Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 
 import java.util.List;
 
 import static com.facebook.presto.operator.Operator.NOT_BLOCKED;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static io.airlift.units.DataSize.Unit.MEGABYTE;
 
 public class InMemoryExchange
 {
     private final List<Typetypes;
     private final Queue<Pagebuffer;
     private final long maxBufferedBytes;
 
     @GuardedBy("this")
     private boolean finishing;
 
     @GuardedBy("this")
     private boolean noMoreSinkFactories;
 
     @GuardedBy("this")
     private int sinkFactories;
 
     @GuardedBy("this")
     private int sinks;
 
     @GuardedBy("this")
     private long bufferBytes;
 
     @GuardedBy("this")
     private SettableFuture<?> readerFuture;
 
     @GuardedBy("this")
     private SettableFuture<?> writerFuture;
 
     public InMemoryExchange(List<Typetypes)
     {
         this(typesnew DataSize(32, ));
     }
 
     public InMemoryExchange(List<TypetypesDataSize maxBufferedBytes)
     {
         this. = ImmutableList.copyOf(checkNotNull(types"types is null"));
         this. = new ConcurrentLinkedQueue<>();
 
         checkArgument(maxBufferedBytes.toBytes() > 0, "maxBufferedBytes must be greater than zero");
         this. = maxBufferedBytes.toBytes();
     }
 
     public List<TypegetTypes()
     {
         return ;
     }
 
     public synchronized OperatorFactory createSinkFactory(int operatorId)
     {
         ++;
         return new InMemoryExchangeSinkOperatorFactory(operatorId);
     }
 
     private synchronized void addSink()
     {
         checkState( > 0, "All sink factories already closed");
         ++;
     }
 
     public synchronized void sinkFinished()
     {
         checkState( != 0, "All sinks are already complete");
         --;
         updateState();
    }
    public synchronized void noMoreSinkFactories()
    {
        this. = true;
        updateState();
    }
    private synchronized void sinkFactoryClosed()
    {
        checkState( != 0, "All sinks factories are already closed");
        --;
        updateState();
    }
    private void updateState()
    {
        if ( && ( == 0) && ( == 0)) {
            finish();
        }
    }
    public synchronized boolean isFinishing()
    {
        return ;
    }
    public synchronized void finish()
    {
         = true;
        notifyBlockedReaders();
        notifyBlockedWriters();
    }
    public synchronized boolean isFinished()
    {
        return  && .isEmpty();
    }
    public synchronized void addPage(Page page)
    {
        if () {
            return;
        }
        .add(page);
         += page.getSizeInBytes();
        // TODO: record memory usage using OperatorContext.setMemoryReservation()
        notifyBlockedReaders();
    }
    private synchronized void notifyBlockedReaders()
    {
        if ( != null) {
            .set(null);
             = null;
        }
    }
    public synchronized ListenableFuture<?> waitForReading()
    {
        if ( || !.isEmpty()) {
            return ;
        }
        if ( == null) {
             = SettableFuture.create();
        }
        return ;
    }
    public synchronized Page removePage()
    {
        Page page = .poll();
        if (page != null) {
             -= page.getSizeInBytes();
        }
        if ( < ) {
            notifyBlockedWriters();
        }
        return page;
    }
    private synchronized void notifyBlockedWriters()
    {
        if ( != null) {
            .set(null);
             = null;
        }
    }
    public synchronized ListenableFuture<?> waitForWriting()
    {
        if ( < ) {
            return ;
        }
        if ( == null) {
             = SettableFuture.create();
        }
        return ;
    }
            implements OperatorFactory
    {
        private final int operatorId;
        private boolean closed;
        private InMemoryExchangeSinkOperatorFactory(int operatorId)
        {
            this. = operatorId;
        }
        @Override
        public List<TypegetTypes()
        {
            return ;
        }
        @Override
        public Operator createOperator(DriverContext driverContext)
        {
            checkState(!"Factory is already closed");
            OperatorContext operatorContext = driverContext.addOperatorContext(InMemoryExchangeSinkOperator.class.getSimpleName());
            addSink();
            return new InMemoryExchangeSinkOperator(operatorContextInMemoryExchange.this);
        }
        @Override
        public void close()
        {
            if (!) {
                 = true;
                sinkFactoryClosed();
            }
        }
    }
New to GrepCode? Check out our FAQ X