Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package org.apache.pig.backend.hadoop.executionengine.util;
 
 import java.util.List;
 import java.util.Map;
 
A class of utility static methods to be used in the hadoop map reduce backend
 
 public class MapRedUtil {
 
     private static Log log = LogFactory.getLog(MapRedUtil.class);
          
     public static final String FILE_SYSTEM_NAME = "fs.default.name";

    
Loads the key distribution sampler file

Parameters:
keyDistFile the name for the distribution file
totalReducers gets set to the total number of reducers as found in the dist file
keyType Type of the key to be stored in the return map. It currently treats Tuple as a special case.
 
     @SuppressWarnings("unchecked")
     public static <E> Map<E, Pair<IntegerInteger>> loadPartitionFileFromLocalCache(
             String keyDistFileInteger[] totalReducersbyte keyTypeConfiguration mapConf)
             throws IOException {
 
         Map<E, Pair<IntegerInteger>> reducerMap = new HashMap<E, Pair<IntegerInteger>>();
 
         // use local file system to get the keyDistFile
         Configuration conf = new Configuration(false);            
         
         if (mapConf.get("yarn.resourcemanager.principal")!=null) {
             conf.set("yarn.resourcemanager.principal"mapConf.get("yarn.resourcemanager.principal"));
         }
 
         if (..get().get("fs.file.impl")!=null)
             conf.set("fs.file.impl"..get().get("fs.file.impl"));
         if (..get().get("fs.hdfs.impl")!=null)
             conf.set("fs.hdfs.impl"..get().get("fs.hdfs.impl"));
 
 
        conf.set(."file:///");
                keyDistFile, 0);
        DataBag partitionList;
        Tuple t = loader.getNext();
        if (t == null) {
            // this could happen if the input directory for sampling is empty
            .warn("Empty dist file: " + keyDistFile);
            return reducerMap;
        }
        // The keydist file is structured as (key, min, max)
        // min, max being the index of the reducers
        Map<StringObject > distMap = (Map<StringObject>) t.get (0);
        partitionList = (DataBagdistMap.get(.);
        totalReducers[0] = Integer.valueOf(""+distMap.get(.));
        Iterator<Tupleit = partitionList.iterator();
        while (it.hasNext()) {
            Tuple idxTuple = it.next();
            Integer maxIndex = (IntegeridxTuple.get(idxTuple.size() - 1);
            Integer minIndex = (IntegeridxTuple.get(idxTuple.size() - 2);
            // Used to replace the maxIndex with the number of reducers
            if (maxIndex < minIndex) {
                maxIndex = totalReducers[0] + maxIndex
            }
            E keyT;
            // if the join is on more than 1 key
            if (idxTuple.size() > 3) {
                // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store
                // it in the reducer map
                Tuple keyTuple = TupleFactory.getInstance().newTuple();
                for (int i=0; i < idxTuple.size() - 2; i++) {
                    keyTuple.append(idxTuple.get(i));	
                }
                keyT = (E) keyTuple;
            } else {
                if (keyType == .) {
                    keyT = (E)TupleFactory.getInstance().newTuple(1);
                    ((Tuple)keyT).set(0,idxTuple.get(0));
                } else {
                    keyT = (E) idxTuple.get(0);
                }
            }
            // number of reducers
            Integer cnt = maxIndex - minIndex;
            reducerMap.put(keyTnew Pair(minIndexcnt));// 1 is added to account for the 0 index
        }
        return reducerMap;
    }
    public static void copyTmpFileConfigurationValues(Configuration fromConfConfiguration toConf) {
        // Currently these are used only by loaders (and not storers), so we do not need to copy
        // mapred properties that are required by @{Link SequenceFileInterStorage}
        if (fromConf.getBoolean(.false)) {
            toConf.setBoolean(.true);
            if (fromConf.get(.) != null) {
                toConf.set(.,
                        fromConf.get(.));
            }
            if (fromConf.get(.) != null) {
                toConf.set(.,
                        fromConf.get(.));
            }
        }
    }
    public static void setupUDFContext(Configuration jobthrows IOException {
        UDFContext udfc = UDFContext.getUDFContext();
        udfc.addJobConf(job);
        // don't deserialize in front-end 
        if (udfc.isUDFConfEmpty()) {
            udfc.deserialize();
        }
    }
    
    public static FileSpec checkLeafIsStore(
            PhysicalPlan plan,
            PigContext pigContextthrows ExecException {
        try {
            PhysicalOperator leaf = plan.getLeaves().get(0);
            FileSpec spec = null;
            if(!(leaf instanceof POStore)){
                String scope = leaf.getOperatorKey().getScope();
                POStore str = new POStore(new OperatorKey(scope,
                    NodeIdGenerator.getGenerator().getNextNodeId(scope)));
                spec = new FileSpec(FileLocalizer.getTemporaryPath(
                    pigContext).toString(),
                    new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
                str.setSFile(spec);
                plan.addAsLeaf(str);
            } else{
                spec = ((POStore)leaf).getSFile();
            }
            return spec;
        } catch (Exception e) {
            int errCode = 2045;
            String msg = "Internal error. Not able to check if the leaf node is a store operator.";
            throw new ExecException(msgerrCode.e);
        }
    }

    
Get all files recursively from the given list of files

Parameters:
files a list of FileStatus
conf the configuration object
Returns:
the list of fileStatus that contains all the files in the given list and, recursively, all the files inside the directories in the given list
Throws:
java.io.IOException
    public static List<FileStatusgetAllFileRecursively(
            List<FileStatusfilesConfiguration confthrows IOException {
        List<FileStatusresult = new ArrayList<FileStatus>();
        int len = files.size();
        for (int i = 0; i < len; ++i) {
            FileStatus file = files.get(i);
            if (file.isDir()) {
                Path p = file.getPath();
                FileSystem fs = p.getFileSystem(conf);
                addInputPathRecursively(resultfsp);
            } else {
                result.add(file);
            }
        }
        .info("Total input paths to process : " + result.size()); 
        return result;
    }
    
    private static void addInputPathRecursively(List<FileStatusresult,
            FileSystem fsPath pathPathFilter inputFilter
            throws IOException {
        for (FileStatus statfs.listStatus(pathinputFilter)) {
            if (stat.isDir()) {
                addInputPathRecursively(resultfsstat.getPath(), inputFilter);
            } else {
                result.add(stat);
            }
        }
    }          
    private static final PathFilter hiddenFileFilter = new PathFilter(){
        public boolean accept(Path p){
            String name = p.getName(); 
            return !name.startsWith("_") && !name.startsWith("."); 
        }
    };    
    /* The following codes are for split combination: see PIG-1518
     * 
     */
    private static Comparator<NodenodeComparator = new Comparator<Node>() {
        @Override
        public int compare(Node o1Node o2) {
            long cmp = o1.length - o2.length;
            return cmp == 0 ? 0 : cmp < 0 ? -1 : 1;
        }
    };
    
    private static final class ComparableSplit implements Comparable<ComparableSplit> {
        private InputSplit rawInputSplit;
        private HashSet<Nodenodes;
        // id used as a tie-breaker when two splits are of equal size.
        private long id;
        ComparableSplit(InputSplit splitlong id) {
             = split;
             = new HashSet<Node>();
            this. = id;
        }
        
        void add(Node node) {
            .add(node);
        }
        
        void removeFromNodes() {
            for (Node node : )
                node.remove(this);
        }
        
        public InputSplit getSplit() {
            return ;
        }
  
        @Override
        public boolean equals(Object other) {
            if (other == null || !(other instanceof ComparableSplit))
                return false;
            return (compareTo((ComparableSplitother) == 0);
        }
        
        @Override
        public int hashCode() {
            return 41;
        }
        
        @Override
        public int compareTo(ComparableSplit other) {
            try {
                long cmp = .getLength() - other.rawInputSplit.getLength();
                // in descending order
                return cmp == 0 ? ( == other.id ? 0 :  < other.id ? -1 : 1) : cmp < 0 ?  1 : -1;
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
      
    private static class DummySplit extends InputSplit {
        private long length;
        
        @Override
        public String[] getLocations() {
            return null;
        }
        
        @Override
        public long getLength() {
            return ;
        }
        
        public void setLength(long length) {
            this. = length;
        }
    }
    
    private static class Node {
        private long length = 0;
        private ArrayList<ComparableSplitsplits;
        private boolean sorted;
        
        Node() throws IOExceptionInterruptedException {
             = 0;
             = new ArrayList<ComparableSplit>();
             = false;
        }
        
        void add(ComparableSplit splitthrows IOExceptionInterruptedException {
            .add(split);
            ++;
        }
        
        void remove(ComparableSplit split) {
            if (!)
                sort();
            int index = Collections.binarySearch(split);
            if (index >= 0) {
                .remove(index);
                --;
            }
        }
        
        void sort() {
            if (!) {
                Collections.sort();
                 = true;
            }
        }
        
        ArrayList<ComparableSplitgetSplits() {
            return ;
        }
  
        public long getLength() {
            return ;
        }
    }
  
        oneInputSplitslong maxCombinedSplitSizeConfiguration conf)
          throws IOExceptionInterruptedException {
        ArrayList<Nodenodes = new ArrayList<Node>();
        HashMap<StringNodenodeMap = new HashMap<StringNode>();
        List<List<InputSplit>> result = new ArrayList<List<InputSplit>>();
        List<LongresultLengths = new ArrayList<Long>();
        long comparableSplitId = 0;
        
        int size = 0, nSplits = oneInputSplits.size();
        InputSplit lastSplit = null;
        int emptyCnt = 0;
        for (InputSplit split : oneInputSplits) {
            if (split.getLength() == 0) {
                emptyCnt++; 
                continue;
            }
            if (split.getLength() >= maxCombinedSplitSize) {
                comparableSplitId++;
                ArrayList<InputSplitcombinedSplits = new ArrayList<InputSplit>();
                combinedSplits.add(split);
                result.add(combinedSplits);
                resultLengths.add(split.getLength());
            } else {
                ComparableSplit csplit = new ComparableSplit(splitcomparableSplitId++);
                String[] locations = split.getLocations();
                // sort the locations to stabilize the number of maps: PIG-1757
                Arrays.sort(locations);
                HashSet<StringlocationSeen = new HashSet<String>();
                for (String location : locations)
                {
                    if (!locationSeen.contains(location)) 
                    {
                        Node node = nodeMap.get(location);
                        if (node == null) {
                            node = new Node();
                            nodes.add(node);
                            nodeMap.put(locationnode);
                        }
                        node.add(csplit);
                        csplit.add(node);
                        locationSeen.add(location);
                    }
                }
                lastSplit = split;
                size++;
            }
        }
        /* verification code: debug purpose
        {
          ArrayList<ComparableSplit> leftoverSplits = new ArrayList<ComparableSplit>();
          HashSet<InputSplit> seen = new HashSet<InputSplit>();
          for (Node node : nodes) {
            if (node.getLength() > 0)
            {
              ArrayList<ComparableSplit> splits = node.getSplits();
              for (ComparableSplit split : splits) {
                if (!seen.contains(split.getSplit())) {
                  // remove duplicates. The set has to be on the raw input split not the 
                  // comparable input split as the latter overrides the compareTo method
                  // so its equality semantics is changed and not we want here
                  seen.add(split.getSplit());
                  leftoverSplits.add(split);
                }
              }
            }
          }
          
          int combinedSplitLen = 0;
          for (PigSplit split : result)
            combinedSplitLen += split.getNumPaths();
          if (combinedSplitLen + leftoverSplits.size()!= nSplits-emptyCnt) {
            throw new AssertionError("number of combined splits {"+combinedSplitLen+"+"+leftoverSplits.size()+"-"+size+"} does not match the number of original splits ["+nSplits+"].");
          }
        }
        */
        if (nSplits > 0 && emptyCnt == nSplits)
        {
            // if all splits are empty, add a single empty split as currently an empty directory is
            // not properly handled somewhere
            ArrayList<InputSplitcombinedSplits = new ArrayList<InputSplit>();
            combinedSplits.add(oneInputSplits.get(0));
            result.add(combinedSplits);
        }
        else if (size == 1) {
            ArrayList<InputSplitcombinedSplits = new ArrayList<InputSplit>();
            combinedSplits.add(lastSplit);
            result.add(combinedSplits);
        } else if (size > 1) {
            // combine small splits
            Collections.sort(nodes);
            DummySplit dummy = new DummySplit();
            // dummy is used to search for next split of suitable size to be combined
            ComparableSplit dummyComparableSplit = new ComparableSplit(dummy, -1);
            for (Node node : nodes) {
                // sort the splits on this node in descending order
                node.sort();
                long totalSize = 0;
                ArrayList<ComparableSplitsplits = node.getSplits();
                int idx;
                int lenSplits;
                ArrayList<InputSplitcombinedSplits = new ArrayList<InputSplit>();
                ArrayList<ComparableSplitcombinedComparableSplits = new ArrayList<ComparableSplit>();
                while (!splits.isEmpty()) {
                    combinedSplits.add(splits.get(0).getSplit());
                    combinedComparableSplits.add(splits.get(0));
                    int startIdx = 1;
                    lenSplits = splits.size();
                    totalSize += splits.get(0).getSplit().getLength();
                    long spaceLeft = maxCombinedSplitSize - totalSize;
                    dummy.setLength(spaceLeft);
                    idx = Collections.binarySearch(node.getSplits().subList(startIdxlenSplits), dummyComparableSplit);
                    idx = -idx-1+startIdx;
                    while (idx < lenSplits)
                    {
                        long thisLen = splits.get(idx).getSplit().getLength();
                        combinedSplits.add(splits.get(idx).getSplit());
                        combinedComparableSplits.add(splits.get(idx));
                        totalSize += thisLen;
                        spaceLeft -= thisLen;
                        if (spaceLeft <= 0)
                            break;
                        // find next combinable chunk
                        startIdx = idx + 1;
                        if (startIdx >= lenSplits)
                            break;
                        dummy.setLength(spaceLeft);
                        idx = Collections.binarySearch(node.getSplits().subList(startIdxlenSplits), dummyComparableSplit);
                        idx = -idx-1+startIdx;
                    }
                    if (totalSize > maxCombinedSplitSize/2) {
                        result.add(combinedSplits);
                        resultLengths.add(totalSize);
                        removeSplits(combinedComparableSplits);
                        totalSize = 0;
                        combinedSplits = new ArrayList<InputSplit>();
                        combinedComparableSplits.clear();
                        splits = node.getSplits();
                    } else {
                        if (combinedSplits.size() != lenSplits)
                            throw new AssertionError("Combined split logic error!");
                        break;
                    }
                }
            }
            // handle leftovers
            ArrayList<ComparableSplitleftoverSplits = new ArrayList<ComparableSplit>();
            HashSet<InputSplitseen = new HashSet<InputSplit>();
            for (Node node : nodes) {
                for (ComparableSplit split : node.getSplits()) {
                    if (!seen.contains(split.getSplit())) {
                        // remove duplicates. The set has to be on the raw input split not the 
                        // comparable input split as the latter overrides the compareTo method
                        // so its equality semantics is changed and not we want here
                        seen.add(split.getSplit());
                        leftoverSplits.add(split);
                    }
                }
            }
            
            /* verification code
            int combinedSplitLen = 0;
            for (PigSplit split : result)
              combinedSplitLen += split.getNumPaths();
            if (combinedSplitLen + leftoverSplits.size()!= nSplits-emptyCnt)
              throw new AssertionError("number of combined splits ["+combinedSplitLen+"+"+leftoverSplits.size()+"] does not match the number of original splits ["+nSplits+"].");
            */
            if (!leftoverSplits.isEmpty())
            {
                long totalSize = 0;
                ArrayList<InputSplitcombinedSplits = new ArrayList<InputSplit>();
                ArrayList<ComparableSplitcombinedComparableSplits = new ArrayList<ComparableSplit>();
                
                int splitLen = leftoverSplits.size();
                for (int i = 0; i < splitLeni++)
                {
                    ComparableSplit split = leftoverSplits.get(i);
                    long thisLen = split.getSplit().getLength();
                    if (totalSize + thisLen >= maxCombinedSplitSize) {
                        removeSplits(combinedComparableSplits);
                        result.add(combinedSplits);
                        resultLengths.add(totalSize);
                        combinedSplits = new ArrayList<InputSplit>();
                        combinedComparableSplits.clear();
                        totalSize = 0;
                    }
                    combinedSplits.add(split.getSplit());
                    combinedComparableSplits.add(split);
                    totalSize += split.getSplit().getLength();
                    if (i == splitLen - 1) {
                        // last piece: it could be very small, try to see it can be squeezed into any existing splits
                        for (int j =0; j < result.size(); j++)
                        {
                            if (resultLengths.get(j) + totalSize <= maxCombinedSplitSize)
                            {
                                List<InputSplitisList = result.get(j);
                                for (InputSplit csplit : combinedSplits) {
                                    isList.add(csplit);
                                }
                                removeSplits(combinedComparableSplits);
                                combinedSplits.clear();
                                break;
                            }
                        }
                        if (!combinedSplits.isEmpty()) {
                            // last piece can not be squeezed in, create a new combined split for them.
                            removeSplits(combinedComparableSplits);
                            result.add(combinedSplits);
                        }
                    }
                }
            }
        }
        /* verification codes
        int combinedSplitLen = 0;
        for (PigSplit split : result)
          combinedSplitLen += split.getNumPaths();
        if (combinedSplitLen != nSplits-emptyCnt)
          throw new AssertionError("number of combined splits ["+combinedSplitLen+"] does not match the number of original splits ["+nSplits+"].");
        
        long totalLen = 0;
        for (PigSplit split : result)
          totalLen += split.getLength();
        
        long origTotalLen = 0;
        for (InputSplit split : oneInputSplits)
          origTotalLen += split.getLength();
        if (totalLen != origTotalLen)
          throw new AssertionError("The total length ["+totalLen+"] does not match the original ["+origTotalLen+"]");
        */ 
        .info("Total input paths (combined) to process : " + result.size());
        return result;
    }
    
    private static void removeSplits(List<ComparableSplitsplits) {
        for (ComparableSplit splitsplits)
            split.removeFromNodes();
    }
    
    public String inputSplitToString(InputSplit[] splitsthrows IOExceptionInterruptedException {
        // debugging purpose only
        StringBuilder st = new StringBuilder();
        st.append("Number of splits :" + splits.length+"\n");
        long len = 0;
        for (InputSplit splitsplits)
            len += split.getLength();
        st.append("Total Length = "len +"\n");
        for (int i = 0; i < splits.lengthi++) {
            st.append("Input split["+i+"]:\n   Length = "splits[i].getLength()+"\n  Locations:\n");
            for (String location :  splits[i].getLocations())
                st.append("    "+location+"\n");
            st.append("\n-----------------------\n"); 
        }
        return st.toString();
    }
    
    /* verification code: debug purpose only
    public String inputSplitToString(ArrayList<ComparableSplit> splits) throws IOException, InterruptedException {
      StringBuilder st = new StringBuilder();
      st.append("Number of splits :" + splits.size()+"\n");
      long len = 0;
      for (ComparableSplit split: splits)
        len += split.getSplit().getLength();
      st.append("Total Length = "+ len +"\n");
      for (int i = 0; i < splits.size(); i++) {
        st.append("Input split["+i+"]:\n   Length = "+ splits.get(i).getSplit().getLength()+"\n  Locations:\n");
        for (String location :  splits.get(i).getSplit().getLocations())
          st.append("    "+location+"\n");
        st.append("\n-----------------------\n"); 
      }
      return st.toString();
    }
    */
New to GrepCode? Check out our FAQ X