Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 
 import java.util.List;

Class that estimates the number of reducers based on input size. Number of reducers is based on two properties:
  • pig.exec.reducers.bytes.per.reducer - how many bytes of input per reducer (default is 1000*1000*1000)
  • pig.exec.reducers.max - constrain the maximum number of reducer task (default is 999)
If using a loader that implements LoadMetadata the reported input size is used, otherwise attempt to determine size from the filesystem.

e.g. the following is your pig script

 a = load '/data/a';
 b = load '/data/b';
 c = join a by $0, b by $0;
 store c into '/tmp';
 
and the size of /data/a is 1000*1000*1000, and the size of /data/b is 2*1000*1000*1000 then the estimated number of reducer to use will be (1000*1000*1000+2*1000*1000*1000)/(1000*1000*1000)=3
 
 public class InputSizeReducerEstimator implements PigReducerEstimator {
     private static final Log log = LogFactory.getLog(InputSizeReducerEstimator.class);

    
Determines the number of reducers to be used.

Parameters:
job job instance
mapReduceOper
Throws:
java.io.IOException
 
     @Override
     public int estimateNumberOfReducers(Job jobMapReduceOper mapReduceOperthrows IOException {
         Configuration conf = job.getConfiguration();
 
         long bytesPerReducer = conf.getLong();
         int maxReducers = conf.getInt();
 
         List<POLoadpoLoads = PlanHelper.getPhysicalOperators(mapReduceOper.mapPlanPOLoad.class);
         long totalInputFileSize = getTotalInputFileSize(confpoLoadsjob);
 
         .info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
             + maxReducers + " totalInputFileSize=" + totalInputFileSize);
 
         // if totalInputFileSize == -1, we couldn't get the input size so we can't estimate.
         if (totalInputFileSize == -1) { return -1; }
 
         int reducers = (int)Math.ceil((double)totalInputFileSize / bytesPerReducer);
         reducers = Math.max(1, reducers);
         reducers = Math.min(maxReducersreducers);
 
         return reducers;
     }

    
Get the input size for as many inputs as possible. Inputs that do not report their size nor can pig look that up itself are excluded from this size.
 
     static long getTotalInputFileSize(Configuration conf,
                                      List<POLoadldsJob jobthrows IOException {
        long totalInputFileSize = 0;
        boolean foundSize = false;
        for (POLoad ld : lds) {
            long size = getInputSizeFromLoader(ldjob);
            if (size > -1) { foundSize = true; }
            if (size > 0) {
                totalInputFileSize += size;
                continue;
            }
            // the input file location might be a list of comma separated files,
            // separate them out
            for (String location : LoadFunc.getPathStrings(ld.getLFile().getFileName())) {
                if (UriUtil.isHDFSFileOrLocalOrS3N(location)) {
                    Path path = new Path(location);
                    FileSystem fs = path.getFileSystem(conf);
                    FileStatus[] status = fs.globStatus(path);
                    if (status != null) {
                        for (FileStatus s : status) {
                            totalInputFileSize += Utils.getPathLength(fss);
                            foundSize = true;
                        }
                    }
                }
            }
        }
        return foundSize ? totalInputFileSize : -1;
    }

    
Get the total input size in bytes by looking at statistics provided by loaders that implement

Parameters:
ld
job
Returns:
total input size in bytes, or -1 if unknown or incomplete
Throws:
java.io.IOException on error
:
link LoadMetadata}.
    static long getInputSizeFromLoader(POLoad ldJob jobthrows IOException {
        if (ld.getLoadFunc() == null
                || !(ld.getLoadFunc() instanceof LoadMetadata)
                || ld.getLFile() == null
                || ld.getLFile().getFileName() == null) {
            return -1;
        }
        ResourceStatistics statistics;
        try {
            statistics = ((LoadMetadatald.getLoadFunc())
                        .getStatistics(ld.getLFile().getFileName(), job);
        } catch (Exception e) {
            .warn("Couldn't get statistics from LoadFunc: " + ld.getLoadFunc(), e);
            return -1;
        }
        if (statistics == null || statistics.getSizeInBytes() == null) {
            return -1;
        }
        return statistics.getSizeInBytes();
    }
New to GrepCode? Check out our FAQ X