Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
 
 import static com.facebook.presto.SystemSessionProperties.isBigQueryEnabled;
 import static com.facebook.presto.spi.StandardErrorCode.QUERY_QUEUE_FULL;
 import static com.facebook.presto.spi.StandardErrorCode.USER_CANCELED;
 import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static io.airlift.concurrent.Threads.threadsNamed;
 import static java.util.concurrent.Executors.newCachedThreadPool;
 
 public class SqlQueryManager
         implements QueryManager
 {
     private static final Logger log = Logger.get(SqlQueryManager.class);
 
     private final SqlParser sqlParser;
 
     private final ExecutorService queryExecutor;
     private final ThreadPoolExecutorMBean queryExecutorMBean;
     private final QueryStarter queryStarter;
 
     private final int maxQueryHistory;
     private final Duration maxQueryAge;
 
     private final ConcurrentMap<QueryIdQueryExecutionqueries = new ConcurrentHashMap<>();
     private final Queue<QueryExecutionexpirationQueue = new LinkedBlockingQueue<>();
 
     private final Duration clientTimeout;
 
 
     private final QueryMonitor queryMonitor;
     private final LocationFactory locationFactory;
     private final QueryIdGenerator queryIdGenerator;
 
     private final Map<Class<? extends Statement>, QueryExecutionFactory<?>> executionFactories;
 
     private final SqlQueryManagerStats stats = new SqlQueryManagerStats();
    @Inject
    public SqlQueryManager(
            SqlParser sqlParser,
            QueryManagerConfig config,
            QueryMonitor queryMonitor,
            QueryIdGenerator queryIdGenerator,
            LocationFactory locationFactory,
            Map<Class<? extends Statement>, QueryExecutionFactory<?>> executionFactories)
    {
        this. = checkNotNull(sqlParser"sqlParser is null");
        this. = checkNotNull(executionFactories"executionFactories is null");
        this. = newCachedThreadPool(threadsNamed("query-scheduler-%s"));
        checkNotNull(config"config is null");
        this. = new QueryStarter(config);
        this. = checkNotNull(queryMonitor"queryMonitor is null");
        this. = checkNotNull(locationFactory"locationFactory is null");
        this. = checkNotNull(queryIdGenerator"queryIdGenerator is null");
        this. = config.getMaxQueryAge();
        this. = config.getMaxQueryHistory();
        this. = config.getClientTimeout();
         = Executors.newScheduledThreadPool(config.getQueryManagerExecutorPoolSize(), threadsNamed("query-management-%s"));
        {
            @Override
            public void run()
            {
                try {
                    failAbandonedQueries();
                }
                catch (Throwable e) {
                    .warn(e"Error cancelling abandoned queries");
                }
                try {
                    removeExpiredQueries();
                }
                catch (Throwable e) {
                    .warn(e"Error removing expired queries");
                }
            }
        }, 1, 1, .);
    }
    @PreDestroy
    public void stop()
    {
        .shutdownNow();
    }
    @Override
    public List<QueryInfogetAllQueryInfo()
    {
        return .values().stream()
                .map(queryExecution -> {
                    try {
                        return queryExecution.getQueryInfo();
                    }
                    catch (RuntimeException ignored) {
                        return null;
                    }
                })
                .filter(Objects::nonNull)
                .collect(toImmutableList());
    }
    @Override
    public Duration waitForStateChange(QueryId queryIdQueryState currentStateDuration maxWait)
            throws InterruptedException
    {
        Preconditions.checkNotNull(queryId"queryId is null");
        Preconditions.checkNotNull(maxWait"maxWait is null");
        QueryExecution query = .get(queryId);
        if (query == null) {
            return maxWait;
        }
        query.recordHeartbeat();
        return query.waitForStateChange(currentStatemaxWait);
    }
    @Override
    public QueryInfo getQueryInfo(QueryId queryId)
    {
        checkNotNull(queryId"queryId is null");
        QueryExecution query = .get(queryId);
        if (query == null) {
            throw new NoSuchElementException();
        }
        query.recordHeartbeat();
        return query.getQueryInfo();
    }
    @Override
    public QueryInfo createQuery(Session sessionString query)
    {
        checkNotNull(query"query is null");
        Preconditions.checkArgument(!query.isEmpty(), "query must not be empty string");
        QueryId queryId = .createNextQueryId();
        Statement statement;
        try {
            statement = .createStatement(query);
        }
        catch (ParsingException e) {
            return createFailedQuery(sessionqueryqueryIde);
        }
        QueryExecutionFactory<?> queryExecutionFactory = .get(statement.getClass());
        Preconditions.checkState(queryExecutionFactory != null"Unsupported statement type %s"statement.getClass().getName());
        final QueryExecution queryExecution = queryExecutionFactory.createQueryExecution(queryIdquerysessionstatement);
        .createdEvent(queryExecution.getQueryInfo());
        queryExecution.addStateChangeListener(new StateChangeListener<QueryState>()
        {
            @Override
            public void stateChanged(QueryState newValue)
            {
                if (newValue.isDone()) {
                    QueryInfo info = queryExecution.getQueryInfo();
                    .queryFinished(info);
                    .completionEvent(info);
                    .add(queryExecution);
                }
            }
        });
        .put(queryIdqueryExecution);
        // start the query in the background
        if (!.submit(queryExecution)) {
            return createFailedQuery(sessionqueryqueryIdnew PrestoException("Too many queued queries!"));
        }
        return queryExecution.getQueryInfo();
    }
    @Override
    public void cancelQuery(QueryId queryId)
    {
        checkNotNull(queryId"queryId is null");
        .debug("Cancel query %s"queryId);
        QueryExecution query = .get(queryId);
        if (query != null) {
            query.fail(new PrestoException("Query was canceled"));
        }
    }
    @Override
    public void cancelStage(StageId stageId)
    {
        Preconditions.checkNotNull(stageId"stageId is null");
        .debug("Cancel stage %s"stageId);
        QueryExecution query = .get(stageId.getQueryId());
        if (query != null) {
            query.cancelStage(stageId);
        }
    }
    @Managed
    public int getQueryQueueSize()
    {
        return .getQueryQueueSize();
    }
    @Managed
    public int getBigQueryQueueSize()
    {
        return .getBigQueryQueueSize();
    }
    @Managed
    @Flatten
    {
        return ;
    }
    @Managed(description = "Query scheduler executor")
    @Nested
    {
        return ;
    }
    @Managed(description = "Query garbage collector executor")
    @Nested
    {
        return ;
    }

    
