Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 import java.util.List;
 
 import static com.facebook.presto.OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS;
 import static com.facebook.presto.execution.BufferResult.emptyResults;
 import static com.facebook.presto.spi.type.BigintType.BIGINT;
 import static io.airlift.concurrent.Threads.daemonThreadsNamed;
 import static io.airlift.units.DataSize.Unit.BYTE;
 import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 public class TestSharedBuffer
 {
     private static final Duration NO_WAIT = new Duration(0, .);
     private static final Duration MAX_WAIT = new Duration(1, .);
     private static final DataSize PAGE_SIZE = new DataSize(createPage(42).getSizeInBytes(), );
     private static final TaskId TASK_ID = new TaskId("query""stage""task");
 
     private static final ImmutableList<BigintTypeTYPES = ImmutableList.of();
     public static final TaskId FIRST = new TaskId("query""stage""first_task");
     public static final TaskId SECOND = new TaskId("query""stage""second_task");
     public static final TaskId QUEUE = new TaskId("query""stage""queue");
     public static final TaskId FOO = new TaskId("foo""bar""baz");
 
     private static Page createPage(int i)
     {
         return new Page(BlockAssertions.createLongsBlock(i));
     }
 
     public static DataSize sizeOfPages(int count)
     {
         return new DataSize(.toBytes() * count.);
     }
 
 
     @BeforeClass
     public void setUp()
             throws Exception
     {
     }
 
     @AfterClass
     public void tearDown()
             throws Exception
     {
         if ( != null) {
             .shutdownNow();
              = null;
         }
     }
 
     @Test
     public void testInvalidConstructorArg()
             throws Exception
     {
         try {
             new SharedBuffer(new DataSize(0, .));
             fail("Expected IllegalStateException");
         }
        catch (IllegalArgumentException e) {
        }
    }
    @Test
    public void testSimple()
            throws Exception
    {
        SharedBuffer sharedBuffer = new SharedBuffer(sizeOfPages(10));
        // add three items
        for (int i = 0; i < 3; i++) {
            addPage(sharedBuffercreatePage(i));
        }
        // add a queue
        sharedBuffer.setOutputBuffers(outputBuffers);
        assertQueueState(sharedBuffer, 3, 0);
        // get the three elements
        assertBufferResultEquals(getBufferResult(sharedBuffer, 0, sizeOfPages(10), ), bufferResult(0, createPage(0), createPage(1), createPage(2)));
        // pages not acknowledged yet so state is the same
        assertQueueState(sharedBuffer, 3, 0);
        // acknowledge first three pages
        sharedBuffer.get(, 3, sizeOfPages(10)).cancel(true);
        // pages now acknowledged
        assertQueueState(sharedBuffer, 0, 3);
        // fill the buffer (we already added 3 pages)
        for (int i = 3; i < 10; i++) {
            addPage(sharedBuffercreatePage(i));
        }
        assertQueueState(sharedBuffer, 7, 3);
        // try to add one more page, which should block
        ListenableFuture<?> future = enqueuePage(sharedBuffercreatePage(10));
        // remove a page
        assertBufferResultEquals(getBufferResult(sharedBuffer, 3, sizeOfPages(1), ), bufferResult(3, createPage(3)));
        // page not acknowledged yet so sent count is the same
        assertQueueState(sharedBuffer, 8, 3);
        // we should still be blocked
        assertFalse(future.isDone());
        //
        // add another buffer and verify it sees all pages
        outputBuffers = outputBuffers.withBuffer(new UnpartitionedPagePartitionFunction());
        sharedBuffer.setOutputBuffers(outputBuffers);
        assertQueueState(sharedBuffer, 11, 0);
        assertBufferResultEquals(getBufferResult(sharedBuffer, 0, sizeOfPages(10), ), bufferResult(0, createPage(0),
                createPage(1),
                createPage(2),
                createPage(3),
                createPage(4),
                createPage(5),
                createPage(6),
                createPage(7),
                createPage(8),
                createPage(9)));
        // page not acknowledged yet so sent count is still zero
        assertQueueState(sharedBuffer, 11, 0);
        // acknowledge the 10 pages
        sharedBuffer.get(, 10, sizeOfPages(10)).cancel(true);
        assertQueueState(sharedBuffer, 1, 10);
        //
        // tell shared buffer there will be no more queues
        outputBuffers = outputBuffers.withNoMoreBufferIds();
        sharedBuffer.setOutputBuffers(outputBuffers);
        // since both queues consumed the first three pages, the blocked page future from above should be done
        future.get(1, .);
        // we should be able to add 3 more pages (the third will be queued)
        // although the first queue fetched the 4th page, the page has not been acknowledged yet
        addPage(sharedBuffercreatePage(11));
        addPage(sharedBuffercreatePage(12));
        future = enqueuePage(sharedBuffercreatePage(13));
        assertQueueState(sharedBuffer, 11, 3);
        assertQueueState(sharedBuffer, 4, 10);
        // remove a page from the first queue
        assertBufferResultEquals(getBufferResult(sharedBuffer, 4, sizeOfPages(1), ), bufferResult(4, createPage(4)));
        // the blocked page future above should be done
        future.get(1, .);
        assertQueueState(sharedBuffer, 10, 4);
        assertQueueState(sharedBuffer, 4, 10);
        //
        // finish the buffer
        assertFalse(sharedBuffer.isFinished());
        sharedBuffer.setNoMorePages();
        assertQueueState(sharedBuffer, 10, 4);
        assertQueueState(sharedBuffer, 4, 10);
        // not fully finished until all pages are consumed
        assertFalse(sharedBuffer.isFinished());
        // remove a page, not finished
        assertBufferResultEquals(getBufferResult(sharedBuffer, 5, sizeOfPages(1), ), bufferResult(5, createPage(5)));
        assertQueueState(sharedBuffer, 9, 5);
        assertQueueState(sharedBuffer, 4, 10);
        assertFalse(sharedBuffer.isFinished());
        // remove all remaining pages from first queue, should not be finished
        BufferResult x = getBufferResult(sharedBuffer, 6, sizeOfPages(10), );
                createPage(7),
                createPage(8),
                createPage(9),
                createPage(10),
                createPage(11),
                createPage(12),
                createPage(13)));
        assertQueueState(sharedBuffer, 8, 6);
        assertBufferResultEquals(getBufferResult(sharedBuffer, 14, sizeOfPages(10), ), emptyResults(14, true));
        assertQueueClosed(sharedBuffer, 14);
        assertQueueState(sharedBuffer, 4, 10);
        assertFalse(sharedBuffer.isFinished());
        // remove all remaining pages from second queue, should be finished
        assertBufferResultEquals(getBufferResult(sharedBuffer, 10, sizeOfPages(10), ), bufferResult(10, createPage(10),
                createPage(11),
                createPage(12),
                createPage(13)));
        assertQueueState(sharedBuffer, 4, 10);
        assertBufferResultEquals(getBufferResult(sharedBuffer, 14, sizeOfPages(10), ), emptyResults(14, true));
        assertQueueClosed(sharedBuffer, 14);
        assertQueueClosed(sharedBuffer, 14);
        assertFinished(sharedBuffer);
        assertBufferResultEquals(getBufferResult(sharedBuffer, 14, sizeOfPages(10), ), emptyResults(14, true));
        assertBufferResultEquals(getBufferResult(sharedBuffer, 14, sizeOfPages(10), ), emptyResults(14, true));
    }
    public static BufferResult getBufferResult(SharedBuffer sharedBufferTaskId outputIdlong sequenceIdDataSize maxSizeDuration maxWait)
    {
        ListenableFuture<BufferResultfuture = sharedBuffer.get(outputIdsequenceIdmaxSize);
        return getFuture(futuremaxWait);
    }
    public static BufferResult getFuture(ListenableFuture<BufferResultfutureDuration maxWait)
    {
        try {
            return future.get(maxWait.toMillis(), .);
        }
        catch (TimeoutException e) {
            throw Throwables.propagate(e);
        }
        catch (ExecutionException e) {
            throw Throwables.propagate(e.getCause());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw Throwables.propagate(e);
        }
    }
    @Test
    public void testDuplicateRequests()
            throws Exception
    {
        SharedBuffer sharedBuffer = new SharedBuffer(sizeOfPages(10));
        // add three items
        for (int i = 0; i < 3; i++) {
            addPage(sharedBuffercreatePage(i));
        }
        // add a queue
        OutputBuffers outputBuffers = ;
        outputBuffers = outputBuffers.withBuffer(new UnpartitionedPagePartitionFunction());
        sharedBuffer.setOutputBuffers(outputBuffers);
        assertQueueState(sharedBuffer, 3, 0);
        // get the three elements
        assertBufferResultEquals(getBufferResult(sharedBuffer, 0, sizeOfPages(10), ), bufferResult(0, createPage(0), createPage(1), createPage(2)));
        // pages not acknowledged yet so state is the same
        assertQueueState(sharedBuffer, 3, 0);
        // get the three elements again
        assertBufferResultEquals(getBufferResult(sharedBuffer, 0, sizeOfPages(10), ), bufferResult(0, createPage(0), createPage(1), createPage(2)));
        // pages not acknowledged yet so state is the same
        assertQueueState(sharedBuffer, 3, 0);
        // acknowledge the pages
        sharedBuffer.get(, 3, sizeOfPages(10)).cancel(true);
        // attempt to get the three elements again
        assertBufferResultEquals(getBufferResult(sharedBuffer, 0, sizeOfPages(10), ), emptyResults(0, false));
        // pages not acknowledged yet so state is the same
        assertQueueState(sharedBuffer, 0, 3);
    }
    @Test
    public void testAddQueueAfterNoMoreQueues()
            throws Exception
    {
        SharedBuffer sharedBuffer = new SharedBuffer(sizeOfPages(10));
        assertFalse(sharedBuffer.isFinished());
        // tell buffer no more queues will be added
        assertFalse(sharedBuffer.isFinished());
        // set no more queues a second time to assure that we don't get an exception or such
        assertFalse(sharedBuffer.isFinished());
        // set no more queues a third time to assure that we don't get an exception or such
        assertFalse(sharedBuffer.isFinished());
        try {
            OutputBuffers outputBuffers = 
                    .withBuffer(new UnpartitionedPagePartitionFunction())
                    .withNoMoreBufferIds();
            sharedBuffer.setOutputBuffers(outputBuffers);
            fail("Expected IllegalStateException from addQueue after noMoreQueues has been called");
        }
        catch (IllegalStateException expected) {
        }
    }
    @Test
    public void testAddQueueAfterDestroy()
            throws Exception
    {
        SharedBuffer sharedBuffer = new SharedBuffer(sizeOfPages(10));
        assertFalse(sharedBuffer.isFinished());
        // destroy buffer
        sharedBuffer.destroy();
        assertFinished(sharedBuffer);
        // set no more queues to assure that we don't get an exception or such
        assertFinished(sharedBuffer);
        // set no more queues a second time to assure that we don't get an exception or such
        assertFinished(sharedBuffer);
        // add queue calls after finish should be ignored
    }
    @Test
    public void testGetBeforeCreate()
            throws Exception
    {
        SharedBuffer sharedBuffer = new SharedBuffer(sizeOfPages(10));
        assertFalse(sharedBuffer.isFinished());
        // get a page from a buffer that doesn't exist yet
        ListenableFuture<BufferResultfuture = sharedBuffer.get(, (long) 0, sizeOfPages(1));
        assertFalse(future.isDone());
        // add a page and verify the future is not complete
        addPage(sharedBuffercreatePage(33));
        assertFalse(future.isDone());
        // add the buffer and verify the future completed
    }
    @Test
    public void testAbortBeforeCreate()
            throws Exception
    {
        SharedBuffer sharedBuffer = new SharedBuffer(sizeOfPages(10));
        assertFalse(sharedBuffer.isFinished());
        // get a page from a buffer that doesn't exist yet
        ListenableFuture<BufferResultfuture = sharedBuffer.get(, (long) 0, sizeOfPages(1));
        assertFalse(future.isDone());
        // abort that buffer
        sharedBuffer.abort();
        // add a page and verify the future is not complete
        addPage(sharedBuffercreatePage(33));
        assertFalse(future.isDone());
        // add the buffer and verify we did not get the page
        assertBufferResultEquals(getFuture(future), emptyResults(0, true));
        // verify that a normal read returns a closed empty result
        assertBufferResultEquals(getBufferResult(sharedBuffer, 0, sizeOfPages(10), ), emptyResults(0, true));
    }
    @Test
    public void testAddStateMachine()
            throws Exception
    {
        // add after finish
        SharedBuffer sharedBuffer = new SharedBuffer(sizeOfPages(10));
        sharedBuffer.setNoMorePages();
        addPage(sharedBuffercreatePage(0));
        addPage(sharedBuffercreatePage(0));
        assertEquals(sharedBuffer.getInfo().getPagesAdded(), 0);
        // add after destroy
        sharedBuffer = new SharedBuffer(sizeOfPages(10));
        sharedBuffer.destroy();
        addPage(sharedBuffercreatePage(0));
        addPage(sharedBuffercreatePage(0));
        assertEquals(sharedBuffer.getInfo().getPagesAdded(), 0);
    }
    @Test
    public void testAbort()
            throws Exception
    {
        SharedBuffer sharedBuffer = new SharedBuffer(sizeOfPages(10));
        // fill the buffer
        for (int i = 0; i < 10; i++) {
            addPage(sharedBuffercreatePage(i));
        }
        sharedBuffer.setNoMorePages();
        OutputBuffers outputBuffers = ;
        outputBuffers = outputBuffers.withBuffer(new UnpartitionedPagePartitionFunction());
        sharedBuffer.setOutputBuffers(outputBuffers);
        assertBufferResultEquals(getBufferResult(sharedBuffer, 0, sizeOfPages(1), ), bufferResult(0, createPage(0)));
        sharedBuffer.abort();
        assertQueueClosed(sharedBuffer, 0);
        assertBufferResultEquals(getBufferResult(sharedBuffer, 1, sizeOfPages(1), ), emptyResults(1, true));
        outputBuffers = outputBuffers.withBuffer(new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds();
        sharedBuffer.setOutputBuffers(outputBuffers);
        assertBufferResultEquals(getBufferResult(sharedBuffer, 0, sizeOfPages(1), ), bufferResult(0, createPage(0)));
        sharedBuffer.abort();
        assertQueueClosed(sharedBuffer, 0);
        assertFinished(sharedBuffer);
        assertBufferResultEquals(getBufferResult(sharedBuffer, 1, sizeOfPages(1), ), emptyResults(0, true));
    }
    @Test
    public void testFinishClosesEmptyQueues()
            throws Exception
    {
        SharedBuffer sharedBuffer = new SharedBuffer(sizeOfPages(10));
                .withBuffer(new UnpartitionedPagePartitionFunction())
                .withBuffer(new UnpartitionedPagePartitionFunction()));
        // finish while queues are empty
        sharedBuffer.setNoMorePages();
        assertQueueClosed(sharedBuffer, 0);
        assertQueueClosed(sharedBuffer, 0);
    }
    @Test
    public void testAbortFreesReader()
            throws Exception
    {
        SharedBuffer sharedBuffer = new SharedBuffer(sizeOfPages(5));
        assertFalse(sharedBuffer.isFinished());
        // attempt to get a page
        ListenableFuture<BufferResultfuture = sharedBuffer.get(, 0, sizeOfPages(10));
        // verify we are waiting for a page
        assertFalse(future.isDone());
        // add one item
        addPage(sharedBuffercreatePage(0));
        // verify we got one page
        // attempt to get another page, and verify we are blocked
        future = sharedBuffer.get(, 1, sizeOfPages(10));
        assertFalse(future.isDone());
        // abort the buffer
        sharedBuffer.abort();
        assertQueueClosed(sharedBuffer, 1);
        // verify the future completed
        assertBufferResultEquals(getFuture(future), emptyResults(1, true));
    }
    @Test
    public void testFinishFreesReader()
            throws Exception
    {
        SharedBuffer sharedBuffer = new SharedBuffer(sizeOfPages(5));
        assertFalse(sharedBuffer.isFinished());
        // attempt to get a page
        ListenableFuture<BufferResultfuture = sharedBuffer.get(, 0, sizeOfPages(10));
        // verify we are waiting for a page
        assertFalse(future.isDone());
        // add one item
        addPage(sharedBuffercreatePage(0));
        // verify we got one page
        // attempt to get another page, and verify we are blocked
        future = sharedBuffer.get(, 1, sizeOfPages(10));
        assertFalse(future.isDone());
        // finish the buffer
        sharedBuffer.setNoMorePages();
        assertQueueClosed(sharedBuffer, 1);
        // verify the future completed
        assertBufferResultEquals(getFuture(future), emptyResults(1, true));
    }
    @Test
    public void testFinishFreesWriter()
            throws Exception
    {
        SharedBuffer sharedBuffer = new SharedBuffer(sizeOfPages(5));
                .withBuffer(new UnpartitionedPagePartitionFunction())
                .withNoMoreBufferIds());
        assertFalse(sharedBuffer.isFinished());
        // fill the buffer
        for (int i = 0; i < 5; i++) {
            addPage(sharedBuffercreatePage(i));
        }
        // enqueue the addition two pages more pages
        ListenableFuture<?> firstEnqueuePage = enqueuePage(sharedBuffercreatePage(5));
        ListenableFuture<?> secondEnqueuePage = enqueuePage(sharedBuffercreatePage(6));
        // get and acknowledge one page
        assertBufferResultEquals(getBufferResult(sharedBuffer, 0, sizeOfPages(1), ), bufferResult(0, createPage(0)));
        sharedBuffer.get(, 1, sizeOfPages(1)).cancel(true);
        // verify the first blocked page was accepted but the second one was not
        assertTrue(firstEnqueuePage.isDone());
        assertFalse(secondEnqueuePage.isDone());
        // finish the query
        sharedBuffer.setNoMorePages();
        assertFalse(sharedBuffer.isFinished());
        // verify second future was completed
        assertTrue(secondEnqueuePage.isDone());
        // get the last 5 page (page 6 was never accepted)
        assertBufferResultEquals(getBufferResult(sharedBuffer, 1, sizeOfPages(100), ),
                bufferResult(1, createPage(1), createPage(2), createPage(3), createPage(4), createPage(5)));
        assertBufferResultEquals(getBufferResult(sharedBuffer, 6, sizeOfPages(100), ), emptyResults(6, true));
        // verify finished
        assertFinished(sharedBuffer);
    }
    @Test
    public void testDestroyFreesReader()
            throws Exception
    {
        SharedBuffer sharedBuffer = new SharedBuffer(sizeOfPages(5));
                .withBuffer(new UnpartitionedPagePartitionFunction())
                .withNoMoreBufferIds());
        assertFalse(sharedBuffer.isFinished());
        // attempt to get a page
        ListenableFuture<BufferResultfuture = sharedBuffer.get(, 0, sizeOfPages(10));
        // verify we are waiting for a page
        assertFalse(future.isDone());
        // add one page
        addPage(sharedBuffercreatePage(0));
        // verify we got one page
        // attempt to get another page, and verify we are blocked
        future = sharedBuffer.get(, 1, sizeOfPages(10));
        assertFalse(future.isDone());
        // destroy the buffer
        sharedBuffer.destroy();
        assertQueueClosed(sharedBuffer, 1);
        // verify the future completed
        assertBufferResultEquals(getFuture(future), emptyResults(1, true));
    }
    @Test
    public void testDestroyFreesWriter()
            throws Exception
    {
        SharedBuffer sharedBuffer = new SharedBuffer(sizeOfPages(5));
                .withBuffer(new UnpartitionedPagePartitionFunction())
                .withNoMoreBufferIds());
        assertFalse(sharedBuffer.isFinished());
        // fill the buffer
        for (int i = 0; i < 5; i++) {
            addPage(sharedBuffercreatePage(i));
        }
        // enqueue the addition two pages more pages
        ListenableFuture<?> firstEnqueuePage = enqueuePage(sharedBuffercreatePage(5));
        ListenableFuture<?> secondEnqueuePage = enqueuePage(sharedBuffercreatePage(6));
        // get and acknowledge one page
        assertBufferResultEquals(getBufferResult(sharedBuffer, 0, sizeOfPages(1), ), bufferResult(0, createPage(0)));
        sharedBuffer.get(, 1, sizeOfPages(1)).cancel(true);
        // verify the first blocked page was accepted but the second one was not
        assertTrue(firstEnqueuePage.isDone());
        assertFalse(secondEnqueuePage.isDone());
        // destroy the buffer (i.e., cancel the query)
        sharedBuffer.destroy();
        assertFinished(sharedBuffer);
        // verify the second future was completed
        assertTrue(secondEnqueuePage.isDone());
    }
    private static ListenableFuture<?> enqueuePage(SharedBuffer sharedBufferPage page)
    {
        ListenableFuture<?> future = sharedBuffer.enqueue(page);
        assertFalse(future.isDone());
        return future;
    }
    private static void addPage(SharedBuffer sharedBufferPage page)
    {
        assertTrue(sharedBuffer.enqueue(page).isDone());
    }
    private static void assertQueueState(SharedBuffer sharedBufferTaskId queueIdint sizeint pagesSent)
    {
        assertEquals(getBufferInfo(sharedBufferqueueId), new BufferInfo(queueIdfalsesizepagesSent));
    }
    private static void assertQueueClosed(SharedBuffer sharedBufferTaskId queueIdint pagesSent)
    {
        assertEquals(getBufferInfo(sharedBufferqueueId), new BufferInfo(queueIdtrue, 0, pagesSent));
    }
    private static BufferInfo getBufferInfo(SharedBuffer sharedBufferTaskId queueId)
    {
        for (BufferInfo bufferInfo : sharedBuffer.getInfo().getBuffers()) {
            if (bufferInfo.getBufferId().equals(queueId)) {
                return bufferInfo;
            }
        }
        return null;
    }
    private static void assertFinished(SharedBuffer sharedBuffer)
            throws Exception
    {
        assertTrue(sharedBuffer.isFinished());
        for (BufferInfo bufferInfo : sharedBuffer.getInfo().getBuffers()) {
            assertTrue(bufferInfo.isFinished());
            assertEquals(bufferInfo.getBufferedPages(), 0);
        }
    }
//<<<<<<< HEAD
    private static void assertBufferResultEquals(List<? extends TypetypesBufferResult actualBufferResult expected)
//=======
//    private static void assertBufferResultEquals(BufferResult actual, BufferResult expected)
//>>>>>>> Use async http responses for task communication
    {
        assertEquals(actual.getPages().size(), expected.getPages().size());
        assertEquals(actual.getToken(), expected.getToken());
        for (int i = 0; i < actual.getPages().size(); i++) {
            Page actualPage = actual.getPages().get(i);
            Page expectedPage = expected.getPages().get(i);
            assertEquals(actualPage.getChannelCount(), expectedPage.getChannelCount());
            PageAssertions.assertPageEquals(typesactualPageexpectedPage);
        }
        assertEquals(actual.isBufferClosed(), expected.isBufferClosed());
    }
    public static BufferResult bufferResult(long tokenPage firstPagePage... otherPages)
    {
        List<Pagepages = ImmutableList.<Page>builder().add(firstPage).add(otherPages).build();
        return new BufferResult(tokentoken + pages.size(), falsepages);
    }
New to GrepCode? Check out our FAQ X