Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 import java.util.List;
 
 import static com.facebook.presto.spi.type.BigintType.BIGINT;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 public class RowNumberOperator
         implements Operator
 {
     public static class RowNumberOperatorFactory
             implements OperatorFactory
     {
         private final int operatorId;
         private final Optional<IntegermaxRowsPerPartition;
         private final List<TypesourceTypes;
         private final List<IntegeroutputChannels;
         private final List<IntegerpartitionChannels;
         private final List<TypepartitionTypes;
         private final Optional<IntegerhashChannel;
         private final int expectedPositions;
         private final List<Typetypes;
         private boolean closed;
 
         public RowNumberOperatorFactory(
                 int operatorId,
                 List<? extends TypesourceTypes,
                 List<IntegeroutputChannels,
                 List<IntegerpartitionChannels,
                 List<? extends TypepartitionTypes,
                 Optional<IntegermaxRowsPerPartition,
                 Optional<IntegerhashChannel,
                 int expectedPositions)
         {
             this. = operatorId;
             this. = ImmutableList.copyOf(sourceTypes);
             this. = ImmutableList.copyOf(checkNotNull(outputChannels"outputChannels is null"));
             this. = ImmutableList.copyOf(checkNotNull(partitionChannels"partitionChannels is null"));
             this. = ImmutableList.copyOf(checkNotNull(partitionTypes"partitionTypes is null"));
             this. = checkNotNull(maxRowsPerPartition"maxRowsPerPartition is null");
 
             this. = checkNotNull(hashChannel"hashChannel is null");
             checkArgument(expectedPositions > 0, "expectedPositions < 0");
             this. = expectedPositions;
             this. = toTypes(sourceTypesoutputChannels);
         }
 
         @Override
         public List<TypegetTypes()
         {
             return ;
         }
 
         @Override
         public Operator createOperator(DriverContext driverContext)
         {
             checkState(!"Factory is already closed");
 
             OperatorContext operatorContext = driverContext.addOperatorContext(RowNumberOperator.class.getSimpleName());
             return new RowNumberOperator(
                     operatorContext,
                     ,
                     ,
                     ,
                     ,
                     ,
                     ,
                     );
         }
 
         @Override
         public void close()
         {
             = true;
        }
    }
    private final OperatorContext operatorContext;
    private boolean finishing;
    private final int[] outputChannels;
    private final List<Typetypes;
    private GroupByIdBlock partitionIds;
    private final Optional<GroupByHashgroupByHash;
    private Page inputPage;
    private final LongBigArray partitionRowCount;
    private final Optional<IntegermaxRowsPerPartition;
    public RowNumberOperator(
            OperatorContext operatorContext,
            List<TypesourceTypes,
            List<IntegeroutputChannels,
            List<IntegerpartitionChannels,
            List<TypepartitionTypes,
            Optional<IntegermaxRowsPerPartition,
            Optional<IntegerhashChannel,
            int expectedPositions)
    {
        this. = checkNotNull(operatorContext"operatorContext is null");
        this. = Ints.toArray(outputChannels);
        this. = maxRowsPerPartition;
        this. = new LongBigArray(0);
        if (partitionChannels.isEmpty()) {
            this. = Optional.empty();
        }
        else {
            int[] channels = Ints.toArray(partitionChannels);
            this. = Optional.of(new GroupByHash(partitionTypeschannelshashChannelexpectedPositions));
        }
        this. = toTypes(sourceTypesoutputChannels);
    }
    @Override
    {
        return ;
    }
    @Override
    public List<TypegetTypes()
    {
        return ;
    }
    @Override
    public void finish()
    {
         = true;
    }
    @Override
    public boolean isFinished()
    {
        if (isSinglePartition() && .isPresent()) {
            if ( &&  == null) {
                return true;
            }
            return .get(0) == .get();
        }
        return  &&  == null;
    }
    @Override
    public boolean needsInput()
    {
        if (isSinglePartition() && .isPresent()) {
            // Check if single partition is done
            return .get(0) < .get();
        }
        return ! &&  == null;
    }
    @Override
    public void addInput(Page page)
    {
        checkState(!"Operator is already finishing");
        checkNotNull(page"page is null");
        checkState( == null);
         = page;
        if (.isPresent()) {
             = .get().getGroupIds();
        }
    }
    @Override
    public Page getOutput()
    {
        if ( == null) {
            return null;
        }
        Page outputPage;
        if (.isPresent()) {
            outputPage = getSelectedRows();
        }
        else {
            outputPage = getRowsWithRowNumber();
        }
         = null;
        return outputPage;
    }
    private boolean isSinglePartition()
    {
        return !.isPresent();
    }
    private Page getRowsWithRowNumber()
    {
        Block rowNumberBlock = createRowNumberBlock();
        Block[] sourceBlocks = new Block[.getChannelCount()];
        for (int i = 0; i < .i++) {
            sourceBlocks[i] = .getBlock([i]);
        }
        Block[] outputBlocks = Arrays.copyOf(sourceBlockssourceBlocks.length + 1); // +1 for the row number column
        outputBlocks[sourceBlocks.length] = rowNumberBlock;
        return new Page(.getPositionCount(), outputBlocks);
    }
    private Block createRowNumberBlock()
    {
        for (int currentPosition = 0; currentPosition < .getPositionCount(); currentPosition++) {
            long partitionId = getPartitionId(currentPosition);
            long nextRowCount = .get(partitionId) + 1;
            .writeLong(rowNumberBlocknextRowCount);
            .set(partitionIdnextRowCount);
        }
        return rowNumberBlock.build();
    }
    private Page getSelectedRows()
    {
        PageBuilder pageBuilder = new PageBuilder();
        int rowNumberChannel = .size() - 1;
        for (int currentPosition = 0; currentPosition < .getPositionCount(); currentPosition++) {
            long partitionId = getPartitionId(currentPosition);
            long rowCount = .get(partitionId);
            if (rowCount == .get()) {
                continue;
            }
            pageBuilder.declarePosition();
            for (int i = 0; i < .i++) {
                int channel = [i];
                Type type = .get(channel);
                type.appendTo(.getBlock(channel), currentPositionpageBuilder.getBlockBuilder(i));
            }
            .writeLong(pageBuilder.getBlockBuilder(rowNumberChannel), rowCount + 1);
            .set(partitionIdrowCount + 1);
        }
        if (pageBuilder.isEmpty()) {
            return null;
        }
        return pageBuilder.build();
    }
    private long getPartitionId(int position)
    {
        return isSinglePartition() ? 0 : .getGroupId(position);
    }
    private static List<TypetoTypes(List<? extends TypesourceTypesList<IntegeroutputChannels)
    {
        ImmutableList.Builder<Typetypes = ImmutableList.builder();
        for (int channel : outputChannels) {
            types.add(sourceTypes.get(channel));
        }
        types.add();
        return types.build();
    }
New to GrepCode? Check out our FAQ X