Remove completed queries after a waiting period
    public void removeExpiredQueries()
    {
        DateTime timeHorizon = DateTime.now().minus(.toMillis());
        // we're willing to keep queries beyond timeHorizon as long as we have fewer than maxQueryHistory
        while (.size() > ) {
            QueryInfo queryInfo = .peek().getQueryInfo();
            // expirationQueue is FIFO based on query end time. Stop when we see the
            // first query that's too young to expire
            if (queryInfo.getQueryStats().getEndTime().isAfter(timeHorizon)) {
                return;
            }
            // only expire them if they are older than maxQueryAge. We need to keep them
            // around for a while in case clients come back asking for status
            QueryId queryId = queryInfo.getQueryId();
            .debug("Remove query %s"queryId);
            .remove(queryId);
            .remove();
        }
    }
    public void failAbandonedQueries()
    {
        for (QueryExecution queryExecution : .values()) {
            QueryInfo queryInfo = queryExecution.getQueryInfo();
            if (queryInfo.getState().isDone()) {
                continue;
            }
            if (isAbandoned(queryExecution)) {
                .info("Failing abandoned query %s"queryExecution.getQueryInfo().getQueryId());
                queryExecution.fail(new AbandonedException("Query " + queryInfo.getQueryId(), queryInfo.getQueryStats().getLastHeartbeat(), DateTime.now()));
            }
        }
    }
    private boolean isAbandoned(QueryExecution query)
    {
        DateTime oldestAllowedHeartbeat = DateTime.now().minus(.toMillis());
        DateTime lastHeartbeat = query.getQueryInfo().getQueryStats().getLastHeartbeat();
        return lastHeartbeat != null && lastHeartbeat.isBefore(oldestAllowedHeartbeat);
    }
    private QueryInfo createFailedQuery(Session sessionString queryQueryId queryIdThrowable cause)
    {
        URI self = .createQueryLocation(queryId);
        QueryExecution execution = new FailedQueryExecution(queryIdquerysessionselfcause);
        .put(queryIdexecution);
        .queryStarted();
        .createdEvent(execution.getQueryInfo());
        .completionEvent(execution.getQueryInfo());
        .queryFinished(execution.getQueryInfo());
        .add(execution);
        return execution.getQueryInfo();
    }

    
