Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.server;
 
 
 
 import java.net.URI;
 import java.util.List;
 import java.util.Set;
 
 import static com.facebook.presto.spi.StandardErrorCode.REMOTE_TASK_ERROR;
 import static com.facebook.presto.spi.StandardErrorCode.TOO_MANY_REQUESTS_FAILED;
 import static com.facebook.presto.util.Failures.WORKER_NODE_ERROR;
 import static com.facebook.presto.util.Failures.toFailure;
 import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
 import static com.google.common.base.MoreObjects.toStringHelper;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static io.airlift.http.client.FullJsonResponseHandler.createFullJsonResponseHandler;
 import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
 import static io.airlift.http.client.JsonBodyGenerator.jsonBodyGenerator;
 import static io.airlift.http.client.Request.Builder.prepareDelete;
 import static io.airlift.http.client.Request.Builder.prepareGet;
 import static io.airlift.http.client.Request.Builder.preparePost;
 import static io.airlift.http.client.StatusResponseHandler.createStatusResponseHandler;
 import static java.lang.String.format;
 
 public class HttpRemoteTask
        implements RemoteTask
    private static final Logger log = Logger.get(HttpRemoteTask.class);
    private final TaskId taskId;
    private final Session session;
    private final String nodeId;
    private final PlanFragment planFragment;
    private final int maxConsecutiveErrorCount;
    private final Duration minErrorDuration;
    private final AtomicLong nextSplitId = new AtomicLong();
    private final StateMachine<TaskInfotaskInfo;
    @GuardedBy("this")
    private Future<?> currentRequest;
    @GuardedBy("this")
    private long currentRequestStartNanos;
    @GuardedBy("this")
    private final SetMultimap<PlanNodeIdScheduledSplitpendingSplits = HashMultimap.create();
    @GuardedBy("this")
    private final Set<PlanNodeIdnoMoreSplits = new HashSet<>();
    @GuardedBy("this")
    private final AtomicReference<OutputBuffersoutputBuffers = new AtomicReference<>();
    @GuardedBy("this")
    private final HttpClient httpClient;
    private final Executor executor;
    private final JsonCodec<TaskInfotaskInfoCodec;
    private final RateLimiter errorRequestRateLimiter = RateLimiter.create(0.1);
    private final AtomicLong lastSuccessfulRequest = new AtomicLong(System.nanoTime());
    private final AtomicLong errorCount = new AtomicLong();
    private final Queue<ThrowableerrorsSinceLastSuccess = new ConcurrentLinkedQueue<>();
    private final AtomicBoolean needsUpdate = new AtomicBoolean(true);
    public HttpRemoteTask(Session session,
            TaskId taskId,
            String nodeId,
            URI location,
            PlanFragment planFragment,
            Multimap<PlanNodeIdSplitinitialSplits,
            OutputBuffers outputBuffers,
            HttpClient httpClient,
            Executor executor,
            int maxConsecutiveErrorCount,
            Duration minErrorDuration,
            JsonCodec<TaskInfotaskInfoCodec,
            JsonCodec<TaskUpdateRequesttaskUpdateRequestCodec)
    {
        checkNotNull(session"session is null");
        checkNotNull(taskId"taskId is null");
        checkNotNull(nodeId"nodeId is null");
        checkNotNull(location"location is null");
        checkNotNull(planFragment"planFragment1 is null");
        checkNotNull(outputBuffers"outputBuffers is null");
        checkNotNull(httpClient"httpClient is null");
        checkNotNull(executor"executor is null");
        checkNotNull(taskInfoCodec"taskInfoCodec is null");
        checkNotNull(taskUpdateRequestCodec"taskUpdateRequestCodec is null");
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s"taskId)) {
            this. = taskId;
            this. = session;
            this. = nodeId;
            this. = planFragment;
            this..set(outputBuffers);
            this. = httpClient;
            this. = executor;
            this. = taskInfoCodec;
            this. = taskUpdateRequestCodec;
            this. = maxConsecutiveErrorCount;
            this. = minErrorDuration;
            for (Entry<PlanNodeIdSplitentry : checkNotNull(initialSplits"initialSplits is null").entries()) {
                ScheduledSplit scheduledSplit = new ScheduledSplit(.getAndIncrement(), entry.getValue());
                .put(entry.getKey(), scheduledSplit);
            }
            List<BufferInfobufferStates = outputBuffers.getBuffers()
                    .keySet().stream()
                    .map(outputId -> new BufferInfo(outputIdfalse, 0, 0))
                    .collect(toImmutableList());
            TaskStats taskStats = new TaskContext(taskIdexecutorsession).getTaskStats();
             = new StateMachine<>("task " + taskIdexecutornew TaskInfo(
                    taskId,
                    .,
                    .,
                    location,
                    DateTime.now(),
                    new SharedBufferInfo(., 0, 0, bufferStates),
                    ImmutableSet.<PlanNodeId>of(),
                    taskStats,
                    ImmutableList.<ExecutionFailureInfo>of()));
        }
    }
    @Override
    public String getNodeId()
    {
        return ;
    }
    @Override
    public TaskInfo getTaskInfo()
    {
        return .get();
    }
    @Override
    public void start()
    {
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s")) {
            // to start we just need to trigger an update
            scheduleUpdate();
        }
    }
    @Override
    public synchronized void addSplits(PlanNodeId sourceIdIterable<Splitsplits)
    {
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s")) {
            checkNotNull(sourceId"sourceId is null");
            checkNotNull(splits"splits is null");
            checkState(!.contains(sourceId), "noMoreSplits has already been set for %s"sourceId);
            // only add pending split if not done
            if (!getTaskInfo().getState().isDone()) {
                for (Split split : splits) {
                    .put(sourceIdnew ScheduledSplit(.getAndIncrement(), split));
                }
                .set(true);
            }
            scheduleUpdate();
        }
    }
    @Override
    public synchronized void noMoreSplits(PlanNodeId sourceId)
    {
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s")) {
            if (.add(sourceId)) {
                .set(true);
                scheduleUpdate();
            }
        }
    }
    @Override
    public synchronized void setOutputBuffers(OutputBuffers newOutputBuffers)
    {
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s")) {
            if (getTaskInfo().getState().isDone()) {
                return;
            }
            if (newOutputBuffers.getVersion() > .get().getVersion()) {
                .set(newOutputBuffers);
                .set(true);
                scheduleUpdate();
            }
        }
    }
    @Override
    public synchronized int getPartitionedSplitCount()
    {
        int splitCount = .get(.getPartitionedSource()).size();
    }
    @Override
    public synchronized int getQueuedPartitionedSplitCount()
    {
        int splitCount = .get(.getPartitionedSource()).size();
        return splitCount + .get().getStats().getQueuedPartitionedDrivers();
    }
    @Override
    public void addStateChangeListener(StateChangeListener<TaskInfostateChangeListener)
    {
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s")) {
            .addStateChangeListener(stateChangeListener);
        }
    }
    @Override
    public Duration waitForTaskToFinish(Duration maxWait)
            throws InterruptedException
    {
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s")) {
            while (true) {
                TaskInfo currentState = .get();
                if (maxWait.toMillis() <= 1 || currentState.getState().isDone()) {
                    return maxWait;
                }
                maxWait = .waitForStateChange(currentStatemaxWait);
            }
        }
    }
    private synchronized void updateTaskInfo(final TaskInfo newValue)
    {
        if (newValue.getState().isDone()) {
            // splits can be huge so clear the list
            .clear();
        }
        // change to new value if old value is not changed and new value has a newer version
        .setIf(newValueoldValue -> {
            if (oldValue.getState().isDone()) {
                // never update if the task has reached a terminal state
                return false;
            }
            if (newValue.getVersion() < oldValue.getVersion()) {
                // don't update to an older version (same version is ok)
                return false;
            }
            return true;
        });
    }
    private synchronized void scheduleUpdate()
    {
        // don't update if the task hasn't been started yet or if it is already finished
        if (!.get() || .get().getState().isDone()) {
            return;
        }
        // if we have an old request outstanding, cancel it
        if ( != null && Duration.nanosSince().compareTo(new Duration(2, .)) >= 0) {
            .set(true);
            .cancel(true);
             = null;
             = 0;
        }
        // if there is a request already running, wait for it to complete
        if (this. != null && !this..isDone()) {
            return;
        }
        // don't update too fast in the face of errors
        if (.get() > 0) {
            .acquire();
        }
        List<TaskSourcesources = getSources();
        TaskUpdateRequest updateRequest = new TaskUpdateRequest(,
                ,
                sources,
                .get());
        Request request = preparePost()
                .setUri(uriBuilderFrom(.get().getSelf()).addParameter("summarize").build())
                .setHeader(...toString())
                .setBodyGenerator(jsonBodyGenerator(updateRequest))
                .build();
         = future;
         = System.nanoTime();
        // The needsUpdate flag needs to be set to false BEFORE adding the Future callback since callback might change the flag value
        // and does so without grabbing the instance lock.
        .set(false);
        Futures.addCallback(futurenew SimpleHttpResponseHandler<>(new UpdateResponseHandler(sources), request.getUri()), );
    }
    private synchronized List<TaskSourcegetSources()
    {
        ImmutableList.Builder<TaskSourcesources = ImmutableList.builder();
        for (PlanNodeId planNodeId : .getSourceIds()) {
            Set<ScheduledSplitsplits = .get(planNodeId);
            boolean noMoreSplits = this..contains(planNodeId);
            if (!splits.isEmpty() || noMoreSplits) {
                sources.add(new TaskSource(planNodeIdsplitsnoMoreSplits));
            }
        }
        return sources.build();
    }
    @Override
    public synchronized void cancel()
    {
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s")) {
            if (getTaskInfo().getState().isDone()) {
                return;
            }
            URI uri = getTaskInfo().getSelf();
            if (uri == null) {
                return;
            }
            // send cancel to task and ignore response
            final long start = System.nanoTime();
            final Request request = prepareDelete()
                    .setUri(uriBuilderFrom(uri).addParameter("abort""false").addParameter("summarize").build())
                    .build();
            {
                @Override
                public void onSuccess(StatusResponse result)
                {
                    // assume any response is good enough
                }
                @Override
                public void onFailure(Throwable t)
                {
                    if (t instanceof RejectedExecutionException) {
                        // client has been shutdown
                        return;
                    }
                    // reschedule
                    if (Duration.nanosSince(start).compareTo(new Duration(2, .)) < 0) {
                        Futures.addCallback(.executeAsync(requestcreateStatusResponseHandler()), this);
                    }
                    else {
                        logError(t"Unable to cancel task at %s"request.getUri());
                    }
                }
            }, );
        }
    }
    @Override
    public synchronized void abort()
    {
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s")) {
            // clear pending splits to free memory
            .clear();
            // cancel pending request
            if ( != null) {
                .cancel(true);
                 = null;
                 = 0;
            }
            // mark task as canceled (if not already done)
            TaskInfo taskInfo = getTaskInfo();
            URI uri = taskInfo.getSelf();
            updateTaskInfo(new TaskInfo(taskInfo.getTaskId(),
                    .,
                    .,
                    uri,
                    taskInfo.getLastHeartbeat(),
                    taskInfo.getOutputBuffers(),
                    taskInfo.getNoMoreSplits(),
                    taskInfo.getStats(),
                    ImmutableList.<ExecutionFailureInfo>of()));
            if (uri == null) {
                return;
            }
            // send abort to task and ignore response
            final long start = System.nanoTime();
            final Request request = prepareDelete()
                    .setUri(uriBuilderFrom(uri).addParameter("summarize").build())
                    .build();
            {
                @Override
                public void onSuccess(StatusResponse result)
                {
                    // assume any response is good enough
                }
                @Override
                public void onFailure(Throwable t)
                {
                    if (t instanceof RejectedExecutionException) {
                        // client has been shutdown
                        return;
                    }
                    // reschedule
                    if (Duration.nanosSince(start).compareTo(new Duration(2, .)) < 0) {
                        Futures.addCallback(.executeAsync(requestcreateStatusResponseHandler()), this);
                    }
                    else {
                        logError(t"Unable to abort task at %s"request.getUri());
                    }
                }
            }, );
        }
    }
    private synchronized void requestSucceeded(TaskInfo newValueList<TaskSourcesources)
    {
        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s")) {
            updateTaskInfo(newValue);
            .set(System.nanoTime());
            .set(0);
            .clear();
            // remove acknowledged splits, which frees memory
            for (TaskSource source : sources) {
                PlanNodeId planNodeId = source.getPlanNodeId();
                for (ScheduledSplit split : source.getSplits()) {
                    .remove(planNodeIdsplit);
                }
            }
            if ( == null) {
                 = new ContinuousTaskInfoFetcher();
                .start();
            }
        }
    }
    private synchronized void requestFailed(Throwable reason)
    {
        // cancellation is not a failure
        if (reason instanceof CancellationException) {
            return;
        }
        // if task is done, ignore the error
        TaskInfo taskInfo = getTaskInfo();
        if (taskInfo.getState().isDone()) {
            return;
        }
        // log failure message
        if (isExpectedError(reason)) {
            // don't print a stack for known errors
            .warn("Error updating task %s: %s: %s"taskInfo.getTaskId(), reason.getMessage(), taskInfo.getSelf());
        }
        else {
            .warn(reason"Error updating task %s: %s"taskInfo.getTaskId(), taskInfo.getSelf());
        }
        // remember the first 10 errors
        if (.size() < 10) {
            .add(reason);
        }
        // fail the task, if we have more than X failures in a row and more than Y seconds have passed since the last request
        long errorCount = this..incrementAndGet();
        Duration timeSinceLastSuccess = Duration.nanosSince(.get());
        if (errorCount >  && timeSinceLastSuccess.compareTo() > 0) {
            // it is weird to mark the task failed locally and then cancel the remote task, but there is no way to tell a remote task that it is failed
            PrestoException exception = new PrestoException(,
                    format("%s (%s - %s failures, time since last success %s)",
                            ,
                            taskInfo.getSelf(),
                            errorCount,
                            timeSinceLastSuccess.convertTo(.)));
            for (Throwable error : ) {
                exception.addSuppressed(error);
            }
            failTask(exception);
            abort();
        }
    }

    
