Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.builtin.mock;
 
 import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getUniqueFile;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
A convenient mock Storage for unit tests
  PigServer pigServer = new PigServer(ExecType.LOCAL);
  Data data = resetData(pigServer);
      data.set("foo",
      tuple("a"),
      tuple("b"),
      tuple("c")
      );

  pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage();");
  pigServer.registerQuery("STORE A INTO 'bar' USING mock.Storage();");

  List<Tuple> out = data.get("bar");

  assertEquals(tuple("a"), out.get(0));
  assertEquals(tuple("b"), out.get(1));
  assertEquals(tuple("c"), out.get(2));
 
With Schema:
  PigServer pigServer = new PigServer(ExecType.LOCAL);
  Data data = resetData(pigServer);

  data.set("foo", "blah:chararray",
      tuple("a"),
      tuple("b"),
      tuple("c")
      );

  pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage();");
  pigServer.registerQuery("B = FOREACH A GENERATE blah as a, blah as b;");
  pigServer.registerQuery("STORE B INTO 'bar' USING mock.Storage();");

  assertEquals(schema("a:chararray,b:chararray"), data.getSchema("bar"));
  
  List<Tuple> out = data.get("bar");
  assertEquals(tuple("a", "a"), out.get(0));
  assertEquals(tuple("b", "b"), out.get(1));
  assertEquals(tuple("c", "c"), out.get(2));
 
