Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package org.apache.hadoop.mrunit;
 
 
 import java.util.List;
 
Harness that allows you to test a Mapper and a Reducer instance together You provide the input key and value that should be sent to the Mapper, and outputs you expect to be sent by the Reducer to the collector for those inputs. By calling runTest(), the harness will deliver the input to the Mapper, feed the intermediate results to the Reducer (without checking them), and will check the Reducer's outputs against the expected results. This is designed to handle a single (k, v)* -> (k, v)* case from the Mapper/Reducer pair, representing a single unit test.
 
 public abstract class MapReduceDriverBase<K1, V1, K2 extends Comparable, V2, K3, V3>
     extends TestDriver<K1, V1, K3, V3> {
 
   public static final Log LOG = LogFactory.getLog(MapReduceDriverBase.class);
 
   protected List<Pair<K1, V1>> inputList;
  
  
Key group comparator
 
   protected Comparator<K2> keyGroupComparator;
  
  
Key value order comparator
 
   protected Comparator<K2> keyValueOrderComparator;
 
   public MapReduceDriverBase() {
      = new ArrayList<Pair<K1, V1>>();
     
     // create a simple comparator for key grouping and ordering
     Comparator<K2> simpleComparator = new Comparator<K2>() {
       @Override
       public int compare(K2 o1, K2 o2) {
         return o1.compareTo(o2);
       }
     };
     
     // assign simple key grouping and ordering comparator
      = simpleComparator;
      = null;
   }

  
Adds an input to send to the mapper

Parameters:
key
val
 
   public void addInput(K1 key, V1 val) {
     .add(new Pair<K1, V1>(keyval));
   }

  
Adds an input to send to the Mapper

Parameters:
input The (k, v) pair to add to the input list.
 
   public void addInput(Pair<K1, V1> input) {
     if (null == input) {
       throw new IllegalArgumentException("Null input in addInput()");
     }
 
     .add(input);
   }

  
Adds an output (k, v) pair we expect from the Reducer

Parameters:
outputRecord The (k, v) pair to add
 
  public void addOutput(Pair<K3, V3> outputRecord) {
    if (null != outputRecord) {
      .add(outputRecord);
    } else {
      throw new IllegalArgumentException("Tried to add null outputRecord");
    }
  }

  
Adds a (k, v) pair we expect as output from the Reducer

Parameters:
key
val
  public void addOutput(K3 key, V3 val) {
    addOutput(new Pair<K3, V3>(keyval));
  }

  
Expects an input of the form "key \t val" Forces the Mapper input types to Text.

Parameters:
input A string of the form "key \t val". Trims any whitespace.
  public void addInputFromString(String input) {
    if (null == input) {
      throw new IllegalArgumentException("null input given to setInput");
    } else {
      Pair<TextTextinputPair = parseTabbedPair(input);
      if (null != inputPair) {
        // I know this is not type-safe, but I don't
        // know a better way to do this.
        addInput((Pair<K1, V1>) inputPair);
      } else {
        throw new IllegalArgumentException("Could not parse input pair in addInput");
      }
    }
  }

  
Expects an input of the form "key \t val" Forces the Reducer output types to Text.

Parameters:
output A string of the form "key \t val". Trims any whitespace.
  public void addOutputFromString(String output) {
    if (null == output) {
      throw new IllegalArgumentException("null input given to setOutput");
    } else {
      Pair<TextTextoutputPair = parseTabbedPair(output);
      if (null != outputPair) {
        // I know this is not type-safe,
        // but I don't know a better way to do this.
        addOutput((Pair<K3, V3>) outputPair);
      } else {
        throw new IllegalArgumentException(
            "Could not parse output pair in setOutput");
      }
    }
  }
  public abstract List<Pair<K3, V3>> run() throws IOException;
  public void runTest() throws RuntimeException {
    List<Pair<K3, V3>> reduceOutputs = null;
    boolean succeeded;
    try {
      reduceOutputs = run();
      validate(reduceOutputs);
    } catch (IOException ioe) {
      .error("IOException: " + ioe.toString());
      .debug("Setting success to false based on IOException");
      throw new RuntimeException();
    }
  }

  
Take the outputs from the Mapper, combine all values for the same key, and sort them by key.

Parameters:
mapOutputs An unordered list of (key, val) pairs from the mapper
Returns:
the sorted list of (key, list(val))'s to present to the reducer
  public List<Pair<K2, List<V2>>> shuffle(List<Pair<K2, V2>> mapOutputs) {
    // step 1 - use the key group comparator to organise map outputs
    final TreeMap<K2, List<Pair<K2,V2>>> groupedByKey = 
        new TreeMap<K2, List<Pair<K2,V2>>>();
    
    List<Pair<K2,V2>> groupedKeyList;
    for (Pair<K2, V2> mapOutput : mapOutputs) {
      groupedKeyList = groupedByKey.get(mapOutput.getFirst());
      
      if (groupedKeyList == null) {
        groupedKeyList = new ArrayList<Pair<K2,V2>>();
        groupedByKey.put(mapOutput.getFirst(), groupedKeyList);
      }
      
      groupedKeyList.add(mapOutput);
    }
    
    // step 2 - sort each key group using the key order comparator (if set)
    Comparator<Pair<K2,V2>> pairKeyComparator = new Comparator<Pair<K2, V2>>() {
      @Override
      public int compare(Pair<K2, V2> o1Pair<K2, V2> o2) {
        return .compare(o1.getFirst(), o2.getFirst());
      }
    };
    
    // create shuffle stage output list
    List<Pair<K2, List<V2>>> outputKeyValuesList = 
        new ArrayList<Pair<K2,List<V2>>>();
    
    // populate output list
    for (Entry<K2, List<Pair<K2, V2>>> groupedByKeyEntry : 
          groupedByKey.entrySet()) {
      if ( != null) {
        // sort the key/value pairs using the key order comparator (if set)
        Collections.sort(groupedByKeyEntry.getValue(), pairKeyComparator);
      }
      
      // create list to hold values for the grouped key
      List<V2> valuesList = new ArrayList<V2>();
      for (Pair<K2, V2> pair : groupedByKeyEntry.getValue()) {
        valuesList.add(pair.getSecond());
      }
      
      // add key and values to output list
      outputKeyValuesList.add(
          new Pair<K2,List<V2>>(groupedByKeyEntry.getKey(), valuesList));
    }
    
    // return output list
    return outputKeyValuesList;
  }
  
  
Set the key grouping comparator, similar to calling the following API calls but passing a real instance rather than just the class:

Parameters:
groupingComparator
  public void setKeyGroupingComparator(
        RawComparator<K2> groupingComparator) {
     = groupingComparator;
  }
  
  
Set the key value order comparator, similar to calling the following API calls but passing a real instance rather than just the class:

Parameters:
orderComparator
  public void setKeyOrderComparator(
        RawComparator<K2> orderComparator) {
     = orderComparator;
  }
New to GrepCode? Check out our FAQ X