Move the task directly to the failed state
    private void failTask(Throwable cause)
    {
        TaskInfo taskInfo = getTaskInfo();
        if (!taskInfo.getState().isDone()) {
            .debug(cause"Remote task failed: %s"taskInfo.getSelf());
        }
        updateTaskInfo(new TaskInfo(taskInfo.getTaskId(),
                .,
                .,
                taskInfo.getSelf(),
                taskInfo.getLastHeartbeat(),
                taskInfo.getOutputBuffers(),
                taskInfo.getNoMoreSplits(),
                taskInfo.getStats(),
                ImmutableList.of(toFailure(cause))));
    }
    @Override
    public String toString()
    {
        return toStringHelper(this)
                .addValue(getTaskInfo())
                .toString();
    }
    private class UpdateResponseHandler
            implements SimpleHttpResponseCallback<TaskInfo>
    {
        private final List<TaskSourcesources;
        private UpdateResponseHandler(List<TaskSourcesources)
        {
            this. = ImmutableList.copyOf(checkNotNull(sources"sources is null"));
        }
        @Override
        public void success(TaskInfo value)
        {
            try (SetThreadName ignored = new SetThreadName("UpdateResponseHandler-%s")) {
                try {
                    requestSucceeded(value);
                }
                finally {
                    scheduleUpdate();
                }
            }
        }
        @Override
        public void failed(Throwable cause)
        {
            try (SetThreadName ignored = new SetThreadName("UpdateResponseHandler-%s")) {
                try {
                    // on failure assume we need to update again
                    .set(true);
                    requestFailed(cause);
                }
                finally {
                    scheduleUpdate();
                }
            }
        }
        @Override
        public void fatal(Throwable cause)
        {
            try (SetThreadName ignored = new SetThreadName("UpdateResponseHandler-%s")) {
                failTask(cause);
            }
        }
    }

    