public class Storage extends LoadFunc implements StoreFuncInterfaceLoadMetadataStoreMetadata {
  private static final String PIG_CONTEXT_KEY = "pig.mock.storage.id";
  private static final Logger LOG = Logger.getLogger(Storage.class);
  private static Map<IntegerDataidToData = new HashMap<IntegerData>();
  private static TupleFactory TF = TupleFactory.getInstance();
  private static int nextId;

  

Parameters:
objects
Returns:
a tuple containing the provided objects
  public static Tuple tuple(Object... objects) {
    return .newTuple(Arrays.asList(objects));
  }

  

Parameters:
tuples
Returns:
a bag containing the provided objects
  public static DataBag bag(Tuple... tuples) {
    return new NonSpillableDataBag(Arrays.asList(tuples));
  }
  
  

Parameters:
schema
Returns:
the schema represented by the string
Throws:
org.apache.pig.parser.ParserException if the schema is invalid
  public static Schema schema(String schemathrows ParserException {
	  return Utils.getSchemaFromString(schema);
  }

  
reset the store and get the Data object to access it

Parameters:
pigServer
Returns:
Data
  public static Data resetData(PigServer pigServer) {
    return resetData(pigServer.getPigContext());
  }

  
reset the store and get the Data object to access it

Parameters:
context
Returns:
data as Data
  public static Data resetData(PigContext context) {
    Properties properties = context.getProperties();
    // cleaning up previous data
    try {
      if (properties.contains()) {
        Integer previousId = new Integer(properties.getProperty());
        .remove(previousId);
      }
    } catch (RuntimeException e) {
      .warn("invalid id in context properties for "+e);
    }
    // setting new Store
    int id = ++;
    properties.setProperty(, String.valueOf(id));
    Data data = new Data();
    .put(iddata);
    return data;
  }
  private Data getData(Job jobthrows IOException {
    String stringId = job.getConfiguration().get();
    if (stringId == null) {
      throw new IOException("no Data prepared for this Script. " +
      		"You need to call Storage.resetData(pigServer.getPigContext()) first");
    }
    Data data = .get(new Integer(stringId));
    if (data == null) {
      throw new IOException("no Data anymore for this Script. " +
          "Has data been reset by another Storage.resetData(pigServer.getPigContext()) ?");
    }
    return data;
  }
  private static class Parts {
    final String location;
    final Map<StringCollection<Tuple>> parts = new HashMap<StringCollection<Tuple>>();
    public Parts(String location) {
      super();
      this. = location;
    }
    public void set(String partFileCollection<Tupledata) {
      if (.put(partFiledata) != null) {
        throw new RuntimeException("the part " + partFile + " for location " +  + " already exists");
      }
    }
    public List<TuplegetAll() {
        List<Tupleall = new ArrayList<Tuple>();
        Set<Entry<StringCollection<Tuple>>> entrySet = .entrySet();
        for (Entry<StringCollection<Tuple>> entry : entrySet) {
            all.addAll(entry.getValue());
        }
        return all;
    }
  }
  
  
An isolated data store to avoid side effects
  public static class Data implements Serializable {
    private static final long serialVersionUID = 1L;
    private Map<StringPartslocationToData = new HashMap<StringParts>();
    private Map<StringSchemalocationToSchema = new HashMap<StringSchema>();

    
to set the data in a location with a known schema

Parameters:
location "where" to store the tuples
schema the schema of the data
data the tuples to store
Throws:
org.apache.pig.parser.ParserException if schema is invalid
    public void set(String locationString schemaCollection<Tupledatathrows ParserException {
      set(location, Utils.getSchemaFromString(schema), data);
    }

    
to set the data in a location with a known schema

Parameters:
location "where" to store the tuples
schema
data the tuples to store
Throws:
org.apache.pig.parser.ParserException if schema is invalid
    public void set(String locationString schemaTuple... datathrows ParserException {
      set(location, Utils.getSchemaFromString(schema), Arrays.asList(data));
    }
    
    
to set the data in a location with a known schema

Parameters:
location "where" to store the tuples
schema
data the tuples to store
    public void set(String locationSchema schemaCollection<Tupledata) {
      set(locationdata);
      if (.put(locationschema) != null) {
          throw new RuntimeException("schema already set for location "+location);
      }
    }

    
to set the data in a location with a known schema

Parameters:
location "where" to store the tuples
schema
data the tuples to store
    public void set(String locationSchema schemaTuple... data) {
      set(locationschema, Arrays.asList(data));
    }

    
to set the data in a location

Parameters:
location "where" to store the tuples
data the tuples to store
    private void setInternal(String locationString partIDCollection<Tupledata) {
        Parts parts = .get(location);
        if (partID == null) {
            if (parts == null) {
                partID = "mock";
            } else {
                throw new RuntimeException("Can not set location " + location + " twice");
            }
        }
        if (parts == null) {
            parts = new Parts(location);
            .put(locationparts);
        }
        parts.set(partIDdata);
    }

    
to set the data in a location

Parameters:
location "where" to store the tuples
data the tuples to store
    public void set(String locationCollection<Tupledata) {
      setInternal(locationnulldata);
    }

    
to set the data in a location

Parameters:
location "where" to store the tuples
data the tuples to store
    public void set(String locationTuple... data) {
        set(location, Arrays.asList(data));
    }
    
    

Parameters:
location
Returns:
the data in this location
    public List<Tupleget(String location) {
      if (!.containsKey(location)) {
        throw new RuntimeException("No data for location '" + location + "'");
      }
      return .get(location).getAll();
    }

    

Parameters:
location
Returns:
the schema stored in this location
	public Schema getSchema(String location) {
		return .get(location);
	}

to set the schema for a given location

Parameters:
location
schema
	public void setSchema(String locationSchema schema) {
		.put(locationschema);
	}
  }
  private String location;
  private Data data;
  
  private Schema schema;
  private void init(String locationJob jobthrows IOException {
	  this. = getData(job);
	  this. = location;
	  this. = .getSchema(location);
  }
  // LoadFunc
  public String relativeToAbsolutePath(String locationPath curDirthrows IOException {
	this. = location;
    return location;
  }
  public void setLocation(String locationJob jobthrows IOException {
    init(locationjob);
    this. = .get(location).iterator();
  }
  public InputFormat<?, ?> getInputFormat() throws IOException {
    return new MockInputFormat();
  }
  public LoadCaster getLoadCaster() throws IOException {
    return super.getLoadCaster();
  }
  public void prepareToRead(@SuppressWarnings("rawtypes"RecordReader readerPigSplit splitthrows IOException {
  }
  public Tuple getNext() throws IOException {
    if ( == null) {
      throw new IOException("data was not correctly initialized in MockLoader");
    }
    return .hasNext() ? .next() : null;
  }
  public void setUDFContextSignature(String signature) {
    super.setUDFContextSignature(signature);
  }
  
  // LoadMetaData
  
  public ResourceSchema getSchema(String locationJob jobthrows IOException {
	init(locationjob);
  	return  == null ? null : new ResourceSchema();
  }
  public ResourceStatistics getStatistics(String locationJob job)
  		throws IOException {
	init(locationjob);
  	return null;
  }
  public String[] getPartitionKeys(String locationJob jobthrows IOException {
	init(locationjob);
  	return null;
  }
  public void setPartitionFilter(Expression partitionFilterthrows IOException {
  }
  // StoreFunc
  public String relToAbsPathForStoreLocation(String locationPath curDirthrows IOException {
    this. = location;
    return location;
  }
  public OutputFormat<?, ?> getOutputFormat() throws IOException {
    return new MockOutputFormat();
  }
  public void setStoreLocation(String locationJob jobthrows IOException {
	init(locationjob);
  }
  public void checkSchema(ResourceSchema sthrows IOException {
  }
  public void prepareToWrite(@SuppressWarnings("rawtypes"RecordWriter writerthrows IOException {
       = (MockRecordWriterwriter;
  }
  public void putNext(Tuple tthrows IOException {
  }
  public void setStoreFuncUDFContextSignature(String signature) {
  }
  public void cleanupOnFailure(String locationJob jobthrows IOException {
	init(locationjob);
  }
  public void cleanupOnSuccess(String locationJob jobthrows IOException {
	init(locationjob);
  }
  // StoreMetaData
  
  public void storeStatistics(ResourceStatistics statsString locationJob job)
  		throws IOException {
	init(locationjob);
  }
  public void storeSchema(ResourceSchema schemaString locationJob job)
  		throws IOException {
	init(locationjob);
	.setSchema(location, Schema.getPigSchema(schema));
  }
  
  // Mocks for LoadFunc
  private static class MockRecordReader extends RecordReader<ObjectObject> {
    @Override
    public void close() throws IOException {
    }
    @Override
    public Object getCurrentKey() throws IOExceptionInterruptedException {
      return "mockKey";
    }
    @Override
      return "mockValue";
    }
    @Override
    public float getProgress() throws IOExceptionInterruptedException {
      return 0.5f;
    }
    @Override
    public void initialize(InputSplit splitTaskAttemptContext arg1throws IOException,
        InterruptedException {
    }
    @Override
    public boolean nextKeyValue() throws IOExceptionInterruptedException {
      return true;
    }
  }
  private static class MockInputSplit extends InputSplit implements Writable  {
    private String location;
    // used through reflection by Hadoop
    @SuppressWarnings("unused")
    public MockInputSplit() {
    }
    public MockInputSplit(String location) {
      this. = location;
    }
    @Override
    public String[] getLocations() throws IOExceptionInterruptedException {
      return new String[] {  };
    }
    @Override
    public long getLength() throws IOExceptionInterruptedException {
      return 10000000;
    }
    @Override
    public boolean equals(Object arg0) {
      return arg0==this;
    }
    @Override
    public int hashCode() {
      return .hashCode();
    }
    @Override
    public void readFields(DataInput arg0throws IOException {
       = arg0.readUTF();
    }
    @Override
    public void write(DataOutput arg0throws IOException {
      arg0.writeUTF();
    }
  }
  private static class MockInputFormat extends InputFormat<ObjectObject> {
    private final String location;
    public MockInputFormat(String location) {
      this. = location;
    }
    @Override
        throws IOExceptionInterruptedException {
      return new MockRecordReader();
    }
    @Override
    public List<InputSplitgetSplits(JobContext jobContextthrows IOExceptionInterruptedException {
      return Arrays.<InputSplit>asList(new MockInputSplit());
    }
  }
  // mocks for StoreFunc
  private static final class MockRecordWriter extends RecordWriter<ObjectObject> {
    private final List<TupledataBeingWritten = new ArrayList<Tuple>();
    private final String partID;
    public MockRecordWriter(String partID) {
        super();
        this. = partID;
    }
    @Override
    public void close(TaskAttemptContext taskAttemptContextthrows IOExceptionInterruptedException {
    }
    @Override
    public void write(Object arg0Object arg1throws IOExceptionInterruptedException {
    }
  }
  private static class MockOutputCommitter extends OutputCommitter {
    @Override
    public void abortTask(TaskAttemptContext arg0throws IOException {
    }
    @Override
    public void commitTask(TaskAttemptContext arg0throws IOException {
    }
    @Override
    public boolean needsTaskCommit(TaskAttemptContext arg0throws IOException {
      return true;
    }
    @Override
    public void setupJob(JobContext arg0throws IOException {
    }
    @Override
    public void setupTask(TaskAttemptContext arg0throws IOException {
    }
  }
  private static final class MockOutputFormat extends OutputFormat<ObjectObject> {
    @Override
    public void checkOutputSpecs(JobContext arg0throws IOExceptionInterruptedException {
    }
    @Override
      return new MockOutputCommitter();
    }
    @Override
      return new MockRecordWriter(getUniqueFile(arg0"part"".mock"));
    }
  }
New to GrepCode? Check out our FAQ X