Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.server;
 
 
 
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import static com.facebook.presto.client.PrestoHeaders.PRESTO_CLEAR_SESSION;
 import static com.facebook.presto.client.PrestoHeaders.PRESTO_SET_SESSION;
 import static com.facebook.presto.server.ResourceUtil.assertRequest;
 import static com.facebook.presto.server.ResourceUtil.createSessionForRequest;
 import static com.facebook.presto.spi.StandardErrorCode.INTERNAL_ERROR;
 import static com.facebook.presto.spi.StandardErrorCode.toErrorType;
 import static com.facebook.presto.util.Failures.toFailure;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static io.airlift.concurrent.Threads.threadsNamed;
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.lang.String.format;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
@Path("/v1/statement")
public class StatementResource
    private static final Logger log = Logger.get(StatementResource.class);
    private static final Duration MAX_WAIT_TIME = new Duration(1, );
    private static final Ordering<Comparable<Duration>> WAIT_ORDERING = Ordering.natural().nullsLast();
    private static final long DESIRED_RESULT_BYTES = new DataSize(1, ).toBytes();
    private final QueryManager queryManager;
    private final ConcurrentMap<QueryIdQueryqueries = new ConcurrentHashMap<>();
    @Inject
    public StatementResource(QueryManager queryManagerSupplier<ExchangeClientexchangeClientSupplier)
    {
        this. = checkNotNull(queryManager"queryManager is null");
        this. = checkNotNull(exchangeClientSupplier"exchangeClientSupplier is null");
        .scheduleWithFixedDelay(new PurgeQueriesRunnable(queryManager), 200, 200, );
    }
    @PreDestroy
    public void stop()
    {
        .shutdownNow();
    }
    @POST
    public Response createQuery(
            String statement,
            @Context HttpServletRequest servletRequest,
            @Context UriInfo uriInfo)
            throws InterruptedException
    {
        assertRequest(!isNullOrEmpty(statement), "SQL statement is empty");
        Session session = createSessionForRequest(servletRequest);
        ExchangeClient exchangeClient = .get();
        Query query = new Query(sessionstatementexchangeClient);
        .put(query.getQueryId(), query);
        return getQueryResults(query, Optional.empty(), uriInfonew Duration(1, ));
    }
    @GET
    @Path("{queryId}/{token}")
    public Response getQueryResults(
            @PathParam("queryId"QueryId queryId,
            @PathParam("token"long token,
            @QueryParam("maxWait"Duration maxWait,
            @Context UriInfo uriInfo)
            throws InterruptedException
    {
        Query query = .get(queryId);
        if (query == null) {
            return Response.status(.).build();
        }
        Duration wait = .min(maxWait);
        return getQueryResults(query, Optional.of(token), uriInfowait);
    }
    private static Response getQueryResults(Query queryOptional<LongtokenUriInfo uriInfoDuration wait)
            throws InterruptedException
    {
        QueryResults queryResults;
        if (token.isPresent()) {
            queryResults = query.getResults(token.get(), uriInfowait);
        }
        else {
            queryResults = query.getNextResults(uriInfowait);
        }
        ResponseBuilder response = Response.ok(queryResults);
        // add set session properties
        query.getSetSessionProperties().entrySet().stream()
                .forEach(entry -> response.header(entry.getKey() + '=' + entry.getValue()));
        // add clear session properties
        query.getResetSessionProperties().stream()
                .forEach(name -> response.header(name));
        return response.build();
    }
    @DELETE
    @Path("{queryId}/{token}")
    public Response cancelQuery(@PathParam("queryId"QueryId queryId,
            @PathParam("token"long token)
    {
        Query query = .get(queryId);
        if (query == null) {
            return Response.status(.).build();
        }
        query.close();
        return Response.noContent().build();
    }
    @ThreadSafe
    public static class Query
            implements Closeable
    {
        private final QueryManager queryManager;
        private final QueryId queryId;
        private final ExchangeClient exchangeClient;
        private final AtomicLong resultId = new AtomicLong();
        private final Session session;
        @GuardedBy("this")
        private QueryResults lastResult;
        @GuardedBy("this")
        private String lastResultPath;
        @GuardedBy("this")
        private List<Columncolumns;
        @GuardedBy("this")
        private Map<StringStringsetSessionProperties;
        @GuardedBy("this")
        private Set<StringresetSessionProperties;
        @GuardedBy("this")
        private Long updateCount;
        public Query(Session session,
                String query,
                QueryManager queryManager,
                ExchangeClient exchangeClient)
        {
            checkNotNull(session"session is null");
            checkNotNull(query"query is null");
            checkNotNull(queryManager"queryManager is null");
            checkNotNull(exchangeClient"exchangeClient is null");
            this. = session;
            this. = queryManager;
            QueryInfo queryInfo = queryManager.createQuery(sessionquery);
             = queryInfo.getQueryId();
            this. = exchangeClient;
        }
        @Override
        public void close()
        {
            .cancelQuery();
            // frees buffers in the client
            .close();
        }
        public QueryId getQueryId()
        {
            return ;
        }
        public synchronized Map<StringStringgetSetSessionProperties()
        {
            return ;
        }
        public synchronized Set<StringgetResetSessionProperties()
        {
            return ;
        }
        public synchronized QueryResults getResults(long tokenUriInfo uriInfoDuration maxWaitTime)
                throws InterruptedException
        {
            // is the a repeated request for the last results?
            String requestedPath = uriInfo.getAbsolutePath().getPath();
            if ( != null && requestedPath.equals()) {
                // tell query manager we are still interested in the query
                .getQueryInfo();
                .recordHeartbeat();
                return ;
            }
            if (token < .get()) {
                throw new WebApplicationException(.);
            }
            // if this is not a request for the next results, return not found
            if (.getNextUri() == null || !requestedPath.equals(.getNextUri().getPath())) {
                // unknown token
                throw new WebApplicationException(.);
            }
            return getNextResults(uriInfomaxWaitTime);
        }
        public synchronized QueryResults getNextResults(UriInfo uriInfoDuration maxWaitTime)
                throws InterruptedException
        {
            Iterable<List<Object>> data = getData(maxWaitTime);
            // get the query info before returning
            // force update if query manager is closed
            QueryInfo queryInfo = .getQueryInfo();
            .recordHeartbeat();
            // if we have received all of the output data and the query is not marked as done, wait for the query to finish
            if (.isClosed() && !queryInfo.getState().isDone()) {
                .waitForStateChange(queryInfo.getState(), maxWaitTime);
                queryInfo = .getQueryInfo();
            }
            // TODO: figure out a better way to do this
            // grab the update count for non-queries
            if ((data != null) && (queryInfo.getUpdateType() != null) && ( == null) &&
                    (.size() == 1) && (.get(0).getType().equals(.))) {
                Iterator<List<Object>> iterator = data.iterator();
                if (iterator.hasNext()) {
                     = ((Numberiterator.next().get(0)).longValue();
                }
            }
            // close exchange client if the query has failed
            if (queryInfo.getState().isDone()) {
                if (queryInfo.getState() != .) {
                    .close();
                }
                else if (queryInfo.getOutputStage() == null) {
                    // For simple executions (e.g. drop table), there will never be an output stage,
                    // so close the exchange as soon as the query is done.
                    .close();
                    // Return a single value for clients that require a result.
                     = ImmutableList.of(new Column("result""boolean"new ClientTypeSignature(., ImmutableList.<ClientTypeSignature>of(), ImmutableList.of())));
                    data = ImmutableSet.<List<Object>>of(ImmutableList.<Object>of(true));
                }
            }
            // only return a next if the query is not done or there is more data to send (due to buffering)
            URI nextResultsUri = null;
            if ((!queryInfo.getState().isDone()) || (!.isClosed())) {
                nextResultsUri = createNextResultsUri(uriInfo);
            }
            // update setSessionProperties
             = queryInfo.getSetSessionProperties();
             = queryInfo.getResetSessionProperties();
            // first time through, self is null
            QueryResults queryResults = new QueryResults(
                    .toString(),
                    uriInfo.getRequestUriBuilder().replaceQuery("").replacePath(queryInfo.getSelf().getPath()).build(),
                    findCancelableLeafStage(queryInfo),
                    nextResultsUri,
                    ,
                    data,
                    toStatementStats(queryInfo),
                    toQueryError(queryInfo),
                    queryInfo.getUpdateType(),
                    );
            // cache the last results
            if ( != null && .getNextUri() != null) {
                 = .getNextUri().getPath();
            }
            else {
                 = null;
            }
             = queryResults;
            return queryResults;
        }
        private synchronized Iterable<List<Object>> getData(Duration maxWait)
                throws InterruptedException
        {
            // wait for query to start
            QueryInfo queryInfo = .getQueryInfo();
            while (maxWait.toMillis() > 1 && !isQueryStarted(queryInfo)) {
                .recordHeartbeat();
                maxWait = .waitForStateChange(queryInfo.getState(), maxWait);
                queryInfo = .getQueryInfo();
            }
            // if query did not finish starting or does not have output, just return
            if (!isQueryStarted(queryInfo) || queryInfo.getOutputStage() == null) {
                return null;
            }
            if ( == null) {
                 = createColumnsList(queryInfo);
            }
            List<Typetypes = queryInfo.getOutputStage().getTypes();
            updateExchangeClient(queryInfo.getOutputStage());
            ImmutableList.Builder<RowIterablepages = ImmutableList.builder();
            // wait up to max wait for data to arrive; then try to return at least DESIRED_RESULT_BYTES
            long bytes = 0;
            while (bytes < ) {
                Page page = .getNextPage(maxWait);
                if (page == null) {
                    break;
                }
                bytes += page.getSizeInBytes();
                pages.add(new RowIterable(.toConnectorSession(), typespage));
                // only wait on first call
                maxWait = new Duration(0, );
            }
            if (bytes == 0) {
                return null;
            }
            return Iterables.concat(pages.build());
        }
        private static boolean isQueryStarted(QueryInfo queryInfo)
        {
            QueryState state = queryInfo.getState();
            return state != . && queryInfo.getState() != . && queryInfo.getState() != .;
        }
        private synchronized void updateExchangeClient(StageInfo outputStage)
        {
            // add any additional output locations
            if (!outputStage.getState().isDone()) {
                for (TaskInfo taskInfo : outputStage.getTasks()) {
                    SharedBufferInfo outputBuffers = taskInfo.getOutputBuffers();
                    List<BufferInfobuffers = outputBuffers.getBuffers();
                    if (buffers.isEmpty() || outputBuffers.getState().canAddBuffers()) {
                        // output buffer has not been created yet
                        continue;
                    }
                    Preconditions.checkState(buffers.size() == 1,
                            "Expected a single output buffer for task %s, but found %s",
                            taskInfo.getTaskId(),
                            buffers);
                    TaskId bufferId = Iterables.getOnlyElement(buffers).getBufferId();
                    URI uri = uriBuilderFrom(taskInfo.getSelf()).appendPath("results").appendPath(bufferId.toString()).build();
                    .addLocation(uri);
                }
            }
            if (allOutputBuffersCreated(outputStage)) {
                .noMoreLocations();
            }
        }
        private static boolean allOutputBuffersCreated(StageInfo outputStage)
        {
            StageState stageState = outputStage.getState();
            // if the stage is already done, then there will be no more buffers
            if (stageState.isDone()) {
                return true;
            }
            // has the stage finished scheduling?
            if (stageState == . || stageState == .) {
                return false;
            }
            // have all tasks finished adding buffers
            return outputStage.getTasks().stream()
                    .allMatch(taskInfo -> !taskInfo.getOutputBuffers().getState().canAddBuffers());
        }
        private synchronized URI createNextResultsUri(UriInfo uriInfo)
        {
            return uriInfo.getBaseUriBuilder().replacePath("/v1/statement").path(.toString()).path(String.valueOf(.incrementAndGet())).replaceQuery("").build();
        }
        private static List<ColumncreateColumnsList(QueryInfo queryInfo)
        {
            checkNotNull(queryInfo"queryInfo is null");
            StageInfo outputStage = queryInfo.getOutputStage();
            checkNotNull(outputStage"outputStage is null");
            List<Stringnames = queryInfo.getFieldNames();
            List<Typetypes = outputStage.getTypes();
            checkArgument(names.size() == types.size(), "names and types size mismatch");
            ImmutableList.Builder<Columnlist = ImmutableList.builder();
            for (int i = 0; i < names.size(); i++) {
                String name = names.get(i);
                TypeSignature typeSignature = types.get(i).getTypeSignature();
                String type = typeSignature.toString();
                list.add(new Column(nametypenew ClientTypeSignature(typeSignature)));
            }
            return list.build();
        }
        private static StatementStats toStatementStats(QueryInfo queryInfo)
        {
            QueryStats queryStats = queryInfo.getQueryStats();
            return StatementStats.builder()
                    .setState(queryInfo.getState().toString())
                    .setScheduled(queryInfo.isScheduled())
                    .setNodes(globalUniqueNodes(queryInfo.getOutputStage()).size())
                    .setTotalSplits(queryStats.getTotalDrivers())
                    .setQueuedSplits(queryStats.getQueuedDrivers())
                    .setRunningSplits(queryStats.getRunningDrivers())
                    .setCompletedSplits(queryStats.getCompletedDrivers())
                    .setUserTimeMillis(queryStats.getTotalUserTime().toMillis())
                    .setCpuTimeMillis(queryStats.getTotalCpuTime().toMillis())
                    .setWallTimeMillis(queryStats.getTotalScheduledTime().toMillis())
                    .setProcessedRows(queryStats.getRawInputPositions())
                    .setProcessedBytes(queryStats.getRawInputDataSize().toBytes())
                    .setRootStage(toStageStats(queryInfo.getOutputStage()))
                    .build();
        }
        private static StageStats toStageStats(StageInfo stageInfo)
        {
            if (stageInfo == null) {
                return null;
            }
            com.facebook.presto.execution.StageStats stageStats = stageInfo.getStageStats();
            ImmutableList.Builder<StageStatssubStages = ImmutableList.builder();
            for (StageInfo subStage : stageInfo.getSubStages()) {
                subStages.add(toStageStats(subStage));
            }
            Set<StringuniqueNodes = new HashSet<>();
            for (TaskInfo task : stageInfo.getTasks()) {
                // todo add nodeId to TaskInfo
                URI uri = task.getSelf();
                uniqueNodes.add(uri.getHost() + ":" + uri.getPort());
            }
            return StageStats.builder()
                    .setStageId(String.valueOf(stageInfo.getStageId().getId()))
                    .setState(stageInfo.getState().toString())
                    .setDone(stageInfo.getState().isDone())
                    .setNodes(uniqueNodes.size())
                    .setTotalSplits(stageStats.getTotalDrivers())
                    .setQueuedSplits(stageStats.getQueuedDrivers())
                    .setRunningSplits(stageStats.getRunningDrivers())
                    .setCompletedSplits(stageStats.getCompletedDrivers())
                    .setUserTimeMillis(stageStats.getTotalUserTime().toMillis())
                    .setCpuTimeMillis(stageStats.getTotalCpuTime().toMillis())
                    .setWallTimeMillis(stageStats.getTotalScheduledTime().toMillis())
                    .setProcessedRows(stageStats.getRawInputPositions())
                    .setProcessedBytes(stageStats.getRawInputDataSize().toBytes())
                    .setSubStages(subStages.build())
                    .build();
        }
        private static Set<StringglobalUniqueNodes(StageInfo stageInfo)
        {
            if (stageInfo == null) {
                return ImmutableSet.of();
            }
            ImmutableSet.Builder<Stringnodes = ImmutableSet.builder();
            for (TaskInfo task : stageInfo.getTasks()) {
                // todo add nodeId to TaskInfo
                URI uri = task.getSelf();
                nodes.add(uri.getHost() + ":" + uri.getPort());
            }
            for (StageInfo subStage : stageInfo.getSubStages()) {
                nodes.addAll(globalUniqueNodes(subStage));
            }
            return nodes.build();
        }
        private static URI findCancelableLeafStage(QueryInfo queryInfo)
        {
            if (queryInfo.getOutputStage() == null) {
                // query is not running yet, cannot cancel leaf stage
                return null;
            }
            // query is running, find the leaf-most running stage
            return findCancelableLeafStage(queryInfo.getOutputStage());
        }
        private static URI findCancelableLeafStage(StageInfo stage)
        {
            // if this stage is already done, we can't cancel it
            if (stage.getState().isDone()) {
                return null;
            }
            // attempt to find a cancelable sub stage
            // check in reverse order since build side of a join will be later in the list
            for (StageInfo subStage : Lists.reverse(stage.getSubStages())) {
                URI leafStage = findCancelableLeafStage(subStage);
                if (leafStage != null) {
                    return leafStage;
                }
            }
            // no matching sub stage, so return this stage
            return stage.getSelf();
        }
        private static QueryError toQueryError(QueryInfo queryInfo)
        {
            FailureInfo failure = queryInfo.getFailureInfo();
            if (failure == null) {
                QueryState state = queryInfo.getState();
                if ((!state.isDone()) || (state == .)) {
                    return null;
                }
                .warn("Query %s in state %s has no failure info"queryInfo.getQueryId(), state);
                failure = toFailure(new RuntimeException(format("Query is %s (reason unknown)"state))).toFailureInfo();
            }
            ErrorCode errorCode;
            if (queryInfo.getErrorCode() != null) {
                errorCode = queryInfo.getErrorCode();
            }
            else {
                errorCode = .toErrorCode();
                .warn("Failed query %s has no error code"queryInfo.getQueryId());
            }
            return new QueryError(
                    failure.getMessage(),
                    null,
                    errorCode.getCode(),
                    errorCode.getName(),
                    toErrorType(errorCode.getCode()).toString(),
                    failure.getErrorLocation(),
                    failure);
        }
        private static class RowIterable
                implements Iterable<List<Object>>
        {
            private final ConnectorSession session;
            private final List<Typetypes;
            private final Page page;
            private RowIterable(ConnectorSession sessionList<TypetypesPage page)
            {
                this. = session;
                this. = ImmutableList.copyOf(checkNotNull(types"types is null"));
                this. = checkNotNull(page"page is null");
            }
            @Override
            public Iterator<List<Object>> iterator()
            {
                return new RowIterator();
            }
        }
        private static class RowIterator
                extends AbstractIterator<List<Object>>
        {
            private final ConnectorSession session;
            private final List<Typetypes;
            private final Page page;
            private int position = -1;
            private RowIterator(ConnectorSession sessionList<TypetypesPage page)
            {
                this. = session;
                this. = types;
                this. = page;
            }
            @Override
            protected List<ObjectcomputeNext()
            {
                ++;
                if ( >= .getPositionCount()) {
                    return endOfData();
                }
                List<Objectvalues = new ArrayList<>(.getChannelCount());
                for (int channel = 0; channel < .getChannelCount(); channel++) {
                    Type type = .get(channel);
                    Block block = .getBlock(channel);
                    values.add(type.getObjectValue(block));
                }
                return Collections.unmodifiableList(values);
            }
        }
    }
    private static class PurgeQueriesRunnable
            implements Runnable
    {
        private final ConcurrentMap<QueryIdQueryqueries;
        private final QueryManager queryManager;
        public PurgeQueriesRunnable(ConcurrentMap<QueryIdQueryqueriesQueryManager queryManager)
        {
            this. = queries;
            this. = queryManager;
        }
        @Override
        public void run()
        {
            try {
                // Queries are added to the query manager before being recorded in queryIds set.
                // Therefore, we take a snapshot if queryIds before getting the live queries
                // from the query manager.  Then we remove only the queries in the snapshot and
                // not live queries set.  If we did this in the other order, a query could be
                // registered between fetching the live queries and inspecting the queryIds set.
                Set<QueryIdqueryIdsSnapshot = ImmutableSet.copyOf(.keySet());
                Set<QueryIdliveQueries = ImmutableSet.copyOf(.getAllQueryIds());
                Set<QueryIddeadQueries = Sets.difference(queryIdsSnapshotliveQueries);
                for (QueryId deadQueryId : deadQueries) {
                    Query query = .remove(deadQueryId);
                    if (query != null) {
                        query.close();
                        .info("Removed expired query %s"deadQueryId);
                    }
                }
            }
            catch (Throwable e) {
                .warn(e"Error removing old queries");
            }
        }
    }
New to GrepCode? Check out our FAQ X