Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 
 import java.util.List;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Predicates.notNull;
 import static com.google.common.collect.Iterables.filter;
 import static com.google.common.collect.Iterables.transform;
 import static io.airlift.concurrent.Threads.threadsNamed;
 import static java.util.concurrent.Executors.newCachedThreadPool;
 import static java.util.concurrent.Executors.newScheduledThreadPool;
 
 public class SqlTaskManager
         implements TaskManagerCloseable
 {
     private static final Logger log = Logger.get(SqlTaskManager.class);
 
     private final ExecutorService taskNotificationExecutor;
 
 
     private final Duration infoCacheTime;
     private final Duration clientTimeout;
 
     private final LoadingCache<TaskIdSqlTasktasks;
 
     private final SqlTaskIoStats cachedStats = new SqlTaskIoStats();
     private final SqlTaskIoStats finishedTaskStats = new SqlTaskIoStats();
 
     @Inject
     public SqlTaskManager(
             LocalExecutionPlanner planner,
             final LocationFactory locationFactory,
             TaskExecutor taskExecutor,
             QueryMonitor queryMonitor,
             TaskManagerConfig config)
     {
         checkNotNull(config"config is null");
          = config.getInfoMaxAge();
          = config.getClientTimeout();
 
         final DataSize maxBufferSize = config.getSinkMaxBufferSize();
 
          = newCachedThreadPool(threadsNamed("task-notification-%d"));
 
          = newScheduledThreadPool(5, threadsNamed("task-management-%d"));
 
         final SqlTaskExecutionFactory sqlTaskExecutionFactory = new SqlTaskExecutionFactory(taskExecutorplannerqueryMonitorconfig);
 
          = CacheBuilder.newBuilder().build(new CacheLoader<TaskIdSqlTask>()
         {
             @Override
            public SqlTask load(TaskId taskId)
                    throws Exception
            {
                return new SqlTask(
                        taskId,
                        locationFactory.createLocalTaskLocation(taskId),
                        sqlTaskExecutionFactory,
                        ,
                        sqlTask -> {
                                .merge(sqlTask.getIoStats());
                                return null;
                        },
                        maxBufferSize
                );
            }
        });
    }
    public void start()
    {
        {
            @Override
            public void run()
            {
                try {
                    removeOldTasks();
                }
                catch (Throwable e) {
                    .warn(e"Error removing old tasks");
                }
                try {
                    failAbandonedTasks();
                }
                catch (Throwable e) {
                    .warn(e"Error canceling abandoned tasks");
                }
            }
        }, 200, 200, .);
        {
            @Override
            public void run()
            {
                try {
                    updateStats();
                }
                catch (Throwable e) {
                    .warn(e"Error updating stats");
                }
            }
        }, 0, 1, .);
    }
    @Override
    @PreDestroy
    public void close()
    {
    }
    @Managed
    @Flatten
    public SqlTaskIoStats getIoStats()
    {
        return ;
    }
    @Managed(description = "Task notification executor")
    @Nested
    {
        return ;
    }
    @Managed(description = "Task garbage collector executor")
    @Nested
    {
        return ;
    }
    @Override
    public List<TaskInfogetAllTaskInfo()
    {
        return ImmutableList.copyOf(transform(.asMap().values(), SqlTask::getTaskInfo));
    }
    @Override
    public TaskInfo getTaskInfo(TaskId taskId)
    {
        checkNotNull(taskId"taskId is null");
        return .getUnchecked(taskId).getTaskInfo();
    }
    @Override
    public ListenableFuture<TaskInfogetTaskInfo(TaskId taskIdTaskState currentState)
    {
        checkNotNull(taskId"taskId is null");
        checkNotNull(currentState"currentState is null");
        return .getUnchecked(taskId).getTaskInfo(currentState);
    }
    @Override
    public TaskInfo updateTask(Session sessionTaskId taskIdPlanFragment fragmentList<TaskSourcesourcesOutputBuffers outputBuffers)
    {
        checkNotNull(session"session is null");
        checkNotNull(taskId"taskId is null");
        checkNotNull(fragment"fragment is null");
        checkNotNull(sources"sources is null");
        checkNotNull(outputBuffers"outputBuffers is null");
        return .getUnchecked(taskId).updateTask(sessionfragmentsourcesoutputBuffers);
    }
    @Override
    public ListenableFuture<BufferResultgetTaskResults(TaskId taskIdTaskId outputNamelong startingSequenceIdDataSize maxSize)
    {
        checkNotNull(taskId"taskId is null");
        checkNotNull(outputName"outputName is null");
        Preconditions.checkArgument(startingSequenceId >= 0, "startingSequenceId is negative");
        checkNotNull(maxSize"maxSize is null");
        return .getUnchecked(taskId).getTaskResults(outputNamestartingSequenceIdmaxSize);
    }
    @Override
    public TaskInfo abortTaskResults(TaskId taskIdTaskId outputId)
    {
        checkNotNull(taskId"taskId is null");
        checkNotNull(outputId"outputId is null");
        return .getUnchecked(taskId).abortTaskResults(outputId);
    }
    @Override
    public TaskInfo cancelTask(TaskId taskId)
    {
        checkNotNull(taskId"taskId is null");
        return .getUnchecked(taskId).cancel();
    }
    @Override
    public TaskInfo abortTask(TaskId taskId)
    {
        checkNotNull(taskId"taskId is null");
        return .getUnchecked(taskId).abort();
    }
    public void removeOldTasks()
    {
        DateTime oldestAllowedTask = DateTime.now().minus(.toMillis());
        for (TaskInfo taskInfo : filter(transform(.asMap().values(), SqlTask::getTaskInfo), notNull())) {
            try {
                DateTime endTime = taskInfo.getStats().getEndTime();
                if (endTime != null && endTime.isBefore(oldestAllowedTask)) {
                    .asMap().remove(taskInfo.getTaskId());
                }
            }
            catch (RuntimeException e) {
                .warn(e"Error while inspecting age of complete task %s"taskInfo.getTaskId());
            }
        }
    }
    public void failAbandonedTasks()
    {
        DateTime now = DateTime.now();
        DateTime oldestAllowedHeartbeat = now.minus(.toMillis());
        for (SqlTask sqlTask : .asMap().values()) {
            try {
                TaskInfo taskInfo = sqlTask.getTaskInfo();
                if (taskInfo.getState().isDone()) {
                    continue;
                }
                DateTime lastHeartbeat = taskInfo.getLastHeartbeat();
                if (lastHeartbeat != null && lastHeartbeat.isBefore(oldestAllowedHeartbeat)) {
                    .info("Failing abandoned task %s"taskInfo.getTaskId());
                    sqlTask.failed(new AbandonedException("Task " + taskInfo.getTaskId(), lastHeartbeatnow));
                }
            }
            catch (RuntimeException e) {
                .warn(e"Error while inspecting age of task %s"sqlTask.getTaskId());
            }
        }
    }
    //
    // Jmxutils only calls nested getters once, so we are forced to maintain a single
    // instance and periodically recalculate the stats.
    //
    private void updateStats()
    {
        SqlTaskIoStats tempIoStats = new SqlTaskIoStats();
        tempIoStats.merge();
        for (SqlTask task : .asMap().values()) {
            // there is a race here between task completion, which merges stats into
            // finishedTaskStats, and getting the stats from the task.  Since we have
            // already merged the final stats, we could miss the stats from this task
            // which would result in an under-count, but we will not get an over-count.
            if (!task.getTaskInfo().getState().isDone()) {
                tempIoStats.merge(task.getIoStats());
            }
        }
        .resetTo(tempIoStats);
    }
New to GrepCode? Check out our FAQ X