Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 
 import java.util.List;
 
 import static com.facebook.presto.operator.Operator.NOT_BLOCKED;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static io.airlift.units.DataSize.Unit.MEGABYTE;
 
 public class InMemoryExchange
 {
     private static final DataSize DEFAULT_MAX_BUFFERED_BYTES = new DataSize(32, );
     private final List<Typetypes;
     private final List<Queue<PageReference>> buffers;
     private final long maxBufferedBytes;
 
     @GuardedBy("this")
     private boolean finishing;
 
     @GuardedBy("this")
     private boolean noMoreSinkFactories;
 
     @GuardedBy("this")
     private int sinkFactories;
 
     @GuardedBy("this")
     private int sinks;
 
     @GuardedBy("this")
     private long bufferBytes;
 
     @GuardedBy("this")
     private SettableFuture<?> readerFuture;
 
     @GuardedBy("this")
     private SettableFuture<?> writerFuture;
 
     public InMemoryExchange(List<Typetypes)
     {
         this(types, 1, );
     }
 
     public InMemoryExchange(List<Typetypesint bufferCount)
     {
         this(typesbufferCount);
     }
 
     public InMemoryExchange(List<Typetypesint bufferCountDataSize maxBufferedBytes)
     {
         this. = ImmutableList.copyOf(checkNotNull(types"types is null"));
 
         ImmutableList.Builder<Queue<PageReference>> buffers = ImmutableList.builder();
         for (int i = 0; i < bufferCounti++) {
             buffers.add(new ConcurrentLinkedQueue<>());
         }
         this. = buffers.build();
 
         checkArgument(maxBufferedBytes.toBytes() > 0, "maxBufferedBytes must be greater than zero");
         this. = maxBufferedBytes.toBytes();
     }
 
     public List<TypegetTypes()
     {
         return ;
     }
 
     public int getBufferCount()
     {
         return .size();
     }
 
     public synchronized OperatorFactory createSinkFactory(int operatorId)
    {
        ++;
        return new InMemoryExchangeSinkOperatorFactory(operatorId);
    }
    private synchronized void addSink()
    {
        checkState( > 0, "All sink factories already closed");
        ++;
    }
    public synchronized void sinkFinished()
    {
        checkState( != 0, "All sinks are already complete");
        --;
        updateState();
    }
    public synchronized void noMoreSinkFactories()
    {
        this. = true;
        updateState();
    }
    private synchronized void sinkFactoryClosed()
    {
        checkState( != 0, "All sinks factories are already closed");
        --;
        updateState();
    }
    private void updateState()
    {
        if ( && ( == 0) && ( == 0)) {
            finish();
        }
    }
    public synchronized boolean isFinishing()
    {
        return ;
    }
    public synchronized void finish()
    {
         = true;
        notifyBlockedReaders();
        notifyBlockedWriters();
    }
    public synchronized boolean isFinished(int bufferIndex)
    {
        return  && .get(bufferIndex).isEmpty();
    }
    public synchronized void addPage(Page page)
    {
        if () {
            return;
        }
        PageReference pageReference = new PageReference(page.size());
        for (Queue<PageReferencebuffer : ) {
            buffer.add(pageReference);
        }
         += page.getSizeInBytes();
        // TODO: record memory usage using OperatorContext.setMemoryReservation()
        notifyBlockedReaders();
    }
    private synchronized void notifyBlockedReaders()
    {
        if ( != null) {
            .set(null);
             = null;
        }
    }
    public synchronized ListenableFuture<?> waitForReading(int bufferIndex)
    {
        if ( || !.get(bufferIndex).isEmpty()) {
            return ;
        }
        if ( == null) {
             = SettableFuture.create();
        }
        return ;
    }
    public synchronized Page removePage(int bufferIndex)
    {
        PageReference pageReference = .get(bufferIndex).poll();
        if (pageReference == null) {
            return null;
        }
        Page page = pageReference.removePage();
        if (!pageReference.isReferenced()) {
             -= page.getSizeInBytes();
            if ( < ) {
                notifyBlockedWriters();
            }
        }
        return page;
    }
    private synchronized void notifyBlockedWriters()
    {
        if ( != null) {
            .set(null);
             = null;
        }
    }
    public synchronized ListenableFuture<?> waitForWriting()
    {
        if ( < ) {
            return ;
        }
        if ( == null) {
             = SettableFuture.create();
        }
        return ;
    }
    private static class PageReference
    {
        private final Page page;
        private int referenceCount;
        public PageReference(Page pageint referenceCount)
        {
            this. = page;
            this. = referenceCount;
        }
        public Page removePage()
        {
            checkArgument( > 0);
            --;
            return ;
        }
        public boolean isReferenced()
        {
            return  > 0;
        }
    }
            implements OperatorFactory
    {
        private final int operatorId;
        private boolean closed;
        private InMemoryExchangeSinkOperatorFactory(int operatorId)
        {
            this. = operatorId;
        }
        @Override
        public List<TypegetTypes()
        {
            return ;
        }
        @Override
        public Operator createOperator(DriverContext driverContext)
        {
            checkState(!"Factory is already closed");
            OperatorContext operatorContext = driverContext.addOperatorContext(InMemoryExchangeSinkOperator.class.getSimpleName());
            addSink();
            return new InMemoryExchangeSinkOperator(operatorContextInMemoryExchange.this);
        }
        @Override
        public void close()
        {
            if (!) {
                 = true;
                sinkFactoryClosed();
            }
        }
    }
New to GrepCode? Check out our FAQ X