Set up a callback to fire when a query is completed. The callback will be called at most once.
    private static void addCompletionCallback(QueryExecution queryExecutionRunnable callback)
    {
        AtomicBoolean taskExecuted = new AtomicBoolean();
        queryExecution.addStateChangeListener(newValue -> {
            if (newValue.isDone() && taskExecuted.compareAndSet(falsetrue)) {
                callback.run();
            }
        });
        // Need to do this check in case the state changed before we added the previous state change listener
        if (queryExecution.getQueryInfo().getState().isDone() && taskExecuted.compareAndSet(falsetrue)) {
            callback.run();
        }
    }
    @ThreadSafe
    private static class QueryStarter
    {
        private final QueryQueue queryQueue;
        private final QueryQueue bigQueryQueue;
        public QueryStarter(Executor queryExecutorSqlQueryManagerStats statsQueryManagerConfig config)
        {
            checkNotNull(queryExecutor"queryExecutor is null");
            checkNotNull(stats"stats is null");
            checkNotNull(config"config is null");
            this. = new QueryQueue(queryExecutorstatsconfig.getMaxQueuedQueries(), config.getMaxConcurrentQueries());
            this. = new QueryQueue(queryExecutorstatsconfig.getMaxQueuedBigQueries(), config.getMaxConcurrentBigQueries());
        }
        public boolean submit(QueryExecution queryExecution)
        {
            if (isBigQueryEnabled(queryExecution.getQueryInfo().getSession(), false)) {
                return .enqueue(queryExecution);
            }
            else {
                return .enqueue(queryExecution);
            }
        }
        public int getQueryQueueSize()
        {
            return .getQueueSize();
        }
        public int getBigQueryQueueSize()
        {
            return .getQueueSize();
        }
        private static class QueryQueue
        {
            private final int maxQueuedQueries;
            private final AtomicInteger queryQueueSize = new AtomicInteger();
            private final AsyncSemaphore<QueueEntryasyncSemaphore;
            private QueryQueue(Executor queryExecutorSqlQueryManagerStats statsint maxQueuedQueriesint maxConcurrentQueries)
            {
                checkNotNull(queryExecutor"queryExecutor is null");
                checkNotNull(stats"stats is null");
                checkArgument(maxQueuedQueries > 0, "maxQueuedQueries must be greater than zero");
                checkArgument(maxConcurrentQueries > 0, "maxConcurrentQueries must be greater than zero");
                this. = maxQueuedQueries;
                this. = new AsyncSemaphore<>(maxConcurrentQueries,
                        queryExecutor,
                        queueEntry -> {
                            QueryExecution queryExecution = queueEntry.dequeue();
                            if (queryExecution == null) {
                                // Entry was dequeued earlier and so this query is already done
                                return Futures.immediateFuture(null);
                            }
                            else {
                                SettableFuture<?> settableFuture = SettableFuture.create();
                                addCompletionCallback(queryExecution, () -> settableFuture.set(null));
                                if (!settableFuture.isDone()) { // Only execute if the query is not already completed (e.g. cancelled)
                                    queryExecutor.execute(() -> {
                                        try (SetThreadName setThreadName = new SetThreadName("Query-%s"queryExecution.getQueryInfo().getQueryId())) {
                                            stats.queryStarted();
                                            queryExecution.start();
                                        }
                                    });
                                }
                                return settableFuture;
                            }
                        });
            }
            public int getQueueSize()
            {
                return .get();
            }
            public boolean enqueue(QueryExecution queryExecution)
            {
                if (.incrementAndGet() > ) {
                    .decrementAndGet();
                    return false;
                }
                QueueEntry queueEntry = new QueueEntry(queryExecutionaVoid -> .decrementAndGet());
                // Add a callback to dequeue the entry if it is ever completed.
                // This enables us to remove the entry sooner if is cancelled before starting,
                // and has no effect if called after starting.
                addCompletionCallback(queryExecutionqueueEntry::dequeue);
                .submit(queueEntry);
                return true;
            }
            private static class QueueEntry
            {
                private final AtomicBoolean dequeued = new AtomicBoolean();
                private final AtomicReference<QueryExecutionqueryExecution;
                private final Consumer<VoidonDequeue;
                private QueueEntry(QueryExecution queryExecutionConsumer<VoidonDequeue)
                {
                    checkNotNull(queryExecution"queryExecution is null");
                    checkNotNull(onDequeue"onDequeue is null");
                    this. = new AtomicReference<>(queryExecution);
                    this. = onDequeue;
                }

                
Can be called multiple times on the same QueueEntry, but the onDequeue Consumer will only be called once and only one caller will get the QueryExecution.
                public QueryExecution dequeue()
                {
                    if (.compareAndSet(falsetrue)) {
                        .accept(null);
                    }
                    return .getAndSet(null);
                }
            }
        }
    }
New to GrepCode? Check out our FAQ X