Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 
 import java.net.URI;
 import java.util.List;
 
 import static com.facebook.presto.OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS;
 import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount;
 import static com.facebook.presto.SystemSessionProperties.isBigQueryEnabled;
 import static com.facebook.presto.spi.StandardErrorCode.INTERNAL_ERROR;
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
 import static com.facebook.presto.spi.StandardErrorCode.USER_CANCELED;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Objects.requireNonNull;
 
 public final class SqlQueryExecution
         implements QueryExecution
 {
             .withBuffer(new TaskId("output""buffer""id"), new UnpartitionedPagePartitionFunction())
             .withNoMoreBufferIds();
 
     private final QueryStateMachine stateMachine;
 
     private final Statement statement;
     private final Metadata metadata;
     private final SqlParser sqlParser;
     private final SplitManager splitManager;
     private final NodeScheduler nodeScheduler;
     private final List<PlanOptimizerplanOptimizers;
     private final RemoteTaskFactory remoteTaskFactory;
     private final LocationFactory locationFactory;
     private final int scheduleSplitBatchSize;
     private final int initialHashPartitions;
     private final boolean experimentalSyntaxEnabled;
     private final ExecutorService queryExecutor;
 
     private final QueryExplainer queryExplainer;
     private final AtomicReference<SqlStageExecutionoutputStage = new AtomicReference<>();
     private final AtomicReference<QueryInfofinalQueryInfo = new AtomicReference<>();
     private final NodeTaskMap nodeTaskMap;
     private final Session session;
 
     public SqlQueryExecution(QueryId queryId,
             String query,
             Session session,
             URI self,
             Statement statement,
             Metadata metadata,
             SqlParser sqlParser,
            SplitManager splitManager,
            NodeScheduler nodeScheduler,
            List<PlanOptimizerplanOptimizers,
            RemoteTaskFactory remoteTaskFactory,
            LocationFactory locationFactory,
            int scheduleSplitBatchSize,
            int initialHashPartitions,
            boolean experimentalSyntaxEnabled,
            ExecutorService queryExecutor,
            NodeTaskMap nodeTaskMap)
    {
        try (SetThreadName ignored = new SetThreadName("Query-%s"queryId)) {
            this. = checkNotNull(statement"statement is null");
            this. = checkNotNull(metadata"metadata is null");
            this. = checkNotNull(sqlParser"sqlParser is null");
            this. = checkNotNull(splitManager"splitManager is null");
            this. = checkNotNull(nodeScheduler"nodeScheduler is null");
            this. = checkNotNull(planOptimizers"planOptimizers is null");
            this. = checkNotNull(remoteTaskFactory"remoteTaskFactory is null");
            this. = checkNotNull(locationFactory"locationFactory is null");
            this. = checkNotNull(queryExecutor"queryExecutor is null");
            this. = experimentalSyntaxEnabled;
            this. = checkNotNull(nodeTaskMap"nodeTaskMap is null");
            this. = checkNotNull(session"session is null");
            checkArgument(scheduleSplitBatchSize > 0, "scheduleSplitBatchSize must be greater than 0");
            this. = scheduleSplitBatchSize;
            checkArgument(initialHashPartitions > 0, "initialHashPartitions must be greater than 0");
            this. = initialHashPartitions;
            checkNotNull(queryId"queryId is null");
            checkNotNull(query"query is null");
            checkNotNull(session"session is null");
            checkNotNull(self"self is null");
            this. = new QueryStateMachine(queryIdquerysessionselfqueryExecutor);
            // when the query finishes cache the final query info, and clear the reference to the output stage
            .addStateChangeListener(state -> {
                if (!state.isDone()) {
                    return;
                }
                // query is now done, so abort any work that is still running
                SqlStageExecution stage = .get();
                if (stage != null) {
                    stage.abort();
                }
                // capture the final query state and drop reference to the output stage
                .compareAndSet(nullgetQueryInfo(stage));
                .set(null);
            });
            this. = new QueryExplainer(sessionplanOptimizersmetadatasqlParserexperimentalSyntaxEnabled);
        }
    }
    @Override
    {
        return .getMemoryPool();
    }
    @Override
    public void setMemoryPool(VersionedMemoryPoolId poolId)
    {
        .setMemoryPool(poolId);
    }
    @Override
    public long getTotalMemoryReservation()
    {
        // acquire reference to outputStage before checking finalQueryInfo, because
        // state change listener sets finalQueryInfo and then clears outputStage when
        // the query finishes.
        SqlStageExecution stage = .get();
        QueryInfo queryInfo = .get();
        if (queryInfo != null) {
            return queryInfo.getQueryStats().getTotalMemoryReservation().toBytes();
        }
        return stage.getTotalMemoryReservation();
    }
    @Override
    public Session getSession()
    {
        return ;
    }
    @Override
    public void start()
    {
        try (SetThreadName ignored = new SetThreadName("Query-%s".getQueryId())) {
            try {
                // transition to planning
                if (!.transitionToPlanning()) {
                    // query already started or finished
                    return;
                }
                // analyze query
                SubPlan subplan = analyzeQuery();
                // plan distribution of query
                planDistribution(subplan);
                // transition to starting
                if (!.transitionToStarting()) {
                    // query already started or finished
                    return;
                }
                // if query is not finished, start the stage, otherwise cancel it
                SqlStageExecution stage = .get();
                if (!.isDone()) {
                    stage.start();
                }
            }
            catch (Throwable e) {
                fail(e);
                Throwables.propagateIfInstanceOf(eError.class);
            }
        }
    }
    @Override
    public void addStateChangeListener(StateChangeListener<QueryStatestateChangeListener)
    {
        try (SetThreadName ignored = new SetThreadName("Query-%s".getQueryId())) {
            .addStateChangeListener(stateChangeListener);
        }
    }
    private SubPlan analyzeQuery()
    {
        try {
            return doAnalyzeQuery();
        }
        catch (StackOverflowError e) {
            throw new PrestoException("statement is too large (stack overflow during analysis)"e);
        }
    }
    private SubPlan doAnalyzeQuery()
    {
        // time analysis phase
        long analysisStart = System.nanoTime();
        // analyze query
        Analysis analysis = analyzer.analyze();
        .setUpdateType(analysis.getUpdateType());
        // plan query
        PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator();
        LogicalPlanner logicalPlanner = new LogicalPlanner(.getSession(), idAllocator);
        Plan plan = logicalPlanner.plan(analysis);
        // extract inputs
        List<Inputinputs = new InputExtractor().extract(plan.getRoot());
        .setInputs(inputs);
        // fragment the plan
        SubPlan subplan = new PlanFragmenter().createSubPlans(plan);
        // record analysis time
        .recordAnalysisTime(analysisStart);
        return subplan;
    }
    private void planDistribution(SubPlan subplan)
    {
        // time distribution planning
        long distributedPlanningStart = System.nanoTime();
        // plan the execution on the active nodes
        DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner();
        StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(subplan);
        if (.isDone()) {
            return;
        }
        // record field names
        .setOutputFieldNames(outputStageExecutionPlan.getFieldNames());
        // build the stage execution objects (this doesn't schedule execution)
        SqlStageExecution outputStage = new SqlStageExecution(.getQueryId(),
                ,
                outputStageExecutionPlan,
                ,
                ,
                .getSession(),
                ,
                ,
                ,
                ,
                );
        outputStage.addStateChangeListener(state -> {
            if (state == .) {
                .transitionToFinished();
            }
            else if (state == .) {
                // output stage was canceled
                .transitionToFailed(new PrestoException("Query was canceled"));
            }
        });
        for (SqlStageExecution stage : getAllStages(outputStage)) {
            stage.addStateChangeListener(state -> {
                if (.isDone()) {
                    return;
                }
                if (.getQueryState() == .) {
                    // if any stage has at least one task, we are running
                    if (!stage.getStageInfo().getTasks().isEmpty()) {
                        .transitionToRunning();
                    }
                }
                else if (state == .) {
                    .transitionToFailed(stage.getStageInfo().getFailureCause().toException());
                }
                else if (state == .) {
                    // this should never happen, since abort can only be triggered in query clean up after the query is finished
                    .transitionToFailed(new PrestoException("Query stage was aborted"));
                }
            });
        }
        // only export output stage reference after listeners are added
        this..set(outputStage);
        // if query was canceled during stage creation, abort the output stage
        // directly since the callback may have already fired
        if (.isDone()) {
            outputStage.abort();
        }
        // record planning time
        .recordDistributedPlanningTime(distributedPlanningStart);
    }
    @Override
    public void cancelStage(StageId stageId)
    {
        checkNotNull(stageId"stageId is null");
        try (SetThreadName ignored = new SetThreadName("Query-%s".getQueryId())) {
            SqlStageExecution stageExecution = .get();
            if (stageExecution != null) {
                stageExecution.cancelStage(stageId);
            }
        }
    }
    @Override
    public void fail(Throwable cause)
    {
        requireNonNull(cause"cause is null");
        try (SetThreadName ignored = new SetThreadName("Query-%s".getQueryId())) {
            .transitionToFailed(cause);
        }
    }
    @Override
    public Duration waitForStateChange(QueryState currentStateDuration maxWait)
            throws InterruptedException
    {
        try (SetThreadName ignored = new SetThreadName("Query-%s".getQueryId())) {
            return .waitForStateChange(currentStatemaxWait);
        }
    }
    @Override
    public void recordHeartbeat()
    {
        .recordHeartbeat();
    }
    @Override
    public void pruneInfo()
    {
        QueryInfo queryInfo = .get();
        if (queryInfo == null || queryInfo.getOutputStage() == null) {
            return;
        }
        StageInfo prunedOutputStage = new StageInfo(
                queryInfo.getOutputStage().getStageId(),
                queryInfo.getOutputStage().getState(),
                queryInfo.getOutputStage().getSelf(),
                null// Remove the plan
                queryInfo.getOutputStage().getTypes(),
                queryInfo.getOutputStage().getStageStats(),
                ImmutableList.of(), // Remove the tasks
                ImmutableList.of(), // Remove the substages
                queryInfo.getOutputStage().getFailureCause()
        );
        QueryInfo prunedQueryInfo = new QueryInfo(
                queryInfo.getQueryId(),
                queryInfo.getSession(),
                queryInfo.getState(),
                getMemoryPool().getId(),
                queryInfo.isScheduled(),
                queryInfo.getSelf(),
                queryInfo.getFieldNames(),
                queryInfo.getQuery(),
                queryInfo.getQueryStats(),
                queryInfo.getSetSessionProperties(),
                queryInfo.getResetSessionProperties(),
                queryInfo.getUpdateType(),
                prunedOutputStage,
                queryInfo.getFailureInfo(),
                queryInfo.getErrorCode(),
                queryInfo.getInputs()
        );
        .compareAndSet(queryInfoprunedQueryInfo);
    }
    @Override
    public QueryId getQueryId()
    {
        return .getQueryId();
    }
    @Override
    public QueryInfo getQueryInfo()
    {
        try (SetThreadName ignored = new SetThreadName("Query-%s".getQueryId())) {
            // acquire reference to outputStage before checking finalQueryInfo, because
            // state change listener sets finalQueryInfo and then clears outputStage when
            // the query finishes.
            SqlStageExecution outputStage = this..get();
            QueryInfo finalQueryInfo = this..get();
            if (finalQueryInfo != null) {
                return finalQueryInfo;
            }
            return getQueryInfo(outputStage);
        }
    }
    @Override
    public QueryState getState()
    {
        return .getQueryState();
    }
    private QueryInfo getQueryInfo(SqlStageExecution outputStage)
    {
        StageInfo stageInfo = null;
        if (outputStage != null) {
            stageInfo = outputStage.getStageInfo();
        }
        return .getQueryInfo(stageInfo);
    }
    private static List<SqlStageExecutiongetAllStages(SqlStageExecution stage)
    {
        ImmutableList.Builder<SqlStageExecutioncollector = ImmutableList.builder();
        if (stage != null) {
            addAllStages(stagecollector);
        }
        return collector.build();
    }
    private static void addAllStages(SqlStageExecution stageImmutableList.Builder<SqlStageExecutioncollector)
    {
        collector.add(stage);
        for (SqlStageExecution subStage : stage.getSubStages()) {
            addAllStages(subStagecollector);
        }
    }
    public static class SqlQueryExecutionFactory
            implements QueryExecutionFactory<SqlQueryExecution>
    {
        private final int scheduleSplitBatchSize;
        private final int initialHashPartitions;
        private final Integer bigQueryInitialHashPartitions;
        private final boolean experimentalSyntaxEnabled;
        private final Metadata metadata;
        private final SqlParser sqlParser;
        private final SplitManager splitManager;
        private final NodeScheduler nodeScheduler;
        private final List<PlanOptimizerplanOptimizers;
        private final RemoteTaskFactory remoteTaskFactory;
        private final LocationFactory locationFactory;
        private final ExecutorService executor;
        private final NodeTaskMap nodeTaskMap;
        private final NodeManager nodeManager;
        @Inject
                FeaturesConfig featuresConfig,
                Metadata metadata,
                SqlParser sqlParser,
                LocationFactory locationFactory,
                SplitManager splitManager,
                NodeScheduler nodeScheduler,
                NodeManager nodeManager,
                List<PlanOptimizerplanOptimizers,
                RemoteTaskFactory remoteTaskFactory,
                @ForQueryExecution ExecutorService executor,
                NodeTaskMap nodeTaskMap)
        {
            checkNotNull(config"config is null");
            this. = config.getScheduleSplitBatchSize();
            this. = config.getInitialHashPartitions();
            this. = config.getBigQueryInitialHashPartitions();
            this. = checkNotNull(metadata"metadata is null");
            this. = checkNotNull(sqlParser"sqlParser is null");
            this. = checkNotNull(locationFactory"locationFactory is null");
            this. = checkNotNull(splitManager"splitManager is null");
            this. = checkNotNull(nodeScheduler"nodeScheduler is null");
            this. = checkNotNull(planOptimizers"planOptimizers is null");
            this. = checkNotNull(remoteTaskFactory"remoteTaskFactory is null");
            checkNotNull(featuresConfig"featuresConfig is null");
            this. = featuresConfig.isExperimentalSyntaxEnabled();
            this. = checkNotNull(executor"executor is null");
            this. = checkNotNull(nodeTaskMap"nodeTaskMap is null");
            this. = checkNotNull(nodeManager"nodeManager is null");
        }
        @Override
        public SqlQueryExecution createQueryExecution(QueryId queryIdString querySession sessionStatement statement)
        {
            int initialHashPartitions = this.;
            if (isBigQueryEnabled(sessionfalse)) {
                initialHashPartitions = ( == null) ? .getActiveNodes().size() : ;
            }
            initialHashPartitions = getHashPartitionCount(sessioninitialHashPartitions);
            SqlQueryExecution queryExecution = new SqlQueryExecution(queryId,
                    query,
                    session,
                    .createQueryLocation(queryId),
                    statement,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    initialHashPartitions,
                    ,
                    ,
                    );
            return queryExecution;
        }
    }
New to GrepCode? Check out our FAQ X