Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 import java.util.List;
 import java.util.Map;
 
 import static com.facebook.presto.spi.type.BigintType.BIGINT;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 public class TopNRowNumberOperator
         implements Operator
 {
     public static class TopNRowNumberOperatorFactory
             implements OperatorFactory
     {
         private final int operatorId;
 
         private final List<TypesourceTypes;
 
         private final List<IntegeroutputChannels;
         private final List<IntegerpartitionChannels;
         private final List<TypepartitionTypes;
         private final List<IntegersortChannels;
         private final List<SortOrdersortOrder;
         private final int maxRowCountPerPartition;
         private final Optional<IntegerhashChannel;
         private final int expectedPositions;
 
         private final List<Typetypes;
         private final List<TypesortTypes;
         private final boolean generateRowNumber;
         private boolean closed;
 
         public TopNRowNumberOperatorFactory(
                 int operatorId,
                 List<? extends TypesourceTypes,
                 List<IntegeroutputChannels,
                 List<IntegerpartitionChannels,
                 List<? extends TypepartitionTypes,
                 List<IntegersortChannels,
                 List<SortOrdersortOrder,
                 int maxRowCountPerPartition,
                 boolean partial,
                 Optional<IntegerhashChannel,
                 int expectedPositions)
         {
             this. = operatorId;
             this. = ImmutableList.copyOf(sourceTypes);
             this. = ImmutableList.copyOf(checkNotNull(outputChannels"outputChannels is null"));
             this. = ImmutableList.copyOf(checkNotNull(partitionChannels"partitionChannels is null"));
             this. = ImmutableList.copyOf(checkNotNull(partitionTypes"partitionTypes is null"));
             this. = ImmutableList.copyOf(checkNotNull(sortChannels));
             this. = ImmutableList.copyOf(checkNotNull(sortOrder));
             this. = checkNotNull(hashChannel"hashChannel is null");
             checkArgument(maxRowCountPerPartition > 0, "maxRowCountPerPartition must be > 0");
             this. = maxRowCountPerPartition;
             checkArgument(expectedPositions > 0, "expectedPositions must be > 0");
             this. = !partial || !partitionChannels.isEmpty();
             this. = expectedPositions;
 
             this. = toTypes(sourceTypesoutputChannels);
             ImmutableList.Builder<TypesortTypes = ImmutableList.builder();
             for (int channel : sortChannels) {
                 sortTypes.add(.get(channel));
             }
             this. = sortTypes.build();
         }
 
         @Override
         public List<TypegetTypes()
        {
            return ;
        }
        @Override
        public Operator createOperator(DriverContext driverContext)
        {
            checkState(!"Factory is already closed");
            OperatorContext operatorContext = driverContext.addOperatorContext(TopNRowNumberOperator.class.getSimpleName());
            return new TopNRowNumberOperator(
                    operatorContext,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    );
        }
        @Override
        public void close()
        {
             = true;
        }
    }
    private static final DataSize OVERHEAD_PER_VALUE = new DataSize(100, ..); // for estimating in-memory size. This is a completely arbitrary number
    private final OperatorContext operatorContext;
    private boolean finishing;
    private final List<Typetypes;
    private final int[] outputChannels;
    private final List<IntegersortChannels;
    private final List<SortOrdersortOrders;
    private final List<TypesortTypes;
    private final boolean generateRowNumber;
    private final int maxRowCountPerPartition;
    private final MemoryManager memoryManager;
    private final Map<LongPartitionBuilderpartitionRows;
    private final PageBuilder pageBuilder;
    private final Optional<GroupByHashgroupByHash;
    public TopNRowNumberOperator(
            OperatorContext operatorContext,
            List<? extends TypesourceTypes,
            List<IntegeroutputChannels,
            List<IntegerpartitionChannels,
            List<TypepartitionTypes,
            List<IntegersortChannels,
            List<SortOrdersortOrders,
            List<TypesortTypes,
            int maxRowCountPerPartition,
            boolean generateRowNumber,
            Optional<IntegerhashChannel,
            int expectedPositions)
    {
        this. = checkNotNull(operatorContext"operatorContext is null");
        this. = Ints.toArray(checkNotNull(outputChannels"outputChannels is null"));
        this. = checkNotNull(sortChannels"sortChannels is null");
        this. = checkNotNull(sortOrders"sortOrders is null");
        this. = checkNotNull(sortTypes"sortTypes is null");
        checkArgument(maxRowCountPerPartition > 0, "maxRowCountPerPartition must be > 0");
        this. = maxRowCountPerPartition;
        this. = generateRowNumber;
        checkArgument(expectedPositions > 0, "expectedPositions must be > 0");
        this. = toTypes(sourceTypesoutputChannelsgenerateRowNumber);
        this. = new MemoryManager(operatorContext);
        this. = new HashMap<>();
        if (partitionChannels.isEmpty()) {
            this. = Optional.empty();
        }
        else {
            this. = Optional.of(new GroupByHash(partitionTypes, Ints.toArray(partitionChannels), hashChannelexpectedPositions));
        }
        this. = Optional.empty();
        this. = new PageBuilder();
    }
    @Override
    {
        return ;
    }
    @Override
    public List<TypegetTypes()
    {
        return ;
    }
    @Override
    public void finish()
    {
         = true;
    }
    @Override
    public boolean isFinished()
    {
        return  && isEmpty();
    }
    @Override
    public boolean needsInput()
    {
        return ! && !.isFull() && !isFlushing();
    }
    @Override
    public void addInput(Page page)
    {
        checkState(!"Operator is already finishing");
        checkNotNull(page"page is null");
        checkState(!isFlushing(), "Cannot add input with the operator is flushing data");
        processPage(page);
    }
    @Override
    public Page getOutput()
    {
        if ( && !isEmpty()) {
            return getPage();
        }
        return null;
    }
    private void processPage(Page page)
    {
        Optional<GroupByIdBlockpartitionIds = Optional.empty();
        if (.isPresent()) {
            GroupByHash hash = .get();
            long groupByHashSize = hash.getEstimatedSize();
            partitionIds = Optional.of(hash.getGroupIds(page));
            if (!.canUseDelta(hash.getEstimatedSize() - groupByHashSize)) {
                throw new ExceededMemoryLimitException(.getMaxMemorySize());
            }
        }
        long sizeDelta = 0;
        Block[] blocks = page.getBlocks();
        for (int position = 0; position < page.getPositionCount(); position++) {
            long partitionId = .isPresent() ? partitionIds.get().getGroupId(position) : 0;
            if (!.containsKey(partitionId)) {
                .put(partitionIdnew PartitionBuilder());
            }
            PartitionBuilder partitionBuilder = .get(partitionId);
            if (partitionBuilder.getRowCount() < ) {
                Block[] row = getSingleValueBlocks(pageposition);
                sizeDelta += partitionBuilder.addRow(row);
            }
            else if (compare(positionblockspartitionBuilder.peekLastRow()) < 0) {
                Block[] row = getSingleValueBlocks(pageposition);
                sizeDelta += partitionBuilder.replaceRow(row);
            }
        }
        if (!.canUseDelta(sizeDelta)) {
            throw new ExceededMemoryLimitException(.getMaxMemorySize());
        }
    }
    private int compare(int positionBlock[] blocksBlock[] currentMax)
    {
        for (int i = 0; i < .size(); i++) {
            Type type = .get(i);
            int sortChannel = .get(i);
            SortOrder sortOrder = .get(i);
            Block block = blocks[sortChannel];
            Block currentMaxValue = currentMax[sortChannel];
            int compare = sortOrder.compareBlockValue(typeblockpositioncurrentMaxValue, 0);
            if (compare != 0) {
                return compare;
            }
        }
        return 0;
    }
    private Page getPage()
    {
        if (!.isPresent()) {
             = getFlushingPartition();
        }
        .reset();
        long sizeDelta = 0;
        while (!.isFull() && .isPresent()) {
            FlushingPartition currentFlushingPartition = .get();
            while (!.isFull() && currentFlushingPartition.hasNext()) {
                Block[] next = currentFlushingPartition.next();
                sizeDelta += sizeOfRow(next);
                .declarePosition();
                for (int i = 0; i < .i++) {
                    int channel = [i];
                    Type type = .get(channel);
                    type.appendTo(next[channel], 0, .getBlockBuilder(i));
                }
                if () {
                    .writeLong(.getBlockBuilder(.), currentFlushingPartition.getRowNumber());
                }
            }
            if (!currentFlushingPartition.hasNext()) {
                 = getFlushingPartition();
            }
        }
        if (.isEmpty()) {
            return null;
        }
        Page page = .build();
        .freeMemory(-sizeDelta);
        return page;
    }
    {
        int maxPartitionSize = 0;
        PartitionBuilder chosenPartitionBuilder = null;
        long chosenPartitionId = -1;
        for (Map.Entry<LongPartitionBuilderentry : .entrySet()) {
            if (entry.getValue().getRowCount() > maxPartitionSize) {
                chosenPartitionBuilder = entry.getValue();
                maxPartitionSize = chosenPartitionBuilder.getRowCount();
                chosenPartitionId = entry.getKey();
                if (maxPartitionSize == ) {
                    break;
                }
            }
        }
        if (chosenPartitionBuilder == null) {
            return Optional.empty();
        }
        FlushingPartition flushingPartition = new FlushingPartition(chosenPartitionBuilder.build());
        .remove(chosenPartitionId);
        return Optional.of(flushingPartition);
    }
    public boolean isFlushing()
    {
        return .isPresent();
    }
    public boolean isEmpty()
    {
        return .isEmpty();
    }
    private static Block[] getSingleValueBlocks(Page pageint position)
    {
        Block[] blocks = page.getBlocks();
        Block[] row = new Block[blocks.length];
        for (int i = 0; i < blocks.lengthi++) {
            row[i] = blocks[i].getSingleValueBlock(position);
        }
        return row;
    }
    private static List<TypetoTypes(List<? extends TypesourceTypesList<IntegeroutputChannelsboolean generateRowNumber)
    {
        ImmutableList.Builder<Typetypes = ImmutableList.builder();
        for (int channel : outputChannels) {
            types.add(sourceTypes.get(channel));
        }
        if (generateRowNumber) {
            types.add();
        }
        return types.build();
    }
    private static long sizeOfRow(Block[] row)
    {
        long size = .toBytes();
        for (Block value : row) {
            size += value.getSizeInBytes();
        }
        return size;
    }
    private static class PartitionBuilder
    {
        private final MinMaxPriorityQueue<Block[]> candidateRows;
        private final int maxRowCountPerPartition;
        private PartitionBuilder(List<TypesortTypesList<IntegersortChannelsList<SortOrdersortOrdersint maxRowCountPerPartition)
        {
            this. = maxRowCountPerPartition;
            Ordering<Block[]> comparator = Ordering.from(new RowComparator(sortTypessortChannelssortOrders));
            this. = MinMaxPriorityQueue.orderedBy(comparator).maximumSize(maxRowCountPerPartition).create();
        }
        private long replaceRow(Block[] row)
        {
            checkState(.size() == );
            Block[] previousRow = .removeLast();
            long sizeDelta = addRow(row);
            return sizeDelta - sizeOfRow(previousRow);
        }
        private long addRow(Block[] row)
        {
            long sizeDelta = sizeOfRow(row);
            .add(row);
            return sizeDelta;
        }
        private Iterator<Block[]> build()
        {
            ImmutableList.Builder<Block[]> sortedRows = ImmutableList.builder();
            while (!.isEmpty()) {
                sortedRows.add(.poll());
            }
            return sortedRows.build().iterator();
        }
        private int getRowCount()
        {
            return .size();
        }
        private Block[] peekLastRow()
        {
            return .peekLast();
        }
    }
    private static class FlushingPartition
            implements Iterator<Block[]>
    {
        private final Iterator<Block[]> outputIterator;
        private int rowNumber;
        private FlushingPartition(Iterator<Block[]> outputIterator)
        {
            this. = outputIterator;
        }
        @Override
        public boolean hasNext()
        {
            return .hasNext();
        }
        @Override
        public Block[] next()
        {
            ++;
            return .next();
        }
        @Override
        public void remove()
        {
            throw new UnsupportedOperationException();
        }
        public int getRowNumber()
        {
            return ;
        }
    }
New to GrepCode? Check out our FAQ X