Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 
 import java.net.URI;
 import java.util.List;
 import java.util.Set;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Sets.newConcurrentHashSet;
 
 public class ExchangeClient
         implements Closeable
 {
     private static final Page NO_MORE_PAGES = new Page(0);
 
     private final BlockEncodingSerde blockEncodingSerde;
     private final long maxBufferedBytes;
     private final DataSize maxResponseSize;
     private final int concurrentRequestMultiplier;
     private final Duration minErrorDuration;
     private final HttpClient httpClient;
     private final ScheduledExecutorService executor;
 
     @GuardedBy("this")
     private final Set<URIlocations = new HashSet<>();
 
     @GuardedBy("this")
     private boolean noMoreLocations;
 
     private final ConcurrentMap<URIHttpPageBufferClientallClients = new ConcurrentHashMap<>();
 
     @GuardedBy("this")
     private final Deque<HttpPageBufferClientqueuedClients = new LinkedList<>();
 
     private final LinkedBlockingDeque<PagepageBuffer = new LinkedBlockingDeque<>();
 
     @GuardedBy("this")
     private final List<SettableFuture<?>> blockedCallers = new ArrayList<>();
 
     @GuardedBy("this")
     private long bufferBytes;
     @GuardedBy("this")
     private long successfulRequests;
     @GuardedBy("this")
     private long averageBytesPerRequest;
 
     private final AtomicBoolean closed = new AtomicBoolean();
     private final AtomicReference<Throwablefailure = new AtomicReference<>();
 
     public ExchangeClient(
             BlockEncodingSerde blockEncodingSerde,
             DataSize maxBufferedBytes,
             DataSize maxResponseSize,
             int concurrentRequestMultiplier,
             Duration minErrorDuration,
             HttpClient httpClient,
            ScheduledExecutorService executor)
    {
        this. = blockEncodingSerde;
        this. = maxBufferedBytes.toBytes();
        this. = maxResponseSize;
        this. = concurrentRequestMultiplier;
        this. = minErrorDuration;
        this. = httpClient;
        this. = executor;
    }
    public synchronized ExchangeClientStatus getStatus()
    {
        int bufferedPages = .size();
        if (bufferedPages > 0 && .peekLast() == ) {
            bufferedPages--;
        }
        ImmutableList.Builder<PageBufferClientStatusexchangeStatus = ImmutableList.builder();
        for (HttpPageBufferClient client : .values()) {
            exchangeStatus.add(client.getStatus());
        }
        return new ExchangeClientStatus(bufferedPagesexchangeStatus.build());
    }
    public synchronized void addLocation(URI location)
    {
        checkNotNull(location"location is null");
        if (.contains(location)) {
            return;
        }
        checkState(!"No more locations already set");
        .add(location);
    }
    public synchronized void noMoreLocations()
    {
         = true;
    }
    @Nullable
    public Page pollPage()
    {
        checkState(!Thread.holdsLock(this), "Can not get next page while holding a lock on this");
        throwIfFailed();
        if (.get()) {
            return null;
        }
        Page page = .poll();
        page = postProcessPage(page);
        return page;
    }
    @Nullable
    public Page getNextPage(Duration maxWaitTime)
            throws InterruptedException
    {
        checkState(!Thread.holdsLock(this), "Can not get next page while holding a lock on this");
        throwIfFailed();
        if (.get()) {
            return null;
        }
        Page page = .poll();
        // only wait for a page if we have remote clients
        if (page == null && maxWaitTime.toMillis() >= 1 && !.isEmpty()) {
            page = .poll(maxWaitTime.toMillis(), .);
        }
        page = postProcessPage(page);
        return page;
    }
    private Page postProcessPage(Page page)
    {
        checkState(!Thread.holdsLock(this), "Can not get next page while holding a lock on this");
        if (page == ) {
            // mark client closed
            .set(true);
            // add end marker back to queue
            checkState(.add(), "Could not add no more pages marker");
            notifyBlockedCallers();
            // don't return end of stream marker
            page = null;
        }
        if (page != null) {
            synchronized (this) {
                 -= page.getSizeInBytes();
            }
            if (!.get() && .peek() == ) {
                .set(true);
            }
            scheduleRequestIfNecessary();
        }
        return page;
    }
    public boolean isClosed()
    {
        return .get();
    }
    @Override
    public synchronized void close()
    {
        .set(true);
        for (HttpPageBufferClient client : .values()) {
            closeQuietly(client);
        }
        .clear();
         = 0;
        if (.peekLast() != ) {
            checkState(.add(), "Could not add no more pages marker");
        }
        notifyBlockedCallers();
    }
    public synchronized void scheduleRequestIfNecessary()
    {
        if (isClosed() || isFailed()) {
            return;
        }
        // if finished, add the end marker
        if ( && .size() == .size()) {
            if (.peekLast() != ) {
                checkState(.add(), "Could not add no more pages marker");
            }
            if (!.get() && .peek() == ) {
                .set(true);
            }
            notifyBlockedCallers();
            return;
        }
        // add clients for new locations
        for (URI location : ) {
            if (!.containsKey(location)) {
                HttpPageBufferClient client = new HttpPageBufferClient(
                        ,
                        ,
                        ,
                        location,
                        new ExchangeClientCallback(),
                        ,
                        );
                .put(locationclient);
                .add(client);
            }
        }
        if ( > ) {
            return;
        }
        long neededBytes =  - ;
        if (neededBytes <= 0) {
            return;
        }
        int clientCount = (int) ((1.0 * neededBytes / ) * );
        clientCount = Math.max(clientCount, 1);
        int pendingClients = .size() - .size() - .size();
        clientCount -= pendingClients;
        for (int i = 0; i < clientCounti++) {
            HttpPageBufferClient client = .poll();
            if (client == null) {
                // no more clients available
                return;
            }
            client.scheduleRequest();
        }
    }
    public synchronized ListenableFuture<?> isBlocked()
    {
        if (isClosed() || isFailed() || .peek() != null) {
            return Futures.immediateFuture(true);
        }
        SettableFuture<?> future = SettableFuture.create();
        .add(future);
        return future;
    }
    private synchronized void addPage(Page page)
    {
        if (isClosed() || isFailed()) {
            return;
        }
        .add(page);
        // notify all blocked callers
        notifyBlockedCallers();
         += page.getSizeInBytes();
        ++;
        // AVG_n = AVG_(n-1) * (n-1)/n + VALUE_n / n
    }
    private synchronized void notifyBlockedCallers()
    {
        List<SettableFuture<?>> callers = ImmutableList.copyOf();
        .clear();
        for (SettableFuture<?> blockedCaller : callers) {
            blockedCaller.set(null);
        }
    }
    private synchronized void requestComplete(HttpPageBufferClient client)
    {
        if (!.contains(client)) {
            .add(client);
        }
    }
    private synchronized void clientFinished(HttpPageBufferClient client)
    {
        checkNotNull(client"client is null");
        .add(client);
    }
    private synchronized void clientFailed(Throwable cause)
    {
        // TODO: properly handle the failed vs closed state
        // it is important not to treat failures as a successful close
        if (!isClosed()) {
            .compareAndSet(nullcause);
            notifyBlockedCallers();
        }
    }
    private boolean isFailed()
    {
        return .get() != null;
    }
    private void throwIfFailed()
    {
        Throwable t = .get();
        if (t != null) {
            throw Throwables.propagate(t);
        }
    }
    private class ExchangeClientCallback
            implements ClientCallback
    {
        @Override
        public void addPage(HttpPageBufferClient clientPage page)
        {
            checkNotNull(client"client is null");
            checkNotNull(page"page is null");
            ExchangeClient.this.addPage(page);
            scheduleRequestIfNecessary();
        }
        @Override
        public void requestComplete(HttpPageBufferClient client)
        {
            checkNotNull(client"client is null");
            ExchangeClient.this.requestComplete(client);
        }
        @Override
        public void clientFinished(HttpPageBufferClient client)
        {
            ExchangeClient.this.clientFinished(client);
        }
        @Override
        public void clientFailed(HttpPageBufferClient clientThrowable cause)
        {
            checkNotNull(client"client is null");
            checkNotNull(cause"cause is null");
            ExchangeClient.this.clientFailed(cause);
        }
    }
    private static void closeQuietly(HttpPageBufferClient client)
    {
        try {
            client.close();
        }
        catch (RuntimeException e) {
            // ignored
        }
    }
New to GrepCode? Check out our FAQ X