Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 import java.net.URI;
 import java.util.List;
 
 import static com.facebook.presto.PrestoMediaTypes.PRESTO_PAGES;
 import static com.facebook.presto.SequencePageBuilder.createSequencePage;
 import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
 import static com.facebook.presto.client.PrestoHeaders.PRESTO_PAGE_NEXT_TOKEN;
 import static com.facebook.presto.client.PrestoHeaders.PRESTO_PAGE_TOKEN;
 import static com.facebook.presto.operator.PageAssertions.assertPageEquals;
 import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
 import static com.facebook.presto.testing.TestingBlockEncodingManager.createTestingBlockEncodingManager;
 import static io.airlift.concurrent.Threads.daemonThreadsNamed;
 import static io.airlift.units.DataSize.Unit.MEGABYTE;
 import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 
 @Test(singleThreaded = true)
 public class TestExchangeOperator
 {
     private static final List<TypeTYPES = ImmutableList.<Type>of();
     private static final Page PAGE = createSequencePage(, 10, 100);
 
     private static final String TASK_1_ID = "task1";
     private static final String TASK_2_ID = "task2";
     private static final String TASK_3_ID = "task3";
 
     private final LoadingCache<StringTaskBuffertaskBuffers = CacheBuilder.newBuilder().build(new CacheLoader<StringTaskBuffer>()
     {
         @Override
         public TaskBuffer load(String key)
                 throws Exception
         {
             return new TaskBuffer();
         }
     });
 
     private HttpClient httpClient;
 
     @BeforeClass
     public void setUp()
             throws Exception
     {
          = newScheduledThreadPool(4, daemonThreadsNamed("test-%s"));
 
        {
            @Override
            public ExchangeClient get()
            {
                return new ExchangeClient(
                        createTestingBlockEncodingManager(),
                        new DataSize(32, ),
                        new DataSize(10, ),
                        3,
                        new Duration(1, .),
                        ,
                        );
            }
        };
    }
    @AfterClass
    public void tearDown()
            throws Exception
    {
        .close();
         = null;
        .shutdownNow();
         = null;
    }
    public void setUpMethod()
    {
        .invalidateAll();
    }
    @Test
    public void testSimple()
            throws Exception
    {
        SourceOperator operator = createExchangeOperator();
        operator.addSplit(newRemoteSplit());
        operator.addSplit(newRemoteSplit());
        operator.addSplit(newRemoteSplit());
        operator.noMoreSplits();
        // add pages and close the buffers
        .getUnchecked().addPages(10, true);
        .getUnchecked().addPages(10, true);
        .getUnchecked().addPages(10, true);
        // read the pages
        waitForPages(operator, 30);
        // wait for finished
        waitForFinished(operator);
    }
    private Split newRemoteSplit(String taskId)
    {
        return new Split("remote"new RemoteSplit(URI.create("http://localhost/" + taskId)));
    }
    @Test
    public void testWaitForClose()
            throws Exception
    {
        SourceOperator operator = createExchangeOperator();
        operator.addSplit(newRemoteSplit());
        operator.addSplit(newRemoteSplit());
        operator.addSplit(newRemoteSplit());
        operator.noMoreSplits();
        // add pages and leave buffers open
        .getUnchecked().addPages(1, false);
        .getUnchecked().addPages(1, false);
        .getUnchecked().addPages(1, false);
        // read 3 pages
        waitForPages(operator, 3);
        // verify state
        assertEquals(operator.isFinished(), false);
        assertEquals(operator.needsInput(), false);
        assertEquals(operator.getOutput(), null);
        // add more pages and close the buffers
        .getUnchecked().addPages(2, true);
        .getUnchecked().addPages(2, true);
        .getUnchecked().addPages(2, true);
        // read all pages
        waitForPages(operator, 6);
        // wait for finished
        waitForFinished(operator);
    }
    @Test
    public void testWaitForNoMoreSplits()
            throws Exception
    {
        SourceOperator operator = createExchangeOperator();
        // add a buffer location containing one page and close the buffer
        operator.addSplit(newRemoteSplit());
        // add pages and leave buffers open
        .getUnchecked().addPages(1, true);
        // read page
        waitForPages(operator, 1);
        // verify state
        assertEquals(operator.isFinished(), false);
        assertEquals(operator.needsInput(), false);
        assertEquals(operator.getOutput(), null);
        // add a buffer location
        operator.addSplit(newRemoteSplit());
        // set no more splits (buffer locations)
        operator.noMoreSplits();
        // add two pages and close the last buffer
        .getUnchecked().addPages(2, true);
        // read all pages
        waitForPages(operator, 2);
        // wait for finished
        waitForFinished(operator);
    }
    @Test
    public void testFinish()
            throws Exception
    {
        SourceOperator operator = createExchangeOperator();
        operator.addSplit(newRemoteSplit());
        operator.addSplit(newRemoteSplit());
        operator.addSplit(newRemoteSplit());
        operator.noMoreSplits();
        // add pages and leave buffers open
        .getUnchecked().addPages(1, false);
        .getUnchecked().addPages(1, false);
        .getUnchecked().addPages(1, false);
        // read 3 pages
        waitForPages(operator, 3);
        // verify state
        assertEquals(operator.isFinished(), false);
        assertEquals(operator.needsInput(), false);
        assertEquals(operator.getOutput(), null);
        // finish without closing buffers
        operator.finish();
        // wait for finished
        waitForFinished(operator);
    }
    {
        ExchangeOperatorFactory operatorFactory = new ExchangeOperatorFactory(0, new PlanNodeId("test"), );
        DriverContext driverContext = new TaskContext(new TaskId("query""stage""task"), )
                .addPipelineContext(truetrue)
                .addDriverContext();
        return operatorFactory.createOperator(driverContext);
    }
    private List<PagewaitForPages(Operator operatorint expectedPageCount)
            throws InterruptedException
    {
        // read expected pages or until 10 seconds has passed
        long endTime = System.nanoTime() + ..toNanos(10);
        List<PageoutputPages = new ArrayList<>();
        while (outputPages.size() < expectedPageCount && System.nanoTime() < endTime) {
            assertEquals(operator.needsInput(), false);
            if (operator.isFinished()) {
                break;
            }
            Page outputPage = operator.getOutput();
            if (outputPage != null) {
                outputPages.add(outputPage);
            }
            else {
                Thread.sleep(10);
            }
        }
        // sleep for a bit to make sure that there aren't extra pages on the way
        Thread.sleep(10);
        // verify state
        assertEquals(operator.needsInput(), false);
        assertNull(operator.getOutput());
        // verify pages
        assertEquals(outputPages.size(), expectedPageCount);
        for (Page page : outputPages) {
            assertPageEquals(operator.getTypes(), page);
        }
        return outputPages;
    }
    private void waitForFinished(Operator operator)
            throws InterruptedException
    {
        // wait for finished or until 10 seconds has passed
        long endTime = System.nanoTime() + ..toNanos(10);
        while (System.nanoTime() < endTime) {
            assertEquals(operator.needsInput(), false);
            assertNull(operator.getOutput());
            if (operator.isFinished()) {
                break;
            }
            Thread.sleep(10);
        }
        // verify final state
        assertEquals(operator.isFinished(), true);
        assertEquals(operator.needsInput(), false);
        assertNull(operator.getOutput());
    }
    private static class HttpClientHandler
            implements Function<RequestResponse>
    {
        private final LoadingCache<StringTaskBuffertaskBuffers;
        public HttpClientHandler(LoadingCache<StringTaskBuffertaskBuffers)
        {
            this. = taskBuffers;
        }
        @Override
        public Response apply(Request request)
        {
            ImmutableList<Stringparts = ImmutableList.copyOf(Splitter.on("/").omitEmptyStrings().split(request.getUri().getPath()));
            assertEquals(parts.size(), 2);
            String taskId = parts.get(0);
            int pageToken = Integer.parseInt(parts.get(1));
            Builder<StringStringheaders = ImmutableListMultimap.builder();
            headers.put(, String.valueOf(pageToken));
            TaskBuffer taskBuffer = .getUnchecked(taskId);
            Page page = taskBuffer.getPage(pageToken);
            if (page != null) {
                headers.put();
                headers.put(, String.valueOf(pageToken + 1));
                DynamicSliceOutput output = new DynamicSliceOutput(256);
                PagesSerde.writePages(createTestingBlockEncodingManager(), outputpage);
                return new TestingResponse(.headers.build(), output.slice().getInput());
            }
            else if (taskBuffer.isFinished()) {
                headers.put(, String.valueOf(pageToken));
                return new TestingResponse(.headers.build(), new byte[0]);
            }
            else {
                headers.put(, String.valueOf(pageToken));
                return new TestingResponse(.headers.build(), new byte[0]);
            }
        }
    }
    private static class TaskBuffer
    {
        private final List<Pagebuffer = new ArrayList<>();
        private int acknowledgedPages;
        private boolean closed;
        private synchronized void addPages(int pagesboolean close)
        {
            addPages(Collections.nCopies(pages));
            if (close) {
                 = true;
            }
        }
        public synchronized void addPages(Iterable<Pagepages)
        {
            Iterables.addAll(pages);
        }
        public synchronized Page getPage(int pageSequenceId)
        {
             = Math.max(pageSequenceId);
            if (pageSequenceId >= .size()) {
                return null;
            }
            return .get(pageSequenceId);
        }
        private synchronized boolean isFinished()
        {
            return  &&  == .size();
        }
    }
New to GrepCode? Check out our FAQ X