Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.server;
 
 
 
 import java.net.URI;
 import java.util.List;
 import java.util.Set;
 
 import static com.facebook.presto.spi.StandardErrorCode.TOO_MANY_REQUESTS_FAILED;
 import static com.facebook.presto.util.Failures.toFailure;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Iterables.transform;
 import static io.airlift.http.client.FullJsonResponseHandler.createFullJsonResponseHandler;
 import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
 import static io.airlift.http.client.JsonBodyGenerator.jsonBodyGenerator;
 import static io.airlift.http.client.Request.Builder.prepareDelete;
 import static io.airlift.http.client.Request.Builder.prepareGet;
 import static io.airlift.http.client.Request.Builder.preparePost;
 import static io.airlift.http.client.StatusResponseHandler.createStatusResponseHandler;
 import static java.lang.String.format;
 
 public class HttpRemoteTask
         implements RemoteTask
    private static final Logger log = Logger.get(HttpRemoteTask.class);
    private final TaskId taskId;
    private final Session session;
    private final String nodeId;
    private final PlanFragment planFragment;
    private final int maxConsecutiveErrorCount;
    private final Duration minErrorDuration;
    private final AtomicLong nextSplitId = new AtomicLong();
    private final StateMachine<TaskInfotaskInfo;
    @GuardedBy("this")
    private Future<?> currentRequest;
    @GuardedBy("this")
    private long currentRequestStartNanos;
    @GuardedBy("this")
    private final SetMultimap<PlanNodeIdScheduledSplitpendingSplits = HashMultimap.create();
    @GuardedBy("this")
    private final Set<PlanNodeIdnoMoreSplits = new HashSet<>();
    @GuardedBy("this")
    private final AtomicReference<OutputBuffersoutputBuffers = new AtomicReference<>();
    @GuardedBy("this")
    private final AsyncHttpClient httpClient;
    private final Executor executor;
    private final JsonCodec<TaskInfotaskInfoCodec;
    private final List<TupleInfotupleInfos;
    private final RateLimiter errorRequestRateLimiter = RateLimiter.create(0.1);
    private final AtomicLong lastSuccessfulRequest = new AtomicLong(System.nanoTime());
    private final AtomicLong errorCount = new AtomicLong();
    private final Queue<ThrowableerrorsSinceLastSuccess = new ConcurrentLinkedQueue<>();
    private final AtomicBoolean needsUpdate = new AtomicBoolean(true);
    public HttpRemoteTask(Session session,
            TaskId taskId,
            String nodeId,
            URI location,
            PlanFragment planFragment,
            Multimap<PlanNodeIdSplitinitialSplits,
            OutputBuffers outputBuffers,
            AsyncHttpClient httpClient,
            Executor executor,
            int maxConsecutiveErrorCount,
            Duration minErrorDuration,
            JsonCodec<TaskInfotaskInfoCodec,
            JsonCodec<TaskUpdateRequesttaskUpdateRequestCodec)
    {
        checkNotNull(session"session is null");
        checkNotNull(taskId"taskId is null");
        checkNotNull(nodeId"nodeId is null");
        checkNotNull(location"location is null");
        checkNotNull(planFragment"planFragment1 is null");
        checkNotNull(outputBuffers"outputBuffers is null");
        checkNotNull(httpClient"httpClient is null");
        checkNotNull(executor"executor is null");
        checkNotNull(taskInfoCodec"taskInfoCodec is null");
        checkNotNull(taskUpdateRequestCodec"taskUpdateRequestCodec is null");
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s"taskId)) {
            this. = taskId;
            this. = session;
            this. = nodeId;
            this. = planFragment;
            this..set(outputBuffers);
            this. = httpClient;
            this. = executor;
            this. = taskInfoCodec;
            this. = taskUpdateRequestCodec;
            this. = planFragment.getTupleInfos();
            this. = maxConsecutiveErrorCount;
            this. = minErrorDuration;
            for (Entry<PlanNodeIdSplitentry : checkNotNull(initialSplits"initialSplits is null").entries()) {
                ScheduledSplit scheduledSplit = new ScheduledSplit(.getAndIncrement(), entry.getValue());
                .put(entry.getKey(), scheduledSplit);
            }
            List<BufferInfobufferStates = ImmutableList.copyOf(transform(outputBuffers.getBuffers().keySet(), new Function<StringBufferInfo>()
            {
                @Override
                public BufferInfo apply(String outputId)
                {
                    return new BufferInfo(outputIdfalse, 0, 0);
                }
            }));
            TaskStats taskStats = new TaskContext(taskIdexecutorsession).getTaskStats();
             = new StateMachine<>("task " + taskIdexecutornew TaskInfo(
                    taskId,
                    .,
                    .,
                    location,
                    DateTime.now(),
                    new SharedBufferInfo(., 0, 0, bufferStates),
                    ImmutableSet.<PlanNodeId>of(),
                    taskStats,
                    ImmutableList.<ExecutionFailureInfo>of()));
        }
    }
    @Override
    public String getNodeId()
    {
        return ;
    }
    @Override
    public TaskInfo getTaskInfo()
    {
        return .get();
    }
    @Override
    public void start()
    {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s")) {
            // to start we just need to trigger an update
            scheduleUpdate();
        }
    }
    @Override
    public synchronized void addSplits(PlanNodeId sourceIdIterable<? extends Splitsplits)
    {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s")) {
            checkNotNull(sourceId"sourceId is null");
            checkNotNull(splits"splits is null");
            checkState(!.contains(sourceId), "noMoreSplits has already been set for %s"sourceId);
            // only add pending split if not done
            if (!getTaskInfo().getState().isDone()) {
                for (Split split : splits) {
                    .put(sourceIdnew ScheduledSplit(.getAndIncrement(), split));
                }
                .set(true);
            }
            scheduleUpdate();
        }
    }
    @Override
    public synchronized void noMoreSplits(PlanNodeId sourceId)
    {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s")) {
            if (.add(sourceId)) {
                .set(true);
                scheduleUpdate();
            }
        }
    }
    @Override
    public synchronized void setOutputBuffers(OutputBuffers newOutputBuffers)
    {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s")) {
            if (getTaskInfo().getState().isDone()) {
                return;
            }
            if (newOutputBuffers.getVersion() > .get().getVersion()) {
                .set(newOutputBuffers);
                .set(true);
                scheduleUpdate();
            }
        }
    }
    @Override
    public synchronized int getQueuedSplits()
    {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s")) {
            int pendingSplitCount = 0;
            pendingSplitCount = .get(.getPartitionedSource()).size();
            return pendingSplitCount + .get().getStats().getQueuedDrivers();
        }
    }
    @Override
    public void addStateChangeListener(StateChangeListener<TaskInfostateChangeListener)
    {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s")) {
            .addStateChangeListener(stateChangeListener);
        }
    }
    @Override
    public Duration waitForTaskToFinish(Duration maxWait)
            throws InterruptedException
    {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s")) {
            while (true) {
                TaskInfo currentState = .get();
                if (maxWait.toMillis() <= 1 || currentState.getState().isDone()) {
                    return maxWait;
                }
                maxWait = .waitForStateChange(currentStatemaxWait);
            }
        }
    }
    private synchronized void updateTaskInfo(final TaskInfo newValue)
    {
        if (newValue.getState().isDone()) {
            // splits can be huge so clear the list
            .clear();
        }
        // change to new value if old value is not changed and new value has a newer version
        .setIf(newValuenew Predicate<TaskInfo>()
        {
            public boolean apply(TaskInfo oldValue)
            {
                if (oldValue.getState().isDone()) {
                    // never update if the task has reached a terminal state
                    return false;
                }
                if (newValue.getVersion() < oldValue.getVersion()) {
                    // don't update to an older version (same version is ok)
                    return false;
                }
                return true;
            }
        });
    }
    private synchronized void scheduleUpdate()
    {
        // don't update if the task hasn't been started yet or if it is already finished
        if (!.get() || .get().getState().isDone()) {
            return;
        }
        // if we have an old request outstanding, cancel it
        if ( != null && Duration.nanosSince().compareTo(new Duration(2, .)) >= 0) {
            .set(true);
            .cancel(true);
             = null;
             = 0;
        }
        // if there is a request already running, wait for it to complete
        if (this. != null && !this..isDone()) {
            return;
        }
        // don't update too fast in the face of errors
        if (.get() > 0) {
            .acquire();
        }
        List<TaskSourcesources = getSources();
        TaskUpdateRequest updateRequest = new TaskUpdateRequest(,
                ,
                sources,
                .get());
        Request request = preparePost()
                .setUri(uriBuilderFrom(.get().getSelf()).build())
                .setHeader(...toString())
                .setBodyGenerator(jsonBodyGenerator(updateRequest))
                .build();
         = future;
         = System.nanoTime();
        Futures.addCallback(futurenew SimpleHttpResponseHandler<>(new UpdateResponseHandler(sources), request.getUri()), );
        .set(false);
    }
    private synchronized List<TaskSourcegetSources()
    {
        ImmutableList.Builder<TaskSourcesources = ImmutableList.builder();
        for (PlanNodeId planNodeId : .getSourceIds()) {
            Set<ScheduledSplitsplits = .get(planNodeId);
            boolean noMoreSplits = this..contains(planNodeId);
            if (!splits.isEmpty() || noMoreSplits) {
                sources.add(new TaskSource(planNodeIdsplitsnoMoreSplits));
            }
        }
        return sources.build();
    }
    @Override
    public synchronized void cancel()
    {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s")) {
            // clear pending splits to free memory
            .clear();
            // cancel pending request
            if ( != null) {
                .cancel(true);
                 = null;
                 = 0;
            }
            // mark task as canceled (if not already done)
            TaskInfo taskInfo = getTaskInfo();
            updateTaskInfo(new TaskInfo(taskInfo.getTaskId(),
                    .,
                    .,
                    taskInfo.getSelf(),
                    taskInfo.getLastHeartbeat(),
                    taskInfo.getOutputBuffers(),
                    taskInfo.getNoMoreSplits(),
                    taskInfo.getStats(),
                    ImmutableList.<ExecutionFailureInfo>of()));
            // fire delete to task and ignore response
            if (taskInfo.getSelf() != null) {
                final long start = System.nanoTime();
                final Request request = prepareDelete().setUri(taskInfo.getSelf()).build();
                Futures.addCallback(.executeAsync(requestcreateStatusResponseHandler()), new FutureCallback<StatusResponse>()
                {
                    @Override
                    public void onSuccess(StatusResponse result)
                    {
                        // assume any response is good enough
                    }
                    @Override
                    public void onFailure(Throwable t)
                    {
                        if (t instanceof RejectedExecutionException) {
                            // client has been shutdown
                            return;
                        }
                        // reschedule
                        if (Duration.nanosSince(start).compareTo(new Duration(2, .)) < 0) {
                            Futures.addCallback(.executeAsync(requestcreateStatusResponseHandler()), this);
                        }
                        else {
                            .error(t"Unable to cancel task at %s"request.getUri());
                        }
                    }
                }, );
            }
        }
    }
    private synchronized void requestSucceeded(TaskInfo newValueList<TaskSourcesources)
    {
        try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s")) {
            updateTaskInfo(newValue);
            .set(System.nanoTime());
            .set(0);
            .clear();
            // remove acknowledged splits, which frees memory
            for (TaskSource source : sources) {
                PlanNodeId planNodeId = source.getPlanNodeId();
                for (ScheduledSplit split : source.getSplits()) {
                    .remove(planNodeIdsplit);
                }
            }
            if ( == null) {
                 = new ContinuousTaskInfoFetcher();
                .start();
            }
        }
    }
    private synchronized void requestFailed(Throwable reason)
    {
        // cancellation is not a failure
        if (reason instanceof CancellationException) {
            return;
        }
        // if task is done, ignore the error
        TaskInfo taskInfo = getTaskInfo();
        if (taskInfo.getState().isDone()) {
            return;
        }
        // log failure message
        if (isSocketError(reason)) {
            // don't print a stack for a socket error
            .warn("Error updating task %s: %s: %s"taskInfo.getTaskId(), reason.getMessage(), taskInfo.getSelf());
        }
        else {
            .warn(reason"Error updating task %s: %s"taskInfo.getTaskId(), taskInfo.getSelf());
        }
        // remember the first 10 errors
        if (.size() < 10) {
            .add(reason);
        }
        // fail the task, if we have more than X failures in a row and more than Y seconds have passed since the last request
        long errorCount = this..incrementAndGet();
        Duration timeSinceLastSuccess = Duration.nanosSince(.get());
        if (errorCount >  && timeSinceLastSuccess.compareTo() > 0) {
            // it is weird to mark the task failed locally and then cancel the remote task, but there is no way to tell a remote task that it is failed
            PrestoException exception = new PrestoException(.toErrorCode(), format("Too many requests to %s failed: %s failures: Time since last success %s",
                    taskInfo.getSelf(),
                    errorCount,
                    timeSinceLastSuccess));
            for (Throwable error : ) {
                exception.addSuppressed(error);
            }
            failTask(exception);
            cancel();
        }
    }

    
