Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.impl.builtin;
 
 import java.util.Map;
 
Partition reducers for skewed keys. This is used in skewed join during sampling process. It figures out how many reducers required to process a skewed key without causing spill and allocate this number of reducers to this key. This UDF outputs a map which contains 2 keys:
  • "totalreducers": the value is an integer wich indicates the number of total reducers for this join job
  • "partition.list": the value is a bag which contains a list of tuples with each tuple representing partitions for a skewed key. The tuple has format of <join key>,<min index of reducer>, <max index of reducer>
  • For example, a join job configures 10 reducers, and the sampling process finds out 2 skewed keys, "swpv" needs 4 reducers and "swps" needs 2 reducers. The output file would be like following: {totalreducers=10, partition.list={(swpv,0,3), (swps,4,5)}} The name of this file is set into next MR job which does the actual join. That job uses this information to partition skewed keys properly
     
     
     public class PartitionSkewedKeys extends EvalFunc<Map<StringObject>> {
     
     	public static final String PARTITION_LIST = "partition.list";
     
     	public static final String TOTAL_REDUCERS = "totalreducers";
     
     	public static final float DEFAULT_PERCENT_MEMUSAGE = 0.3f;
     
     	private Log log = LogFactory.getLog(getClass());
     
     	BagFactory mBagFactory = BagFactory.getInstance();
     
     	TupleFactory mTupleFactory = TupleFactory.getInstance();
     
     	private int currentIndex_;
     
     	private int totalReducers_;
     
     	private long totalMemory_;
     
     	private String inputFile_;
     
     	private long totalSampleCount_;
     
     	private double heapPercentage_;
     	
         // specify how many tuple a reducer can hold for a key
         // this is for testing purpose. If not specified, then
         // it is calculated based on memory size and size of tuple
     	private int tupleMCount_
     
     	public PartitionSkewedKeys() {
     		this(null);
     	}
     
     	public PartitionSkewedKeys(String[] args) {
     		 = -1;
     		 = 0;
    		if (args != null && args.length > 0) {
    			 = Double.parseDouble(args[0]);
    			 = Integer.parseInt(args[1]);
    			 = args[2];			
    else {
    		}
    			.debug("pig.skewedjoin.reduce.memusage=" + );
    			.debug("input file: " + );
    		}
    		.info("input file: " + );
    	}

    first field in the input tuple is the number of reducers second field is the *sorted* bag of samples this should be called only once
    	public Map<StringObjectexec(Tuple inthrows IOException {
    	    if (in == null || in.size() == 0) {                     
    	        return null;
    	    }
    	    Map<StringObjectoutput = new HashMap<StringObject>();
    	     = (long) (Runtime.getRuntime().maxMemory() * );
    	    .info("Maximum of available memory is " + );
    	    ArrayList<TuplereducerList = new ArrayList<Tuple>();
    	    Tuple currentTuple = null;
    	    long count = 0;
    	    
    	    // total size in memory for tuples in sample 
    	    long totalSampleMSize = 0;
    	    
    	    //total input rows for the join
    	    long totalInputRows = 0;
    	    
    	    try {
    	         = (Integerin.get(0);
    	        DataBag samples = (DataBagin.get(1);
    	         = samples.size();
    	        .info("totalSample: " + );
    	        .info("totalReducers: " + );			
    	        int maxReducers = 0;
    	        // first iterate the samples to find total number of rows
    	        Iterator<Tupleiter1 = samples.iterator();
    	        while (iter1.hasNext()) {
    	            Tuple t = iter1.next();
    	            totalInputRows += (Long)t.get(t.size() - 1);
    	        }
    	                        
    	        // now iterate samples to do the reducer calculation
    	        Iterator<Tupleiter2 = samples.iterator();
    	        while (iter2.hasNext()) {
    	            Tuple t = iter2.next();
    	            if (hasSameKey(currentTuplet) || currentTuple == null) {
    	                count++;
    	                totalSampleMSize += getMemorySize(t);
    	            } else {
    	                Pair<TupleIntegerp = calculateReducers(currentTuple,
    	                        counttotalSampleMSizetotalInputRows);
    	                Tuple rt = p.first;
    	                if (rt != null) {
    	                    reducerList.add(rt);
    	                }
    	                if (maxReducers < p.second) {
    	                    maxReducers = p.second;
    	                }
    	                count = 1;
    	                totalSampleMSize = getMemorySize(t);
    	            }
    	            currentTuple = t;
    	        }
    	        // add last key
    	        if (count > 0) {
    	            Pair<TupleIntegerp = calculateReducers(currentTuplecount,
    	                    totalSampleMSizetotalInputRows);
    	            Tuple rt = p.first;
    	            if (rt != null) {
    	                reducerList.add(rt);
    	            }
    	            if (maxReducers < p.second) {
    	                maxReducers = p.second;
    	            }
    	        }
    	        if (maxReducers > ) {
    	            if( != null) {
    	                .warn(this,"You need at least " + maxReducers
    	                        + " reducers to avoid spillage and run this job efficiently.".);
    	            } else {
    	                .warn("You need at least " + maxReducers
    	                        + " reducers to avoid spillage and run this job efficiently.");
    	            }
    	        }
    	        output.put(.newDefaultBag(reducerList));
    	        output.put(, Integer.valueOf());
    	        .info(output.toString());
    	        if (.isDebugEnabled()) {
    	            .debug(output.toString());
    	        }
    	        return output;
    	    } catch (Exception e) {
    	        e.printStackTrace();
    	        throw new RuntimeException(e);
    	    }
    	}
    	private Pair<TupleIntegercalculateReducers(Tuple currentTuple,
    	        long countlong totalMSizelong totalTuples) {
    	    // get average memory size per tuple
    	    double avgM = totalMSize / (doublecount;
    	    // get the number of tuples that can fit into memory
    	    long tupleMCount = ( <= 0)?(long) ( / avgM): ;
    	    // estimate the number of total tuples for this key
                long keyTupleCount = (long)  ( ((doublecount) * 
                            totalTuples);
                                
    	    
    	    int redCount = (int) Math.round(Math.ceil((doublekeyTupleCount
    	            / tupleMCount));
    	    if (.isDebugEnabled()) 
    	    {
    	        .debug("avgM: " + avgM);
    	        .debug("tuple count: " + keyTupleCount);
    	        .debug("count: " + count);
    	        .debug("A reducer can take " + tupleMCount + " tuples and "
    	                + keyTupleCount + " tuples are find for " + currentTuple);
    	        .debug("key " + currentTuple + " need " + redCount + " reducers");
    	    }
    	    // this is not a skewed key
    	    if (redCount <= 1) {
    	        return new Pair<TupleInteger>(null, 1);
    	    }
    	    Tuple t = this..newTuple(currentTuple.size());
    	    int i = 0;
    	    try {
    	        // set keys
    	        for (; i < currentTuple.size() - 2; i++) {
    	            t.set(icurrentTuple.get(i));
    	        }
    	        int effectiveRedCount = redCount > :redCount;
    	        // set the min index of reducer for this key
    	        t.set(i++, );
    	         = ( + effectiveRedCount) %  - 1;
    	        if ( < 0) {
    	             += ;
    	        }
    	        // set the max index of reducer for this key
    	        t.set(i++, );
    	    } catch (ExecException e) {
    	        throw new RuntimeException("Failed to set value to tuple." + e);
    	    }
    	    Pair<TupleIntegerp = new Pair<TupleInteger>(tredCount);
    	    return p;
    	}
    	// the last field of the tuple is a tuple for memory size and disk size
    	private long getMemorySize(Tuple t) {
    	    int s = t.size();
    	    try {
    	        return (Longt.get(s - 2);
    	    } catch (ExecException e) {
    	        throw new RuntimeException(
    	                "Unable to retrive the size field from tuple."e);
    	    }
    	}
    	private boolean hasSameKey(Tuple t1Tuple t2) {
    		// Have to break the tuple down and compare it field to field.
    		int sz1 = t1 == null ? 0 : t1.size();
    		int sz2 = t2 == null ? 0 : t2.size();
    		if (sz2 != sz1) {
    			return false;
    		}
    		for (int i = 0; i < sz1 - 2; i++) {
    			try {
    				int c = DataType.compare(t1.get(i), t2.get(i));
    				if (c != 0) {
    					return false;
    				}
    catch (ExecException e) {
    				throw new RuntimeException("Unable to compare tuples"e);
    			}
    		}
    		return true;
    	}
    New to GrepCode? Check out our FAQ X