Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 
 import java.util.List;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
         implements SourceOperatorCloseable
 {
     private static final int ROWS_PER_PAGE = 16384;
 
     private final OperatorContext operatorContext;
     private final PlanNodeId planNodeId;
     private final PageSourceProvider pageSourceProvider;
     private final List<Typetypes;
     private final List<ColumnHandlecolumns;
     private final PageBuilder pageBuilder;
     private final CursorProcessor cursorProcessor;
     private final PageProcessor pageProcessor;
 
     @GuardedBy("this")
     private RecordCursor cursor;
 
     @GuardedBy("this")
     private ConnectorPageSource pageSource;
 
     private Page currentPage;
     private int currentPosition;
 
     private boolean finishing;
 
     private long completedBytes;
     private long readTimeNanos;
 
     protected ScanFilterAndProjectOperator(
             OperatorContext operatorContext,
             PlanNodeId sourceId,
             PageSourceProvider pageSourceProvider,
             CursorProcessor cursorProcessor,
             PageProcessor pageProcessor,
             Iterable<ColumnHandlecolumns,
             Iterable<Typetypes)
     {
         this. = checkNotNull(cursorProcessor"cursorProcessor is null");
         this. = checkNotNull(pageProcessor"pageProcessor is null");
         this. = checkNotNull(operatorContext"operatorContext is null");
         this. = checkNotNull(sourceId"sourceId is null");
         this. = checkNotNull(pageSourceProvider"pageSourceManager is null");
         this. = ImmutableList.copyOf(checkNotNull(types"types is null"));
         this. = ImmutableList.copyOf(checkNotNull(columns"columns is null"));
 
         this. = new PageBuilder(getTypes());
     }
 
     @Override
     {
         return ;
     }
 
     @Override
     public PlanNodeId getSourceId()
     {
         return ;
     }
 
     @Override
    public synchronized void addSplit(Split split)
    {
        checkNotNull(split"split is null");
        checkState( == null &&  == null"split already set");
        ConnectorPageSource pageSource = .createPageSource(split);
        if (pageSource instanceof RecordPageSource) {
             = ((RecordPageSourcepageSource).getCursor();
        }
        else {
            this. = pageSource;
        }
        Object splitInfo = split.getInfo();
        if (splitInfo != null) {
            .setInfoSupplier(Suppliers.ofInstance(splitInfo));
        }
    }
    @Override
    public synchronized void noMoreSplits()
    {
        if ( == null &&  == null) {
             = true;
        }
    }
    @Override
    public final List<TypegetTypes()
    {
        return ;
    }
    @Override
    public final void finish()
    {
        close();
    }
    @Override
    public void close()
    {
        if ( != null) {
            try {
                .close();
            }
            catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }
        else if ( != null) {
            .close();
        }
         = true;
    }
    @Override
    public final boolean isFinished()
    {
        if ( != null && .isFinished() &&  == null) {
             = true;
        }
        return  && .isEmpty();
    }
    @Override
    public final boolean needsInput()
    {
        return false;
    }
    @Override
    public final void addInput(Page page)
    {
        throw new UnsupportedOperationException();
    }
    @Override
    public Page getOutput()
    {
        if (!) {
            if ( != null) {
                int rowsProcessed = .process(.getSession().toConnectorSession(), );
                long bytesProcessed = .getCompletedBytes() - ;
                long elapsedNanos = .getReadTimeNanos() - ;
                .recordGeneratedInput(bytesProcessedrowsProcessedelapsedNanos);
                 = .getCompletedBytes();
                 = .getReadTimeNanos();
                if (rowsProcessed == 0) {
                     = true;
                }
            }
            else {
                if ( == null) {
                     = .getNextPage();
                    if ( != null) {
                        // update operator stats
                        long endCompletedBytes = .getCompletedBytes();
                        long endReadTimeNanos = .getReadTimeNanos();
                        .recordGeneratedInput(endCompletedBytes - .getPositionCount(), endReadTimeNanos - );
                         = endCompletedBytes;
                         = endReadTimeNanos;
                    }
                     = 0;
                }
                if ( != null) {
                    if ( == .getPositionCount()) {
                         = null;
                         = 0;
                    }
                }
            }
        }
        // only return a full page if buffer is full or we are finishing
        if (.isEmpty() || (! && !.isFull())) {
            return null;
        }
        Page page = .build();
        .reset();
        return page;
    }
    public static class ScanFilterAndProjectOperatorFactory
            implements SourceOperatorFactory
    {
        private final int operatorId;
        private final CursorProcessor cursorProcessor;
        private final PageProcessor pageProcessor;
        private final PlanNodeId sourceId;
        private final PageSourceProvider pageSourceProvider;
        private final List<ColumnHandlecolumns;
        private final List<Typetypes;
        private boolean closed;
                int operatorId,
                PlanNodeId sourceId,
                PageSourceProvider pageSourceProvider,
                CursorProcessor cursorProcessor,
                PageProcessor pageProcessor,
                Iterable<ColumnHandlecolumns,
                List<Typetypes)
        {
            this. = operatorId;
            this. = checkNotNull(cursorProcessor"cursorProcessor is null");
            this. = checkNotNull(pageProcessor"pageProcessor is null");
            this. = checkNotNull(sourceId"sourceId is null");
            this. = checkNotNull(pageSourceProvider"pageSourceProvider is null");
            this. = ImmutableList.copyOf(checkNotNull(columns"columns is null"));
            this. = checkNotNull(types"types is null");
        }
        @Override
        public PlanNodeId getSourceId()
        {
            return ;
        }
        @Override
        public List<TypegetTypes()
        {
            return ;
        }
        @Override
        public SourceOperator createOperator(DriverContext driverContext)
        {
            checkState(!"Factory is already closed");
            OperatorContext operatorContext = driverContext.addOperatorContext(ScanFilterAndProjectOperator.class.getSimpleName());
            return new ScanFilterAndProjectOperator(
                    operatorContext,
                    ,
                    ,
                    ,
                    ,
                    ,
                    );
        }
        @Override
        public void close()
        {
             = true;
        }
    }
New to GrepCode? Check out our FAQ X