Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.server;
 
 
 
 import java.net.URI;
 import java.util.List;
 import java.util.Set;
 
 import static com.facebook.presto.client.PrestoHeaders.PRESTO_CATALOG;
 import static com.facebook.presto.client.PrestoHeaders.PRESTO_SCHEMA;
 import static com.facebook.presto.client.PrestoHeaders.PRESTO_SOURCE;
 import static com.facebook.presto.client.PrestoHeaders.PRESTO_USER;
 import static com.facebook.presto.execution.QueryInfo.queryIdGetter;
 import static com.facebook.presto.execution.StageInfo.getAllStages;
 import static com.facebook.presto.execution.StageInfo.stageStateGetter;
 import static com.facebook.presto.util.Failures.toFailure;
 import static com.facebook.presto.util.Threads.threadsNamed;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static com.google.common.collect.Iterables.transform;
 import static com.google.common.net.HttpHeaders.USER_AGENT;
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.lang.String.format;
@Path("/v1/statement")
public class StatementResource
    private static final Logger log = Logger.get(StatementResource.class);
    private static final Duration MAX_WAIT_TIME = new Duration(1, .);
    private static final Ordering<Comparable<Duration>> WAIT_ORDERING = Ordering.natural().nullsLast();
    private static final long DESIRED_RESULT_BYTES = new DataSize(1, ).toBytes();
    private final QueryManager queryManager;
    private final ConcurrentMap<QueryIdQueryqueries = new ConcurrentHashMap<>();
    private final ScheduledExecutorService queryPurger = Executors.newSingleThreadScheduledExecutor(threadsNamed("query-purger-%d"));
    @Inject
    public StatementResource(QueryManager queryManagerSupplier<ExchangeClientexchangeClientSupplier)
    {
        this. = checkNotNull(queryManager"queryManager is null");
        this. = checkNotNull(exchangeClientSupplier"exchangeClientSupplier is null");
        .scheduleWithFixedDelay(new PurgeQueriesRunnable(.keySet(), queryManager), 200, 200, .);
    }
    @PreDestroy
    public void stop()
    {
        .shutdownNow();
    }
    @POST
    public Response createQuery(
            String statement,
            @HeaderParam(String user,
            @HeaderParam(String source,
            @HeaderParam(String catalog,
            @HeaderParam(String schema,
            @HeaderParam(String userAgent,
            @Context HttpServletRequest requestContext,
            @Context UriInfo uriInfo)
            throws InterruptedException
    {
        assertRequest(!isNullOrEmpty(statement), "SQL statement is empty");
        assertRequest(!isNullOrEmpty(user), "User (%s) is empty");
        assertRequest(!isNullOrEmpty(catalog), "Catalog (%s) is empty");
        assertRequest(!isNullOrEmpty(schema), "Schema (%s) is empty");
        String remoteUserAddress = requestContext.getRemoteAddr();
        Session session = new Session(usersourcecatalogschemaremoteUserAddressuserAgent);
        ExchangeClient exchangeClient = .get();
        Query query = new Query(sessionstatementexchangeClient);
        .put(query.getQueryId(), query);
        return Response.ok(query.getNextResults(uriInfonew Duration(1, .))).build();
    }
    static void assertRequest(boolean expressionString formatObject... args)
    {
        if (!expression) {
            Response request = Response
                    .status(.)
                    .type(.)
                    .entity(format(formatargs))
                    .build();
            throw new WebApplicationException(request);
        }
    }
    @GET
    @Path("{queryId}/{token}")
    public Response getQueryResults(
            @PathParam("queryId"QueryId queryId,
            @PathParam("token"long token,
            @QueryParam("maxWait"Duration maxWait,
            @Context UriInfo uriInfo)
            throws InterruptedException
    {
        Query query = .get(queryId);
        if (query == null) {
            return Response.status(.).build();
        }
        Duration wait = .min(maxWait);
        return Response.ok(query.getResults(tokenuriInfowait)).build();
    }
    @DELETE
    @Path("{queryId}/{token}")
    public Response cancelQuery(@PathParam("queryId"QueryId queryId,
            @PathParam("token"long token)
    {
        Query query = .get(queryId);
        if (query == null) {
            return Response.status(.).build();
        }
        query.close();
        return Response.noContent().build();
    }
    @ThreadSafe
    public static class Query
            implements Closeable
    {
        private final QueryManager queryManager;
        private final QueryId queryId;
        private final ExchangeClient exchangeClient;
        private final AtomicLong resultId = new AtomicLong();
        @GuardedBy("this")
        private QueryResults lastResult;
        @GuardedBy("this")
        private String lastResultPath;
        @GuardedBy("this")
        private List<Columncolumns;
        public Query(Session session,
                String query,
                QueryManager queryManager,
                ExchangeClient exchangeClient)
        {
            checkNotNull(session"session is null");
            checkNotNull(query"query is null");
            checkNotNull(queryManager"queryManager is null");
            checkNotNull(exchangeClient"exchangeClient is null");
            this. = queryManager;
            QueryInfo queryInfo = queryManager.createQuery(sessionquery);
             = queryInfo.getQueryId();
            this. = exchangeClient;
        }
        @Override
        public void close()
        {
            .cancelQuery();
        }
        public QueryId getQueryId()
        {
            return ;
        }
        public synchronized QueryResults getResults(long tokenUriInfo uriInfoDuration maxWaitTime)
                throws InterruptedException
        {
            // is the a repeated request for the last results?
            String requestedPath = uriInfo.getAbsolutePath().getPath();
            if ( != null && requestedPath.equals()) {
                // tell query manager we are still interested in the query
                .getQueryInfo();
                return ;
            }
            if (token < .get()) {
                throw new WebApplicationException(.);
            }
            // if this is not a request for the next results, return not found
            if (.getNextUri() == null || !requestedPath.equals(.getNextUri().getPath())) {
                // unknown token
                throw new WebApplicationException(.);
            }
            return getNextResults(uriInfomaxWaitTime);
        }
        public synchronized QueryResults getNextResults(UriInfo uriInfoDuration maxWaitTime)
                throws InterruptedException
        {
            Iterable<List<Object>> data = getData(maxWaitTime);
            // get the query info before returning
            // force update if query manager is closed
            QueryInfo queryInfo = .getQueryInfo();
            // if we have received all of the output data and the query is not marked as done, wait for the query to finish
            if (.isClosed() && !queryInfo.getState().isDone()) {
                .waitForStateChange(queryInfo.getState(), maxWaitTime);
                queryInfo = .getQueryInfo();
            }
            // close exchange client if the query has failed
            if (queryInfo.getState().isDone()) {
                if (queryInfo.getState() != .) {
                    .close();
                }
                else if (queryInfo.getOutputStage() == null) {
                    // For simple executions (e.g. drop table), there will never be an output stage,
                    // so close the exchange as soon as the query is done.
                    .close();
                    // this is a hack to suppress the warn message in the client saying that there are no columns.
                    // The reason for this is that the current API definition assumes that everything is a query,
                    // so statements without results produce an error in the client otherwise.
                    //
                    // TODO: add support to the API for non-query statements.
                     = ImmutableList.of(new Column("result""varchar"));
                    data = ImmutableSet.<List<Object>>of(ImmutableList.<Object>of("true"));
                }
            }
            // only return a next if the query is not done or there is more data to send (due to buffering)
            URI nextResultsUri = null;
            if ((!queryInfo.getState().isDone()) || (!.isClosed())) {
                nextResultsUri = createNextResultsUri(uriInfo);
            }
            // first time through, self is null
            QueryResults queryResults = new QueryResults(
                    .toString(),
                    uriInfo.getRequestUriBuilder().replaceQuery("").replacePath(queryInfo.getSelf().getPath()).build(),
                    findCancelableLeafStage(queryInfo),
                    nextResultsUri,
                    ,
                    data,
                    toStatementStats(queryInfo),
                    toQueryError(queryInfo));
            // cache the last results
            if ( != null) {
                 = .getNextUri().getPath();
            }
            else {
                 = null;
            }
             = queryResults;
            return queryResults;
        }
        private synchronized Iterable<List<Object>> getData(Duration maxWait)
                throws InterruptedException
        {
            // wait for query to start
            QueryInfo queryInfo = .getQueryInfo();
            while (maxWait.toMillis() > 1 && !isQueryStarted(queryInfo)) {
                maxWait = .waitForStateChange(queryInfo.getState(), maxWait);
                queryInfo = .getQueryInfo();
            }
            // if query did not finish starting or does not have output, just return
            if (!isQueryStarted(queryInfo) || queryInfo.getOutputStage() == null) {
                return null;
            }
            if ( == null) {
                 = createColumnsList(queryInfo);
            }
            updateExchangeClient(queryInfo.getOutputStage());
            ImmutableList.Builder<RowIterablepages = ImmutableList.builder();
            // wait up to max wait for data to arrive; then try to return at least DESIRED_RESULT_BYTES
            int bytes = 0;
            while (bytes < ) {
                Page page = .getNextPage(maxWait);
                if (page == null) {
                    break;
                }
                bytes += page.getDataSize().toBytes();
                pages.add(new RowIterable(page));
                // only wait on first call
                maxWait = new Duration(0, .);
            }
            if (bytes == 0) {
                return null;
            }
            return Iterables.concat(pages.build());
        }
        private static boolean isQueryStarted(QueryInfo queryInfo)
        {
            QueryState state = queryInfo.getState();
            return state != . && queryInfo.getState() != . && queryInfo.getState() != .;
        }
        private synchronized void updateExchangeClient(StageInfo outputStage)
        {
            // if the output stage is not done, update the exchange client with any additional locations
            if (!outputStage.getState().isDone()) {
                for (TaskInfo taskInfo : outputStage.getTasks()) {
                    List<BufferInfobuffers = taskInfo.getOutputBuffers().getBuffers();
                    Preconditions.checkState(buffers.size() == 1,
                            "Expected a single output buffer for task %s, but found %s",
                            taskInfo.getTaskId(),
                            buffers);
                    String bufferId = Iterables.getOnlyElement(buffers).getBufferId();
                    URI uri = uriBuilderFrom(taskInfo.getSelf()).appendPath("results").appendPath(bufferId).build();
                    .addLocation(uri);
                }
            }
            // if the output stage has finished scheduling, set no more locations
            if ((outputStage.getState() != .) && (outputStage.getState() != .)) {
                .noMoreLocations();
            }
        }
        private synchronized URI createNextResultsUri(UriInfo uriInfo)
        {
            return uriInfo.getBaseUriBuilder().replacePath("/v1/statement").path(.toString()).path(String.valueOf(.incrementAndGet())).replaceQuery("").build();
        }
        private static List<ColumncreateColumnsList(QueryInfo queryInfo)
        {
            checkNotNull(queryInfo"queryInfo is null");
            StageInfo outputStage = queryInfo.getOutputStage();
            if (outputStage == null) {
                checkNotNull(outputStage"outputStage is null");
            }
            List<Stringnames = queryInfo.getFieldNames();
            ArrayList<Typetypes = new ArrayList<>();
            for (TupleInfo tupleInfo : outputStage.getTupleInfos()) {
                types.add(tupleInfo.getType());
            }
            checkArgument(names.size() == types.size(), "names and types size mismatch");
            ImmutableList.Builder<Columnlist = ImmutableList.builder();
            for (int i = 0; i < names.size(); i++) {
                String name = names.get(i);
                Type type = types.get(i);
                switch (type) {
                    case :
                        list.add(new Column(name"boolean"));
                        break;
                    case :
                        list.add(new Column(name"bigint"));
                        break;
                    case :
                        list.add(new Column(name"double"));
                        break;
                    case :
                        list.add(new Column(name"varchar"));
                        break;
                    default:
                        throw new IllegalArgumentException("unhandled type: " + type);
                }
            }
            return list.build();
        }
        private static StatementStats toStatementStats(QueryInfo queryInfo)
        {
            QueryStats queryStats = queryInfo.getQueryStats();
            return StatementStats.builder()
                    .setState(queryInfo.getState().toString())
                    .setScheduled(isScheduled(queryInfo))
                    .setNodes(globalUniqueNodes(queryInfo.getOutputStage()).size())
                    .setTotalSplits(queryStats.getTotalDrivers())
                    .setQueuedSplits(queryStats.getQueuedDrivers())
                    .setRunningSplits(queryStats.getRunningDrivers())
                    .setCompletedSplits(queryStats.getCompletedDrivers())
                    .setUserTimeMillis(queryStats.getTotalUserTime().toMillis())
                    .setCpuTimeMillis(queryStats.getTotalCpuTime().toMillis())
                    .setWallTimeMillis(queryStats.getTotalScheduledTime().toMillis())
                    .setProcessedRows(queryStats.getRawInputPositions())
                    .setProcessedBytes(queryStats.getRawInputDataSize().toBytes())
                    .setRootStage(toStageStats(queryInfo.getOutputStage()))
                    .build();
        }
        private static StageStats toStageStats(StageInfo stageInfo)
        {
            if (stageInfo == null) {
                return null;
            }
            com.facebook.presto.execution.StageStats stageStats = stageInfo.getStageStats();
            ImmutableList.Builder<StageStatssubStages = ImmutableList.builder();
            for (StageInfo subStage : stageInfo.getSubStages()) {
                subStages.add(toStageStats(subStage));
            }
            Set<StringuniqueNodes = new HashSet<>();
            for (TaskInfo task : stageInfo.getTasks()) {
                // todo add nodeId to TaskInfo
                URI uri = task.getSelf();
                uniqueNodes.add(uri.getHost() + ":" + uri.getPort());
            }
            return StageStats.builder()
                    .setStageId(String.valueOf(stageInfo.getStageId().getId()))
                    .setState(stageInfo.getState().toString())
                    .setDone(stageInfo.getState().isDone())
                    .setNodes(uniqueNodes.size())
                    .setTotalSplits(stageStats.getTotalDrivers())
                    .setQueuedSplits(stageStats.getQueuedDrivers())
                    .setRunningSplits(stageStats.getRunningDrivers())
                    .setCompletedSplits(stageStats.getCompletedDrivers())
                    .setUserTimeMillis(stageStats.getTotalUserTime().toMillis())
                    .setCpuTimeMillis(stageStats.getTotalCpuTime().toMillis())
                    .setWallTimeMillis(stageStats.getTotalScheduledTime().toMillis())
                    .setProcessedRows(stageStats.getRawInputPositions())
                    .setProcessedBytes(stageStats.getRawInputDataSize().toBytes())
                    .setSubStages(subStages.build())
                    .build();
        }
        private static Set<StringglobalUniqueNodes(StageInfo stageInfo)
        {
            if (stageInfo == null) {
                return ImmutableSet.of();
            }
            ImmutableSet.Builder<Stringnodes = ImmutableSet.builder();
            for (TaskInfo task : stageInfo.getTasks()) {
                // todo add nodeId to TaskInfo
                URI uri = task.getSelf();
                nodes.add(uri.getHost() + ":" + uri.getPort());
            }
            for (StageInfo subStage : stageInfo.getSubStages()) {
                nodes.addAll(globalUniqueNodes(subStage));
            }
            return nodes.build();
        }
        private static boolean isScheduled(QueryInfo queryInfo)
        {
            StageInfo stage = queryInfo.getOutputStage();
            if (stage == null) {
                return false;
            }
            return IterableTransformer.on(getAllStages(stage))
                    .transform(stageStateGetter())
                    .all(isStageRunningOrDone());
        }
        private static Predicate<StageStateisStageRunningOrDone()
        {
            return new Predicate<StageState>()
            {
                @Override
                public boolean apply(StageState state)
                {
                    return (state == .) || state.isDone();
                }
            };
        }
        private static URI findCancelableLeafStage(QueryInfo queryInfo)
        {
            if (queryInfo.getOutputStage() == null) {
                // query is not running yet, cannot cancel leaf stage
                return null;
            }
            // query is running, find the leaf-most running stage
            return findCancelableLeafStage(queryInfo.getOutputStage());
        }
        private static URI findCancelableLeafStage(StageInfo stage)
        {
            // if this stage is already done, we can't cancel it
            if (stage.getState().isDone()) {
                return null;
            }
            // attempt to find a cancelable sub stage
            // check in reverse order since build side of a join will be later in the list
            for (StageInfo subStage : Lists.reverse(stage.getSubStages())) {
                URI leafStage = findCancelableLeafStage(subStage);
                if (leafStage != null) {
                    return leafStage;
                }
            }
            // no matching sub stage, so return this stage
            return stage.getSelf();
        }
        private static QueryError toQueryError(QueryInfo queryInfo)
        {
            FailureInfo failure = queryInfo.getFailureInfo();
            if (failure == null) {
                QueryState state = queryInfo.getState();
                if ((!state.isDone()) || (state == .)) {
                    return null;
                }
                .warn("Query %s in state %s has no failure info"queryInfo.getQueryId(), state);
                failure = toFailure(new RuntimeException(format("Query is %s (reason unknown)"state))).toFailureInfo();
            }
            return new QueryError(failure.getMessage(), null, 0, failure.getErrorLocation(), failure);
        }
        private static class RowIterable
                implements Iterable<List<Object>>
        {
            private final Page page;
            private RowIterable(Page page)
            {
                this. = checkNotNull(page"page is null");
            }
            @Override
            public Iterator<List<Object>> iterator()
            {
                return new RowIterator();
            }
        }
        private static class RowIterator
                extends AbstractIterator<List<Object>>
        {
            private final BlockCursor[] cursors;
            private RowIterator(Page page)
            {
                 = new BlockCursor[page.getChannelCount()];
                for (int channel = 0; channel < .channel++) {
                    [channel] = page.getBlock(channel).cursor();
                }
            }
            @Override
            protected List<ObjectcomputeNext()
            {
                List<Objectrow = new ArrayList<>(.);
                for (BlockCursor cursor : ) {
                    if (!cursor.advanceNextPosition()) {
                        Preconditions.checkState(row.isEmpty(), "Page is unaligned");
                        return endOfData();
                    }
                    row.add(cursor.getTuple().getObjectValue());
                }
                return row;
            }
        }
    }
    private static class PurgeQueriesRunnable
            implements Runnable
    {
        private final Set<QueryIdqueryIds;
        private final QueryManager queryManager;
        public PurgeQueriesRunnable(Set<QueryIdqueryIdsQueryManager queryManager)
        {
            this. = queryIds;
            this. = queryManager;
        }
        @Override
        public void run()
        {
            try {
                // Queries are added to the query manager before being recorded in queryIds set.
                // Therefore, we take a snapshot if queryIds before getting the live queries
                // from the query manager.  Then we remove only the queries in the snapshot and
                // not live queries set.  If we did this in the other order, a query could be
                // registered between fetching the live queries and inspecting the queryIds set.
                Set<QueryIdqueryIdsSnapshot = ImmutableSet.copyOf();
                // do not call queryManager.getQueryInfo() since it updates the heartbeat time
                Set<QueryIdliveQueries = ImmutableSet.copyOf(transform(.getAllQueryInfo(), queryIdGetter()));
                Set<QueryIddeadQueries = Sets.difference(queryIdsSnapshotliveQueries);
                for (QueryId deadQuery : deadQueries) {
                    .remove(deadQuery);
                    .debug("Removed expired query %s"deadQuery);
                }
            }
            catch (Throwable e) {
                .warn(e"Error removing old queries");
            }
        }
    }
New to GrepCode? Check out our FAQ X