Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
 
 import static com.facebook.presto.SystemSessionProperties.isBigQueryEnabled;
 import static com.facebook.presto.spi.StandardErrorCode.QUERY_QUEUE_FULL;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Iterables.filter;
 import static com.google.common.collect.Iterables.transform;
 import static io.airlift.concurrent.Threads.threadsNamed;
 import static java.util.concurrent.Executors.newCachedThreadPool;
 
 public class SqlQueryManager
         implements QueryManager
 {
     private static final Logger log = Logger.get(SqlQueryManager.class);
 
     private final SqlParser sqlParser;
 
     private final ExecutorService queryExecutor;
     private final ThreadPoolExecutorMBean queryExecutorMBean;
     private final QueryStarter queryStarter;
 
     private final int maxQueryHistory;
     private final Duration maxQueryAge;
 
     private final ConcurrentMap<QueryIdQueryExecutionqueries = new ConcurrentHashMap<>();
     private final Queue<QueryExecutionexpirationQueue = new LinkedBlockingQueue<>();
 
     private final Duration clientTimeout;
 
 
     private final QueryMonitor queryMonitor;
     private final LocationFactory locationFactory;
     private final QueryIdGenerator queryIdGenerator;
 
     private final Map<Class<? extends Statement>, QueryExecutionFactory<?>> executionFactories;
 
     private final SqlQueryManagerStats stats = new SqlQueryManagerStats();
 
    @Inject
    public SqlQueryManager(
            SqlParser sqlParser,
            QueryManagerConfig config,
            QueryMonitor queryMonitor,
            QueryIdGenerator queryIdGenerator,
            LocationFactory locationFactory,
            Map<Class<? extends Statement>, QueryExecutionFactory<?>> executionFactories)
    {
        this. = checkNotNull(sqlParser"sqlParser is null");
        this. = checkNotNull(executionFactories"executionFactories is null");
        this. = newCachedThreadPool(threadsNamed("query-scheduler-%d"));
        checkNotNull(config"config is null");
        this. = new QueryStarter(config);
        this. = checkNotNull(queryMonitor"queryMonitor is null");
        this. = checkNotNull(locationFactory"locationFactory is null");
        this. = checkNotNull(queryIdGenerator"queryIdGenerator is null");
        this. = config.getMaxQueryAge();
        this. = config.getMaxQueryHistory();
        this. = config.getClientTimeout();
         = Executors.newScheduledThreadPool(config.getQueryManagerExecutorPoolSize(), threadsNamed("query-management-%d"));
        {
            @Override
            public void run()
            {
                try {
                    failAbandonedQueries();
                }
                catch (Throwable e) {
                    .warn(e"Error cancelling abandoned queries");
                }
                try {
                    removeExpiredQueries();
                }
                catch (Throwable e) {
                    .warn(e"Error removing expired queries");
                }
            }
        }, 1, 1, .);
    }
    @PreDestroy
    public void stop()
    {
        .shutdownNow();
    }
    @Override
    public List<QueryInfogetAllQueryInfo()
    {
        return ImmutableList.copyOf(filter(transform(.values(), new Function<QueryExecutionQueryInfo>()
        {
            @Override
            public QueryInfo apply(QueryExecution queryExecution)
            {
                try {
                    return queryExecution.getQueryInfo();
                }
                catch (RuntimeException ignored) {
                    return null;
                }
            }
        }), Predicates.notNull()));
    }
    @Override
    public Duration waitForStateChange(QueryId queryIdQueryState currentStateDuration maxWait)
            throws InterruptedException
    {
        Preconditions.checkNotNull(queryId"queryId is null");
        Preconditions.checkNotNull(maxWait"maxWait is null");
        QueryExecution query = .get(queryId);
        if (query == null) {
            return maxWait;
        }
        query.recordHeartbeat();
        return query.waitForStateChange(currentStatemaxWait);
    }
    @Override
    public QueryInfo getQueryInfo(QueryId queryId)
    {
        checkNotNull(queryId"queryId is null");
        QueryExecution query = .get(queryId);
        if (query == null) {
            throw new NoSuchElementException();
        }
        query.recordHeartbeat();
        return query.getQueryInfo();
    }
    @Override
    public QueryInfo createQuery(Session sessionString query)
    {
        checkNotNull(query"query is null");
        Preconditions.checkArgument(!query.isEmpty(), "query must not be empty string");
        QueryId queryId = .createNextQueryId();
        Statement statement;
        try {
            statement = .createStatement(query);
        }
        catch (ParsingException e) {
            return createFailedQuery(sessionqueryqueryIde);
        }
        QueryExecutionFactory<?> queryExecutionFactory = .get(statement.getClass());
        Preconditions.checkState(queryExecutionFactory != null"Unsupported statement type %s"statement.getClass().getName());
        final QueryExecution queryExecution = queryExecutionFactory.createQueryExecution(queryIdquerysessionstatement);
        .createdEvent(queryExecution.getQueryInfo());
        queryExecution.addStateChangeListener(new StateChangeListener<QueryState>()
        {
            @Override
            public void stateChanged(QueryState newValue)
            {
                if (newValue.isDone()) {
                    QueryInfo info = queryExecution.getQueryInfo();
                    .queryFinished(info);
                    .completionEvent(info);
                    .add(queryExecution);
                }
            }
        });
        .put(queryIdqueryExecution);
        // start the query in the background
        if (!.submit(queryExecution)) {
            return createFailedQuery(sessionqueryqueryIdnew PrestoException("Too many queued queries!"));
        }
        return queryExecution.getQueryInfo();
    }
    @Override
    public void cancelQuery(QueryId queryId)
    {
        checkNotNull(queryId"queryId is null");
        .debug("Cancel query %s"queryId);
        QueryExecution query = .get(queryId);
        if (query != null) {
            query.cancel();
        }
    }
    @Override
    public void cancelStage(StageId stageId)
    {
        Preconditions.checkNotNull(stageId"stageId is null");
        .debug("Cancel stage %s"stageId);
        QueryExecution query = .get(stageId.getQueryId());
        if (query != null) {
            query.cancelStage(stageId);
        }
    }
    @Managed
    public int getQueryQueueSize()
    {
        return .getQueryQueueSize();
    }
    @Managed
    public int getBigQueryQueueSize()
    {
        return .getBigQueryQueueSize();
    }
    @Managed
    @Flatten
    {
        return ;
    }
    @Managed(description = "Query scheduler executor")
    @Nested
    {
        return ;
    }
    @Managed(description = "Query garbage collector executor")
    @Nested
    {
        return ;
    }
    public void removeQuery(QueryId queryId)
    {
        Preconditions.checkNotNull(queryId"queryId is null");
        .debug("Remove query %s"queryId);
        QueryExecution query = .remove(queryId);
        if (query != null) {
            query.cancel();
        }
    }

    
