Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;

Returns the top N rows from the source sorted according to the specified ordering in the keyChannelIndex channel.
 
 public class TopNOperator
         implements Operator
 {
     public static class TopNOperatorFactory
             implements OperatorFactory
     {
         private final int operatorId;
         private final List<TypesourceTypes;
         private final int n;
         private final List<TypesortTypes;
         private final List<IntegersortChannels;
         private final List<SortOrdersortOrders;
         private final boolean partial;
         private boolean closed;
 
         public TopNOperatorFactory(
                 int operatorId,
                 List<? extends Typetypes,
                 int n,
                 List<IntegersortChannels,
                 List<SortOrdersortOrders,
                 boolean partial)
         {
             this. = operatorId;
             this. = ImmutableList.copyOf(checkNotNull(types"types is null"));
             this. = n;
             ImmutableList.Builder<TypesortTypes = ImmutableList.builder();
             for (int channel : sortChannels) {
                 sortTypes.add(types.get(channel));
             }
             this. = sortTypes.build();
             this. = ImmutableList.copyOf(checkNotNull(sortChannels"sortChannels is null"));
             this. = ImmutableList.copyOf(checkNotNull(sortOrders"sortOrders is null"));
             this. = partial;
         }
 
         @Override
         public List<TypegetTypes()
         {
             return ;
         }
 
         @Override
         public Operator createOperator(DriverContext driverContext)
         {
             checkState(!"Factory is already closed");
             OperatorContext operatorContext = driverContext.addOperatorContext(TopNOperator.class.getSimpleName());
             return new TopNOperator(
                     operatorContext,
                     ,
                     ,
                     ,
                     ,
                     ,
                     );
         }
 
         @Override
         public void close()
         {
              = true;
         }
     }
    private static final int MAX_INITIAL_PRIORITY_QUEUE_SIZE = 10000;
    private static final DataSize OVERHEAD_PER_VALUE = new DataSize(100, ..); // for estimating in-memory size. This is a completely arbitrary number
    private final OperatorContext operatorContext;
    private final List<Typetypes;
    private final int n;
    private final List<TypesortTypes;
    private final List<IntegersortChannels;
    private final List<SortOrdersortOrders;
    private final TopNMemoryManager memoryManager;
    private final boolean partial;
    private final PageBuilder pageBuilder;
    private TopNBuilder topNBuilder;
    private boolean finishing;
    private Iterator<Block[]> outputIterator;
    public TopNOperator(
            OperatorContext operatorContext,
            List<Typetypes,
            int n,
            List<TypesortTypes,
            List<IntegersortChannels,
            List<SortOrdersortOrders,
            boolean partial)
    {
        this. = checkNotNull(operatorContext"operatorContext is null");
        this. = checkNotNull(types"types is null");
        checkArgument(n > 0, "n must be greater than zero");
        this. = n;
        this. = checkNotNull(sortTypes"sortTypes is null");
        this. = checkNotNull(sortChannels"sortChannels is null");
        this. = checkNotNull(sortOrders"sortOrders is null");
        this. = partial;
        this. = new TopNMemoryManager(checkNotNull(operatorContext"operatorContext is null"));
        this. = new PageBuilder(types);
    }
    @Override
    {
        return ;
    }
    @Override
    public List<TypegetTypes()
    {
        return ;
    }
    @Override
    public void finish()
    {
         = true;
    }
    @Override
    public boolean isFinished()
    {
        return  &&  == null && ( == null || !.hasNext());
    }
    @Override
    public boolean needsInput()
    {
        return ! &&  == null && ( == null || !.isFull());
    }
    @Override
    public void addInput(Page page)
    {
        checkState(!"Operator is already finishing");
        checkNotNull(page"page is null");
        if ( == null) {
             = new TopNBuilder(
                    ,
                    ,
                    ,
                    ,
                    );
        }
        checkState(!.isFull(), "Aggregation buffer is full");
        .processPage(page);
    }
    @Override
    public Page getOutput()
    {
        if ( == null || !.hasNext()) {
            // no data
            if ( == null) {
                return null;
            }
            // only flush if we are finishing or the aggregation builder is full
            if (! && !.isFull()) {
                return null;
            }
            // Only partial aggregation can flush early. Also, check that we are not flushing tiny bits at a time
            if ( || ) {
                 = .build();
                 = null;
            }
            else {
                throw new ExceededMemoryLimitException(.getMaxMemorySize());
            }
        }
        .reset();
        while (!.isFull() && .hasNext()) {
            Block[] next = .next();
            .declarePosition();
            for (int i = 0; i < next.lengthi++) {
                Type type = .get(i);
                type.appendTo(next[i], 0, .getBlockBuilder(i));
            }
        }
        return .build();
    }
    private static class TopNBuilder
    {
        private final int n;
        private final List<TypesortTypes;
        private final List<IntegersortChannels;
        private final List<SortOrdersortOrders;
        private final TopNMemoryManager memoryManager;
        private final PriorityQueue<Block[]> globalCandidates;
        private long memorySize;
        private TopNBuilder(int n,
                List<TypesortTypes,
                List<IntegersortChannels,
                List<SortOrdersortOrders,
                TopNMemoryManager memoryManager)
        {
            this. = n;
            this. = sortTypes;
            this. = sortChannels;
            this. = sortOrders;
            this. = memoryManager;
            Ordering<Block[]> comparator = Ordering.from(new RowComparator(sortTypessortChannelssortOrders)).reverse();
            this. = new PriorityQueue<>(Math.min(n), comparator);
        }
        public void processPage(Page page)
        {
            long sizeDelta = mergeWithGlobalCandidates(page);
             += sizeDelta;
        }
        private long mergeWithGlobalCandidates(Page page)
        {
            long sizeDelta = 0;
            Block[] blocks = page.getBlocks();
            for (int position = 0; position < page.getPositionCount(); position++) {
                if (.size() < ) {
                    sizeDelta += addRow(positionblocks);
                }
                else if (compare(positionblocks.peek()) < 0) {
                    sizeDelta += addRow(positionblocks);
                }
            }
            return sizeDelta;
        }
        private int compare(int positionBlock[] blocksBlock[] currentMax)
        {
            for (int i = 0; i < .size(); i++) {
                Type type = .get(i);
                int sortChannel = .get(i);
                SortOrder sortOrder = .get(i);
                Block block = blocks[sortChannel];
                Block currentMaxValue = currentMax[sortChannel];
                // compare the right value to the left block but negate the result since we are evaluating in the opposite order
                int compare = -sortOrder.compareBlockValue(typecurrentMaxValue, 0, blockposition);
                if (compare != 0) {
                    return compare;
                }
            }
            return 0;
        }
        private long addRow(int positionBlock[] blocks)
        {
            long sizeDelta = 0;
            Block[] row = getValues(positionblocks);
            sizeDelta += sizeOfRow(row);
            .add(row);
            while (.size() > ) {
                Block[] previous = .remove();
                    sizeDelta -= sizeOfRow(previous);
            }
            return sizeDelta;
        }
        private static long sizeOfRow(Block[] row)
        {
            long size = .toBytes();
            for (Block value : row) {
                size += value.getSizeInBytes();
            }
            return size;
        }
        private static Block[] getValues(int positionBlock[] blocks)
        {
            Block[] row = new Block[blocks.length];
            for (int i = 0; i < blocks.lengthi++) {
                row[i] = blocks[i].getSingleValueBlock(position);
            }
            return row;
        }
        private boolean isFull()
        {
            return .canUse();
        }
        public Iterator<Block[]> build()
        {
            ImmutableList.Builder<Block[]> minSortedGlobalCandidates = ImmutableList.builder();
            while (!.isEmpty()) {
                Block[] row = .remove();
                minSortedGlobalCandidates.add(row);
            }
            return minSortedGlobalCandidates.build().reverse().iterator();
        }
    }
    public static class TopNMemoryManager
    {
        private final OperatorContext operatorContext;
        private long currentMemoryReservation;
        public TopNMemoryManager(OperatorContext operatorContext)
        {
            this. = operatorContext;
        }
        public boolean canUse(long memorySize)
        {
            // remove the pre-allocated memory from this size
            memorySize -= .getOperatorPreAllocatedMemory().toBytes();
            long delta = memorySize - ;
            if (delta <= 0) {
                return false;
            }
            if (!.reserveMemory(delta)) {
                return true;
            }
            // reservation worked, record the reservation
             = Math.max(memorySize);
            return false;
        }
        public DataSize getMaxMemorySize()
        {
            return .getMaxMemorySize();
        }
    }
New to GrepCode? Check out our FAQ X