Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 
 import java.util.List;
 import java.util.Set;
 
 import static com.facebook.presto.OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS;
 import static com.facebook.presto.execution.BufferResult.emptyResults;
 import static com.facebook.presto.execution.PageSplitterUtil.splitPage;
 import static com.facebook.presto.execution.SharedBuffer.BufferState.FINISHED;
 import static com.facebook.presto.execution.SharedBuffer.BufferState.FLUSHING;
 import static com.facebook.presto.execution.SharedBuffer.BufferState.NO_MORE_BUFFERS;
 import static com.facebook.presto.execution.SharedBuffer.BufferState.NO_MORE_PAGES;
 import static com.facebook.presto.execution.SharedBuffer.BufferState.OPEN;
 import static com.facebook.presto.spi.block.BlockBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES;
 import static com.google.common.base.MoreObjects.toStringHelper;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 
 public class SharedBuffer
 {
     public enum BufferState
     {
        
Additional buffers can be added. Any next state is allowed.
 
         OPEN(truetrue),
        
No more buffers can be added. Next state is FLUSHING.
 
         NO_MORE_BUFFERS(truefalse),
        
No more pages can be added. Next state is FLUSHING.
 
         NO_MORE_PAGES(falsetrue),
        
No more pages or buffers can be added, and buffer is waiting for the final pages to be consumed. Next state is FINISHED.
 
         FLUSHING(falsefalse),
        
No more buffers can be added and all pages have been consumed. This is the terminal state.
 
         FINISHED(falsefalse);
 
         private final boolean newPagesAllowed;
         private final boolean newBuffersAllowed;
 
         BufferState(boolean newPagesAllowedboolean newBuffersAllowed)
         {
             this. = newPagesAllowed;
             this. = newBuffersAllowed;
        }
        public boolean canAddPages()
        {
            return ;
        }
        public boolean canAddBuffers()
        {
            return ;
        }
    }
    private final long maxBufferedBytes;
    @GuardedBy("this")
    @GuardedBy("this")
    private long bufferedBytes;
    @GuardedBy("this")
    private final LinkedList<PagemasterBuffer = new LinkedList<>();
    @GuardedBy("this")
    private final BlockingQueue<QueuedPagequeuedPages = new LinkedBlockingQueue<>();
    @GuardedBy("this")
    private final AtomicLong masterSequenceId = new AtomicLong();
    @GuardedBy("this")
    private final ConcurrentMap<TaskIdNamedBuffernamedBuffers = new ConcurrentHashMap<>();
    @GuardedBy("this")
    private final Set<TaskIdabortedBuffers = new HashSet<>();
    private final StateMachine<BufferStatestate;
    @GuardedBy("this")
    private final List<GetBufferResultstateChangeListeners = new ArrayList<>();
    private final AtomicLong pagesAdded = new AtomicLong();
    public SharedBuffer(TaskId taskIdExecutor executorDataSize maxBufferSize)
    {
        checkNotNull(taskId"taskId is null");
        checkNotNull(executor"executor is null");
         = new StateMachine<>(taskId + "-buffer"executor);
        checkNotNull(maxBufferSize"maxBufferSize is null");
        checkArgument(maxBufferSize.toBytes() > 0, "maxBufferSize must be at least 1");
        this. = maxBufferSize.toBytes();
    }
    public void addStateChangeListener(StateChangeListener<BufferStatestateChangeListener)
    {
        .addStateChangeListener(stateChangeListener);
    }
    public boolean isFinished()
    {
        return .get() == ;
    }
    public SharedBufferInfo getInfo()
    {
        //
        // NOTE: this code must be lock free to we are not hanging state machine updates
        //
        checkState(!Thread.holdsLock(this), "Thread must NOT hold a lock on the %s"SharedBuffer.class.getSimpleName());
        ImmutableList.Builder<BufferInfoinfos = ImmutableList.builder();
        for (NamedBuffer namedBuffer : .values()) {
            infos.add(namedBuffer.getInfo());
        }
        return new SharedBufferInfo(.get(), .get(), .get(), infos.build());
    }
    public synchronized void setOutputBuffers(OutputBuffers newOutputBuffers)
    {
        checkNotNull(newOutputBuffers"newOutputBuffers is null");
        // ignore buffers added after query finishes, which can happen when a query is canceled
        // also ignore old versions, which is normal
        if (.get() ==  || .getVersion() >= newOutputBuffers.getVersion()) {
            return;
        }
        // verify this is valid state change
        SetView<TaskIdmissingBuffers = Sets.difference(.getBuffers().keySet(), newOutputBuffers.getBuffers().keySet());
        checkArgument(missingBuffers.isEmpty(), "newOutputBuffers does not have existing buffers %s"missingBuffers);
        checkArgument(!.isNoMoreBufferIds() || newOutputBuffers.isNoMoreBufferIds(), "Expected newOutputBuffers to have noMoreBufferIds set");
         = newOutputBuffers;
        // add the new buffers
        for (Entry<TaskIdPagePartitionFunctionentry : .getBuffers().entrySet()) {
            TaskId bufferId = entry.getKey();
            if (!.containsKey(bufferId)) {
                checkState(.get().canAddBuffers(), "Cannot add buffers to %s"SharedBuffer.class.getSimpleName());
                NamedBuffer namedBuffer = new NamedBuffer(bufferIdentry.getValue());
                // the buffer may have been aborted before the creation message was received
                if (.contains(bufferId)) {
                    namedBuffer.abort();
                }
                .put(bufferIdnamedBuffer);
            }
        }
        // update state if no more buffers is set
        if (.isNoMoreBufferIds()) {
            .compareAndSet();
            .compareAndSet();
        }
        updateState();
    }
    public synchronized ListenableFuture<?> enqueue(Page page)
    {
        checkNotNull(page"page is null");
        // ignore pages after no more pages is set
        // this can happen with a limit query
        if (!.get().canAddPages()) {
            return immediateFuture(true);
        }
        // is there room in the buffer
        if ( < ) {
            addInternal(page);
            return immediateFuture(true);
        }
        QueuedPage queuedPage = new QueuedPage(page);
        .add(queuedPage);
        updateState();
        return queuedPage.getFuture();
    }
    private synchronized void addInternal(Page page)
    {
        List<Pagepages = splitPage(page);
        .addAll(pages);
        .addAndGet(pages.size());
        for (Page p : pages) {
             += p.getSizeInBytes();
        }
        processPendingReads();
    }
    public synchronized ListenableFuture<BufferResultget(TaskId outputIdlong startingSequenceIdDataSize maxSize)
    {
        checkNotNull(outputId"outputId is null");
        checkArgument(maxSize.toBytes() > 0, "maxSize must be at least 1 byte");
        // if no buffers can be added, and the requested buffer does not exist, return a closed empty result
        // this can happen with limit queries
        if (!.get().canAddBuffers() && .get(outputId) == null) {
            return immediateFuture(emptyResults(0, true));
        }
        // return a future for data
        GetBufferResult getBufferResult = new GetBufferResult(outputIdstartingSequenceIdmaxSize);
        .add(getBufferResult);
        updateState();
        return getBufferResult.getFuture();
    }
    private synchronized List<PagegetPagesInternal(DataSize maxSizelong sequenceId)
    {
        long maxBytes = maxSize.toBytes();
        List<Pagepages = new ArrayList<>();
        long bytes = 0;
        int listOffset = Ints.checkedCast(sequenceId - .get());
        while (listOffset < .size()) {
            Page page = .get(listOffset++);
            bytes += page.getSizeInBytes();
            // break (and don't add) if this page would exceed the limit
            if (!pages.isEmpty() && bytes > maxBytes) {
                break;
            }
            pages.add(page);
        }
        return ImmutableList.copyOf(pages);
    }
    public synchronized void abort(TaskId outputId)
    {
        checkNotNull(outputId"outputId is null");
        .add(outputId);
        NamedBuffer namedBuffer = .get(outputId);
        if (namedBuffer != null) {
            namedBuffer.abort();
        }
        updateState();
    }
    public synchronized void setNoMorePages()
    {
            updateState();
        }
    }

    
