Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.server;
 
 
 
 import java.util.List;
 
 import static com.facebook.presto.PrestoMediaTypes.PRESTO_PAGES;
 import static com.facebook.presto.client.PrestoHeaders.PRESTO_CURRENT_STATE;
 import static com.facebook.presto.client.PrestoHeaders.PRESTO_MAX_WAIT;
 import static com.facebook.presto.client.PrestoHeaders.PRESTO_PAGE_NEXT_TOKEN;
 import static com.facebook.presto.client.PrestoHeaders.PRESTO_PAGE_TOKEN;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Iterables.transform;
 import static io.airlift.http.server.AsyncResponseHandler.bindAsyncResponse;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;

Manages tasks on this worker node
 
 @Path("/v1/task")
 public class TaskResource
 {
     private static final DataSize DEFAULT_MAX_SIZE = new DataSize(10, .);
     private static final Duration DEFAULT_MAX_WAIT_TIME = new Duration(1, );
 
     private final TaskManager taskManager;
     private final ScheduledExecutorService executor;
 
     @Inject
     public TaskResource(TaskManager taskManager, @ForAsyncHttpResponse ScheduledExecutorService executor)
     {
         this. = checkNotNull(taskManager"taskManager is null");
         this. = checkNotNull(executor"executor is null");
     }
 
     @GET
     public List<TaskInfogetAllTaskInfo(@Context UriInfo uriInfo)
     {
         List<TaskInfoallTaskInfo = .getAllTaskInfo();
         if (shouldSummarize(uriInfo)) {
             allTaskInfo = ImmutableList.copyOf(transform(allTaskInfo, TaskInfo::summarize));
         }
         return allTaskInfo;
     }
 
     @POST
     @Path("{taskId}")
     public Response createOrUpdateTask(@PathParam("taskId"TaskId taskIdTaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo)
    {
        checkNotNull(taskUpdateRequest"taskUpdateRequest is null");
        TaskInfo taskInfo = .updateTask(taskUpdateRequest.getSession(),
                taskId,
                taskUpdateRequest.getFragment(),
                taskUpdateRequest.getSources(),
                taskUpdateRequest.getOutputIds());
        if (shouldSummarize(uriInfo)) {
            taskInfo = taskInfo.summarize();
        }
        return Response.ok().entity(taskInfo).build();
    }
    @GET
    @Path("{taskId}")
    public void getTaskInfo(@PathParam("taskId"final TaskId taskId,
            @HeaderParam(TaskState currentState,
            @HeaderParam(Duration maxWait,
            @Context UriInfo uriInfo,
            @Suspended AsyncResponse asyncResponse)
    {
        checkNotNull(taskId"taskId is null");
        if (currentState == null || maxWait == null) {
            TaskInfo taskInfo = .getTaskInfo(taskId);
            if (shouldSummarize(uriInfo)) {
                taskInfo = taskInfo.summarize();
            }
            asyncResponse.resume(taskInfo);
            return;
        }
        ListenableFuture<TaskInfofutureTaskInfo = MoreFutures.addTimeout(
                .getTaskInfo(taskIdcurrentState),
                () -> .getTaskInfo(taskId),
                maxWait,
                );
        if (shouldSummarize(uriInfo)) {
            futureTaskInfo = Futures.transform(futureTaskInfo, TaskInfo::summarize);
        }
        // For hard timeout, add an additional 5 seconds to max wait for thread scheduling contention and GC
        Duration timeout = new Duration(maxWait.toMillis() + 5000, );
        bindAsyncResponse(asyncResponsefutureTaskInfo)
                .withTimeout(timeout);
    }
    @DELETE
    @Path("{taskId}")
    public Response deleteTask(@PathParam("taskId"TaskId taskId,
            @QueryParam("abort") @DefaultValue("true"boolean abort,
            @Context UriInfo uriInfo)
    {
        checkNotNull(taskId"taskId is null");
        TaskInfo taskInfo;
        if (abort) {
            taskInfo = .abortTask(taskId);
        }
        else {
            taskInfo = .cancelTask(taskId);
        }
        if (shouldSummarize(uriInfo)) {
            taskInfo = taskInfo.summarize();
        }
        return Response.ok(taskInfo).build();
    }
    @GET
    @Path("{taskId}/results/{outputId}/{token}")
    public void getResults(@PathParam("taskId"TaskId taskId,
            @PathParam("outputId"TaskId outputId,
            @PathParam("token"final long token,
            @Suspended AsyncResponse asyncResponse)
            throws InterruptedException
    {
        checkNotNull(taskId"taskId is null");
        checkNotNull(outputId"outputId is null");
        ListenableFuture<BufferResultbufferResultFuture = .getTaskResults(taskIdoutputIdtoken);
        bufferResultFuture = MoreFutures.addTimeout(
                bufferResultFuture,
                () -> BufferResult.emptyResults(tokenfalse),
                ,
                );
        ListenableFuture<ResponseresponseFuture = Futures.transform(bufferResultFuture, (BufferResult result) -> {
            List<Pagepages = result.getPages();
            GenericEntity<?> entity = null;
            Status status;
            if (!pages.isEmpty()) {
                entity = new GenericEntity<>(pagesnew TypeToken<List<Page>>() {}.getType());
                status = .;
            }
            else if (result.isBufferClosed()) {
                status = .;
            }
            else {
                status = .;
            }
            return Response.status(status)
                    .entity(entity)
                    .header(result.getToken())
                    .header(result.getNextToken())
                    .build();
        });
        // For hard timeout, add an additional 5 seconds to max wait for thread scheduling contention and GC
        Duration timeout = new Duration(.toMillis() + 5000, );
        bindAsyncResponse(asyncResponseresponseFuture)
                .withTimeout(timeout,
                        Response.status(.)
                                .header(token)
                                .header(token)
                                .build());
    }
    @DELETE
    @Path("{taskId}/results/{outputId}")
    public Response abortResults(@PathParam("taskId"TaskId taskId, @PathParam("outputId"TaskId outputId, @Context UriInfo uriInfo)
    {
        checkNotNull(taskId"taskId is null");
        checkNotNull(outputId"outputId is null");
        TaskInfo taskInfo = .abortTaskResults(taskIdoutputId);
        if (shouldSummarize(uriInfo)) {
            taskInfo = taskInfo.summarize();
        }
        return Response.ok(taskInfo).build();
    }
    private static boolean shouldSummarize(UriInfo uriInfo)
    {
        return uriInfo.getQueryParameters().containsKey("summarize");
    }
New to GrepCode? Check out our FAQ X