Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package org.apache.hadoop.mrunit;
 
 
 import java.util.List;
 
Harness that allows you to test a dataflow through a set of Mappers and Reducers. You provide a set of (Mapper, Reducer) "jobs" that make up a workflow, as well as a set of (key, value) pairs to pass in to the first Mapper. You can also specify the outputs you expect to be sent to the final Reducer in the pipeline. By calling runTest(), the harness will deliver the input to the first Mapper, feed the intermediate results to the first Reducer (without checking them), and proceed to forward this data along to subsequent Mapper/Reducer jobs in the pipeline until the final Reducer. The last Reducer's outputs are checked against the expected results. This is designed for slightly more complicated integration tests than the MapReduceDriver, which is for smaller unit tests. (K1, V1) in the type signature refer to the types associated with the inputs to the first Mapper. (K2, V2) refer to the types associated with the final Reducer's output. No intermediate types are specified.
 
 public class PipelineMapReduceDriver<K1, V1, K2, V2>
     extends TestDriver<K1, V1, K2, V2> {
 
   public static final Log LOG = LogFactory.getLog(PipelineMapReduceDriver.class);
 
   private List<Pair<K1, V1>> inputList;
   private Counters counters;
 
   public PipelineMapReduceDriver(final List<Pair<MapperReducer>> pipeline) {
     this. = copyMapReduceList(pipeline);
     this. = new ArrayList<Pair<K1, V1>>();
     this. = new Counters();
   }
 
   public PipelineMapReduceDriver() {
     this. = new ArrayList<Pair<MapperReducer>>();
     this. = new ArrayList<Pair<K1, V1>>();
     this. = new Counters();
   }
 
   private List<Pair<MapperReducer>> copyMapReduceList(List<Pair<MapperReducer>> lst) {
     List<Pair<MapperReducer>> outList = new ArrayList<Pair<MapperReducer>>();
     for (Pair<MapperReducerp : lst) {
       // Take advantage of the fact that Pair is immutable.
       outList.add(p);
     }
 
     return outList;
   }

  

Returns:
the counters used in this test
 
   public Counters getCounters() {
     return ;
   }

  
Sets the counters object to use for this test.

Parameters:
ctrs The counters object to use.
 
   public void setCounters(final Counters ctrs) {
     this. = ctrs;
   }

  
Sets the counters to use and returns self for fluent style
 
   public PipelineMapReduceDriver<K1, V1, K2, V2> withCounters(final Counters ctrs) {
     setCounters(ctrs);
     return this;
  }


  
Add a Mapper and Reducer instance to the pipeline to use with this test driver

Parameters:
m The Mapper instance to add to the pipeline
r The Reducer instance to add to the pipeline
  public void addMapReduce(Mapper mReducer r) {
    Pair<MapperReducerp = new Pair<MapperReducer>(mr);
    this..add(p);
  }

  
Add a Mapper and Reducer instance to the pipeline to use with this test driver

Parameters:
p The Mapper and Reducer instances to add to the pipeline
  public void addMapReduce(Pair<MapperReducerp) {
    this..add(p);
  }

  
Add a Mapper and Reducer instance to the pipeline to use with this test driver using fluent style

Parameters:
m The Mapper instance to use
r The Reducer instance to use
  public PipelineMapReduceDriver<K1, V1, K2, V2> withMapReduce(Mapper mReducer r) {
    addMapReduce(mr);
    return this;
  }

  
Add a Mapper and Reducer instance to the pipeline to use with this test driver using fluent style

Parameters:
p The Mapper and Reducer instances to add to the pipeline
  public PipelineMapReduceDriver<K1, V1, K2, V2> withMapReduce(Pair<MapperReducerp) {
    addMapReduce(p);
    return this;
  }

  

Returns:
A copy of the list of Mapper and Reducer objects under test
  }

  
Adds an input to send to the mapper

Parameters:
key
val
  public void addInput(K1 key, V1 val) {
    .add(new Pair<K1, V1>(keyval));
  }

  
Identical to addInput() but returns self for fluent programming style

Parameters:
key
val
Returns:
this
  public PipelineMapReduceDriver<K1, V1, K2, V2> withInput(K1 key, V1 val) {
    addInput(keyval);
    return this;
  }

  
Adds an input to send to the Mapper

Parameters:
input The (k, v) pair to add to the input list.
  public void addInput(Pair<K1, V1> input) {
    if (null == input) {
      throw new IllegalArgumentException("Null input in addInput()");
    }
    .add(input);
  }

  
Identical to addInput() but returns self for fluent programming style

