Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 import java.util.List;
 
 import static com.facebook.presto.spi.type.BigintType.BIGINT;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 public class HashAggregationOperator
         implements Operator
 {
     public static class HashAggregationOperatorFactory
             implements OperatorFactory
     {
         private final int operatorId;
         private final List<TypegroupByTypes;
         private final List<IntegergroupByChannels;
         private final Step step;
         private final List<AccumulatorFactoryaccumulatorFactories;
         private final Optional<IntegerhashChannel;
 
         private final int expectedGroups;
         private final List<Typetypes;
         private boolean closed;
         private final long maxPartialMemory;
 
         public HashAggregationOperatorFactory(
                 int operatorId,
                 List<? extends TypegroupByTypes,
                 List<IntegergroupByChannels,
                 Step step,
                 List<AccumulatorFactoryaccumulatorFactories,
                 Optional<IntegerhashChannel,
                 int expectedGroups,
                 DataSize maxPartialMemory)
         {
             this. = operatorId;
             this. = checkNotNull(hashChannel"hashChannel is null");
             this. = ImmutableList.copyOf(groupByTypes);
             this. = ImmutableList.copyOf(groupByChannels);
             this. = step;
             this. = ImmutableList.copyOf(accumulatorFactories);
             this. = expectedGroups;
             this. = checkNotNull(maxPartialMemory"maxPartialMemory is null").toBytes();
 
             this. = toTypes(groupByTypesstepaccumulatorFactorieshashChannel);
         }
 
         @Override
         public List<TypegetTypes()
         {
             return ;
         }
 
         @Override
         public Operator createOperator(DriverContext driverContext)
         {
             checkState(!"Factory is already closed");
 
             OperatorContext operatorContext;
             if ( == .) {
                 operatorContext = driverContext.addOperatorContext(HashAggregationOperator.class.getSimpleName(), );
             }
             else {
                 operatorContext = driverContext.addOperatorContext(HashAggregationOperator.class.getSimpleName());
             }
             return new HashAggregationOperator(
                     operatorContext,
                     ,
                    ,
                    ,
                    ,
                    ,
                    );
        }
        @Override
        public void close()
        {
             = true;
        }
    }
    private final OperatorContext operatorContext;
    private final List<TypegroupByTypes;
    private final List<IntegergroupByChannels;
    private final Step step;
    private final Optional<IntegerhashChannel;
    private final int expectedGroups;
    private final List<Typetypes;
    private final MemoryManager memoryManager;
    private Iterator<PageoutputIterator;
    private boolean finishing;
            OperatorContext operatorContext,
            List<TypegroupByTypes,
            List<IntegergroupByChannels,
            Step step,
            List<AccumulatorFactoryaccumulatorFactories,
            Optional<IntegerhashChannel,
            int expectedGroups)
    {
        this. = checkNotNull(operatorContext"operatorContext is null");
        checkNotNull(step"step is null");
        checkNotNull(accumulatorFactories"accumulatorFactories is null");
        checkNotNull(operatorContext"operatorContext is null");
        this. = ImmutableList.copyOf(groupByTypes);
        this. = ImmutableList.copyOf(groupByChannels);
        this. = ImmutableList.copyOf(accumulatorFactories);
        this. = checkNotNull(hashChannel"hashChannel is null");
        this. = step;
        this. = expectedGroups;
        this. = new MemoryManager(operatorContext);
        this. = toTypes(groupByTypesstepaccumulatorFactorieshashChannel);
    }
    @Override
    {
        return ;
    }
    @Override
    public List<TypegetTypes()
    {
        return ;
    }
    @Override
    public void finish()
    {
         = true;
    }
    @Override
    public boolean isFinished()
    {
        return  &&  == null && ( == null || !.hasNext());
    }
    @Override
    public boolean needsInput()
    {
        return ! &&  == null && ( == null || !.isFull());
    }
    @Override
    public void addInput(Page page)
    {
        checkState(!"Operator is already finishing");
        checkNotNull(page"page is null");
        if ( == null) {
             = new GroupByHashAggregationBuilder(
                    ,
                    ,
                    ,
                    ,
                    ,
                    ,
                    );
            // assume initial aggregationBuilder is not full
        }
        else {
            checkState(!.isFull(), "Aggregation buffer is full");
        }
        .processPage(page);
    }
    @Override
    public Page getOutput()
    {
        if ( == null || !.hasNext()) {
            // current output iterator is done
             = null;
            // no data
            if ( == null) {
                return null;
            }
            // only flush if we are finishing or the aggregation builder is full
            if (! && !.isFull()) {
                return null;
            }
            // Only partial aggregation can flush early. Also, check that we are not flushing tiny bits at a time
            if (! &&  != .) {
                throw new ExceededMemoryLimitException(.getMaxMemorySize());
            }
             = .build();
             = null;
            if (!.hasNext()) {
                // current output iterator is done
                 = null;
                return null;
            }
        }
        return .next();
    }
    private static List<TypetoTypes(List<? extends TypegroupByTypeStep stepList<AccumulatorFactoryfactoriesOptional<IntegerhashChannel)
    {
        ImmutableList.Builder<Typetypes = ImmutableList.builder();
        types.addAll(groupByType);
        if (hashChannel.isPresent()) {
            types.add();
        }
        for (AccumulatorFactory factory : factories) {
            types.add(new Aggregator(factorystep).getType());
        }
        return types.build();
    }
    private static class GroupByHashAggregationBuilder
    {
        private final GroupByHash groupByHash;
        private final List<Aggregatoraggregators;
        private final MemoryManager memoryManager;
        private GroupByHashAggregationBuilder(
                List<AccumulatorFactoryaccumulatorFactories,
                Step step,
                int expectedGroups,
                List<TypegroupByTypes,
                List<IntegergroupByChannels,
                Optional<IntegerhashChannel,
                MemoryManager memoryManager)
        {
            this. = new GroupByHash(groupByTypes, Ints.toArray(groupByChannels), hashChannelexpectedGroups);
            this. = memoryManager;
            // wrapper each function with an aggregator
            ImmutableList.Builder<Aggregatorbuilder = ImmutableList.builder();
            checkNotNull(accumulatorFactories"accumulatorFactories is null");
            for (int i = 0; i < accumulatorFactories.size(); i++) {
                AccumulatorFactory accumulatorFactory = accumulatorFactories.get(i);
                builder.add(new Aggregator(accumulatorFactorystep));
            }
             = builder.build();
        }
        private void processPage(Page page)
        {
            GroupByIdBlock groupIds = .getGroupIds(page);
            for (Aggregator aggregator : ) {
                aggregator.processPage(groupIdspage);
            }
        }
        public boolean isFull()
        {
            long memorySize = .getEstimatedSize();
            for (Aggregator aggregator : ) {
                memorySize += aggregator.getEstimatedSize();
            }
            return !.canUse(memorySize);
        }
        public Iterator<Pagebuild()
        {
            List<Typetypes = new ArrayList<>(.getTypes());
            for (Aggregator aggregator : ) {
                types.add(aggregator.getType());
            }
            final PageBuilder pageBuilder = new PageBuilder(types);
            return new AbstractIterator<Page>()
            {
                private final int groupCount = .getGroupCount();
                private int groupId;
                @Override
                protected Page computeNext()
                {
                    if ( >= ) {
                        return endOfData();
                    }
                    pageBuilder.reset();
                    List<Typetypes = .getTypes();
                    while (!pageBuilder.isFull() &&  < ) {
                        .appendValuesTo(pageBuilder, 0);
                        pageBuilder.declarePosition();
                        for (int i = 0; i < .size(); i++) {
                            Aggregator aggregator = .get(i);
                            BlockBuilder output = pageBuilder.getBlockBuilder(types.size() + i);
                            aggregator.evaluate(output);
                        }
                        ++;
                    }
                    return pageBuilder.build();
                }
            };
        }
    }
    private static class Aggregator
    {
        private final GroupedAccumulator aggregation;
        private final Step step;
        private final int intermediateChannel;
        private Aggregator(AccumulatorFactory accumulatorFactoryStep step)
        {
            if (step == .) {
                checkArgument(accumulatorFactory.getInputChannels().size() == 1, "expected 1 input channel for intermediate aggregation");
                 = accumulatorFactory.getInputChannels().get(0);
                 = accumulatorFactory.createGroupedIntermediateAccumulator();
            }
            else {
                 = -1;
                 = accumulatorFactory.createGroupedAccumulator();
            }
            this. = step;
        }
        public long getEstimatedSize()
        {
            return .getEstimatedSize();
        }
        public Type getType()
        {
            if ( == .) {
                return .getIntermediateType();
            }
            else {
                return .getFinalType();
            }
        }
        public void processPage(GroupByIdBlock groupIdsPage page)
        {
            if ( == .) {
                .addIntermediate(groupIdspage.getBlock());
            }
            else {
                .addInput(groupIdspage);
            }
        }
        public void evaluate(int groupIdBlockBuilder output)
        {
            if ( == .) {
                .evaluateIntermediate(groupIdoutput);
            }
            else {
                .evaluateFinal(groupIdoutput);
            }
        }
    }
New to GrepCode? Check out our FAQ X