Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.List;
 import java.util.Map;
 
 
Do partial aggregation in map plan. Inputs are buffered up in a hashmap until a threshold is reached; then the combiner functions are fed these buffered up inputs, and results stored in a secondary map. Once that map fills up or all input has been seen, results are piped out into the next operator (caller of getNext()).
 
 public class POPartialAgg extends PhysicalOperator implements Spillable {
     private static final Log LOG = LogFactory.getLog(POPartialAgg.class);
     private static final long serialVersionUID = 1L;
 
     private static final Result ERR_RESULT = new Result();
     private static final Result EOP_RESULT = new Result(.,
             null);
 
     // number of records to sample to determine average size used by each
     // entry in hash map and average seen reduction
     private static final int NUM_RECS_TO_SAMPLE = 10000;
 
     // We want to avoid massive ArrayList copies as they get big.
     // Array Lists grow by prevSize + prevSize/2. Given default initial size of 10,
     // 9369 is the size of the array after 18 such resizings. This seems like a sufficiently
     // large value to trigger spilling/aggregation instead of paying for yet another data
     // copy.
     private static final int MAX_LIST_SIZE = 9368;
 
     private static final int DEFAULT_MIN_REDUCTION = 10;
 
     // TODO: these are temporary. The real thing should be using memory usage estimation.
     private static final int FIRST_TIER_THRESHOLD = 20000;
     private static final int SECOND_TIER_THRESHOLD =  / ;
 
     private static final WeakHashMap<POPartialAggByteALL_POPARTS = new WeakHashMap<POPartialAggByte>();
 
     private static final TupleFactory TF = TupleFactory.getInstance();
     private static final BagFactory BG = BagFactory.getInstance();
 
     private PhysicalPlan keyPlan;
     private ExpressionOperator keyLeaf;
 
     private List<PhysicalPlanvaluePlans;
     private List<ExpressionOperatorvalueLeaves;
 
     private int numRecsInRawMap = 0;
     private int numRecsInProcessedMap = 0;
 
     private Map<ObjectList<Tuple>> rawInputMap = Maps.newHashMap();
     private Map<ObjectList<Tuple>> processedInputMap = Maps.newHashMap();
    private boolean disableMapAgg = false;
    private boolean sizeReductionChecked = false;
    private boolean inputsExhausted = false;
    private volatile boolean doSpill = false;
    private transient MemoryLimits memLimits;
    private transient boolean initialized = false;
    private int firstTierThreshold = ;
    private int sizeReduction = 1;
    private int avgTupleSize = 0;
    private boolean estimatedMemThresholds = false;
    public POPartialAgg(OperatorKey k) {
        super(k);
    }
    private void init() throws ExecException {
        .put(thisnull);
        float percent = getPercentUsageFromProp();
        if (percent <= 0) {
            .info("No memory allocated to intermediate memory buffers. Turning off partial aggregation.");
            disableMapAgg();
        }
         = true;
        SpillableMemoryManager.getInstance().registerSpillable(this);
    }
    @Override
    public Result getNextTuple() throws ExecException {
        // accumulate tuples from processInput in rawInputMap.
        // when the maps grow to mem limit, go over each item in map, and call
        // combiner aggs on each collection.
        // Store the results into processedInputMap. Clear out rawInputMap.
        // Mem usage is updated every time we modify either of the maps.
        // When processedInputMap is >= 20% of allotted memory, run aggs on it,
        // and output the results as returns of successive calls of this method.
        // Then reset processedInputMap.
        // The fact that we are in the latter stage is communicated via the doSpill
        // flag.
        if (! && !.containsKey(this)) {
            init();
        }
        while (true) {
            if (! &&  >= ) {
                checkSizeReduction();
            }
            if (! &&  >= ) {
                estimateMemThresholds();
            }
            if () {
                startSpill();
                Result result = spillResult();
                if (result == ) {
                     = false;
                }
                if (result !=  || ) {
                    return result;
                }
            }
            if (mapAggDisabled()) {
                // disableMapAgg() sets doSpill, so we can't get here while there is still contents in the buffered maps.
                // if we get to this point, everything is flushed, so we can simply return the raw tuples from now on.
                return processInput();
            } else {
                Result inp = processInput();
                if (inp.returnStatus == .) {
                    return inp;
                } else if (inp.returnStatus == .) {
                    if (.) {
                        // parent input is over. flush what we have.
                         = true;
                        startSpill();
                        .info("Spilling last bits.");
                        continue;
                    } else {
                        return ;
                    }
                } else if (inp.returnStatus == .) {
                    continue;
                } else {
                    // add this input to map.
                    Tuple inpTuple = (Tupleinp.result;
                    .attachInput(inpTuple);
                    // evaluate the key
                    Result keyRes = getResult();
                    if (keyRes == ) {
                        return ;
                    }
                    Object key = keyRes.result;
                    .detachInput();
                     += 1;
                    addKeyValToMap(keyinpTuple);
                    if (shouldAggregateFirstLevel()) {
                        aggregateFirstLevel();
                    }
                    if (shouldAggregateSecondLevel()) {
                        aggregateSecondLevel();
                    }
                    if (shouldSpill()) {
                        .info("Starting spill.");
                        startSpill(); // next time around, we'll start emitting.
                    }
                }
            }
        }
    }
    private void estimateMemThresholds() {
        if (!mapAggDisabled()) {
            .info("Getting mem limits; considering " + .size() + " POPArtialAgg objects.");
            float percent = getPercentUsageFromProp();
             = new MemoryLimits(.size(), percent);
            int estTotalMem = 0;
            int estTuples = 0;
            for (Map.Entry<ObjectList<Tuple>> entry : .entrySet()) {
                for (Tuple t : entry.getValue()) {
                    estTuples += 1;
                    int mem = (intt.getMemorySize();
                    estTotalMem += mem;
                    .addNewObjSize(mem);
                }
            }
             = estTotalMem / estTuples;
            int totalTuples = .getCacheLimit();
            .info("Estimated total tuples to buffer, based on " + estTuples + " tuples that took up " + estTotalMem + " bytes: " + totalTuples);
             = (int) (0.5 + totalTuples * (1f - (1f / )));
             = (int) (0.5 + totalTuples *  (1f / ));
            .info("Setting thresholds. Primary: " +  + ". Secondary: " + );
        }
         = true;
    }
    private void checkSizeReduction() throws ExecException {
        int numBeforeReduction =  + ;
        aggregateFirstLevel();
        aggregateSecondLevel();
        int numAfterReduction =  + ;
        .info("After reduction, processed map: " +  + "; raw map: " + );
        int minReduction = getMinOutputReductionFromProp();
        .info("Observed reduction factor: from " + numBeforeReduction +
                " to " + numAfterReduction +
                " => " + numBeforeReduction / numAfterReduction + ".");
        if ( numBeforeReduction / numAfterReduction < minReduction) {
            .info("Disabling in-memory aggregation, since observed reduction is less than " + minReduction);
            disableMapAgg();
        }
         = numBeforeReduction / numAfterReduction;
         = true;
    }
    private void disableMapAgg() throws ExecException {
        startSpill();
         = true;
    }
    private boolean mapAggDisabled() {
        return ;
    }
    private boolean shouldAggregateFirstLevel() {
        if (.isInfoEnabled() &&  > ) {
            .info("Aggregating " +  + " raw records.");
        }
        return ( > );
    }
    private boolean shouldAggregateSecondLevel() {
            .info("Aggregating " +  + " secondary records.");
        }
        return ( > );
    }
    private boolean shouldSpill() {
        // is this always the same as shouldAgg?
        return shouldAggregateSecondLevel();
    }
    private void addKeyValToMap(Map<ObjectList<Tuple>> map,
            Object keyTuple inpTuplethrows ExecException {
        List<Tuplevalue = map.get(key);
        if (value == null) {
            value = new ArrayList<Tuple>();
            map.put(keyvalue);
        }
        value.add(inpTuple);
        if (value.size() >= ) {
            boolean isFirst = (map == );
            if (.isDebugEnabled()){
                .debug("The cache for key " + key + " has grown too large. Aggregating " + ((isFirst) ? "first level." : "second level."));
            }
            if (isFirst) {
                aggregateRawRow(key);
            } else {
                aggregateSecondLevel();
            }
        }
    }
    private void startSpill() throws ExecException {
        // If spillingIterator is null, we are already spilling and don't need to set up.
        if ( != nullreturn;
        if (!.isEmpty()) {
            if (.isInfoEnabled()) {
                .info("In startSpill(), aggregating raw inputs. " +  + " tuples.");
            }
            aggregateFirstLevel();
            if (.isInfoEnabled()) {
                .info("processed inputs: " +  + " tuples.");
            }
        }
        if (!.isEmpty()) {
            if (.isInfoEnabled()) {
                .info("In startSpill(), aggregating processed inputs. " +  + " tuples.");
            }
            aggregateSecondLevel();
            if (.isInfoEnabled()) {
                .info("processed inputs: " +  + " tuples.");
            }
        }
         = true;
    }
    private Result spillResult() throws ExecException {
        // if no more to spill, return EOP_RESULT.
        if (.isEmpty()) {
             = null;
            .info("In spillResults(), processed map is empty -- done spilling.");
            return ;
        } else {
            Map.Entry<ObjectList<Tuple>> entry = .next();
            Tuple valueTuple = createValueTuple(entry.getKey(), entry.getValue());
             -= entry.getValue().size();
            .remove();
            Result res = getOutput(entry.getKey(), valueTuple);
            return res;
        }
    }
    private void aggregateRawRow(Object keythrows ExecException {
        List<Tuplevalue = .get(key);
        Tuple valueTuple = createValueTuple(keyvalue);
        Result res = getOutput(keyvalueTuple);
        .remove(key);
        addKeyValToMap(keygetAggResultTuple(res.result));
         += valueTuple.size() - 1;
    }

    
For each entry in rawInputMap, feed the list of tuples into the aggregator funcs and add the results to processedInputMap. Remove the entries from rawInputMap as we go.

    private int aggregate(Map<ObjectList<Tuple>> fromMapMap<ObjectList<Tuple>> toMapint numEntriesInTargetthrows ExecException {
        Iterator<Map.Entry<ObjectList<Tuple>>> iter = fromMap.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<ObjectList<Tuple>> entry = iter.next();
            Tuple valueTuple = createValueTuple(entry.getKey(), entry.getValue());
            Result res = getOutput(entry.getKey(), valueTuple);
            iter.remove();
            addKeyValToMap(toMapentry.getKey(), getAggResultTuple(res.result));
            numEntriesInTarget += valueTuple.size() - 1;
        }
        return numEntriesInTarget;
    }
    private void aggregateFirstLevel() throws ExecException {
         = 0;
    }
    private void aggregateSecondLevel() throws ExecException {
        Map<ObjectList<Tuple>> newMap = Maps.newHashMapWithExpectedSize(.size());
         = aggregate(newMap, 0);
         = newMap;
    }
    private Tuple createValueTuple(Object keyList<TupleinpTuplesthrows ExecException {
        Tuple valueTuple = .newTuple(.size() + 1);
        valueTuple.set(0, key);
        for (int i = 0; i < .size(); i++) {
            DataBag bag = .newDefaultBag();
            valueTuple.set(i + 1, bag);
        }
        for (Tuple t : inpTuples) {
            for (int i = 1; i < t.size(); i++) {
                DataBag bag = (DataBagvalueTuple.get(i);
                bag.add((Tuplet.get(i));
            }
        }
        return valueTuple;
    }
    private Tuple getAggResultTuple(Object resultthrows ExecException {
        try {
            return (Tupleresult;
        } catch (ClassCastException ex) {
            throw new ExecException("Intermediate Algebraic "
                    + "functions must implement EvalFunc<Tuple>");
        }
    }
    @Override
    public Tuple illustratorMarkup(Object inObject outint eqClassIndex) {
        // combiner optimizer does not get invoked if the plan is being executed
        // under illustrate, so POPartialAgg should not get used in that case
        throw new UnsupportedOperationException();
    }
    @Override
    public void visit(PhyPlanVisitor vthrows VisitorException {
        v.visitPartialAgg(this);
    }
    private int getMinOutputReductionFromProp() {
        int minReduction = ..get().getInt(
        if (minReduction <= 0) {
            .info("Specified reduction is < 0 (" + minReduction + "). Using default " + );
            minReduction = ;
        }
        return minReduction;
    }
    private float getPercentUsageFromProp() {
        float percent = 0.2F;
        if (..get() != null) {
            String usage = ..get().get(
                    .);
            if (usage != null) {
                percent = Float.parseFloat(usage);
            }
        }
        return percent;
    }
    private Result getResult(ExpressionOperator opthrows ExecException {
        Result res = ;
        switch (op.getResultType()) {
        case .:
        case .:
        case .:
        case .:
        case .:
        case .:
        case .:
        case .:
        case .:
        case .:
        case .:
        case .:
        case .:
            res = op.getNext(op.getResultType());
            break;
        default:
            String msg = "Invalid result type: "
                    + DataType.findType(op.getResultType());
            throw new ExecException(msg, 2270, .);
        }
        // allow null as group by key
        if (res.returnStatus == .
                || res.returnStatus == .) {
            return res;
        }
        return ;
    }

    
Runs the provided key-value pair through the aggregator plans.

Parameters:
key
value
Returns:
Result, containing a tuple of form (key, tupleReturnedByPlan1, tupleReturnedByPlan2, ...)
Throws:
org.apache.pig.backend.executionengine.ExecException
    private Result getOutput(Object keyTuple valuethrows ExecException {
        Tuple output = .newTuple(.size() + 1);
        output.set(0, key);
        for (int i = 0; i < .size(); i++) {
            .get(i).attachInput(value);
            Result valRes = getResult(.get(i));
            if (valRes == ) {
                return ;
            }
            output.set(i + 1, valRes.result);
        }
        return new Result(.output);
    }
    @Override
    public boolean supportsMultipleInputs() {
        return false;
    }
    @Override
    public boolean supportsMultipleOutputs() {
        return false;
    }
    @Override
    public String name() {
        return getAliasString() + "Partial Agg" + "["
                + DataType.findTypeName() + "]" + .toString();
    }
    public PhysicalPlan getKeyPlan() {
        return ;
    }
    public void setKeyPlan(PhysicalPlan keyPlan) {
        this. = keyPlan;
         = (ExpressionOperatorkeyPlan.getLeaves().get(0);
    }
    public List<PhysicalPlangetValuePlans() {
        return ;
    }
    public void setValuePlans(List<PhysicalPlanvaluePlans) {
        this. = valuePlans;
         = new ArrayList<ExpressionOperator>();
        for (PhysicalPlan plan : valuePlans) {
            .add((ExpressionOperatorplan.getLeaves().get(0));
        }
    }
    @Override
    public long spill() {
        .info("Spill triggered by SpillableMemoryManager");
         = true;
        return 0;
    }
    @Override
    public long getMemorySize() {
        return  * ( + );
    }
New to GrepCode? Check out our FAQ X