Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig;
 
 
This class is used to provide a free implementation of the Accumulator interface and EvalFunc class in the case of an Algebraic function. Instead of having to provide redundant implementations for Accumulator and EvalFunc, implementing the getInitial, getIntermed, and getFinal methods (which implies implementing the static classes they reference) will give you an implementation of each of those for free.

One key thing to note is that if a subclass of AlgebraicEvalFunc wishes to use any constructor arguments, it MUST call super(args).

IMPORTANT: the implementation of the Accumulator interface that this class provides is good, but it is simulated. For maximum efficiency, it is important to manually implement the accumulator interface. See Accumulator for more information on how to do so.
 
 public abstract class AlgebraicEvalFunc<T> extends AccumulatorEvalFunc<T> implements Algebraic {
     private EvalFunc<TupleinitEvalFunc;
     private EvalFunc<TupleintermedEvalFunc;
     private EvalFunc<T> finalEvalFunc;
 
     private static final BagFactory mBagFactory = BagFactory.getInstance();
     private static final TupleFactory mTupleFactory = TupleFactory.getInstance();
 
     private DataBag intermediateDB;
     private DataBag wrapDB;
     private DataBag accumDB;
 
     private Tuple argTuple;
     private Tuple accumTup;

    
This represents the number of elements an intermediate bag must have in order for the intermediate EvalFunc to be used on it.
 
     private int bagCombineThreshold = 1000;

    
This represents the factor by which the result of applying the intermediate EvalFunc must shrink the data in order for combining at this stage to be deemed "worth it."
 
     private int combineFactor = 2;
 
     private PigCounterHelper pigCounterHelper = new PigCounterHelper();
 
     private boolean combine;
 
     private String[] constructorArgs;

    
It is key that if a subclass has a constructor, that it calls super(args...) or else this class will not instantiate properly.
 
     public AlgebraicEvalFunc(String... constructorArgs) { this. = constructorArgs; }

    
This must be implement as per a normal Algebraic interface. See Algebraic for more information.
 
     @Override
     public abstract String getFinal();

    
This must be implement as per a normal Algebraic interface. See Algebraic for more information.
 
     @Override
     public abstract String getInitial();

    
This must be implement as per a normal Algebraic interface. See Algebraic for more information.
    @Override
    public abstract String getIntermed();
    private boolean init = false;

    
This helper function instantiates an EvalFunc given its String class name.
    private EvalFunc<?> makeEvalFunc(String base) {
        StringBuffer sb = new StringBuffer();
        sb.append(base).append("(");
        boolean first = true;
        for (String s : ) {
            if (!firstsb.append(",");
            else first = false;
            sb.append("'").append(s).append("'");
        }
        sb.append(")");
        return (EvalFunc<?>)PigContext.instantiateFuncFromSpec(sb.toString());
    }

    
This is the free accumulate implementation based on the static classes provided by the Algebraic static classes. This implemention works by leveraging the initial, intermediate, and final classes provided by the algebraic interface. The exec function of the Initial EvalFunc will be called on every Tuple of the input and the output will be collected in an intermediate state. Periodically, this intermediate state will have the Intermediate EvalFunc called on it 1 or more times. The Final EvalFunc is not called until getValue() is called.
    @SuppressWarnings("unchecked")
    @Override
    public void accumulate(Tuple inputthrows IOException {
        if (!) {
             = .newDefaultBag();
             = .newDefaultBag();
             = .newDefaultBag();
             = .newTuple(1);
            .set(0, );
             = .newTuple(1);
            .set(0,);
             = (EvalFunc<Tuple>)makeEvalFunc(getInitial());
             = (EvalFunc<Tuple>)makeEvalFunc(getIntermed());
             = (EvalFunc<T>)makeEvalFunc(getFinal());
             = true;
             = true;
        }
        .clear();
        for (Tuple t : (DataBag)input.get(0)) {
            .clear();
            .add(t);
            .add(.exec());
        }
        if ( && .size() > ) {
           long initialSizeEstimate = .getMemorySize();
           DataBag newIntermediateDB = .newDefaultBag();
           Tuple t = .newTuple(1);
           t.set(0, );
           newIntermediateDB.add(.exec(t));
            = newIntermediateDB;
           long newSizeEstimate = .getMemorySize();
           .incrCounter("AlgebraicEvalFunc""InitialSizeEst"initialSizeEstimate);
           .incrCounter("AlgebraicEvalFunc""PostCombineSizeEst"newSizeEstimate);
           .incrCounter("AlgebraicEvalFunc""CombineApply", 1L);
           if ( * newSizeEstimate > initialSizeEstimate) {
              = false;
             .incrCounter("AlgebraicEvalFunc""CombineShutoff", 1L);
           }
        }
    }

    
Per the Accumulator interface, this clears all of the variables used in the implementation.
    @Override
    public void cleanup() {
         = null;
         = null;
         = null;
         = null;
         = null;
         = null;
         = null;
         = null;
         = false;
    }

    
This function returns the ultimate result. It is when getValue() is called that the Final EvalFunc's exec function is called on the accumulated data.
    @Override
    public T getValue() {
        try {
            return .exec(.newTuple());
        } catch (IOException e) {
            throw new RuntimeException("Error in AlgebraicEvalFunc evaluating final method");
        }
    }
New to GrepCode? Check out our FAQ X