Move the task directly to the failed state
    private void failTask(Throwable cause)
    {
        TaskInfo taskInfo = getTaskInfo();
        if (!taskInfo.getState().isDone()) {
            .debug(cause"Remote task failed: %s"taskInfo.getSelf());
        }
        updateTaskInfo(new TaskInfo(taskInfo.getTaskId(),
                .,
                .,
                taskInfo.getSelf(),
                taskInfo.getLastHeartbeat(),
                taskInfo.getOutputBuffers(),
                taskInfo.getNoMoreSplits(),
                taskInfo.getStats(),
                ImmutableList.of(toFailure(cause))));
    }
    @Override
    public String toString()
    {
        return Objects.toStringHelper(this)
                .addValue(getTaskInfo())
                .toString();
    }
    private class UpdateResponseHandler
            implements SimpleHttpResponseCallback<TaskInfo>
    {
        private final List<TaskSourcesources;
        private UpdateResponseHandler(List<TaskSourcesources)
        {
            this. = ImmutableList.copyOf(checkNotNull(sources"sources is null"));
        }
        @Override
        public void success(TaskInfo value)
        {
            try (SetThreadName setThreadName = new SetThreadName("UpdateResponseHandler-%s")) {
                try {
                    requestSucceeded(value);
                }
                finally {
                    scheduleUpdate();
                }
            }
        }
        @Override
        public void failed(Throwable cause)
        {
            try (SetThreadName setThreadName = new SetThreadName("UpdateResponseHandler-%s")) {
                try {
                    // on failure assume we need to update again
                    .set(true);
                    requestFailed(cause);
                }
                finally {
                    scheduleUpdate();
                }
            }
        }
        @Override
        public void fatal(Throwable cause)
        {
            try (SetThreadName setThreadName = new SetThreadName("UpdateResponseHandler-%s")) {
                failTask(cause);
            }
        }
    }

    
