Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 
 import java.util.List;
 
 import static com.facebook.presto.execution.PageSplitterUtil.splitPage;
 import static com.facebook.presto.spi.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Verify.verify;
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static java.util.Objects.requireNonNull;
 
 public class PartitionBuffer
 {
     private final LinkedList<PagemasterBuffer = new LinkedList<>();
     private final BlockingQueue<QueuedPagequeuedPages = new LinkedBlockingQueue<>();
     private final AtomicLong pagesAdded = new AtomicLong(); // Number of pages added to the masterBuffer
     private final AtomicLong masterSequenceId = new AtomicLong();
     private final AtomicLong bufferedBytes = new AtomicLong();  // Bytes in the master buffer
     private final int partition;
     private final SharedBufferMemoryManager memoryManager;
 
     public PartitionBuffer(int partitionSharedBufferMemoryManager memoryManager)
     {
         checkArgument(partition >= 0, "partition must be >= 0");
         this. = partition;
         this. = requireNonNull(memoryManager"memoryManager is null");
     }
 
     public synchronized ListenableFuture<?> enqueuePage(Page page)
     {
         if (!.isFull()) {
             addToMasterBuffer(page);
             return immediateFuture(true);
         }
         else {
             QueuedPage queuedPage = new QueuedPage(page);
             .add(queuedPage);
             return queuedPage.getFuture();
         }
     }
 
     private synchronized void addToMasterBuffer(Page page)
     {
         long bytesAdded = 0;
         List<Pagepages = splitPage(page);
         .addAll(pages);
         .addAndGet(pages.size());
         for (Page p : pages) {
             bytesAdded += p.getSizeInBytes();
         }
         updateMemoryUsage(bytesAdded);
     }
 
     public synchronized List<PagegetPages(DataSize maxSizelong sequenceId)
     {
         long maxBytes = maxSize.toBytes();
         List<Pagepages = new ArrayList<>();
         long bytes = 0;
 
         int listOffset = Ints.checkedCast(sequenceId - .get());
         while (listOffset < .size()) {
             Page page = .get(listOffset++);
             bytes += page.getSizeInBytes();
             // break (and don't add) if this page would exceed the limit
             if (!pages.isEmpty() && bytes > maxBytes) {
                 break;
             }
             pages.add(page);
         }
        return ImmutableList.copyOf(pages);
    }
    public synchronized void advanceSequenceId(long newSequenceId)
    {
        long oldMasterSequenceId = .get();
        checkArgument(newSequenceId >= oldMasterSequenceId"Master sequence id moved backwards: oldMasterSequenceId=%s, newMasterSequenceId=%s",
                oldMasterSequenceId,
                newSequenceId);
        if (newSequenceId == oldMasterSequenceId) {
            return;
        }
        .set(newSequenceId);
        // drop consumed pages
        int pagesToRemove = Ints.checkedCast(newSequenceId - oldMasterSequenceId);
        checkState(.size() >= pagesToRemove,
                "MasterBuffer does not have any pages to remove: pagesToRemove %s oldMasterSequenceId: %s newSequenceId: %s",
                pagesToRemove,
                oldMasterSequenceId,
                newSequenceId);
        long bytesRemoved = 0;
        for (int i = 0; i < pagesToRemovei++) {
            Page page = .removeFirst();
            bytesRemoved += page.getSizeInBytes();
        }
        updateMemoryUsage(-bytesRemoved);
        dequeuePages();
    }
    public synchronized void dequeuePages()
    {
        // refill buffer from queued pages
        while (!.isEmpty() && !.isFull()) {
            QueuedPage queuedPage = .remove();
            addToMasterBuffer(queuedPage.getPage());
            queuedPage.getFuture().set(null);
        }
    }
    public synchronized void destroy()
    {
        // clear the buffer
        .clear();
        clearQueue();
    }
    public synchronized void clearQueue()
    {
        for (QueuedPage queuedPage : ) {
            queuedPage.getFuture().set(null);
        }
        .clear();
    }
    private void updateMemoryUsage(long bytesAdded)
    {
        .addAndGet(bytesAdded);
        .updateMemoryUsage(bytesAdded);
        verify(.get() >= 0);
    }
    public long getPageCount()
    {
        return .get();
    }
    public long getBufferedBytes()
    {
        return .get();
    }
    public long getBufferedPageCount()
    {
        return .size();
    }
    public long getQueuedPageCount()
    {
        return .size();
    }
    public int getPartition()
    {
        return ;
    }
    public PageBufferInfo getInfo()
    {
    }
    @Immutable
    private static final class QueuedPage
    {
        private final Page page;
        private final SettableFuture<?> future = SettableFuture.create();
        QueuedPage(Page page)
        {
            this. = page;
        }
        public Page getPage()
        {
            return ;
        }
        public SettableFuture<?> getFuture()
        {
            return ;
        }
    }
New to GrepCode? Check out our FAQ X