Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 import java.util.List;
 
 import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;

Group input data and produce a single block for each sequence of identical values.
 
 public class AggregationOperator
         implements Operator
 {
     public static class AggregationOperatorFactory
             implements OperatorFactory
     {
         private final int operatorId;
         private final Step step;
         private final List<AccumulatorFactoryaccumulatorFactories;
         private final List<Typetypes;
         private boolean closed;
 
         public AggregationOperatorFactory(int operatorIdStep stepList<AccumulatorFactoryaccumulatorFactories)
         {
             this. = operatorId;
             this. = step;
             this. = ImmutableList.copyOf(accumulatorFactories);
             this. = toTypes(stepaccumulatorFactories);
         }
 
         @Override
         public List<TypegetTypes()
         {
             return ;
         }
 
         @Override
         public Operator createOperator(DriverContext driverContext)
         {
             checkState(!"Factory is already closed");
             OperatorContext operatorContext = driverContext.addOperatorContext(AggregationOperator.class.getSimpleName());
             return new AggregationOperator(operatorContext);
         }
 
         @Override
         public void close()
         {
              = true;
         }
     }
 
     private enum State
     {
         NEEDS_INPUT,
         HAS_OUTPUT,
         FINISHED
     }
 
     private final OperatorContext operatorContext;
     private final List<Typetypes;
     private final List<Aggregatoraggregates;
 
     private State state = .;
 
     public AggregationOperator(OperatorContext operatorContextStep stepList<AccumulatorFactoryaccumulatorFactories)
     {
         this. = checkNotNull(operatorContext"operatorContext is null");
 
         checkNotNull(step"step is null");
         checkNotNull(accumulatorFactories"accumulatorFactories is null");
 
         this. = toTypes(stepaccumulatorFactories);
         MemoryManager memoryManager = new MemoryManager(operatorContext);
 
        // wrapper each function with an aggregator
        ImmutableList.Builder<Aggregatorbuilder = ImmutableList.builder();
        for (AccumulatorFactory accumulatorFactory : accumulatorFactories) {
            builder.add(new Aggregator(accumulatorFactorystepmemoryManager));
        }
         = builder.build();
    }
    @Override
    {
        return ;
    }
    @Override
    public List<TypegetTypes()
    {
        return ;
    }
    @Override
    public void finish()
    {
        if ( == .) {
             = .;
        }
    }
    @Override
    public boolean isFinished()
    {
        return  == .;
    }
    @Override
    public boolean needsInput()
    {
        return  == .;
    }
    @Override
    public void addInput(Page page)
    {
        checkState(needsInput(), "Operator is already finishing");
        checkNotNull(page"page is null");
        for (Aggregator aggregate : ) {
            aggregate.processPage(page);
        }
    }
    @Override
    public Page getOutput()
    {
        if ( != .) {
            return null;
        }
        // project results into output blocks
        List<Typetypes = .stream().map(Aggregator::getType).collect(toImmutableList());
        PageBuilder pageBuilder = new PageBuilder(types);
        pageBuilder.declarePosition();
        for (int i = 0; i < .size(); i++) {
            Aggregator aggregator = .get(i);
            BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(i);
            aggregator.evaluate(blockBuilder);
        }
         = .;
        return pageBuilder.build();
    }
    private static List<TypetoTypes(Step stepList<AccumulatorFactoryaccumulatorFactories)
    {
        ImmutableList.Builder<Typetypes = ImmutableList.builder();
        for (AccumulatorFactory accumulatorFactory : accumulatorFactories) {
            types.add(new Aggregator(accumulatorFactorystepnull).getType());
        }
        return types.build();
    }
    private static class Aggregator
    {
        private final Accumulator aggregation;
        private final Step step;
        private final MemoryManager memoryManager;
        private final int intermediateChannel;
        private Aggregator(AccumulatorFactory accumulatorFactoryStep stepMemoryManager memoryManager)
        {
            if (step == .) {
                checkArgument(accumulatorFactory.getInputChannels().size() == 1, "expected 1 input channel for intermediate aggregation");
                 = accumulatorFactory.getInputChannels().get(0);
                 = accumulatorFactory.createIntermediateAccumulator();
            }
            else {
                 = -1;
                 = accumulatorFactory.createAccumulator();
            }
            this. = step;
            this. = memoryManager;
        }
        public Type getType()
        {
            if ( == .) {
                return .getIntermediateType();
            }
            else {
                return .getFinalType();
            }
        }
        public void processPage(Page page)
        {
            if ( == .) {
                .addIntermediate(page.getBlock());
            }
            else {
                .addInput(page);
            }
            if (!.canUse(.getEstimatedSize())) {
                throw new ExceededMemoryLimitException(.getMaxMemorySize());
            }
        }
        public void evaluate(BlockBuilder blockBuilder)
        {
            if ( == .) {
                .evaluateIntermediate(blockBuilder);
            }
            else {
                .evaluateFinal(blockBuilder);
            }
        }
    }
New to GrepCode? Check out our FAQ X