Continuous update loop for task info. Wait for a short period for task state to change, and if it does not, return the current state of the task. This will cause stats to be updated at a regular interval, and state changes will be immediately recorded.
    private class ContinuousTaskInfoFetcher
            implements SimpleHttpResponseCallback<TaskInfo>
    {
        @GuardedBy("this")
        private boolean running;
        @GuardedBy("this")
        private ListenableFuture<JsonResponse<TaskInfo>> future;
        public synchronized void start()
        {
            if () {
                // already running
                return;
            }
             = true;
            scheduleNextRequest();
        }
        public synchronized void stop()
        {
             = false;
            if ( != null) {
                .cancel(true);
                 = null;
            }
        }
        private synchronized void scheduleNextRequest()
        {
            try (SetThreadName setThreadName = new SetThreadName("ContinuousTaskInfoFetcher-%s")) {
                // stopped or done?
                TaskInfo taskInfo = HttpRemoteTask.this..get();
                if (! || taskInfo.getState().isDone()) {
                    return;
                }
                // outstanding request?
                if ( != null && !.isDone()) {
                    // this should never happen
                    .error("Can not reschedule update because an update is already running");
                    return;
                }
                Request request = prepareGet()
                        .setUri(taskInfo.getSelf())
                        .setHeader(...toString())
                        .setHeader(.taskInfo.getState().toString())
                        .setHeader(."200ms")
                        .build();
                 = .executeAsync(requestcreateFullJsonResponseHandler());
                Futures.addCallback(new SimpleHttpResponseHandler<>(thisrequest.getUri()), );
            }
        }
        @Override
        public void success(TaskInfo value)
        {
            try (SetThreadName setThreadName = new SetThreadName("ContinuousTaskInfoFetcher-%s")) {
                synchronized (this) {
                     = null;
                }
                try {
                    requestSucceeded(value, ImmutableList.<TaskSource>of());
                }
                finally {
                    scheduleNextRequest();
                }
            }
        }
        @Override
        public void failed(Throwable cause)
        {
            try (SetThreadName setThreadName = new SetThreadName("ContinuousTaskInfoFetcher-%s")) {
                synchronized (this) {
                     = null;
                }
                try {
                    requestFailed(cause);
                }
                finally {
                    // there is no back off here so we can get a lot of error messages when a server spins
                    // down, but it typically goes away quickly because the queries get canceled
                    scheduleNextRequest();
                }
            }
        }
        @Override
        public void fatal(Throwable cause)
        {
            try (SetThreadName setThreadName = new SetThreadName("ContinuousTaskInfoFetcher-%s")) {
                synchronized (this) {
                     = null;
                }
                failTask(cause);
            }
        }
    }
    public static class SimpleHttpResponseHandler<T>
            implements FutureCallback<JsonResponse<T>>
    {
        private final SimpleHttpResponseCallback<T> callback;
        private final URI uri;
        public SimpleHttpResponseHandler(SimpleHttpResponseCallback<T> callbackURI uri)
        {
            this. = callback;
            this. = uri;
        }
        @Override
        public void onSuccess(JsonResponse<T> response)
        {
            try {
                if (response.getStatusCode() == ..code() && response.hasValue()) {
                    .success(response.getValue());
                }
                else if (response.getStatusCode() == ..code()) {
                    .failed(new RuntimeException("Server at %s returned SERVICE_UNAVAILABLE"));
                }
                else {
                    // Something is broken in the server or the client, so fail the task immediately (includes 500 errors)
                    Exception cause = response.getException();
                    if (cause == null) {
                        if (response.getStatusCode() == ..code()) {
                            cause = new RuntimeException(format("Expected response from %s is empty"));
                        }
                        else {
                            cause = new RuntimeException(format("Expected response code from %s to be %s, but was %s: %s",
                                    ,
                                    ..code(),
                                    response.getStatusCode(),
                                    response.getStatusMessage()));
                        }
                    }
                    .fatal(cause);
                }
            }
            catch (Throwable t) {
                // this should never happen
                .failed(t);
            }
        }
        @Override
        public void onFailure(Throwable t)
        {
            .failed(t);
        }
    }
    public interface SimpleHttpResponseCallback<T>
    {
        void success(T value);
        void failed(Throwable cause);
        void fatal(Throwable cause);
    }
    private static boolean isSocketError(Throwable t)
    {
        while (t != null) {
            // in this case we consider an EOFException a socket error
            if ((t instanceof SocketException) || (t instanceof SocketTimeoutException) || (t instanceof EOFException)) {
                return true;
            }
            t = t.getCause();
        }
        return false;
    }
New to GrepCode? Check out our FAQ X