Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 
 import java.net.URI;
 import java.util.List;
 
 import static com.facebook.presto.PrestoMediaTypes.PRESTO_PAGES;
 import static com.facebook.presto.spi.StandardErrorCode.PAGE_TOO_LARGE;
 import static com.facebook.presto.spi.StandardErrorCode.PAGE_TRANSPORT_ERROR;
 import static com.facebook.presto.spi.StandardErrorCode.PAGE_TRANSPORT_TIMEOUT;
 import static com.facebook.presto.testing.TestingBlockEncodingManager.createTestingBlockEncodingManager;
 import static com.facebook.presto.util.Failures.WORKER_NODE_ERROR;
 import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
 import static io.airlift.concurrent.Threads.daemonThreadsNamed;
 import static io.airlift.testing.Assertions.assertContains;
 import static io.airlift.testing.Assertions.assertInstanceOf;
 import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static org.testng.Assert.assertEquals;
 
 {
 
     @BeforeClass
     public void setUp()
     {
          = newScheduledThreadPool(4, daemonThreadsNamed("test-%s"));
     }
 
     @AfterClass
     public void tearDown()
     {
         if ( != null) {
             .shutdownNow();
              = null;
         }
     }
 
     @Test
     public void testHappyPath()
             throws Exception
     {
         Page expectedPage = new Page(100);
 
         DataSize expectedMaxSize = new DataSize(11, .);
         MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(expectedMaxSize);
 
         CyclicBarrier requestComplete = new CyclicBarrier(2);
 
         TestingClientCallback callback = new TestingClientCallback(requestComplete);
 
         URI location = URI.create("http://localhost:8080");
         HttpPageBufferClient client = new HttpPageBufferClient(new TestingHttpClient(processor),
                 expectedMaxSize,
                 new Duration(1, .),
                 location,
                 callback,
                createTestingBlockEncodingManager(),
                ,
                Stopwatch.createUnstarted());
        assertStatus(clientlocation"queued", 0, 0, 0, 0, "not scheduled");
        // fetch a page and verify
        processor.addPage(locationexpectedPage);
        callback.resetStats();
        client.scheduleRequest();
        requestComplete.await(1, .);
        assertEquals(callback.getPages().size(), 1);
        assertPageEquals(expectedPagecallback.getPages().get(0));
        assertEquals(callback.getCompletedRequests(), 1);
        assertEquals(callback.getFinishedBuffers(), 0);
        assertStatus(clientlocation"queued", 1, 1, 1, 0, "not scheduled");
        // fetch no data and verify
        callback.resetStats();
        client.scheduleRequest();
        requestComplete.await(1, .);
        assertEquals(callback.getPages().size(), 0);
        assertEquals(callback.getCompletedRequests(), 1);
        assertEquals(callback.getFinishedBuffers(), 0);
        assertStatus(clientlocation"queued", 1, 2, 2, 0, "not scheduled");
        // fetch two more pages and verify
        processor.addPage(locationexpectedPage);
        processor.addPage(locationexpectedPage);
        callback.resetStats();
        client.scheduleRequest();
        requestComplete.await(1, .);
        assertEquals(callback.getPages().size(), 2);
        assertPageEquals(expectedPagecallback.getPages().get(0));
        assertPageEquals(expectedPagecallback.getPages().get(1));
        assertEquals(callback.getCompletedRequests(), 1);
        assertEquals(callback.getFinishedBuffers(), 0);
        assertEquals(callback.getFailedBuffers(), 0);
        callback.resetStats();
        assertStatus(clientlocation"queued", 3, 3, 3, 0, "not scheduled");
        // finish and verify
        callback.resetStats();
        processor.setComplete(location);
        client.scheduleRequest();
        requestComplete.await(1, .);
        assertEquals(callback.getPages().size(), 0);
        assertEquals(callback.getCompletedRequests(), 0);
        assertEquals(callback.getFinishedBuffers(), 1);
        assertEquals(callback.getFailedBuffers(), 0);
        assertStatus(clientlocation"closed", 3, 4, 4, 0, "not scheduled");
    }
    @Test
    public void testLifecycle()
            throws Exception
    {
        CyclicBarrier beforeRequest = new CyclicBarrier(2);
        CyclicBarrier afterRequest = new CyclicBarrier(2);
        StaticRequestProcessor processor = new StaticRequestProcessor(beforeRequestafterRequest);
        processor.setResponse(new TestingResponse(., ImmutableListMultimap.<StringString>of(), new byte[0]));
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        HttpPageBufferClient client = new HttpPageBufferClient(new TestingHttpClient(processor),
                new DataSize(10, .),
                new Duration(1, .),
                location,
                callback,
                createTestingBlockEncodingManager(),
                ,
                Stopwatch.createUnstarted());
        assertStatus(clientlocation"queued", 0, 0, 0, 0, "not scheduled");
        client.scheduleRequest();
        beforeRequest.await(1, .);
        assertStatus(clientlocation"running", 0, 1, 0, 0, "PROCESSING_REQUEST");
        assertEquals(client.isRunning(), true);
        afterRequest.await(1, .);
        requestComplete.await(1, .);
        assertStatus(clientlocation"queued", 0, 1, 1, 1, "not scheduled");
        client.close();
        assertStatus(clientlocation"closed", 0, 1, 1, 1, "not scheduled");
    }
    @Test
    public void testInvalidResponses()
            throws Exception
    {
        CyclicBarrier beforeRequest = new CyclicBarrier(1);
        CyclicBarrier afterRequest = new CyclicBarrier(1);
        StaticRequestProcessor processor = new StaticRequestProcessor(beforeRequestafterRequest);
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        HttpPageBufferClient client = new HttpPageBufferClient(new TestingHttpClient(processor),
                new DataSize(10, .),
                new Duration(1, .),
                location,
                callback,
                createTestingBlockEncodingManager(),
                ,
                Stopwatch.createUnstarted());
        assertStatus(clientlocation"queued", 0, 0, 0, 0, "not scheduled");
        // send not found response and verify response was ignored
        processor.setResponse(new TestingResponse(., ImmutableListMultimap.of(), new byte[0]));
        client.scheduleRequest();
        requestComplete.await(1, .);
        assertEquals(callback.getPages().size(), 0);
        assertEquals(callback.getCompletedRequests(), 1);
        assertEquals(callback.getFinishedBuffers(), 0);
        assertEquals(callback.getFailedBuffers(), 1);
        assertInstanceOf(callback.getFailure(), PageTransportErrorException.class);
        assertContains(callback.getFailure().getMessage(), "Expected response code to be 200, but was 404 Not Found");
        assertStatus(clientlocation"queued", 0, 1, 1, 1, "not scheduled");
        // send invalid content type response and verify response was ignored
        callback.resetStats();
        processor.setResponse(new TestingResponse(., ImmutableListMultimap.of("INVALID_TYPE"), new byte[0]));
        client.scheduleRequest();
        requestComplete.await(1, .);
        assertEquals(callback.getPages().size(), 0);
        assertEquals(callback.getCompletedRequests(), 1);
        assertEquals(callback.getFinishedBuffers(), 0);
        assertEquals(callback.getFailedBuffers(), 1);
        assertInstanceOf(callback.getFailure(), PageTransportErrorException.class);
        assertContains(callback.getFailure().getMessage(), "Expected application/x-presto-pages response from server but got INVALID_TYPE");
        assertStatus(clientlocation"queued", 0, 2, 2, 2, "not scheduled");
        // send unexpected content type response and verify response was ignored
        callback.resetStats();
        processor.setResponse(new TestingResponse(., ImmutableListMultimap.of("text/plain"), new byte[0]));
        client.scheduleRequest();
        requestComplete.await(1, .);
        assertEquals(callback.getPages().size(), 0);
        assertEquals(callback.getCompletedRequests(), 1);
        assertEquals(callback.getFinishedBuffers(), 0);
        assertEquals(callback.getFailedBuffers(), 1);
        assertInstanceOf(callback.getFailure(), PageTransportErrorException.class);
        assertContains(callback.getFailure().getMessage(), "Expected application/x-presto-pages response from server but got text/plain");
        assertStatus(clientlocation"queued", 0, 3, 3, 3, "not scheduled");
        // close client and verify
        client.close();
        assertStatus(clientlocation"closed", 0, 3, 3, 3, "not scheduled");
    }
    @Test
    public void testCloseDuringPendingRequest()
            throws Exception
    {
        CyclicBarrier beforeRequest = new CyclicBarrier(2);
        CyclicBarrier afterRequest = new CyclicBarrier(2);
        StaticRequestProcessor processor = new StaticRequestProcessor(beforeRequestafterRequest);
        processor.setResponse(new TestingResponse(., ImmutableListMultimap.<StringString>of(), new byte[0]));
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        HttpPageBufferClient client = new HttpPageBufferClient(new TestingHttpClient(processor),
                new DataSize(10, .),
                new Duration(1, .),
                location,
                callback,
                createTestingBlockEncodingManager(),
                ,
                Stopwatch.createUnstarted());
        assertStatus(clientlocation"queued", 0, 0, 0, 0, "not scheduled");
        // send request
        client.scheduleRequest();
        beforeRequest.await(1, .);
        assertStatus(clientlocation"running", 0, 1, 0, 0, "PROCESSING_REQUEST");
        assertEquals(client.isRunning(), true);
        // request is pending, now close it
        client.close();
        try {
            requestComplete.await(1, .);
        }
        catch (BrokenBarrierException ignored) {
        }
        assertStatus(clientlocation"closed", 0, 1, 1, 1, "not scheduled");
    }
    @Test
    public void testExceptionFromResponseHandler()
            throws Exception
    {
        final TestingTicker ticker = new TestingTicker();
        final AtomicReference<DurationtickerIncrement = new AtomicReference<>(new Duration(0, .));
        Function<RequestResponseprocessor = (input) -> {
            Duration delta = tickerIncrement.get();
            ticker.increment(delta.toMillis(), .);
            throw new RuntimeException("Foo");
        };
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        HttpPageBufferClient client = new HttpPageBufferClient(new TestingHttpClient(processor),
                new DataSize(10, .),
                new Duration(1, .),
                location,
                callback,
                createTestingBlockEncodingManager(),
                ,
                Stopwatch.createUnstarted(ticker));
        assertStatus(clientlocation"queued", 0, 0, 0, 0, "not scheduled");
        // request processor will throw exception, verify the request is marked a completed
        // this starts the error stopwatch
        client.scheduleRequest();
        requestComplete.await(1, .);
        assertEquals(callback.getPages().size(), 0);
        assertEquals(callback.getCompletedRequests(), 1);
        assertEquals(callback.getFinishedBuffers(), 0);
        assertEquals(callback.getFailedBuffers(), 0);
        assertStatus(clientlocation"queued", 0, 1, 1, 1, "not scheduled");
        // advance time forward, but not enough to fail the client
        tickerIncrement.set(new Duration(30, .));
        // verify that the client has not failed
        client.scheduleRequest();
        requestComplete.await(1, .);
        assertEquals(callback.getPages().size(), 0);
        assertEquals(callback.getCompletedRequests(), 2);
        assertEquals(callback.getFinishedBuffers(), 0);
        assertEquals(callback.getFailedBuffers(), 0);
        assertStatus(clientlocation"queued", 0, 2, 2, 2, "not scheduled");
        // advance time forward beyond the minimum error duration
        tickerIncrement.set(new Duration(31, .));
        // verify that the client has failed
        client.scheduleRequest();
        requestComplete.await(1, .);
        assertEquals(callback.getPages().size(), 0);
        assertEquals(callback.getCompletedRequests(), 3);
        assertEquals(callback.getFinishedBuffers(), 0);
        assertEquals(callback.getFailedBuffers(), 1);
        assertInstanceOf(callback.getFailure(), PageTransportTimeoutException.class);
        assertContains(callback.getFailure().getMessage(),  + " (http://localhost:8080/0 - requests failed for 61.00s)");
        assertStatus(clientlocation"queued", 0, 3, 3, 3, "not scheduled");
    }
    @Test
    public void testErrorCodes()
            throws Exception
    {
    }
    private static void assertStatus(
            HttpPageBufferClient client,
            URI locationString status,
            int pagesReceived,
            int requestsScheduled,
            int requestsCompleted,
            int requestsFailed,
            String httpRequestState)
    {
        PageBufferClientStatus actualStatus = client.getStatus();
        assertEquals(actualStatus.getUri(), location);
        assertEquals(actualStatus.getState(), status"status");
        assertEquals(actualStatus.getPagesReceived(), pagesReceived"pagesReceived");
        assertEquals(actualStatus.getRequestsScheduled(), requestsScheduled"requestsScheduled");
        assertEquals(actualStatus.getRequestsCompleted(), requestsCompleted"requestsCompleted");
        assertEquals(actualStatus.getRequestsFailed(), requestsFailed"requestsFailed");
        assertEquals(actualStatus.getHttpRequestState(), httpRequestState"httpRequestState");
    }
    private static void assertPageEquals(Page expectedPagePage actualPage)
    {
        assertEquals(actualPage.getPositionCount(), expectedPage.getPositionCount());
        assertEquals(actualPage.getChannelCount(), expectedPage.getChannelCount());
    }
    private static class TestingClientCallback
            implements ClientCallback
    {
        private final CyclicBarrier done;
        private final List<Pagepages = Collections.synchronizedList(new ArrayList<Page>());
        private final AtomicInteger completedRequests = new AtomicInteger();
        private final AtomicInteger finishedBuffers = new AtomicInteger();
        private final AtomicInteger failedBuffers = new AtomicInteger();
        private final AtomicReference<Throwablefailure = new AtomicReference<>();
        public TestingClientCallback(CyclicBarrier done)
        {
            this. = done;
        }
        public List<PagegetPages()
        {
            return ;
        }
        private int getCompletedRequests()
        {
            return .get();
        }
        private int getFinishedBuffers()
        {
            return .get();
        }
        public int getFailedBuffers()
        {
            return .get();
        }
        public Throwable getFailure()
        {
            return .get();
        }
        @Override
        public void addPage(HttpPageBufferClient clientPage page)
        {
            .add(page);
        }
        @Override
        public void requestComplete(HttpPageBufferClient client)
        {
            .getAndIncrement();
            try {
                .await(1, .);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw Throwables.propagate(e);
            }
            catch (BrokenBarrierException | TimeoutException e) {
                throw Throwables.propagate(e);
            }
        }
        @Override
        public void clientFinished(HttpPageBufferClient client)
        {
            .getAndIncrement();
            try {
                .await(1, .);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw Throwables.propagate(e);
            }
            catch (BrokenBarrierException | TimeoutException e) {
                throw Throwables.propagate(e);
            }
        }
        @Override
        public void clientFailed(HttpPageBufferClient clientThrowable cause)
        {
            .getAndIncrement();
            .compareAndSet(nullcause);
            // requestComplete() will be called after this
        }
        public void resetStats()
        {
            .clear();
            .set(0);
            .set(0);
            .set(0);
            .set(null);
        }
    }
    private static class StaticRequestProcessor
            implements Function<RequestResponse>
    {
        private final AtomicReference<Responseresponse = new AtomicReference<>();
        private final CyclicBarrier beforeRequest;
        private final CyclicBarrier afterRequest;
        private StaticRequestProcessor(CyclicBarrier beforeRequestCyclicBarrier afterRequest)
        {
            this. = beforeRequest;
            this. = afterRequest;
        }
        private void setResponse(Response response)
        {
            this..set(response);
        }
        @SuppressWarnings("ThrowFromFinallyBlock")
        @Override
        public Response apply(@Nullable Request request)
        {
            try {
                .await(1, .);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw Throwables.propagate(e);
            }
            catch (BrokenBarrierException | TimeoutException e) {
                throw Throwables.propagate(e);
            }
            try {
                return .get();
            }
            finally {
                try {
                    .await(1, .);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw Throwables.propagate(e);
                }
                catch (BrokenBarrierException | TimeoutException e) {
                    throw Throwables.propagate(e);
                }
            }
        }
    }
New to GrepCode? Check out our FAQ X