Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
  
  
  package org.apache.hadoop.hive.ql.exec;
  
  import java.util.Arrays;
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.List;
  import java.util.Map;
  import java.util.Set;
  
  import  org.apache.hadoop.conf.Configuration;
  import  org.apache.hadoop.io.Text;

GroupBy operator implementation.
  
  public class GroupByOperator extends Operator<GroupByDescimplements
      Serializable {
  
    private static final Log LOG = LogFactory.getLog(GroupByOperator.class
        .getName());
  
    private static final long serialVersionUID = 1L;
    private static final int NUMROWSESTIMATESIZE = 1000;
  
    protected transient ExprNodeEvaluator[] keyFields;
    protected transient ObjectInspector[] keyObjectInspectors;
  
    protected transient ExprNodeEvaluator[][] aggregationParameterFields;
    protected transient ObjectInspector[][] aggregationParameterObjectInspectors;
    protected transient Object[][] aggregationParameterObjects;
    // In the future, we may allow both count(DISTINCT a) and sum(DISTINCT a) in
    // the same SQL clause,
    // so aggregationIsDistinct is a boolean array instead of a single number.
    protected transient boolean[] aggregationIsDistinct;
    // Map from integer tag to distinct aggrs
    transient protected Map<IntegerSet<Integer>> distinctKeyAggrs =
      new HashMap<IntegerSet<Integer>>();
    // Map from integer tag to non-distinct aggrs with key parameters.
    transient protected Map<IntegerSet<Integer>> nonDistinctKeyAggrs =
      new HashMap<IntegerSet<Integer>>();
    // List of non-distinct aggrs.
    transient protected List<IntegernonDistinctAggrs = new ArrayList<Integer>();
    // Union expr for distinct keys
    transient ExprNodeEvaluator unionExprEval = null;
  
  
    protected transient ArrayList<ObjectInspectorobjectInspectors;
   transient ArrayList<StringfieldNames;
 
   // Used by sort-based GroupBy: Mode = COMPLETE, PARTIAL1, PARTIAL2,
   // MERGEPARTIAL
   protected transient KeyWrapper currentKeys;
   protected transient KeyWrapper newKeys;
   protected transient AggregationBuffer[] aggregations;
   protected transient Object[][] aggregationsParametersLastInvoke;
 
   // Used by hash-based GroupBy: Mode = HASH, PARTIALS
   protected transient HashMap<KeyWrapperAggregationBuffer[]> hashAggregations;
 
   // Used by hash distinct aggregations when hashGrpKeyNotRedKey is true
   protected transient HashSet<KeyWrapperkeysCurrentGroup;
 
   transient boolean bucketGroup;
 
   transient boolean firstRow;
   transient long totalMemory;
   transient boolean hashAggr;
   // The reduction is happening on the reducer, and the grouping key and
   // reduction keys are different.
   // For example: select a, count(distinct b) from T group by a
   // The data is sprayed by 'b' and the reducer is grouping it by 'a'
   transient boolean groupKeyIsNotReduceKey;
   transient boolean firstRowInGroup;
   transient long numRowsInput;
   transient long numRowsHashTbl;
   transient int groupbyMapAggrInterval;
   transient long numRowsCompareHashAggr;
   transient float minReductionHashAggr;
 
   // current Key ObjectInspectors are standard ObjectInspectors
   protected transient ObjectInspector[] currentKeyObjectInspectors;
   // new Key ObjectInspectors are objectInspectors from the parent
   public static MemoryMXBean memoryMXBean;
   private long maxMemory;
   private float memoryThreshold;

  
This is used to store the position and field names for variable length fields.
 
   class varLenFields {
     int aggrPos;
     List<Fieldfields;
 
     varLenFields(int aggrPosList<Fieldfields) {
       this. = aggrPos;
       this. = fields;
     }
 
     int getAggrPos() {
       return ;
     }
 
     List<FieldgetFields() {
       return ;
     }
   };
 
   // for these positions, some variable primitive type (String) is used, so size
   // cannot be estimated. sample it at runtime.
   transient List<IntegerkeyPositionsSize;
 
   // for these positions, some variable primitive type (String) is used for the
   // aggregation classes
   transient List<varLenFieldsaggrPositions;
 
   transient int fixedRowSize;
   transient long maxHashTblMemory;
   transient int totalVariableSize;
   transient int numEntriesVarSize;
   transient int numEntriesHashTable;
   transient int countAfterReport;
   transient int heartbeatInterval;
 
   protected void initializeOp(Configuration hconfthrows HiveException {
      = Runtime.getRuntime().totalMemory();
      = 0;
      = 0;
 
      = HiveConf.getIntVar(hconf,
      = 0;
 
     ObjectInspector rowInspector = [0];
 
     // init keyFields
     for (int i = 0; i < .i++) {
       [i] = ExprNodeEvaluatorFactory.get(.getKeys().get(i));
       [i] = [i].initialize(rowInspector);
       [i] = ObjectInspectorUtils
           .);
     }
 
     // initialize unionExpr for reduce-side
     // reduce KEY has union field as the last field if there are distinct
     // aggregates in group-by.
     List<? extends StructFieldsfs =
       ((StandardStructObjectInspectorrowInspector).getAllStructFieldRefs();
     if (sfs.size() > 0) {
       StructField keyField = sfs.get(0);
       if (keyField.getFieldName().toUpperCase().equals(
           ...name())) {
         ObjectInspector keyObjInspector = keyField.getFieldObjectInspector();
         if (keyObjInspector instanceof StandardStructObjectInspector) {
           List<? extends StructFieldkeysfs =
             ((StandardStructObjectInspectorkeyObjInspector).getAllStructFieldRefs();
           if (keysfs.size() > 0) {
             // the last field is the union field, if any
             StructField sf = keysfs.get(keysfs.size() - 1);
             if (sf.getFieldObjectInspector().getCategory().equals(
                 ..)) {
                = ExprNodeEvaluatorFactory.get(
                 new ExprNodeColumnDesc(TypeInfoUtils.getTypeInfoFromObjectInspector(
                 sf.getFieldObjectInspector()),
                 keyField.getFieldName() + "." + sf.getFieldName(), null,
                 false));
               .initialize(rowInspector);
             }
           }
         }
       }
     }
     // init aggregationParameterFields
      = new Object[aggrs.size()][];
      = new boolean[aggrs.size()];
     for (int i = 0; i < aggrs.size(); i++) {
       AggregationDesc aggr = aggrs.get(i);
       ArrayList<ExprNodeDescparameters = aggr.getParameters();
       [i] = new ExprNodeEvaluator[parameters.size()];
       [i] = new ObjectInspector[parameters
           .size()];
           .size()];
       [i] = new Object[parameters.size()];
       for (int j = 0; j < parameters.size(); j++) {
         [i][j] = ExprNodeEvaluatorFactory
             .get(parameters.get(j));
             .initialize(rowInspector);
         if ( != null) {
           String[] names = parameters.get(j).getExprString().split("\\.");
           // parameters of the form : KEY.colx:t.coly
           if (...name().equals(names[0])) {
             String name = names[names.length - 2];
             int tag = Integer.parseInt(name.split("\\:")[1]);
             if (aggr.getDistinct()) {
               // is distinct
               Set<Integerset = .get(tag);
               if (null == set) {
                 set = new HashSet<Integer>();
                 .put(tagset);
               }
               if (!set.contains(i)) {
                 set.add(i);
               }
             } else {
               Set<Integerset = .get(tag);
               if (null == set) {
                 set = new HashSet<Integer>();
                 .put(tagset);
               }
               if (!set.contains(i)) {
                 set.add(i);
               }
             }
           } else {
             // will be VALUE._COLx
             if (!.contains(i)) {
               .add(i);
             }
           }
         } else {
           if (aggr.getDistinct()) {
             [i] = true;
           }
         }
         [i][j] = ObjectInspectorUtils
             .getStandardObjectInspector(
             [i][j],
             .);
         [i][j] = null;
       }
       if (parameters.size() == 0) {
         // for ex: count(*)
         if (!.contains(i)) {
           .add(i);
         }
       }
     }
 
     // init aggregationClasses
         .size()];
     for (int i = 0; i < .i++) {
       AggregationDesc agg = .getAggregators().get(i);
     }
 
     // init objectInspectors
     int totalFields = . + .;
      = new ArrayList<ObjectInspector>(totalFields);
     for (ExprNodeEvaluator keyField : ) {
       .add(null);
     }
     for (int i = 0; i < .i++) {
           .get(i).getMode(), [i]);
       .add(roi);
     }
 
     if (.getMode() != .. || ) {
        = newAggregations();
        = false;
     } else {
        = new HashMap<KeyWrapperAggregationBuffer[]>(256);
        = newAggregations();
        = true;
        = new ArrayList<Integer>();
        = new ArrayList<varLenFields>();
        = HiveConf.getIntVar(hconf,
 
       // compare every groupbyMapAggrInterval rows
        = HiveConf.getFloatVar(hconf,
       if () {
          = new HashSet<KeyWrapper>();
       }
     }
 
 
     for (int i = 0; i < .i++) {
     }
 
     // Generate key names
     ArrayList<StringkeyNames = new ArrayList<String>(.);
     for (int i = 0; i < .i++) {
       keyNames.add(.get(i));
     }
      = ObjectInspectorFactory
         .getStandardStructObjectInspector(keyNames, Arrays
         .asList());
      = ObjectInspectorFactory
         .getStandardStructObjectInspector(keyNames, Arrays
 
      = ObjectInspectorFactory
 
 
 
      = true;
     // estimate the number of hash table entries based on the size of each
     // entry. Since the size of a entry
     // is not known, estimate that based on the number of entries
     if () {
       computeMaxEntriesHashAggr(hconf);
     }
      = ManagementFactory.getMemoryMXBean();
     initializeChildren(hconf);
   }

  
Estimate the number of entries in map-side hash table. The user can specify the total amount of memory to be used by the map-side hash. By default, all available memory is used. The size of each row is estimated, rather crudely, and the number of entries are figure out based on that.

Returns:
number of entries that can fit in hash table - useful for map-side aggregation only
 
   private void computeMaxEntriesHashAggr(Configuration hconfthrows HiveException {
     float memoryPercentage = this.getConf().getGroupByMemoryUsage();
      = (long) (memoryPercentage * Runtime.getRuntime().maxMemory());
     estimateRowSize();
   }
 
   private static final int javaObjectOverHead = 64;
   private static final int javaHashEntryOverHead = 64;
   private static final int javaSizePrimitiveType = 16;
   private static final int javaSizeUnknownType = 256;

  
The size of the element at position 'pos' is returned, if possible. If the datatype is of variable length, STRING, a list of such key positions is maintained, and the size for such positions is then actually calculated at runtime.

Parameters:
pos the position of the key
c the type of the key
Returns:
the size of this datatype
 
   private int getSize(int posPrimitiveCategory category) {
     switch (category) {
     case :
     case :
     case :
     case :
     case :
     case :
     case :
     case :
       return ;
     case :
       .add(new Integer(pos));
       return ;
     default:
       return ;
     }
   }

  
The size of the element at position 'pos' is returned, if possible. If the field is of variable length, STRING, a list of such field names for the field position is maintained, and the size for such positions is then actually calculated at runtime.

Parameters:
pos the position of the key
c the type of the key
f the field to be added
Returns:
the size of this datatype
 
   private int getSize(int posClass<?> cField f) {
     if (c.isPrimitive()
         || c.isInstance(new Boolean(true))
         || c.isInstance(new Byte((byte) 0))
         || c.isInstance(new Short((short) 0))
         || c.isInstance(new Integer(0))
         || c.isInstance(new Long(0))
         || c.isInstance(new Float(0))
         || c.isInstance(new Double(0))) {
       return ;
     }
 
     if (c.isInstance(new String())) {
       int idx = 0;
       varLenFields v = null;
       for (idx = 0; idx < .size(); idx++) {
         v = .get(idx);
         if (v.getAggrPos() == pos) {
           break;
         }
       }
 
       if (idx == .size()) {
         v = new varLenFields(posnew ArrayList<Field>());
         .add(v);
       }
 
       v.getFields().add(f);
       return ;
     }
 
     return ;
   }

  

Parameters:
pos position of the key
typeinfo type of the input
Returns:
the size of this datatype
 
   private int getSize(int posTypeInfo typeInfo) {
     if (typeInfo instanceof PrimitiveTypeInfo) {
       return getSize(pos, ((PrimitiveTypeInfotypeInfo).getPrimitiveCategory());
     }
     return ;
   }

  

Returns:
the size of each row
 
   private void estimateRowSize() throws HiveException {
     // estimate the size of each entry -
     // a datatype with unknown size (String/Struct etc. - is assumed to be 256
     // bytes for now).
     // 64 bytes is the overhead for a reference
 
     ArrayList<ExprNodeDesckeys = .getKeys();
 
     // Go over all the keys and get the size of the fields of fixed length. Keep
     // track of the variable length keys
     for (int pos = 0; pos < keys.size(); pos++) {
        += getSize(poskeys.get(pos).getTypeInfo());
     }
 
     // Go over all the aggregation classes and and get the size of the fields of
     // fixed length. Keep track of the variable length
     // fields in these aggregation classes.
     for (int i = 0; i < .i++) {
 
       Class<? extends AggregationBufferagg = [i]
           .getNewAggregationBuffer().getClass();
       Field[] fArr = ObjectInspectorUtils.getDeclaredNonStaticFields(agg);
       for (Field f : fArr) {
          += getSize(if.getType(), f);
       }
     }
   }
 
   protected AggregationBuffer[] newAggregations() throws HiveException {
     for (int i = 0; i < .i++) {
       aggs[i] = [i].getNewAggregationBuffer();
       // aggregationClasses[i].reset(aggs[i]);
     }
     return aggs;
   }
 
   protected void resetAggregations(AggregationBuffer[] aggsthrows HiveException {
     for (int i = 0; i < aggs.lengthi++) {
       [i].reset(aggs[i]);
     }
   }
 
   /*
    * Update aggregations. If the aggregation is for distinct, in case of hash
    * aggregation, the client tells us whether it is a new entry. For sort-based
    * aggregations, the last row is compared with the current one to figure out
    * whether it has changed. As a cleanup, the lastInvoke logic can be pushed in
    * the caller, and this function can be independent of that. The client should
    * always notify whether it is a different row or not.
    *
    * @param aggs the aggregations to be evaluated
    *
    * @param row the row being processed
    *
    * @param rowInspector the inspector for the row
    *
    * @param hashAggr whether hash aggregation is being performed or not
    *
    * @param newEntryForHashAggr only valid if it is a hash aggregation, whether
    * it is a new entry or not
    */
   protected void updateAggregations(AggregationBuffer[] aggsObject row,
       ObjectInspector rowInspectorboolean hashAggr,
       boolean newEntryForHashAggrObject[][] lastInvokethrows HiveException {
     if ( == null) {
       for (int ai = 0; ai < aggs.lengthai++) {
         // Calculate the parameters
         Object[] o = new Object[[ai].length];
         for (int pi = 0; pi < [ai].lengthpi++) {
           o[pi] = [ai][pi].evaluate(row);
         }
 
         // Update the aggregations.
         if ([ai]) {
           if (hashAggr) {
             if (newEntryForHashAggr) {
               [ai].aggregate(aggs[ai], o);
             }
           } else {
             if (lastInvoke[ai] == null) {
               lastInvoke[ai] = new Object[o.length];
             }
             if (ObjectInspectorUtils.compare(o,
                 [ai], lastInvoke[ai],
                 [ai]) != 0) {
               [ai].aggregate(aggs[ai], o);
               for (int pi = 0; pi < o.lengthpi++) {
                 lastInvoke[ai][pi] = ObjectInspectorUtils.copyToStandardObject(
                     o[pi], [ai][pi],
                     .);
               }
             }
           }
         } else {
           [ai].aggregate(aggs[ai], o);
         }
       }
       return;
     }
 
     if (.size() > 0) {
       // evaluate union object
       UnionObject uo = (UnionObject) (.evaluate(row));
       int unionTag = uo.getTag();
 
       // update non-distinct key aggregations : "KEY._colx:t._coly"
       if (.get(unionTag) != null) {
         for (int pos : .get(unionTag)) {
           Object[] o = new Object[[pos].length];
           for (int pi = 0; pi < [pos].lengthpi++) {
             o[pi] = [pos][pi].evaluate(row);
           }
           [pos].aggregate(aggs[pos], o);
         }
       }
       // there may be multi distinct clauses for one column
       // update them all.
       if (.get(unionTag) != null) {
         for (int i : .get(unionTag)) {
           Object[] o = new Object[[i].length];
           for (int pi = 0; pi < [i].lengthpi++) {
             o[pi] = [i][pi].evaluate(row);
           }
 
           if (hashAggr) {
             if (newEntryForHashAggr) {
               [i].aggregate(aggs[i], o);
             }
           } else {
             if (lastInvoke[i] == null) {
               lastInvoke[i] = new Object[o.length];
             }
             if (ObjectInspectorUtils.compare(o,
                 [i],
                 lastInvoke[i],
                 [i]) != 0) {
               [i].aggregate(aggs[i], o);
               for (int pi = 0; pi < o.lengthpi++) {
                 lastInvoke[i][pi] = ObjectInspectorUtils.copyToStandardObject(
                     o[pi], [i][pi],
                     .);
               }
             }
           }
         }
       }
 
       // update non-distinct value aggregations: 'VALUE._colx'
       // these aggregations should be updated only once.
       if (unionTag == 0) {
         for (int pos : ) {
           Object[] o = new Object[[pos].length];
           for (int pi = 0; pi < [pos].lengthpi++) {
             o[pi] = [pos][pi].evaluate(row);
           }
           [pos].aggregate(aggs[pos], o);
         }
       }
     } else {
       for (int ai = 0; ai < aggs.lengthai++) {
         // there is no distinct aggregation,
         // update all aggregations
         Object[] o = new Object[[ai].length];
         for (int pi = 0; pi < [ai].lengthpi++) {
           o[pi] = [ai][pi].evaluate(row);
         }
         [ai].aggregate(aggs[ai], o);
       }
     }
   }
 
   public void startGroup() throws HiveException {
      = true;
   }
 
   public void endGroup() throws HiveException {
     if () {
       .clear();
     }
   }
 
   public void processOp(Object rowint tagthrows HiveException {
      = false;
     ObjectInspector rowInspector = [tag];
     // Total number of input rows is needed for hash aggregation only
     if ( && !) {
       ++;
       // if hash aggregation is not behaving properly, disable it
       if ( == ) {
         // map-side aggregation should reduce the entries by at-least half
         if ( >  * ) {
           .warn("Disable Hash Aggr: #hash table = " + 
               + " #total = " +  + " reduction = " + 1.0
               * ( / ) + " minReduction = "
               + );
           flush(true);
            = false;
         } else {
           .trace("Hash Aggr Enabled: #hash table = " + 
               + " #total = " +  + " reduction = " + 1.0
               * ( / ) + " minReduction = "
               + );
         }
       }
     }
 
     try {
       ++;
 
       .getNewKey(rowrowInspector);
       if () {
         .setHashKey();
         processHashAggr(rowrowInspector);
       } else {
         processAggr(rowrowInspector);
       }
 
        = false;
 
       if ( != 0 && ( % ) == 0
           && ( != null)) {
         .progress();
          = 0;
       }
     } catch (HiveException e) {
       throw e;
     } catch (Exception e) {
       throw new HiveException(e);
     }
   }
 
   private void processHashAggr(Object rowObjectInspector rowInspector,
       KeyWrapper newKeysthrows HiveException {
     // Prepare aggs for updating
     AggregationBuffer[] aggs = null;
     boolean newEntryForHashAggr = false;
 
     // hash-based aggregations
     aggs = .get(newKeys);
     if (aggs == null) {
       KeyWrapper newKeyProber = newKeys.copyKey();
       aggs = newAggregations();
       .put(newKeyProberaggs);
       newEntryForHashAggr = true;
       ++; // new entry in the hash table
     }
 
     // If the grouping key and the reduction key are different, a set of
     // grouping keys for the current reduction key are maintained in
     // keysCurrentGroup
     // Peek into the set to find out if a new grouping key is seen for the given
     // reduction key
     if () {
       newEntryForHashAggr = .add(newKeys.copyKey());
     }
 
     // Update the aggs
     updateAggregations(aggsrowrowInspectortruenewEntryForHashAggrnull);
 
     // We can only flush after the updateAggregations is done, or the
     // potentially new entry "aggs"
     // can be flushed out of the hash table.
 
     // Based on user-specified parameters, check if the hash table needs to be
     // flushed.
     // If the grouping key is not the same as reduction key, flushing can only
     // happen at boundaries
         && shouldBeFlushed(newKeys)) {
       flush(false);
     }
   }
 
   // Non-hash aggregation
   private void processAggr(Object rowObjectInspector rowInspector,
       KeyWrapper newKeysthrows HiveException {
     // Prepare aggs for updating
     AggregationBuffer[] aggs = null;
     Object[][] lastInvoke = null;
     //boolean keysAreEqual = (currentKeys != null && newKeys != null)?
     //  newKeyStructEqualComparer.areEqual(currentKeys, newKeys) : false;
 
     boolean keysAreEqual = ( != null && newKeys != null)?
         newKeys.equals() : false;
 
 
     // Forward the current keys if needed for sort-based aggregation
     if ( != null && !keysAreEqual) {
        = 0;
     }
 
     // Need to update the keys?
     if ( == null || !keysAreEqual) {
       if ( == null) {
          = newKeys.copyKey();
       } else {
         .copyKey(newKeys);
       }
 
       // Reset the aggregations
 
       // clear parameters in last-invoke
       for (int i = 0; i < .i++) {
         [i] = null;
       }
     }
 
     aggs = ;
 
     lastInvoke = ;
     // Update the aggs
 
     updateAggregations(aggsrowrowInspectorfalsefalselastInvoke);
   }

  
Based on user-parameters, should the hash table be flushed.

Parameters:
newKeys keys for the row under consideration
 
   private boolean shouldBeFlushed(KeyWrapper newKeys) {
     int numEntries = .size();
     long usedMemory;
     float rate;
 
     // The fixed size for the aggregation class is already known. Get the
     // variable portion of the size every NUMROWSESTIMATESIZE rows.
     if (( == 0) || ((numEntries % ) == 0)) {
       //check how much memory left memory
       usedMemory = .getHeapMemoryUsage().getUsed();
       rate = (floatusedMemory / (float;
       if(rate > ){
         return true;
       }
       for (Integer pos : ) {
         Object key = newKeys.getKeyArray()[pos.intValue()];
         // Ignore nulls
         if (key != null) {
           if (key instanceof LazyPrimitive) {
                +=
                   ((LazyPrimitive<LazyStringObjectInspector, Text>) key).
                       getWritableObject().getLength();
           } else if (key instanceof String) {
              += ((Stringkey).length();
           } else if (key instanceof Text) {
              += ((Text) key).getLength();
           }
         }
       }
 
       AggregationBuffer[] aggs = null;
       if (.size() > 0) {
         KeyWrapper newKeyProber = newKeys.copyKey();
         aggs = .get(newKeyProber);
       }
 
       for (varLenFields v : ) {
         int aggrPos = v.getAggrPos();
         List<FieldfieldsVarLen = v.getFields();
         AggregationBuffer agg = aggs[aggrPos];
 
         try {
           for (Field f : fieldsVarLen) {
              += ((Stringf.get(agg)).length();
           }
         } catch (IllegalAccessException e) {
           assert false;
         }
       }
 
       ++;
 
       // Update the number of entries that can fit in the hash table
        =
           (int) ( / ( + ( / )));
       .trace("Hash Aggr: #hash table = " + numEntries
           + " #max in hash table = " + );
     }
 
     // flush if necessary
     if (numEntries >= ) {
       return true;
     }
     return false;
   }
 
   private void flush(boolean completethrows HiveException {
 
      = 0;
 
     // Currently, the algorithm flushes 10% of the entries - this can be
     // changed in the future
 
     if (complete) {
           .entrySet().iterator();
       while (iter.hasNext()) {
         Map.Entry<KeyWrapperAggregationBuffer[]> m = iter.next();
         forward(m.getKey().getKeyArray(), m.getValue());
       }
       .clear();
        = null;
       .warn("Hash Table completed flushed");
       return;
     }
 
     int oldSize = .size();
     .warn("Hash Tbl flush: #hash table = " + oldSize);
         .entrySet().iterator();
     int numDel = 0;
     while (iter.hasNext()) {
       Map.Entry<KeyWrapperAggregationBuffer[]> m = iter.next();
       forward(m.getKey().getKeyArray(), m.getValue());
       iter.remove();
       numDel++;
       if (numDel * 10 >= oldSize) {
         .warn("Hash Table flushed: new size = " + .size());
         return;
       }
     }
   }
 
   transient Object[] forwardCache;

  
Forward a record of keys and aggregation results.

Parameters:
keys The keys in the record
Throws:
HiveException
 
   protected void forward(Object[] keysAggregationBuffer[] aggs)
       throws HiveException {
     int totalFields = keys.lengthaggs.length;
     if ( == null) {
        = new Object[totalFields];
     }
     for (int i = 0; i < keys.lengthi++) {
       [i] = keys[i];
     }
     for (int i = 0; i < aggs.lengthi++) {
       [keys.length + i] = [i]
           .evaluate(aggs[i]);
     }
 
   }

  
We need to forward all the aggregations to children.
 
   public void closeOp(boolean abortthrows HiveException {
     if (!abort) {
       try {
         // If there is no grouping key and no row came to this operator
         if ( && (. == 0)) {
            = false;
 
           // There is no grouping key - simulate a null row
           // This is based on the assumption that a null row is ignored by
           // aggregation functions
           for (int ai = 0; ai < .ai++) {
 
             // o is set to NULL in order to distinguish no rows at all
             Object[] o;
             if ([ai].length > 0) {
               o = new Object[[ai].length];
             } else {
               o = null;
             }
 
             // Calculate the parameters
             for (int pi = 0; pi < [ai].lengthpi++) {
               o[pi] = null;
             }
             [ai].aggregate([ai], o);
           }
 
           // create dummy keys - size 0
           forward(new Object[0], );
         } else {
           if ( != null) {
             .warn("Begin Hash Table flush at close: size = "
                 + .size());
            Iterator iter = .entrySet().iterator();
            while (iter.hasNext()) {
              Map.Entry<KeyWrapperAggregationBuffer[]> m = (Map.Entryiter
                  .next();
              forward(m.getKey().getKeyArray(), m.getValue());
              iter.remove();
            }
            .clear();
          } else if ( != null) {
            // sort-based aggregations
            if ( != null) {
              forward(.getKeyArray(), );
            }
             = null;
          } else {
            // The GroupByOperator is not initialized, which means there is no
            // data
            // (since we initialize the operators when we see the first record).
            // Just do nothing here.
          }
        }
      } catch (Exception e) {
        e.printStackTrace();
        throw new HiveException(e);
      }
    }
  }
  // Group by contains the columns needed - no need to aggregate from children
  public List<StringgenColLists(
      HashMap<Operator<? extends Serializable>, OpParseContextopParseCtx) {
    List<StringcolLists = new ArrayList<String>();
    ArrayList<ExprNodeDesckeys = .getKeys();
    for (ExprNodeDesc key : keys) {
      colLists = Utilities.mergeUniqElems(colListskey.getCols());
    }
    for (AggregationDesc aggr : aggrs) {
      ArrayList<ExprNodeDescparams = aggr.getParameters();
      for (ExprNodeDesc param : params) {
        colLists = Utilities.mergeUniqElems(colListsparam.getCols());
      }
    }