Continuous update loop for task info. Wait for a short period for task state to change, and if it does not, return the current state of the task. This will cause stats to be updated at a regular interval, and state changes will be immediately recorded.
    private class ContinuousTaskInfoFetcher
            implements SimpleHttpResponseCallback<TaskInfo>
    {
        @GuardedBy("this")
        private boolean running;
        @GuardedBy("this")
        private ListenableFuture<JsonResponse<TaskInfo>> future;
        public synchronized void start()
        {
            if () {
                // already running
                return;
            }
             = true;
            scheduleNextRequest();
        }
        public synchronized void stop()
        {
             = false;
            if ( != null) {
                .cancel(true);
                 = null;
            }
        }
        private synchronized void scheduleNextRequest()
        {
            try (SetThreadName ignored = new SetThreadName("ContinuousTaskInfoFetcher-%s")) {
                // stopped or done?
                TaskInfo taskInfo = HttpRemoteTask.this..get();
                if (! || taskInfo.getState().isDone()) {
                    return;
                }
                // outstanding request?
                if ( != null && !.isDone()) {
                    // this should never happen
                    .error("Can not reschedule update because an update is already running");
                    return;
                }
                Request request = prepareGet()
                        .setUri(uriBuilderFrom(taskInfo.getSelf()).addParameter("summarize").build())
                        .setHeader(...toString())
                        .setHeader(.taskInfo.getState().toString())
                        .setHeader(."200ms")
                        .build();
                 = .executeAsync(requestcreateFullJsonResponseHandler());
                Futures.addCallback(new SimpleHttpResponseHandler<>(thisrequest.getUri()), );
            }
        }
        @Override
        public void success(TaskInfo value)
        {
            try (SetThreadName ignored = new SetThreadName("ContinuousTaskInfoFetcher-%s")) {
                synchronized (this) {
                     = null;
                }
                try {
                    requestSucceeded(value, ImmutableList.<TaskSource>of());
                }
                finally {
                    scheduleNextRequest();
                }
            }
        }
        @Override
        public void failed(Throwable cause)
        {
            try (SetThreadName ignored = new SetThreadName("ContinuousTaskInfoFetcher-%s")) {
                synchronized (this) {
                     = null;
                }
                try {
                    requestFailed(cause);
                }
                finally {
                    // there is no back off here so we can get a lot of error messages when a server spins
                    // down, but it typically goes away quickly because the queries get canceled
                    scheduleNextRequest();
                }
            }
        }
        @Override
        public void fatal(Throwable cause)
        {
            try (SetThreadName ignored = new SetThreadName("ContinuousTaskInfoFetcher-%s")) {
                synchronized (this) {
                     = null;
                }
                failTask(cause);
            }
        }
    }
    public static class SimpleHttpResponseHandler<T>
            implements FutureCallback<JsonResponse<T>>
    {
        private final SimpleHttpResponseCallback<T> callback;
        private final URI uri;
        public SimpleHttpResponseHandler(SimpleHttpResponseCallback<T> callbackURI uri)
        {
            this. = callback;
            this. = uri;
        }
        @Override
        public void onSuccess(JsonResponse<T> response)
        {
            try {
                if (response.getStatusCode() == ..code() && response.hasValue()) {
                    .success(response.getValue());
                }
                else if (response.getStatusCode() == ..code()) {
                    .failed(new ServiceUnavailableException());
                }
                else {
                    // Something is broken in the server or the client, so fail the task immediately (includes 500 errors)
                    Exception cause = response.getException();
                    if (cause == null) {
                        if (response.getStatusCode() == ..code()) {
                            cause = new PrestoException(format("Expected response from %s is empty"));
                        }
                        else {
                            cause = new PrestoException(format("Expected response code from %s to be %s, but was %s: %s%n%s",
                                    ,
                                    ..code(),
                                    response.getStatusCode(),
                                    response.getStatusMessage(),
                                    response.getResponseBody()));
                        }
                    }
                    .fatal(cause);
                }
            }
            catch (Throwable t) {
                // this should never happen
                .failed(t);
            }
        }
        @Override
        public void onFailure(Throwable t)
        {
            .failed(t);
        }
    }
    public interface SimpleHttpResponseCallback<T>
    {
        void success(T value);
        void failed(Throwable cause);
        void fatal(Throwable cause);
    }
    private static void logError(Throwable tString formatObject... args)
    {
        if (isExpectedError(t)) {
            .error(format + ": %s", ObjectArrays.concat(argst));
        }
        else {
            .error(tformatargs);
        }
    }
    private static boolean isExpectedError(Throwable t)
    {
        while (t != null) {
            if ((t instanceof SocketException) ||
                    (t instanceof SocketTimeoutException) ||
                    (t instanceof EOFException) ||
                    (t instanceof TimeoutException) ||
                    (t instanceof ServiceUnavailableException)) {
                return true;
            }
            t = t.getCause();
        }
        return false;
    }
    private static class ServiceUnavailableException
            extends RuntimeException
    {
        public ServiceUnavailableException(URI uri)
        {
            super("Server returned SERVICE_UNAVAILABLE: " + uri);
        }
    }
New to GrepCode? Check out our FAQ X