Destroys the buffer, discarding all pages.
    public synchronized void destroy()
    {
        .set();
        // clear the buffer
        .clear();
         = 0;
        // free queued page waiters
        for (QueuedPage queuedPage : ) {
            queuedPage.getFuture().set(null);
        }
        .clear();
        for (NamedBuffer namedBuffer : .values()) {
            namedBuffer.abort();
        }
        processPendingReads();
    }
    private void checkFlushComplete()
    {
        checkState(Thread.holdsLock(this), "Thread must hold a lock on the %s"SharedBuffer.class.getSimpleName());
        if (.get() == ) {
            for (NamedBuffer namedBuffer : .values()) {
                if (!namedBuffer.checkCompletion()) {
                    return;
                }
            }
            destroy();
        }
    }
    private void updateState()
    {
        checkState(Thread.holdsLock(this), "Thread must hold a lock on the %s"SharedBuffer.class.getSimpleName());
        try {
            processPendingReads();
            BufferState state = this..get();
            if (state == ) {
                return;
            }
            if (!state.canAddPages()) {
                // discard queued pages (not officially in the buffer)
                for (QueuedPage queuedPage : ) {
                    queuedPage.getFuture().set(null);
                }
                .clear();
            }
            // advanced master queue
            if (!state.canAddBuffers() && !.isEmpty()) {
                // advance master sequence id
                long oldMasterSequenceId = .get();
                long newMasterSequenceId = .;
                for (NamedBuffer namedBuffer : .values()) {
                    newMasterSequenceId = Math.min(namedBuffer.getSequenceId(), newMasterSequenceId);
                }
                .set(newMasterSequenceId);
                // drop consumed pages
                int pagesToRemove = Ints.checkedCast(newMasterSequenceId - oldMasterSequenceId);
                checkState(pagesToRemove >= 0,
                        "Master sequence id moved backwards: oldMasterSequenceId=%s, newMasterSequenceId=%s",
                        oldMasterSequenceId,
                        newMasterSequenceId);
                for (int i = 0; i < pagesToRemovei++) {
                    Page page = .removeFirst();
                     -= page.getSizeInBytes();
                }
                // refill buffer from queued pages
                while (!.isEmpty() &&  < ) {
                    QueuedPage queuedPage = .remove();
                    addInternal(queuedPage.getPage());
                    queuedPage.getFuture().set(null);
                }
            }
            // remove any completed buffers
            if (!state.canAddPages()) {
                for (NamedBuffer namedBuffer : .values()) {
                    namedBuffer.checkCompletion();
                }
            }
        }
        finally {
            checkFlushComplete();
        }
    }
    private void processPendingReads()
    {
        checkState(Thread.holdsLock(this), "Thread must hold a lock on the %s"SharedBuffer.class.getSimpleName());
        for (GetBufferResult getBufferResult : ImmutableList.copyOf()) {
            if (getBufferResult.execute()) {
                .remove(getBufferResult);
            }
        }
    }
    @ThreadSafe
    private final class NamedBuffer
    {
        private final TaskId bufferId;
        private final PagePartitionFunction partitionFunction;
        private final AtomicLong sequenceId = new AtomicLong();
        private final AtomicBoolean finished = new AtomicBoolean();
        private NamedBuffer(TaskId bufferIdPagePartitionFunction partitionFunction)
        {
            this. = bufferId;
            this. = partitionFunction;
        }
        public BufferInfo getInfo()
        {
            //
            // NOTE: this code must be lock free to we are not hanging state machine updates
            //
            checkState(!Thread.holdsLock(this), "Thread must NOT hold a lock on the %s"SharedBuffer.class.getSimpleName());
            long sequenceId = this..get();
            if (.get()) {
                return new BufferInfo(true, 0, sequenceId);
            }
            int size = Math.max(Ints.checkedCast(.get() + .size() - sequenceId), 0);
            return new BufferInfo(.get(), sizesequenceId);
        }
        public long getSequenceId()
        {
            checkState(Thread.holdsLock(SharedBuffer.this), "Thread must hold a lock on the %s"SharedBuffer.class.getSimpleName());
            return .get();
        }
        public BufferResult getPages(long startingSequenceIdDataSize maxSize)
        {
            checkState(Thread.holdsLock(SharedBuffer.this), "Thread must hold a lock on the %s"SharedBuffer.class.getSimpleName());
            checkArgument(maxSize.toBytes() > 0, "maxSize must be at least 1 byte");
            long sequenceId = this..get();
            checkArgument(startingSequenceId >= sequenceId"startingSequenceId is before the beginning of the buffer");
            // acknowledge previous pages
            if (startingSequenceId > sequenceId) {
                this..set(startingSequenceId);
                sequenceId = startingSequenceId;
            }
            if (checkCompletion()) {
                return emptyResults(startingSequenceIdtrue);
            }
            List<Pagepages = getPagesInternal(maxSizesequenceId);
            return new BufferResult(startingSequenceIdstartingSequenceId + pages.size(), falsepages);
        }
        public void abort()
        {
            checkState(Thread.holdsLock(SharedBuffer.this), "Thread must hold a lock on the %s"SharedBuffer.class.getSimpleName());
            .set(true);
        }
        public boolean checkCompletion()
        {
            checkState(Thread.holdsLock(SharedBuffer.this), "Thread must hold a lock on the %s"SharedBuffer.class.getSimpleName());
            // WARNING: finish must short circuit this call, or the call to checkFlushComplete below will cause an infinite recursion
            if (.get()) {
                return true;
            }
            if (!.get().canAddPages() && .get() >= .get()) {
                // WARNING: finish must set before the call to checkFlushComplete of the short circuit above will not trigger and the code enter an infinite recursion
                .set(true);
                // check if master buffer is finished
                checkFlushComplete();
            }
            return .get();
        }
        @Override
        public String toString()
        {
            return toStringHelper(this)
                    .add("bufferId")
                    .add("sequenceId".get())
                    .add("finished".get())
                    .toString();
        }
    }
    @Immutable
    private static final class QueuedPage
    {
        private final Page page;
        private final SettableFuture<?> future = SettableFuture.create();
        private QueuedPage(Page page)
        {
            this. = page;
        }
        private Page getPage()
        {
            return ;
        }
        private SettableFuture<?> getFuture()
        {
            return ;
        }
    }
    @Immutable
    private class GetBufferResult
    {
        private final SettableFuture<BufferResultfuture = SettableFuture.create();
        private final TaskId outputId;
        private final long startingSequenceId;
        private final DataSize maxSize;
        public GetBufferResult(TaskId outputIdlong startingSequenceIdDataSize maxSize)
        {
            this. = outputId;
            this. = startingSequenceId;
            this. = maxSize;
        }
        public SettableFuture<BufferResultgetFuture()
        {
            return ;
        }
        public boolean execute()
        {
            checkState(Thread.holdsLock(SharedBuffer.this), "Thread must hold a lock on the %s"SharedBuffer.class.getSimpleName());
            if (.isDone()) {
                return true;
            }
            try {
                NamedBuffer namedBuffer = .get();
                // if buffer is finished return an empty page
                // this could be a request for a buffer that never existed, but that is ok since the buffer
                // could have been destroyed before the creation message was received
                if (.get() == ) {
                    .set(emptyResults(namedBuffer == null ? 0 : namedBuffer.getSequenceId(), true));
                    return true;
                }
                // buffer doesn't exist yet
                if (namedBuffer == null) {
                    return false;
                }
                // if request is for pages before the current position, just return an empty page
                if ( < namedBuffer.getSequenceId()) {
                    .set(emptyResults(false));
                    return true;
                }
                // read pages from the buffer
                BufferResult bufferResult = namedBuffer.getPages();
                // if this was the last page, we're done
                checkFlushComplete();
                // if we got an empty result, wait for more pages
                if (bufferResult.isEmpty() && !bufferResult.isBufferClosed()) {
                    return false;
                }
                .set(bufferResult);
            }
            catch (Throwable throwable) {
                .setException(throwable);
            }
            return true;
        }
    }
New to GrepCode? Check out our FAQ X