Parameters:
input The (k, v) pair to add
Returns:
this
  public PipelineMapReduceDriver<K1, V1, K2, V2> withInput(
      Pair<K1, V1> input) {
    addInput(input);
    return this;
  }

  
Adds an output (k, v) pair we expect from the Reducer

Parameters:
outputRecord The (k, v) pair to add
  public void addOutput(Pair<K2, V2> outputRecord) {
    if (null != outputRecord) {
      .add(outputRecord);
    } else {
      throw new IllegalArgumentException("Tried to add null outputRecord");
    }
  }

  
Works like addOutput(), but returns self for fluent style

Parameters:
outputRecord
Returns:
this
  public PipelineMapReduceDriver<K1, V1, K2, V2> withOutput(
          Pair<K2, V2> outputRecord) {
    addOutput(outputRecord);
    return this;
  }

  
Adds a (k, v) pair we expect as output from the Reducer

Parameters:
key
val
  public void addOutput(K2 key, V2 val) {
    addOutput(new Pair<K2, V2>(keyval));
  }

  
Functions like addOutput() but returns self for fluent programming style

Parameters:
key
val
Returns:
this
  public PipelineMapReduceDriver<K1, V1, K2, V2> withOutput(K2 key, V2 val) {
    addOutput(keyval);
    return this;
  }

  
Expects an input of the form "key \t val" Forces the Mapper input types to Text.

Parameters:
input A string of the form "key \t val". Trims any whitespace.
  public void addInputFromString(String input) {
    if (null == input) {
      throw new IllegalArgumentException("null input given to setInput");
    } else {
      Pair<TextTextinputPair = parseTabbedPair(input);
      if (null != inputPair) {
        // I know this is not type-safe, but I don't
        // know a better way to do this.
        addInput((Pair<K1, V1>) inputPair);
      } else {
        throw new IllegalArgumentException("Could not parse input pair in addInput");
      }
    }
  }

  
Identical to addInputFromString, but with a fluent programming style

Parameters:
input A string of the form "key \t val". Trims any whitespace.
Returns:
this
  public PipelineMapReduceDriver<K1, V1, K2, V2> withInputFromString(String input) {
    addInputFromString(input);
    return this;
  }

  
Expects an input of the form "key \t val" Forces the Reducer output types to Text.

Parameters:
output A string of the form "key \t val". Trims any whitespace.
  public void addOutputFromString(String output) {
    if (null == output) {
      throw new IllegalArgumentException("null input given to setOutput");
    } else {
      Pair<TextTextoutputPair = parseTabbedPair(output);
      if (null != outputPair) {
        // I know this is not type-safe,
        // but I don't know a better way to do this.
        addOutput((Pair<K2, V2>) outputPair);
      } else {
        throw new IllegalArgumentException(
            "Could not parse output pair in setOutput");
      }
    }
  }

  
Identical to addOutputFromString, but with a fluent programming style

Parameters:
output A string of the form "key \t val". Trims any whitespace.
Returns:
this
  public PipelineMapReduceDriver<K1, V1, K2, V2> withOutputFromString(String output) {
    addOutputFromString(output);
    return this;
  }
  public List<Pair<K2, V2>> run() throws IOException {
    // inputs starts with the user-provided inputs.
    List inputs = this.;
    if (.size() == 0) {
      .warn("No Mapper or Reducer instances in pipeline; this is a trivial test.");
    }
    if (inputs.size() == 0) {
      .warn("No inputs configured to send to MapReduce pipeline; this is a trivial test.");
    }
    for (Pair<MapperReducerjob : ) {
      // Create a MapReduceDriver to run this phase of the pipeline.
      MapReduceDriver mrDriver = new MapReduceDriver(job.getFirst(), job.getSecond());
      mrDriver.setCounters(getCounters());
      // Add the inputs from the user, or from the previous stage of the pipeline.
      for (Object input : inputs) {
        mrDriver.addInput((Pairinput);
      }
      // Run the MapReduce "job". The output of this job becomes
      // the input to the next job.
      inputs = mrDriver.run();
    }
    // The last list of values stored in "inputs" is actually the outputs.
    // Unfortunately, due to the variable-length list of MR passes the user 
    // can test, this is not type-safe.
    return (List<Pair<K2, V2>>) inputs;
  }
  public void runTest() throws RuntimeException {
    List<Pair<K2, V2>> outputs = null;
    boolean succeeded;
    try {
      outputs = run();
      validate(outputs);
    } catch (IOException ioe) {
      .error("IOException: " + ioe.toString());
      .debug("Setting success to false based on IOException");
      throw new RuntimeException();
    }
  }
New to GrepCode? Check out our FAQ X