Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 import java.net.URI;
 
 import static com.facebook.presto.OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS;
 import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
 import static com.facebook.presto.execution.TaskTestUtils.EMPTY_SOURCES;
 import static com.facebook.presto.execution.TaskTestUtils.PLAN_FRAGMENT;
 import static com.facebook.presto.execution.TaskTestUtils.SPLIT;
 import static com.facebook.presto.execution.TaskTestUtils.TABLE_SCAN_NODE_ID;
 import static com.facebook.presto.execution.TaskTestUtils.createTestingPlanner;
 import static com.facebook.presto.execution.TaskTestUtils.updateTask;
 import static io.airlift.concurrent.Threads.threadsNamed;
 import static io.airlift.units.DataSize.Unit.MEGABYTE;
 import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 public class TestSqlTask
 {
     public static final TaskId OUT = new TaskId("query""stage""out");
     private final TaskExecutor taskExecutor;
 
     private final AtomicLong nextTaskId = new AtomicLong();
 
     public TestSqlTask()
     {
          = new TaskExecutor(8);
         .start();
 
          = newScheduledThreadPool(5, threadsNamed("task-notification-%d"));
 
         LocalExecutionPlanner planner = createTestingPlanner();
 
                 ,
                 ,
                 planner,
                 new QueryMonitor(new ObjectMapperProvider().get(), new NullEventClient(), new NodeInfo("test"), new NodeVersion("testVersion")),
                 new TaskManagerConfig());
     }
 
     @AfterClass
     public void destroy()
             throws Exception
     {
         .stop();
     }
 
     @Test
     public void testEmptyQuery()
             throws Exception
     {
        SqlTask sqlTask = createInitialTask();
        TaskInfo taskInfo = sqlTask.updateTask(,
                ,
                ImmutableList.<TaskSource>of(),
                );
        assertEquals(taskInfo.getState(), .);
        taskInfo = sqlTask.getTaskInfo();
        assertEquals(taskInfo.getState(), .);
        taskInfo = sqlTask.updateTask(,
                ,
                ImmutableList.of(new TaskSource(, ImmutableSet.<ScheduledSplit>of(), true)),
                .withNoMoreBufferIds());
        assertEquals(taskInfo.getState(), .);
        taskInfo = sqlTask.getTaskInfo();
        assertEquals(taskInfo.getState(), .);
    }
    @Test
    public void testSimpleQuery()
            throws Exception
    {
        SqlTask sqlTask = createInitialTask();
        TaskInfo taskInfo = sqlTask.updateTask(,
                ,
                ImmutableList.of(new TaskSource(, ImmutableSet.of(), true)),
        assertEquals(taskInfo.getState(), .);
        taskInfo = sqlTask.getTaskInfo();
        assertEquals(taskInfo.getState(), .);
        BufferResult results = sqlTask.getTaskResults(, 0, new DataSize(1, .)).get();
        assertEquals(results.isBufferClosed(), false);
        assertEquals(results.getPages().size(), 1);
        assertEquals(results.getPages().get(0).getPositionCount(), 1);
        results = sqlTask.getTaskResults(results.getToken() + results.getPages().size(), new DataSize(1, .)).get();
        assertEquals(results.isBufferClosed(), true);
        assertEquals(results.getPages().size(), 0);
        taskInfo = sqlTask.getTaskInfo(taskInfo.getState()).get(1, .);
        assertEquals(taskInfo.getState(), .);
        taskInfo = sqlTask.getTaskInfo();
        assertEquals(taskInfo.getState(), .);
    }
    @Test
    public void testCancel()
            throws Exception
    {
        SqlTask sqlTask = createInitialTask();
        TaskInfo taskInfo = sqlTask.updateTask(,
                ,
                ImmutableList.<TaskSource>of(),
                );
        assertEquals(taskInfo.getState(), .);
        assertNull(taskInfo.getStats().getEndTime());
        taskInfo = sqlTask.getTaskInfo();
        assertEquals(taskInfo.getState(), .);
        assertNull(taskInfo.getStats().getEndTime());
        taskInfo = sqlTask.cancel();
        assertEquals(taskInfo.getState(), .);
        assertNotNull(taskInfo.getStats().getEndTime());
        taskInfo = sqlTask.getTaskInfo();
        assertEquals(taskInfo.getState(), .);
        assertNotNull(taskInfo.getStats().getEndTime());
    }
    @Test
    public void testAbort()
            throws Exception
    {
        SqlTask sqlTask = createInitialTask();
        TaskInfo taskInfo = sqlTask.updateTask(,
                ,
                ImmutableList.of(new TaskSource(, ImmutableSet.of(), true)),
        assertEquals(taskInfo.getState(), .);
        taskInfo = sqlTask.getTaskInfo();
        assertEquals(taskInfo.getState(), .);
        sqlTask.abortTaskResults();
        taskInfo = sqlTask.getTaskInfo(taskInfo.getState()).get(1, .);
        assertEquals(taskInfo.getState(), .);
        taskInfo = sqlTask.getTaskInfo();
        assertEquals(taskInfo.getState(), .);
    }
    @Test
    public void testBufferCloseOnFinish()
            throws Exception
    {
        SqlTask sqlTask = createInitialTask();
        updateTask(sqlTaskoutputBuffers);
        ListenableFuture<BufferResultbufferResult = sqlTask.getTaskResults(, 0, new DataSize(1, ));
        assertFalse(bufferResult.isDone());
        // finish the task by closing the sources (no splits will ever be added)
        updateTask(sqlTask, ImmutableList.of(new TaskSource(, ImmutableSet.<ScheduledSplit>of(), true)), outputBuffers);
        assertEquals(sqlTask.getTaskInfo().getState(), .);
        // buffer will be closed by cancel event (wait for 500 MS for event to fire)
        assertTrue(bufferResult.get(200, ).isBufferClosed());
        // verify the buffer is closed
        bufferResult = sqlTask.getTaskResults(, 0, new DataSize(1, ));
        assertTrue(bufferResult.isDone());
        assertTrue(bufferResult.get().isBufferClosed());
    }
    @Test
    public void testBufferCloseOnCancel()
            throws Exception
    {
        SqlTask sqlTask = createInitialTask();
        ListenableFuture<BufferResultbufferResult = sqlTask.getTaskResults(, 0, new DataSize(1, ));
        assertFalse(bufferResult.isDone());
        sqlTask.cancel();
        assertEquals(sqlTask.getTaskInfo().getState(), .);
        // buffer will be closed by cancel event.. the event is async so wait a bit for event to propagate
        assertTrue(bufferResult.get(200, ).isBufferClosed());
        bufferResult = sqlTask.getTaskResults(, 0, new DataSize(1, ));
        assertTrue(bufferResult.isDone());
        assertTrue(bufferResult.get().isBufferClosed());
    }
    @Test
    public void testBufferNotCloseOnFail()
            throws Exception
    {
        SqlTask sqlTask = createInitialTask();
        ListenableFuture<BufferResultbufferResult = sqlTask.getTaskResults(, 0, new DataSize(1, ));
        assertFalse(bufferResult.isDone());
        TaskState taskState = sqlTask.getTaskInfo().getState();
        sqlTask.failed(new Exception("test"));
        assertEquals(sqlTask.getTaskInfo(taskState).get(200, ).getState(), .);
        // buffer will not be closed by fail event.  event is async so wait a bit for event to fire
        try {
            assertTrue(bufferResult.get(200, ).isBufferClosed());
            fail("expected TimeoutException");
        }
        catch (TimeoutException expected) {
        }
        assertFalse(sqlTask.getTaskResults(, 0, new DataSize(1, )).isDone());
    }
    public SqlTask createInitialTask()
    {
        TaskId taskId = new TaskId("query""stage""task" + .incrementAndGet());
        URI location = URI.create("fake://task/" + taskId);
        return new SqlTask(
                taskId,
                location,
                ,
                ,
                Functions.<SqlTask>identity(),
                new DataSize(32, ));
    }
New to GrepCode? Check out our FAQ X