Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 
 import java.net.URI;
 import java.util.List;
 
 import static com.facebook.presto.PrestoMediaTypes.PRESTO_PAGES_TYPE;
 import static com.facebook.presto.block.PagesSerde.readPages;
 import static com.facebook.presto.client.PrestoHeaders.PRESTO_MAX_SIZE;
 import static com.facebook.presto.client.PrestoHeaders.PRESTO_PAGE_NEXT_TOKEN;
 import static com.facebook.presto.client.PrestoHeaders.PRESTO_PAGE_TOKEN;
 import static com.facebook.presto.operator.HttpPageBufferClient.PagesResponse.createClosedResponse;
 import static com.facebook.presto.operator.HttpPageBufferClient.PagesResponse.createEmptyPagesResponse;
 import static com.facebook.presto.operator.HttpPageBufferClient.PagesResponse.createPagesResponse;
 import static com.facebook.presto.util.Failures.WORKER_NODE_ERROR;
 import static com.google.common.base.MoreObjects.toStringHelper;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
 import static io.airlift.http.client.Request.Builder.prepareDelete;
 import static io.airlift.http.client.Request.Builder.prepareGet;
 import static io.airlift.http.client.ResponseHandlerUtils.propagate;
 import static io.airlift.http.client.StatusResponseHandler.createStatusResponseHandler;
 import static java.lang.Math.min;
 import static java.lang.String.format;
 
 public final class HttpPageBufferClient
         implements Closeable
 {
     private static final int INITIAL_DELAY_MILLIS = 1;
     private static final int MAX_DELAY_MILLIS = 100;
 
     private static final Logger log = Logger.get(HttpPageBufferClient.class);

    
For each request, the addPage method will be called zero or more times, followed by either requestComplete or bufferFinished. If the client is closed, requestComplete or bufferFinished may never be called.

NOTE: Implementations of this interface are not allowed to perform blocking operations.

 
     public interface ClientCallback
     {
         void addPage(HttpPageBufferClient clientPage page);
 
         void requestComplete(HttpPageBufferClient client);
 
         void clientFinished(HttpPageBufferClient client);
 
         void clientFailed(HttpPageBufferClient clientThrowable cause);
     }
 
     private final HttpClient httpClient;
    private final DataSize maxResponseSize;
    private final Duration minErrorDuration;
    private final URI location;
    private final ClientCallback clientCallback;
    private final BlockEncodingSerde blockEncodingSerde;
    private final ScheduledExecutorService executor;
    @GuardedBy("this")
    private final Stopwatch errorStopwatch;
    @GuardedBy("this")
    private boolean closed;
    @GuardedBy("this")
    @GuardedBy("this")
    private DateTime lastUpdate = DateTime.now();
    @GuardedBy("this")
    private long token;
    @GuardedBy("this")
    private boolean scheduled;
    @GuardedBy("this")
    private long errorDelayMillis;
    private final AtomicInteger pagesReceived = new AtomicInteger();
    private final AtomicInteger requestsScheduled = new AtomicInteger();
    private final AtomicInteger requestsCompleted = new AtomicInteger();
    private final AtomicInteger requestsFailed = new AtomicInteger();
    public HttpPageBufferClient(
            HttpClient httpClient,
            DataSize maxResponseSize,
            Duration minErrorDuration,
            URI location,
            ClientCallback clientCallback,
            BlockEncodingSerde blockEncodingSerde,
            ScheduledExecutorService executor)
    {
        this(httpClientmaxResponseSizeminErrorDurationlocationclientCallbackblockEncodingSerdeexecutor, Stopwatch.createUnstarted());
    }
    public HttpPageBufferClient(
            HttpClient httpClient,
            DataSize maxResponseSize,
            Duration minErrorDuration,
            URI location,
            ClientCallback clientCallback,
            BlockEncodingSerde blockEncodingSerde,
            ScheduledExecutorService executor,
            Stopwatch errorStopwatch)
    {
        this. = checkNotNull(httpClient"httpClient is null");
        this. = checkNotNull(maxResponseSize"maxResponseSize is null");
        this. = checkNotNull(minErrorDuration"minErrorDuration is null");
        this. = checkNotNull(location"location is null");
        this. = checkNotNull(clientCallback"clientCallback is null");
        this. = checkNotNull(blockEncodingSerde"blockEncodingManager is null");
        this. = checkNotNull(executor"executor is null");
        this. = checkNotNull(errorStopwatch"errorStopwatch is null").reset();
    }
    public synchronized PageBufferClientStatus getStatus()
    {
        String state;
        if () {
            state = "closed";
        }
        else if ( != null) {
            state = "running";
        }
        else if () {
            state = "scheduled";
        }
        else {
            state = "queued";
        }
        String httpRequestState = "not scheduled";
        if ( != null) {
            httpRequestState = .getState();
        }
        return new PageBufferClientStatus(
                ,
                state,
                ,
                .get(),
                .get(),
                .get(),
                .get(),
                httpRequestState);
    }
    public synchronized boolean isRunning()
    {
        return  != null;
    }
    @Override
    public void close()
    {
        boolean shouldSendDelete;
        Future<?> future;
        synchronized (this) {
            shouldSendDelete = !;
             = true;
            future = this.;
            this. = null;
             = DateTime.now();
        }
        if (future != null) {
            future.cancel(true);
        }
        // abort the output buffer on the remote node; response of delete is ignored
        if (shouldSendDelete) {
        }
    }
    public synchronized void scheduleRequest()
    {
        if ( || ( != null) || ) {
            return;
        }
         = true;
        // start before scheduling to include error delay
        .start();
        .schedule(new Runnable()
        {
            @Override
            public void run()
            {
                try {
                    initiateRequest();
                }
                catch (Throwable t) {
                    // should not happen, but be safe and fail the operator
                    .clientFailed(HttpPageBufferClient.thist);
                }
            }
        }, .);
         = DateTime.now();
    }
    private synchronized void initiateRequest()
    {
         = false;
        if ( || ( != null)) {
            return;
        }
        final URI uri = HttpUriBuilder.uriBuilderFrom().appendPath(String.valueOf()).build();
         = .executeAsync(
                prepareGet()
                        .setHeader(.toString())
                        .setUri(uri).build(),
                new PageResponseHandler());
        Futures.addCallback(new FutureCallback<PagesResponse>()
        {
            @Override
            public void onSuccess(PagesResponse result)
            {
                if (Thread.holdsLock(HttpPageBufferClient.this)) {
                    .error("Can not handle callback while holding a lock on this");
                }
                resetErrors();
                .incrementAndGet();
                List<Pagepages;
                synchronized (HttpPageBufferClient.this) {
                    if (result.getToken() == ) {
                        pages = result.getPages();
                         = result.getNextToken();
                    }
                    else {
                        pages = ImmutableList.of();
                    }
                }
                // add pages
                for (Page page : pages) {
                    .incrementAndGet();
                    .addPage(HttpPageBufferClient.thispage);
                }
                // complete request or close client
                if (result.isClientClosed()) {
                    synchronized (HttpPageBufferClient.this) {
                         = true;
                         = null;
                         = DateTime.now();
                    }
                    .clientFinished(HttpPageBufferClient.this);
                }
                else {
                    synchronized (HttpPageBufferClient.this) {
                         = null;
                         = DateTime.now();
                    }
                    .requestComplete(HttpPageBufferClient.this);
                }
            }
            @Override
            public void onFailure(Throwable t)
            {
                .debug("Request to %s failed %s"urit);
                if (Thread.holdsLock(HttpPageBufferClient.this)) {
                    .error("Can not handle callback while holding a lock on this");
                }
                t = rewriteException(t);
                if (t instanceof PrestoException) {
                    .clientFailed(HttpPageBufferClient.thist);
                }
                Duration errorDuration = elapsedErrorDuration();
                if (errorDuration.compareTo() > 0) {
                    String message = format("%s (%s - requests failed for %s)"urierrorDuration);
                    .clientFailed(HttpPageBufferClient.thisnew PageTransportTimeoutException(messaget));
                }
                increaseErrorDelay();
                .incrementAndGet();
                .incrementAndGet();
                synchronized (HttpPageBufferClient.this) {
                     = null;
                     = DateTime.now();
                }
                .requestComplete(HttpPageBufferClient.this);
            }
        }, );
         = DateTime.now();
    }
    @Override
    public boolean equals(Object o)
    {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        HttpPageBufferClient that = (HttpPageBufferCliento;
        if (!.equals(that.location)) {
            return false;
        }
        return true;
    }
    @Override
    public int hashCode()
    {
        return .hashCode();
    }
    @Override
    public String toString()
    {
        String state;
        synchronized (this) {
            if () {
                state = "CLOSED";
            }
            else if ( != null) {
                state = "RUNNING";
            }
            else {
                state = "QUEUED";
            }
        }
        return toStringHelper(this)
                .add("location")
                .addValue(state)
                .toString();
    }
    private static Throwable rewriteException(Throwable t)
    {
        if (t instanceof ResponseTooLargeException) {
            return new PageTooLargeException();
        }
        return t;
    }
    private synchronized Duration elapsedErrorDuration()
    {
        if (.isRunning()) {
            .stop();
        }
        long nanos = .elapsed(.);
        return new Duration(nanos.).convertTo(.);
    }
    private synchronized void increaseErrorDelay()
    {
        if ( == 0) {
             = ;
        }
        else {
             = min( * 2, );
        }
    }
    private synchronized void resetErrors()
    {
        .reset();
    }
    public static class PageResponseHandler
            implements ResponseHandler<PagesResponseRuntimeException>
    {
        private final BlockEncodingSerde blockEncodingSerde;
        public PageResponseHandler(BlockEncodingSerde blockEncodingSerde)
        {
            this. = blockEncodingSerde;
        }
        @Override
        public PagesResponse handleException(Request requestException exception)
        {
            throw propagate(requestexception);
        }
        @Override
        public PagesResponse handle(Request requestResponse response)
        {
            // job is finished when we get a GONE response
            if (response.getStatusCode() == ..code()) {
                return createClosedResponse(getToken(response));
            }
            // no content means no content was created within the wait period, but query is still ok
            if (response.getStatusCode() == ..code()) {
                return createEmptyPagesResponse(getToken(response), getNextToken(response));
            }
            // otherwise we must have gotten an OK response, everything else is considered fatal
            if (response.getStatusCode() != ..code()) {
                throw new PageTransportErrorException(format("Expected response code to be 200, but was %s %s: %s"response.getStatusCode(), response.getStatusMessage(), request.getUri()));
            }
            String contentType = response.getHeader();
            if ((contentType == null) || !mediaTypeMatches(contentType)) {
                // this can happen when an error page is returned, but is unlikely given the above 200
                throw new PageTransportErrorException(format("Expected %s response from server but got %s: %s"contentTyperequest.getUri()));
            }
            long token = getToken(response);
            long nextToken = getNextToken(response);
            try (SliceInput input = new InputStreamSliceInput(response.getInputStream())) {
                List<Pagepages = ImmutableList.copyOf(readPages(input));
                return createPagesResponse(tokennextTokenpages);
            }
            catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }
        private static long getToken(Response response)
        {
            String tokenHeader = response.getHeader();
            if (tokenHeader == null) {
                throw new PageTransportErrorException(format("Expected %s header"));
            }
            return Long.parseLong(tokenHeader);
        }
        private static long getNextToken(Response response)
        {
            String nextTokenHeader = response.getHeader();
            if (nextTokenHeader == null) {
                throw new PageTransportErrorException(format("Expected %s header"));
            }
            return Long.parseLong(nextTokenHeader);
        }
        private static boolean mediaTypeMatches(String valueMediaType range)
        {
            try {
                return MediaType.parse(value).is(range);
            }
            catch (IllegalArgumentException | IllegalStateException e) {
                return false;
            }
        }
    }
    public static class PagesResponse
    {
        public static PagesResponse createPagesResponse(long tokenlong nextTokenIterable<Pagepages)
        {
            return new PagesResponse(tokennextTokenpagesfalse);
        }
        public static PagesResponse createEmptyPagesResponse(long tokenlong nextToken)
        {
            return new PagesResponse(tokennextToken, ImmutableList.<Page>of(), false);
        }
        public static PagesResponse createClosedResponse(long token)
        {
            return new PagesResponse(token, -1, ImmutableList.<Page>of(), true);
        }
        private final long token;
        private final long nextToken;
        private final List<Pagepages;
        private final boolean clientClosed;
        private PagesResponse(long tokenlong nextTokenIterable<Pagepagesboolean clientClosed)
        {
            this. = token;
            this. = nextToken;
            this. = ImmutableList.copyOf(pages);
            this. = clientClosed;
        }
        public long getToken()
        {
            return ;
        }
        public long getNextToken()
        {
            return ;
        }
        public List<PagegetPages()
        {
            return ;
        }
        public boolean isClientClosed()
        {
            return ;
        }
        @Override
        public String toString()
        {
            return toStringHelper(this)
                    .add("token")
                    .add("nextToken")
                    .add("pagesSize".size())
                    .add("clientClosed")
                    .toString();
        }
    }
New to GrepCode? Check out our FAQ X