Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
 
 import static com.facebook.presto.execution.QueryState.RUNNING;
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
 import static com.facebook.presto.spi.StandardErrorCode.QUERY_QUEUE_FULL;
 import static com.facebook.presto.spi.StandardErrorCode.SERVER_SHUTTING_DOWN;
 import static com.facebook.presto.spi.StandardErrorCode.USER_CANCELED;
 import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static io.airlift.concurrent.Threads.threadsNamed;
 import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.Executors.newCachedThreadPool;
 
 public class SqlQueryManager
         implements QueryManager
 {
     private static final Logger log = Logger.get(SqlQueryManager.class);
 
     private final SqlParser sqlParser;
 
     private final ExecutorService queryExecutor;
     private final ThreadPoolExecutorMBean queryExecutorMBean;
     private final QueryQueueManager queueManager;
     private final ClusterMemoryManager memoryManager;
 
     private final int maxQueryHistory;
     private final Duration maxQueryAge;
 
     private final ConcurrentMap<QueryIdQueryExecutionqueries = new ConcurrentHashMap<>();
     private final Queue<QueryExecutionexpirationQueue = new LinkedBlockingQueue<>();
 
     private final Duration clientTimeout;
 
 
     private final QueryMonitor queryMonitor;
     private final LocationFactory locationFactory;
     private final QueryIdGenerator queryIdGenerator;
 
     private final Map<Class<? extends Statement>, QueryExecutionFactory<?>> executionFactories;
 
     private final SqlQueryManagerStats stats = new SqlQueryManagerStats();
 
     @Inject
     public SqlQueryManager(
             SqlParser sqlParser,
             QueryManagerConfig config,
            QueryMonitor queryMonitor,
            QueryQueueManager queueManager,
            ClusterMemoryManager memoryManager,
            QueryIdGenerator queryIdGenerator,
            LocationFactory locationFactory,
            Map<Class<? extends Statement>, QueryExecutionFactory<?>> executionFactories)
    {
        this. = checkNotNull(sqlParser"sqlParser is null");
        this. = checkNotNull(executionFactories"executionFactories is null");
        this. = newCachedThreadPool(threadsNamed("query-scheduler-%s"));
        checkNotNull(config"config is null");
        this. = checkNotNull(queueManager"queueManager is null");
        this. = requireNonNull(memoryManager"memoryManager is null");
        this. = checkNotNull(queryMonitor"queryMonitor is null");
        this. = checkNotNull(locationFactory"locationFactory is null");
        this. = checkNotNull(queryIdGenerator"queryIdGenerator is null");
        this. = config.getMaxQueryAge();
        this. = config.getMaxQueryHistory();
        this. = config.getClientTimeout();
         = Executors.newScheduledThreadPool(config.getQueryManagerExecutorPoolSize(), threadsNamed("query-management-%s"));
        {
            @Override
            public void run()
            {
                try {
                    failAbandonedQueries();
                }
                catch (Throwable e) {
                    .warn(e"Error cancelling abandoned queries");
                }
                try {
                    enforceMemoryLimits();
                }
                catch (Throwable e) {
                    .warn(e"Error enforcing memory limits");
                }
                try {
                    removeExpiredQueries();
                }
                catch (Throwable e) {
                    .warn(e"Error removing expired queries");
                }
                try {
                    pruneExpiredQueries();
                }
                catch (Throwable e) {
                    .warn(e"Error pruning expired queries");
                }
            }
        }, 1, 1, .);
    }
    @PreDestroy
    public void stop()
    {
        boolean queryCancelled = false;
        for (QueryExecution queryExecution : .values()) {
            QueryInfo queryInfo = queryExecution.getQueryInfo();
            if (queryInfo.getState().isDone()) {
                continue;
            }
            .info("Server shutting down. Query %s has been cancelled"queryExecution.getQueryInfo().getQueryId());
            queryExecution.fail(new PrestoException("Server is shutting down. Query " + queryInfo.getQueryId() + " has been cancelled"));
            queryCancelled = true;
        }
        if (queryCancelled) {
            try {
                ..sleep(5);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        .shutdownNow();
    }
    @Override
    public List<QueryIdgetAllQueryIds()
    {
        return .values().stream()
                .map(queryExecution -> {
                    try {
                        return queryExecution.getQueryId();
                    }
                    catch (RuntimeException ignored) {
                        return null;
                    }
                })
                .filter(Objects::nonNull)
                .collect(toImmutableList());
    }
    @Override
    public List<QueryInfogetAllQueryInfo()
    {
        return .values().stream()
                .map(queryExecution -> {
                    try {
                        return queryExecution.getQueryInfo();
                    }
                    catch (RuntimeException ignored) {
                        return null;
                    }
                })
                .filter(Objects::nonNull)
                .collect(toImmutableList());
    }
    @Override
    public Duration waitForStateChange(QueryId queryIdQueryState currentStateDuration maxWait)
            throws InterruptedException
    {
        checkNotNull(queryId"queryId is null");
        checkNotNull(maxWait"maxWait is null");
        QueryExecution query = .get(queryId);
        if (query == null) {
            return maxWait;
        }
        return query.waitForStateChange(currentStatemaxWait);
    }
    @Override
    public QueryInfo getQueryInfo(QueryId queryId)
    {
        checkNotNull(queryId"queryId is null");
        QueryExecution query = .get(queryId);
        if (query == null) {
            throw new NoSuchElementException();
        }
        return query.getQueryInfo();
    }
    @Override
    public void recordHeartbeat(QueryId queryId)
    {
        checkNotNull(queryId"queryId is null");
        QueryExecution query = .get(queryId);
        if (query == null) {
            return;
        }
        query.recordHeartbeat();
    }
    @Override
    public QueryInfo createQuery(Session sessionString query)
    {
        checkNotNull(query"query is null");
        checkArgument(!query.isEmpty(), "query must not be empty string");
        QueryId queryId = .createNextQueryId();
        Statement statement;
        QueryExecutionFactory<?> queryExecutionFactory;
        try {
            statement = .createStatement(query);
            queryExecutionFactory = .get(statement.getClass());
            if (queryExecutionFactory == null) {
                throw new PrestoException("Unsupported statement type: " + statement.getClass().getSimpleName());
            }
        }
        catch (ParsingException | PrestoException e) {
            // This is intentionally not a method, since after the state change listener is registered
            // it's not safe to do any of this, and we had bugs before where people reused this code in a method
            URI self = .createQueryLocation(queryId);
            QueryExecution execution = new FailedQueryExecution(queryIdquerysessionselfe);
            .put(queryIdexecution);
            .createdEvent(execution.getQueryInfo());
            .completionEvent(execution.getQueryInfo());
            .queryFinished(execution.getQueryInfo());
            .add(execution);
            return execution.getQueryInfo();
        }
        QueryExecution queryExecution = queryExecutionFactory.createQueryExecution(queryIdquerysessionstatement);
        .createdEvent(queryExecution.getQueryInfo());
        queryExecution.addStateChangeListener(newValue -> {
            if (newValue.isDone()) {
                QueryInfo info = queryExecution.getQueryInfo();
                .queryFinished(info);
                .completionEvent(info);
                .add(queryExecution);
            }
        });
        .put(queryIdqueryExecution);
        // start the query in the background
        if (!.submit(queryExecution)) {
            queryExecution.fail(new PrestoException("Too many queued queries!"));
        }
        return queryExecution.getQueryInfo();
    }
    @Override
    public void cancelQuery(QueryId queryId)
    {
        checkNotNull(queryId"queryId is null");
        .debug("Cancel query %s"queryId);
        QueryExecution query = .get(queryId);
        if (query != null) {
            query.fail(new PrestoException("Query was canceled"));
        }
    }
    @Override
    public void cancelStage(StageId stageId)
    {
        checkNotNull(stageId"stageId is null");
        .debug("Cancel stage %s"stageId);
        QueryExecution query = .get(stageId.getQueryId());
        if (query != null) {
            query.cancelStage(stageId);
        }
    }
    @Managed
    @Flatten
    {
        return ;
    }
    @Managed(description = "Query scheduler executor")
    @Nested
    {
        return ;
    }
    @Managed(description = "Query garbage collector executor")
    @Nested
    {
        return ;
    }

    
Enforce memory limits at the query level
    public void enforceMemoryLimits()
    {
                .filter(query -> query.getQueryInfo().getState() == )
                .collect(toImmutableList()));
    }

    
Prune extraneous info from old queries
    private void pruneExpiredQueries()
    {
        if (.size() <= ) {
            return;
        }
        int count = 0;
        // we're willing to keep full info for up to maxQueryHistory queries
        for (QueryExecution query : ) {
            if (.size() - count <= ) {
                break;
            }
            query.pruneInfo();
            count++;
        }
    }

    
Remove completed queries after a waiting period
    private void removeExpiredQueries()
    {
        DateTime timeHorizon = DateTime.now().minus(.toMillis());
        // we're willing to keep queries beyond timeHorizon as long as we have fewer than maxQueryHistory
        while (.size() > ) {
            QueryInfo queryInfo = .peek().getQueryInfo();
            // expirationQueue is FIFO based on query end time. Stop when we see the
            // first query that's too young to expire
            if (queryInfo.getQueryStats().getEndTime().isAfter(timeHorizon)) {
                return;
            }
            // only expire them if they are older than maxQueryAge. We need to keep them
            // around for a while in case clients come back asking for status
            QueryId queryId = queryInfo.getQueryId();
            .debug("Remove query %s"queryId);
            .remove(queryId);
            .remove();
        }
    }
    public void failAbandonedQueries()
    {
        for (QueryExecution queryExecution : .values()) {
            QueryInfo queryInfo = queryExecution.getQueryInfo();
            if (queryInfo.getState().isDone()) {
                continue;
            }
            if (isAbandoned(queryExecution)) {
                .info("Failing abandoned query %s"queryExecution.getQueryInfo().getQueryId());
                queryExecution.fail(new AbandonedException("Query " + queryInfo.getQueryId(), queryInfo.getQueryStats().getLastHeartbeat(), DateTime.now()));
            }
        }
    }
    private boolean isAbandoned(QueryExecution query)
    {
        DateTime oldestAllowedHeartbeat = DateTime.now().minus(.toMillis());
        DateTime lastHeartbeat = query.getQueryInfo().getQueryStats().getLastHeartbeat();
        return lastHeartbeat != null && lastHeartbeat.isBefore(oldestAllowedHeartbeat);
    }

    
Set up a callback to fire when a query is completed. The callback will be called at most once.
    static void addCompletionCallback(QueryExecution queryExecutionRunnable callback)
    {
        AtomicBoolean taskExecuted = new AtomicBoolean();
        queryExecution.addStateChangeListener(newValue -> {
            if (newValue.isDone() && taskExecuted.compareAndSet(falsetrue)) {
                callback.run();
            }
        });
        // Need to do this check in case the state changed before we added the previous state change listener
        if (queryExecution.getQueryInfo().getState().isDone() && taskExecuted.compareAndSet(falsetrue)) {
            callback.run();
        }
    }
New to GrepCode? Check out our FAQ X