Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.List;
 import java.util.Map;
 
The partition rearrange operator is a part of the skewed join implementation. It has an embedded physical plan that generates tuples of the form (inpKey,reducerIndex,(indxed inp Tuple)).
 
 public class POPartitionRearrange extends POLocalRearrange {

    
 
     private static final long serialVersionUID = 1L;
 
     private Integer totalReducers = -1;
     // ReducerMap will store the tuple, max reducer index & min reducer index
     private static Map<ObjectPair<IntegerInteger> > reducerMap = new HashMap<ObjectPair<IntegerInteger> >();
     private boolean loaded;
 
     protected static final BagFactory mBagFactory = BagFactory.getInstance();
     private PigContext pigContext;
 
     public POPartitionRearrange(OperatorKey k) {
         this(k, -1, null);
     }
 
     public POPartitionRearrange(OperatorKey kint rp) {
         this(krpnull);
     }
 
     public POPartitionRearrange(OperatorKey kList<PhysicalOperatorinp) {
         this(k, -1, inp);
     }
 
     public POPartitionRearrange(OperatorKey kint rpList<PhysicalOperatorinp) {
         super(krpinp);
          = -1;
          = new ArrayList<ExpressionOperator>();
     }
 
     /* Loads the key distribution file obtained from the sampler */
     private void loadPartitionFile() throws RuntimeException {
         String keyDistFile = ..get().get("pig.keyDistFile""");
         if (keyDistFile.isEmpty()) {
             throw new RuntimeException(
             "Internal error: missing key distribution file property.");
         }
 
         try {
         } catch (IOException ie) {
             throw new RuntimeException(ie);
         }
         try {
 
             Integer [] redCnt = new Integer[1];
 
             = MapRedUtil.loadPartitionFileFromLocalCache(
                    keyDistFileredCnt...get());
            // check if the partition file is empty
             = (redCnt[0] == null) ? -1 : redCnt[0];
             = true;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    @Override
    public String name() {
        return getAliasString() + "Partition rearrange " + "["
                + DataType.findTypeName() + "]" + "{"
                + DataType.findTypeName() + "}" + "(" + 
                + ") - " + .toString();
    }

    
Calls getNext on the generate operator inside the nested physical plan. Converts the generated tuple into the proper format, i.e, (key,indexedTuple(value))
    @Override
    public Result getNextTuple() throws ExecException {
        Result inp = null;
        Result res = null;
        // Load the skewed join key partitioning file
        if (!) {
        	try {
        		loadPartitionFile();
        	} catch (Exception e) {
        		throw new RuntimeException(e);
        	}
        }
        while (true) {
            inp = processInput();
            if (inp.returnStatus == . || inp.returnStatus == .) {
                break;
            }
            if (inp.returnStatus == .) {
                continue;
            }
            for (PhysicalPlan ep : ) {
                ep.attachInput((Tuple)inp.result);
            }
            List<ResultresLst = new ArrayList<Result>();
            for (ExpressionOperator op : ){
                res = op.getNext(op.getResultType());
                if(res.returnStatus!=.) {
                    return new Result();
                }
                resLst.add(res);
            }
            res.result = constructPROutput(resLst,(Tuple)inp.result);
            return res;
        }
        return inp;
    }
	// Returns bag of tuples
    protected DataBag constructPROutput(List<ResultresLstTuple valuethrows ExecException{
		Tuple t = super.constructLROutput(resLstnullvalue);
        //Construct key
        Object key = t.get(1);
		// Construct an output bag and feed in the tuples
		//Put the index, key, and value
		//in a tuple and return
		Pair <IntegerIntegerindexes = .get(key);	// first -> min, second ->max
		// For non skewed keys, we set the partition index to be -1
		if (indexes == null) {
			indexes = new Pair <IntegerInteger>(-1,0);
		}
		for (Integer reducerIdx=indexes.firstcnt=0; cnt <= indexes.secondreducerIdx++, cnt++) {
			if (reducerIdx >= ) {
				reducerIdx = 0;
			}
			Tuple opTuple = .newTuple(4);
			opTuple.set(0, t.get(0));
			// set the partition index
			opTuple.set(1, reducerIdx.intValue());
			opTuple.set(2, key);
			opTuple.set(3, t.get(2));
			opBag.add(opTuple);
		}
		return opBag;
    }

    

Parameters:
pigContext the pigContext to set
    public void setPigContext(PigContext pigContext) {
        this. = pigContext;
    }

    

Returns:
the pigContext
    public PigContext getPigContext() {
        return ;
    }

    
Make a deep copy of this operator.

    @Override
	    //clone.reducerMap = (HashMap)reducerMap.clone();
        return clone;
    }
New to GrepCode? Check out our FAQ X