Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.List;
 import java.util.Map;
 
The package operator that packages the globally rearranged tuples into output format as required by co-group. This is last stage of processing co-group. This operator has a slightly different format than other operators in that, it takes two things as input. The key being worked on and the iterator of bags that contain indexed tuples that just need to be packaged into their appropriate output bags based on the index.
 
 public class POPackage extends PhysicalOperator {
    
 
     private static final long serialVersionUID = 1L;
 
 
     public static enum PackageType { GROUP, JOIN };
 
     //The iterator of indexed Tuples
     //that is typically provided by
     //Hadoop
     transient Iterator<NullableTupletupIter;
 
     //The key being worked on
     Object key;
 
     // marker to indicate if key is a tuple
     protected boolean isKeyTuple = false;
     // marker to indicate if the tuple key is compound in nature
     protected boolean isKeyCompound = false;
     // key as a Tuple object (if the key is a tuple)
     protected Tuple keyAsTuple;
 
     //key's type
     byte keyType;
 
     //The number of inputs to this
     //co-group.  0 indicates a distinct, which means there will only be a
     //key, no value.
     int numInputs;
 
     // If the attaching map-reduce plan use secondary sort key
     boolean useSecondaryKey = false;
 
     //Denotes if inner is specified
    //on a particular input
    boolean[] inner;
    // flag to denote whether there is a distinct
    // leading to this package
    protected boolean distinct = false;
    // A mapping of input index to key information got from LORearrange
    // for that index. The Key information is a pair of boolean, Map.
    // The boolean indicates whether there is a lone project(*) in the
    // cogroup by. If not, the Map has a mapping of column numbers in the
    // "value" to column numbers in the "key" which contain the fields in
    // the "value"
    protected Map<IntegerPair<BooleanMap<IntegerInteger>>> keyInfo;
    protected static final BagFactory mBagFactory = BagFactory.getInstance();
    protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
    private boolean firstTime = true;
    private boolean useDefaultBag = false;
    private PackageType pkgType;
    public POPackage(OperatorKey k) {
        this(k, -1, null);
    }
    public POPackage(OperatorKey kint rp) {
        this(krpnull);
    }
    public POPackage(OperatorKey kList<PhysicalOperatorinp) {
        this(k, -1, inp);
    }
    public POPackage(OperatorKey kint rpList<PhysicalOperatorinp) {
        super(krpinp);
         = -1;
         = new HashMap<IntegerPair<BooleanMap<IntegerInteger>>>();
    }
    @Override
    public String name() {
        return getAliasString() + "Package" + "["
                + DataType.findTypeName() + "]" + "{"
                + DataType.findTypeName() + "}" + " - "
                + .toString();
    }
    @Override
    public boolean supportsMultipleInputs() {
        return false;
    }
    @Override
    public void visit(PhyPlanVisitor vthrows VisitorException {
        v.visitPackage(this);
    }
    @Override
    public boolean supportsMultipleOutputs() {
        return false;
    }

    
Attaches the required inputs

Parameters:
k - the key being worked on
inp - iterator of indexed tuples typically obtained from Hadoop
    public void attachInput(PigNullableWritable kIterator<NullableTupleinp) {
        try {
             = inp;
             = k.getValueAsPigType();
            if () {
                 = ((Tuple)).get(0);
            }
            if() {
                // key is a tuple, cache the key as a
                // tuple for use in the getNext()
                 = (Tuple);
            }
        } catch (Exception e) {
            throw new RuntimeException(
                    "Error attaching input for key " + k +
                    " in " + name() + " at location " + getOriginalLocations(), e);
        }
    }

    
attachInput's better half!
    public void detachInput() {
         = null;
         = null;
    }
    public int getNumInps() {
        return ;
    }
    public void setNumInps(int numInps) {
        this. = numInps;
    }
    public boolean[] getInner() {
        return ;
    }
    public void setInner(boolean[] inner) {
        this. = inner;
    }

    
From the inputs, constructs the output tuple for this co-group in the required format which is (key, {bag of tuples from input 1}, {bag of tuples from input 2}, ...)
    @Override
    public Result getNextTuple() throws ExecException {
        Tuple res;
        if(){
             = false;
            if (..get() != null) {
                String bagType = ..get().get("pig.cachedbag.type");
                if (bagType != null && bagType.equalsIgnoreCase("default")) {
                     = true;
                }
            }
        }
        if() {
            // only set the key which has the whole
            // tuple
            res = .newTuple(1);
            res.set(0, );
        } else {
            //Create numInputs bags
            DataBag[] dbs = null;
            dbs = new DataBag[];
            if (isAccumulative()) {
                // create bag wrapper to pull tuples in many batches
                // all bags have reference to the sample tuples buffer
                // which contains tuples from one batch
                POPackageTupleBuffer buffer = new POPackageTupleBuffer();
                for (int i = 0; i < i++) {
                    dbs[i] = new AccumulativeBag(bufferi);
                }
            } else {
                // create bag to pull all tuples out of iterator
                for (int i = 0; i < i++) {
                    dbs[i] =  ? BagFactory.getInstance().newDefaultBag()
                    // In a very rare case if there is a POStream after this
                    // POPackage in the pipeline and is also blocking the pipeline;
                    // constructor argument should be 2 * numInputs. But for one obscure
                    // case we don't want to pay the penalty all the time.
                            : new InternalCachedBag();
                }
                //For each indexed tup in the inp, sort them
                //into their corresponding bags based
                //on the index
                while (.hasNext()) {
                    NullableTuple ntup = .next();
                    int index = ntup.getIndex();
                    Tuple copy = getValueTuple(ntupindex);
                    if ( == 1) {
                        // this is for multi-query merge where
                        // the numInputs is always 1, but the index
                        // (the position of the inner plan in the
                        // enclosed operator) may not be 1.
                        dbs[0].add(copy);
                    } else {
                        dbs[index].add(copy);
                    }
                    if(getReporter()!=null) {
                        getReporter().progress();
                    }
                }
            }
            //Construct the output tuple by appending
            //the key and all the above constructed bags
            //and return it.
            res = .newTuple(+1);
            res.set(0,);
            int i=-1;
            for (DataBag bag : dbs) {
                i++;
                if([i] && !isAccumulative()){
                    if(bag.size()==0){
                        detachInput();
                        Result r = new Result();
                        r.returnStatus = .;
                        return r;
                    }
                }
                res.set(i+1,bag);
            }
        }
        Result r = new Result();
        r.returnStatus = .;
        if (!isAccumulative())
            r.result = illustratorMarkup(nullres, 0);
        else
            r.result = res;
        detachInput();
        return r;
    }
    protected Tuple getValueTuple(NullableTuple ntupint indexthrows ExecException {
     // Need to make a copy of the value, as hadoop uses the same ntup
        // to represent each value.
        Tuple val = (Tuple)ntup.getValueAsPigType();
        Tuple copy = null;
        // The "value (val)" that we just got may not
        // be the complete "value". It may have some portions
        // in the "key" (look in POLocalRearrange for more comments)
        // If this is the case we need to stitch
        // the "value" together.
        Pair<BooleanMap<IntegerInteger>> lrKeyInfo =
            .get(index);
        boolean isProjectStar = lrKeyInfo.first;
        Map<IntegerIntegerkeyLookup = lrKeyInfo.second;
        int keyLookupSize = keyLookup.size();
        ifkeyLookupSize > 0) {
            // we have some fields of the "value" in the
            // "key".
            int finalValueSize = keyLookupSize + val.size();
            copy = .newTuple(finalValueSize);
            int valIndex = 0; // an index for accessing elements from
                              // the value (val) that we have currently
            for(int i = 0; i < finalValueSizei++) {
                Integer keyIndex = keyLookup.get(i);
                if(keyIndex == null) {
                    // the field for this index is not in the
                    // key - so just take it from the "value"
                    // we were handed
                    copy.set(ival.get(valIndex));
                    valIndex++;
                } else {
                    // the field for this index is in the key
                    if( && ) {
                        // the key is a tuple, extract the
                        // field out of the tuple
                        copy.set(i.get(keyIndex));
                    } else {
                        copy.set(i);
                    }
                }
            }
            copy = illustratorMarkup2(valcopy);
        } else if (isProjectStar) {
            // the whole "value" is present in the "key"
            copy = .newTuple(.getAll());
            copy = illustratorMarkup2(copy);
        } else {
            // there is no field of the "value" in the
            // "key" - so just make a copy of what we got
            // as the "value"
            copy = .newTuple(val.getAll());
            copy = illustratorMarkup2(valcopy);
        }
        return copy;
    }
    public byte getKeyType() {
        return ;
    }
    public void setKeyType(byte keyType) {
        this. = keyType;
    }

    
Make a deep copy of this operator.

    @Override
    public POPackage clone() throws CloneNotSupportedException {
        POPackage clone = (POPackage)super.clone();
        clone.mKey = new OperatorKey(., NodeIdGenerator.getGenerator().getNextNodeId(.));
        clone.requestedParallelism = ;
        clone.resultType = ;
        clone.keyType = ;
        clone.numInputs = ;
        if (!=null)
        {
            clone.inner = new boolean[.];
            for (int i = 0; i < .i++) {
                clone.inner[i] = [i];
            }
        }
        else
            clone.inner = null;
        return clone;
    }

    

Parameters:
keyInfo the keyInfo to set
    public void setKeyInfo(Map<IntegerPair<BooleanMap<IntegerInteger>>> keyInfo) {
        this. = keyInfo;
    }

    

Parameters:
keyTuple the keyTuple to set
    public void setKeyTuple(boolean keyTuple) {
        this. = keyTuple;
    }

    

Parameters:
keyCompound the keyCompound to set
    public void setKeyCompound(boolean keyCompound) {
        this. = keyCompound;
    }

    

Returns:
the keyInfo
    public Map<IntegerPair<BooleanMap<IntegerInteger>>> getKeyInfo() {
        return ;
    }

    

Returns:
the distinct
    public boolean isDistinct() {
        return ;
    }

    

Parameters:
distinct the distinct to set
    public void setDistinct(boolean distinct) {
        this. = distinct;
    }
    public void setUseSecondaryKey(boolean useSecondaryKey) {
        this. = useSecondaryKey;
    }
    public void setPackageType(PackageType type) {
        this. = type;
    }
    public PackageType getPackageType() {
        return ;
    }
    class POPackageTupleBuffer implements AccumulativeTupleBuffer {
        private List<Tuple>[] bags;
        private Iterator<NullableTupleiter;
        private int batchSize;
        private Object currKey;
        @SuppressWarnings("unchecked")
        public POPackageTupleBuffer() {
             = 20000;
            if (..get() != null) {
                String size = ..get().get("pig.accumulative.batchsize");
                if (size != null) {
                     = Integer.parseInt(size);
                }
            }
            this. = new List[];
            for(int i=0; i<i++) {
                this.[i] = new ArrayList<Tuple>();
            }
            this. = ;
            this. = ;
        }
        @Override
        public boolean hasNextBatch() {
            return .hasNext();
        }
        @Override
        public void nextBatch() throws IOException {
            for(int i=0; i<.i++) {
                [i].clear();
            }
             = ;
            for(int i=0; i<i++) {
                if (.hasNext()) {
                     NullableTuple ntup = .next();
                     int index = ntup.getIndex();
                     Tuple copy = getValueTuple(ntupindex);
                     if ( == 1) {
                            // this is for multi-query merge where
                            // the numInputs is always 1, but the index
                            // (the position of the inner plan in the
                            // enclosed operator) may not be 1.
                            [0].add(copy);
                     } else {
                            [index].add(copy);
                     }
                }else{
                    break;
                }
            }
        }
        public void clear() {
            for(int i=0; i<.i++) {
                [i].clear();
            }
             = null;
        }
        public Iterator<TuplegetTuples(int index) {
            return [index].iterator();
        }
        public Tuple illustratorMarkup(Object inObject outint eqClassIndex) {
            return POPackage.this.illustratorMarkup(inouteqClassIndex);
        }
       };
       private Tuple illustratorMarkup2(Object inObject out) {
           if( != null) {
               ExampleTuple tOut = new ExampleTuple((Tupleout);
               .getLineage().insert(tOut);
               tOut.synthetic = ((ExampleTuplein).;
               .getLineage().union(tOut, (Tuplein);
               return tOut;
           } else
               return (Tupleout;
       }
       @Override
       public Tuple illustratorMarkup(Object inObject outint eqClassIndex) {
           if( != null) {
               ExampleTuple tOut = new ExampleTuple((Tupleout);
               LineageTracer lineageTracer = .getLineage();
               lineageTracer.insert(tOut);
               Tuple tmp;
               boolean synthetic = false;
               if (.getEquivalenceClasses() == null) {
                   LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
                   for (int i = 0; i < ; ++i) {
                       IdentityHashSet<TupleequivalenceClass = new IdentityHashSet<Tuple>();
                       equivalenceClasses.add(equivalenceClass);
                   }
                   .setEquivalenceClasses(equivalenceClassesthis);
               }
               if () {
                   int count;
                   for (count = 0; .hasNext(); ++count) {
                       NullableTuple ntp = .next();
                       tmp = (Tuplentp.getValueAsPigType();
                       if (!tmp.equals(tOut))
                           lineageTracer.union(tOuttmp);
                   }
                   if (count > 1) // only non-distinct tuples are inserted into the equivalence class
                       .getEquivalenceClasses().get(eqClassIndex).add(tOut);
                   .addData((TupletOut);
                   return (TupletOut;
               }
               boolean outInEqClass = true;
               try {
                   for (int i = 1; i < +1; i++)
                   {
                       DataBag dbs = (DataBag) ((Tupleout).get(i);
                       Iterator<Tupleiter = dbs.iterator();
                       if (dbs.size() <= 1 && outInEqClass// all inputs have >= 2 records
                           outInEqClass = false;
                       while (iter.hasNext()) {
                           tmp = iter.next();
                           // any of synthetic data in bags causes the output tuple to be synthetic
                           if (!synthetic && ((ExampleTuple)tmp).)
                               synthetic = true;
                           lineageTracer.union(tOuttmp);
                       }
                   }
               } catch (ExecException e) {
                 // TODO better exception handling
                 throw new RuntimeException("Illustrator exception :"+e.getMessage());
               }
               if (outInEqClass)
                   .getEquivalenceClasses().get(eqClassIndex).add(tOut);
               tOut.synthetic = synthetic;
               .addData((TupletOut);
               return tOut;
           } else
               return (Tupleout;
       }
New to GrepCode? Check out our FAQ X