Remove completed queries after a waiting period
    public void removeExpiredQueries()
    {
        while (.size() > ) {
            QueryExecution query = .remove();
            removeQuery(query.getQueryInfo().getQueryId());
        }
        DateTime oldestAllowedQuery = DateTime.now().minus(.toMillis());
        while (.size() > 0) {
            QueryExecution query = .peek();
            if (query.getQueryInfo().getQueryStats().getEndTime().isAfter(oldestAllowedQuery)) {
                return;
            }
            removeQuery(query.getQueryInfo().getQueryId());
            .remove();
        }
    }
    public void failAbandonedQueries()
    {
        for (QueryExecution queryExecution : .values()) {
            QueryInfo queryInfo = queryExecution.getQueryInfo();
            if (queryInfo.getState().isDone()) {
                continue;
            }
            if (isAbandoned(queryExecution)) {
                .info("Failing abandoned query %s"queryExecution.getQueryInfo().getQueryId());
                queryExecution.fail(new AbandonedException("Query " + queryInfo.getQueryId(), queryInfo.getQueryStats().getLastHeartbeat(), DateTime.now()));
            }
        }
    }
    private boolean isAbandoned(QueryExecution query)
    {
        DateTime oldestAllowedHeartbeat = DateTime.now().minus(.toMillis());
        DateTime lastHeartbeat = query.getQueryInfo().getQueryStats().getLastHeartbeat();
        return lastHeartbeat != null && lastHeartbeat.isBefore(oldestAllowedHeartbeat);
    }
    private QueryInfo createFailedQuery(Session sessionString queryQueryId queryIdThrowable cause)
    {
        URI self = .createQueryLocation(queryId);
        QueryExecution execution = new FailedQueryExecution(queryIdquerysessionselfcause);
        .put(queryIdexecution);
        .queryStarted();
        .createdEvent(execution.getQueryInfo());
        .completionEvent(execution.getQueryInfo());
        .queryFinished(execution.getQueryInfo());
        .add(execution);
        return execution.getQueryInfo();
    }
    private static Function<QueryExecutionDateTimeendTimeGetter()
    {
        return new Function<QueryExecutionDateTime>()
        {
            @Nullable
            @Override
            public DateTime apply(QueryExecution input)
            {
                return input.getQueryInfo().getQueryStats().getEndTime();
            }
        };
    }
    private static class QueryStarter
    {
        private final int maxQueuedQueries;
        private final AtomicInteger queryQueueSize = new AtomicInteger();
        private final AsyncSemaphore<QueryExecutionqueryAsyncSemaphore;
        private final int maxQueuedBigQueries;
        private final AtomicInteger bigQueryQueueSize = new AtomicInteger();
        private final AsyncSemaphore<QueryExecutionbigQueryAsyncSemaphore;
        public QueryStarter(Executor queryExecutorSqlQueryManagerStats statsQueryManagerConfig config)
        {
            checkNotNull(queryExecutor"queryExecutor is null");
            checkNotNull(stats"stats is null");
            this. = config.getMaxQueuedQueries();
            this. = new AsyncSemaphore<>(config.getMaxConcurrentQueries(), queryExecutornew QuerySubmitter(queryExecutorstats));
            this. = config.getMaxQueuedBigQueries();
            this. = new AsyncSemaphore<>(config.getMaxConcurrentBigQueries(), queryExecutornew QuerySubmitter(queryExecutorstats));
        }
        public boolean submit(QueryExecution queryExecution)
        {
            AtomicInteger queueSize;
            int maxQueueSize;
            AsyncSemaphore<QueryExecutionasyncSemaphore;
            if (isBigQueryEnabled(queryExecution.getQueryInfo().getSession(), false)) {
                queueSize = ;
                maxQueueSize = ;
                asyncSemaphore = ;
            }
            else {
                queueSize = ;
                maxQueueSize = ;
                asyncSemaphore = ;
            }
            if (queueSize.incrementAndGet() > maxQueueSize) {
                queueSize.decrementAndGet();
                return false;
            }
            asyncSemaphore.submit(queryExecution);
            return true;
        }
        public int getQueryQueueSize()
        {
            return .get();
        }
        public int getBigQueryQueueSize()
        {
            return .get();
        }
        private static class QuerySubmitter
                implements Function<QueryExecutionListenableFuture<?>>
        {
            private final Executor queryExecutor;
            private final SqlQueryManagerStats stats;
            private final AtomicInteger queueSize;
            public QuerySubmitter(Executor queryExecutorSqlQueryManagerStats statsAtomicInteger queueSize)
            {
                this. = checkNotNull(queryExecutor"queryExecutor is null");
                this. = checkNotNull(stats"stats is null");
                this. = checkNotNull(queueSize"queueSize is null");
            }
            @Override
            public ListenableFuture<?> apply(final QueryExecution queryExecution)
            {
                .decrementAndGet();
                final SettableFuture<?> settableFuture = SettableFuture.create();
                queryExecution.addStateChangeListener(new StateChangeListener<QueryState>()
                {
                    @Override
                    public void stateChanged(QueryState newValue)
                    {
                        if (newValue.isDone()) {
                            settableFuture.set(null);
                        }
                    }
                });
                if (queryExecution.getQueryInfo().getState().isDone()) {
                    settableFuture.set(null);
                }
                else {
                    .execute(new Runnable()
                    {
                        @Override
                        public void run()
                        {
                            try (SetThreadName setThreadName = new SetThreadName("Query-%s"queryExecution.getQueryInfo().getQueryId())) {
                                .queryStarted();
                                queryExecution.start();
                            }
                        }
                    });
                }
                return settableFuture;
            }
        }
    }
New to GrepCode? Check out our FAQ X