Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 
 import java.net.URI;
 import java.util.List;
 
 import static com.facebook.presto.OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS;
 import static com.facebook.presto.SystemSessionProperties.isBigQueryEnabled;
 import static com.facebook.presto.spi.StandardErrorCode.USER_CANCELED;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 public class SqlQueryExecution
         implements QueryExecution
 {
             .withBuffer(new TaskId("output""buffer""id"), new UnpartitionedPagePartitionFunction())
             .withNoMoreBufferIds();
 
     private final QueryStateMachine stateMachine;
 
     private final Statement statement;
     private final Metadata metadata;
     private final SqlParser sqlParser;
     private final SplitManager splitManager;
     private final NodeScheduler nodeScheduler;
     private final List<PlanOptimizerplanOptimizers;
     private final RemoteTaskFactory remoteTaskFactory;
     private final LocationFactory locationFactory;
     private final int scheduleSplitBatchSize;
     private final int initialHashPartitions;
     private final boolean experimentalSyntaxEnabled;
     private final ExecutorService queryExecutor;
 
     private final QueryExplainer queryExplainer;
     private final AtomicReference<SqlStageExecutionoutputStage = new AtomicReference<>();
     private final NodeTaskMap nodeTaskMap;
 
     public SqlQueryExecution(QueryId queryId,
             String query,
             Session session,
             URI self,
             Statement statement,
             Metadata metadata,
             SqlParser sqlParser,
             SplitManager splitManager,
             NodeScheduler nodeScheduler,
             List<PlanOptimizerplanOptimizers,
             RemoteTaskFactory remoteTaskFactory,
             LocationFactory locationFactory,
             int scheduleSplitBatchSize,
             int maxPendingSplitsPerNode,
             int initialHashPartitions,
            boolean experimentalSyntaxEnabled,
            ExecutorService queryExecutor,
            NodeTaskMap nodeTaskMap)
    {
        try (SetThreadName ignored = new SetThreadName("Query-%s"queryId)) {
            this. = checkNotNull(statement"statement is null");
            this. = checkNotNull(metadata"metadata is null");
            this. = checkNotNull(sqlParser"sqlParser is null");
            this. = checkNotNull(splitManager"splitManager is null");
            this. = checkNotNull(nodeScheduler"nodeScheduler is null");
            this. = checkNotNull(planOptimizers"planOptimizers is null");
            this. = checkNotNull(remoteTaskFactory"remoteTaskFactory is null");
            this. = checkNotNull(locationFactory"locationFactory is null");
            this. = checkNotNull(queryExecutor"queryExecutor is null");
            this. = experimentalSyntaxEnabled;
            this. = checkNotNull(nodeTaskMap"nodeTaskMap is null");
            checkArgument(maxPendingSplitsPerNode > 0, "scheduleSplitBatchSize must be greater than 0");
            this. = scheduleSplitBatchSize;
            checkArgument(initialHashPartitions > 0, "initialHashPartitions must be greater than 0");
            this. = initialHashPartitions;
            checkNotNull(queryId"queryId is null");
            checkNotNull(query"query is null");
            checkNotNull(session"session is null");
            checkNotNull(self"self is null");
            this. = new QueryStateMachine(queryIdquerysessionselfqueryExecutor);
            this. = new QueryExplainer(sessionplanOptimizersmetadatasqlParserexperimentalSyntaxEnabled);
        }
    }
    @Override
    public void start()
    {
        try (SetThreadName ignored = new SetThreadName("Query-%s".getQueryId())) {
            try {
                // transition to planning
                if (!.beginPlanning()) {
                    // query already started or finished
                    return;
                }
                // analyze query
                SubPlan subplan = analyzeQuery();
                // plan distribution of query
                planDistribution(subplan);
                // transition to starting
                if (!.starting()) {
                    // query already started or finished
                    return;
                }
                // if query is not finished, start the stage, otherwise cancel it
                SqlStageExecution stage = .get();
                if (!.isDone()) {
                    stage.start();
                }
                else {
                    stage.abort();
                }
            }
            catch (Throwable e) {
                fail(e);
                Throwables.propagateIfInstanceOf(eError.class);
            }
        }
    }
    @Override
    public void addStateChangeListener(StateChangeListener<QueryStatestateChangeListener)
    {
        try (SetThreadName ignored = new SetThreadName("Query-%s".getQueryId())) {
            .addStateChangeListener(stateChangeListener);
        }
    }
    private SubPlan analyzeQuery()
    {
        try {
            return doAnalyzeQuery();
        }
        catch (StackOverflowError e) {
            throw new RuntimeException("statement is too large (stack overflow during analysis)"e);
        }
    }
    private SubPlan doAnalyzeQuery()
    {
        // time analysis phase
        long analysisStart = System.nanoTime();
        // analyze query
        Analysis analysis = analyzer.analyze();
        .setUpdateType(analysis.getUpdateType());
        // plan query
        PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator();
        LogicalPlanner logicalPlanner = new LogicalPlanner(.getSession(), idAllocator);
        Plan plan = logicalPlanner.plan(analysis);
        // extract inputs
        List<Inputinputs = new InputExtractor().extract(plan.getRoot());
        .setInputs(inputs);
        // fragment the plan
        SubPlan subplan = new PlanFragmenter().createSubPlans(plan);
        // record analysis time
        .recordAnalysisTime(analysisStart);
        return subplan;
    }
    private void planDistribution(SubPlan subplan)
    {
        // time distribution planning
        long distributedPlanningStart = System.nanoTime();
        // plan the execution on the active nodes
        DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner();
        StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(subplan);
        if (.isDone()) {
            return;
        }
        // record field names
        .setOutputFieldNames(outputStageExecutionPlan.getFieldNames());
        // build the stage execution objects (this doesn't schedule execution)
        SqlStageExecution outputStage = new SqlStageExecution(.getQueryId(),
                ,
                outputStageExecutionPlan,
                ,
                ,
                .getSession(),
                ,
                ,
                ,
                ,
                );
        this..set(outputStage);
        outputStage.addStateChangeListener(this::doUpdateState);
        // record planning time
        .recordDistributedPlanningTime(distributedPlanningStart);
        // update state in case output finished before listener was added
        doUpdateState(outputStage.getStageInfo());
    }
    @Override
    public void cancelStage(StageId stageId)
    {
        checkNotNull(stageId"stageId is null");
        try (SetThreadName ignored = new SetThreadName("Query-%s".getQueryId())) {
            SqlStageExecution stageExecution = .get();
            if (stageExecution != null) {
                stageExecution.cancelStage(stageId);
            }
        }
    }
    @Override
    public void fail(Throwable cause)
    {
        try (SetThreadName ignored = new SetThreadName("Query-%s".getQueryId())) {
            // transition to failed state, only if not already finished
            .fail(cause);
            SqlStageExecution stageExecution = .get();
            if (stageExecution != null) {
                stageExecution.abort();
            }
        }
    }
    @Override
    public Duration waitForStateChange(QueryState currentStateDuration maxWait)
            throws InterruptedException
    {
        try (SetThreadName ignored = new SetThreadName("Query-%s".getQueryId())) {
            return .waitForStateChange(currentStatemaxWait);
        }
    }
    @Override
    public void recordHeartbeat()
    {
        .recordHeartbeat();
    }
    @Override
    public QueryInfo getQueryInfo()
    {
        try (SetThreadName ignored = new SetThreadName("Query-%s".getQueryId())) {
            SqlStageExecution outputStage = this..get();
            StageInfo stageInfo = null;
            if (outputStage != null) {
                stageInfo = outputStage.getStageInfo();
            }
            return .getQueryInfo(stageInfo);
        }
    }
    private void doUpdateState(StageInfo outputStageInfo)
    {
        // if already complete, just return
        if (.isDone()) {
            return;
        }
        // if output stage is done, transition to done
        StageState outputStageState = outputStageInfo.getState();
        if (outputStageState.isDone()) {
            if (outputStageState.isFailure()) {
                .fail(failureCause(outputStageInfo));
            }
            else if (outputStageState == .) {
                .fail(new PrestoException("Query was canceled"));
            }
            else {
                .finished();
            }
        }
        else if (.getQueryState() == .) {
            // if output stage has at least one task, we are running
            if (!outputStageInfo.getTasks().isEmpty()) {
                .running();
                .recordExecutionStart();
            }
        }
    }
    private static Throwable failureCause(StageInfo stageInfo)
    {
        if (!stageInfo.getFailures().isEmpty()) {
            return stageInfo.getFailures().get(0).toException();
        }
        for (TaskInfo taskInfo : stageInfo.getTasks()) {
            if (!taskInfo.getFailures().isEmpty()) {
                return taskInfo.getFailures().get(0).toException();
            }
        }
        for (StageInfo subStageInfo : stageInfo.getSubStages()) {
            Throwable cause = failureCause(subStageInfo);
            if (cause != null) {
                return cause;
            }
        }
        return null;
    }
    public static class SqlQueryExecutionFactory
            implements QueryExecutionFactory<SqlQueryExecution>
    {
        private final int scheduleSplitBatchSize;
        private final int maxPendingSplitsPerNode;
        private final int initialHashPartitions;
        private final Integer bigQueryInitialHashPartitions;
        private final boolean experimentalSyntaxEnabled;
        private final Metadata metadata;
        private final SqlParser sqlParser;
        private final SplitManager splitManager;
        private final NodeScheduler nodeScheduler;
        private final List<PlanOptimizerplanOptimizers;
        private final RemoteTaskFactory remoteTaskFactory;
        private final LocationFactory locationFactory;
        private final ExecutorService executor;
        private final NodeTaskMap nodeTaskMap;
        private final NodeManager nodeManager;
        @Inject
                FeaturesConfig featuresConfig,
                Metadata metadata,
                SqlParser sqlParser,
                LocationFactory locationFactory,
                SplitManager splitManager,
                NodeScheduler nodeScheduler,
                NodeManager nodeManager,
                List<PlanOptimizerplanOptimizers,
                RemoteTaskFactory remoteTaskFactory,
                @ForQueryExecution ExecutorService executor,
                NodeTaskMap nodeTaskMap)
        {
            checkNotNull(config"config is null");
            this. = config.getScheduleSplitBatchSize();
            this. = config.getMaxPendingSplitsPerNode();
            this. = config.getInitialHashPartitions();
            this. = config.getBigQueryInitialHashPartitions();
            this. = checkNotNull(metadata"metadata is null");
            this. = checkNotNull(sqlParser"sqlParser is null");
            this. = checkNotNull(locationFactory"locationFactory is null");
            this. = checkNotNull(splitManager"splitManager is null");
            this. = checkNotNull(nodeScheduler"nodeScheduler is null");
            this. = checkNotNull(planOptimizers"planOptimizers is null");
            this. = checkNotNull(remoteTaskFactory"remoteTaskFactory is null");
            checkNotNull(featuresConfig"featuresConfig is null");
            this. = featuresConfig.isExperimentalSyntaxEnabled();
            this. = checkNotNull(executor"executor is null");
            this. = checkNotNull(nodeTaskMap"nodeTaskMap is null");
            this. = checkNotNull(nodeManager"nodeManager is null");
        }
        @Override
        public SqlQueryExecution createQueryExecution(QueryId queryIdString querySession sessionStatement statement)
        {
            int initialHashPartitions;
            if (isBigQueryEnabled(sessionfalse)) {
                if (this. == null) {
                    initialHashPartitions = .getActiveNodes().size();
                }
                else {
                    initialHashPartitions = this.;
                }
            }
            else {
                initialHashPartitions = this.;
            }
            SqlQueryExecution queryExecution = new SqlQueryExecution(queryId,
                    query,
                    session,
                    .createQueryLocation(queryId),
                    statement,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    initialHashPartitions,
                    ,
                    ,
                    );
            return queryExecution;
        }
    }
New to GrepCode? Check out our FAQ X