Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package org.apache.pig.builtin;
 
 import java.util.List;
 
 import  org.apache.avro.mapred.AvroOutputFormat;
 
Pig UDF for reading and writing Avro data.
 
 public class AvroStorage extends LoadFunc
     implements StoreFuncInterfaceLoadMetadataLoadPushDown {

  
Creates new instance of Pig Storage function, without specifying the schema. Useful for just loading in data.
 
   public AvroStorage() {
     this(nullnull);
   }

  
Creates new instance of Pig Storage function.

Parameters:
sn Specifies the input/output schema or record name.
  public AvroStorage(final String sn) {
    this(snnull);
  }
  private String schemaName = "pig_output";
  private String schemaNameSpace = null;
  protected boolean allowRecursive = false;
  protected boolean doubleColonsToDoubleUnderscores = false;
  protected Schema schema;
  protected final Log log = LogFactory.getLog(getClass());

  
Creates new instance of AvroStorage function, specifying output schema properties.

Parameters:
sn Specifies the input/output schema or record name.
opts Options for AvroStorage:
  • -namespace Namespace for an automatically generated output schema.
  • -schemafile Specifies URL for avro schema file from which to read the input schema (can be local file, hdfs, url, etc).
  • -examplefile Specifies URL for avro data file from which to copy the input schema (can be local file, hdfs, url, etc).
  • -allowrecursive Option to allow recursive schema definitions (default is false).
  • -doublecolons Option to translate Pig schema names with double colons to names with double underscores (default is false).
  •   public AvroStorage(final String snfinal String opts) {
        super();
        if (sn != null && sn.length() > 0) {
          try {
            Schema s = (new Schema.Parser()).parse(sn);
            // must be a valid schema
            setInputAvroSchema(s);
            setOutputAvroSchema(s);
          } catch (SchemaParseException e) {
            // not a valid schema, use as a record name
             = sn;
          }
        }
        if (opts != null) {
          String[] optsArr = opts.split(" ");
          Options validOptions = new Options();
          try {
            CommandLineParser parser = new GnuParser();
            validOptions.addOption("n""namespace"true,
                "Namespace for an automatically generated output schema");
            validOptions.addOption("f""schemafile"true,
                "Specifies URL for avro schema file from which to read "
                + "the input or output schema");
            validOptions.addOption("e""examplefile"true,
                "Specifies URL for avro data file from which to copy "
                + "the output schema");
            validOptions.addOption("r""allowrecursive"false,
                "Option to allow recursive schema definitions (default is false)");
            validOptions.addOption("d""doublecolons"false,
                "Option to translate Pig schema names with double colons "
                + "to names with double underscores (default is false)");
            CommandLine configuredOptions = parser.parse(validOptionsoptsArr);
             = configuredOptions.getOptionValue("namespace"null);
             = configuredOptions.hasOption('r');
             = configuredOptions.hasOption('d');
            if (configuredOptions.hasOption('f')) {
              try {
                Path p = new Path(configuredOptions.getOptionValue('f'));
                Schema s = new Schema.Parser()
                  .parse((FileSystem.get(p.toUri(), new Configuration()).open(p)));  
                setInputAvroSchema(s);
                setOutputAvroSchema(s);
              } catch (FileNotFoundException fnfe) {
                ..printf("file not found exception\n");
                .warn("Schema file not found when instantiating AvroStorage. (If the " + 
                    "schema was described in a local file on the front end, and this message " + 
                    "is in the back end log, you can ignore this mesasge.)"fnfe);
              }
            } else if (configuredOptions.hasOption('e')) {
              setOutputAvroSchema(
                  getAvroSchema(configuredOptions.getOptionValue('e'),
                      new Job(new Configuration())));
            }
          } catch (ParseException e) {
            .error("Exception in AvroStorage"e);
            .error("AvroStorage called with arguments " + sn + ", " + opts);
            warn("ParseException in AvroStorage".);
            HelpFormatter formatter = new HelpFormatter();
            formatter.printHelp("AvroStorage(',', '[options]')"validOptions);
            throw new RuntimeException(e);
          } catch (IOException e) {
            .warn("Exception in AvroStorage"e);
            .warn("AvroStorage called with arguments " + sn + ", " + opts);
            warn("IOException in AvroStorage".);
            throw new RuntimeException(e);
          }
        }
      }

      
    Context signature for this UDF instance.
      protected String udfContextSignature = null;
      public final void setUDFContextSignature(final String signature) {
         = signature;
        super.setUDFContextSignature(signature);
      }

      
    Internal function for getting the Properties object associated with this UDF instance.

    Returns:
    The Properties object associated with this UDF instance
      protected final Properties getProperties() {
        if ( == null) {
          return getProperties(AvroStorage.classnull);
        } else {
          return getProperties(AvroStorage.class);
        }
      }

      
    Internal function for getting the Properties object associated with this UDF instance.

    Parameters:
    c Class of this UDF
    signature Signature string
    Returns:
    The Properties object associated with this UDF instance
      @SuppressWarnings("rawtypes")
      protected final Properties getProperties(final Class c,
          final String signature) {
        UDFContext context = UDFContext.getUDFContext();
        if (signature == null) {
          return context.getUDFProperties(c);
        } else {
          return context.getUDFProperties(cnew String[] {signature});
        }
      }
      /*
       * @see org.apache.pig.LoadMetadata#getSchema(java.lang.String,
       * org.apache.hadoop.mapreduce.Job)
       */
      public final ResourceSchema getSchema(final String location,
          final Job jobthrows IOException {
        if ( == null) {
          Schema s = getAvroSchema(locationjob);
          setInputAvroSchema(s);
        }
        ResourceSchema rs = AvroStorageSchemaConversionUtilities
        return rs;
      }

      
    Reads the avro schema at the specified location.

    Parameters:
    location Location of file
    job Hadoop job object
    Returns:
    an Avro Schema object derived from the specified file
    Throws:
    IOException
      protected final Schema getAvroSchema(final String locationfinal Job job)
          throws IOException {
        String[] locations = getPathStrings(location);
        Path[] paths = new Path[locations.length];
        for (int i = 0; i < paths.length; ++i) {
          paths[i] = new Path(locations[i]);
        }
        return getAvroSchema(pathsjob);
      }

      
    A PathFilter that filters out invisible files.
      protected static final PathFilter VISIBLE_FILES = new PathFilter() {
        @Override
        public boolean accept(final Path p) {
          return (!(p.getName().startsWith("_") || p.getName().startsWith(".")));
        }
      };

      
    Reads the avro schemas at the specified location.

    Parameters:
    p Location of file
    job Hadoop job object
    Returns:
    an Avro Schema object derived from the specified file
    Throws:
    IOException
      public Schema getAvroSchema(final Path[] pfinal Job jobthrows IOException {
        GenericDatumReader<ObjectavroReader = new GenericDatumReader<Object>();
        ArrayList<FileStatusstatusList = new ArrayList<FileStatus>();
        FileSystem fs = FileSystem.get(p[0].toUri(), job.getConfiguration());
        for (Path temp : p) {
          for (FileStatus tempf : fs.globStatus(temp)) {
            statusList.add(tempf);
          }
        }
        FileStatus[] statusArray = (FileStatus[]) statusList
            .toArray(new FileStatus[statusList.size()]);
        if (statusArray == null) {
          throw new IOException("Path " + p.toString() + " does not exist.");
        }
        if (statusArray.length == 0) {
          throw new IOException("No path matches pattern " + p.toString());
        }
        Path filePath = depthFirstSearchForFile(statusArrayfs);
        if (filePath == null) {
          throw new IOException("No path matches pattern " + p.toString());
        }
        InputStream hdfsInputStream = fs.open(filePath);
        DataFileStream<ObjectavroDataStream = new DataFileStream<Object>(
            hdfsInputStreamavroReader);
        Schema s = avroDataStream.getSchema();
        avroDataStream.close();
        return s;
      }

      
    Finds a valid path for a file from a FileStatus object.

    Parameters:
    fileStatus FileStatus object corresponding to a file, or a directory.
    fileSystem FileSystem in with the file should be found
    Returns:
    The first file found
    Throws:
    IOException
      private Path depthFirstSearchForFile(final FileStatus fileStatus,
          final FileSystem fileSystemthrows IOException {
        if (fileSystem.isFile(fileStatus.getPath())) {
          return fileStatus.getPath();
        } else {
          return depthFirstSearchForFile(
              fileSystem.listStatus(fileStatus.getPath(), ),
              fileSystem);
        }
      }

      
    Finds a valid path for a file from an array of FileStatus objects.

    Parameters:
    statusArray Array of FileStatus objects in which to search for the file.
    fileSystem FileSystem in which to search for the first file.
    Returns:
    The first file found.
    Throws:
    IOException
      protected Path depthFirstSearchForFile(final FileStatus[] statusArray,
          final FileSystem fileSystemthrows IOException {
        // Most recent files first
        Arrays.sort(statusArray,
            new Comparator<FileStatus>() {
              @Override
              public int compare(final FileStatus fs1final FileStatus fs2) {
                  return Longs.compare(fs2.getModificationTime(),fs1.getModificationTime());
                }
              }
        );
        for (FileStatus f : statusArray) {
          Path p = depthFirstSearchForFile(ffileSystem);
          if (p != null) {
            return p;
          }
        }
        return null;
      }
      /*
       * @see org.apache.pig.LoadMetadata#getStatistics(java.lang.String,
       * org.apache.hadoop.mapreduce.Job)
       */
      public final ResourceStatistics getStatistics(final String location,
          final Job jobthrows IOException {
        return null;
      }
      /*
       * @see org.apache.pig.LoadMetadata#getPartitionKeys(java.lang.String,
       * org.apache.hadoop.mapreduce.Job)
       */
      public final String[] getPartitionKeys(final String location,
          final Job jobthrows IOException {
        return null;
      }
      /*
       * @see
       * org.apache.pig.LoadMetadata#setPartitionFilter(org.apache.pig.Expression)
       */
      public void setPartitionFilter(final Expression partitionFilter)
          throws IOException {
      }
      /*
       * @see
       * org.apache.pig.StoreFuncInterface#relToAbsPathForStoreLocation(java.lang
       * .String, org.apache.hadoop.fs.Path)
       */
      public final String relToAbsPathForStoreLocation(final String location,
          final Path curDirthrows IOException {
        return LoadFunc.getAbsolutePath(locationcurDir);
      }
      /*
       * @see org.apache.pig.StoreFuncInterface#getOutputFormat()
       */
          throws IOException {

        
    Hadoop output format for AvroStorage.
        class AvroStorageOutputFormat extends
          @Override
              final TaskAttemptContext tcthrows IOException,
            return new AvroRecordWriter(
                // avroStorageOutputFormatSchema,
                getDefaultWorkFile(tc, AvroOutputFormat.EXT),
                tc.getConfiguration());
          }
        }
        return new AvroStorageOutputFormat();
      }
      /*
       * @see org.apache.pig.StoreFuncInterface#setStoreLocation(java.lang.String,
       * org.apache.hadoop.mapreduce.Job)
       */
      public final void setStoreLocation(final String location,
          final Job jobthrows IOException {
        FileOutputFormat.setOutputPath(jobnew Path(location));
      }

      
    Pig property name for the output avro schema.
      public static final String OUTPUT_AVRO_SCHEMA =
          "org.apache.pig.builtin.AvroStorage.output.schema";
      /*
       * @see
       * org.apache.pig.StoreFuncInterface#checkSchema(org.apache.pig.ResourceSchema
       * )
       */
      public final void checkSchema(final ResourceSchema rsthrows IOException {
        if (rs == null) {
          throw new IOException("checkSchema: called with null ResourceSchema");
        }
        Schema avroSchema = AvroStorageSchemaConversionUtilities
            .resourceSchemaToAvroSchema(rs,
                ( == null || .length() == 0)
                    ? "pig_output" : ,
                    ,
                    Maps.<StringList<Schema>> newHashMap(),
                    );
        if (avroSchema == null) {
          throw new IOException("checkSchema: could not translate ResourceSchema to Avro Schema");
        }
        setOutputAvroSchema(avroSchema);
      }

      
    Sets the output avro schema to .

    Parameters:
    s An Avro schema
      protected final void setOutputAvroSchema(final Schema s) {
         = s;
      }

      
    Utility function that gets the output schema from the udf properties for this instance of the store function.

    Returns:
    the output schema associated with this UDF
      protected final Schema getOutputAvroSchema() {
        if ( == null) {
          String schemaString = 
                  getProperties()
              .getProperty();
          if (schemaString != null) {
             = (new Schema.Parser()).parse(schemaString);
          }
        }
        return ;
      }
      
      
    RecordWriter used by this UDF instance.
      /*
       * @see
       * org.apache.pig.StoreFuncInterface#prepareToWrite(org.apache.hadoop.mapreduce
       * .RecordWriter)
       */
      @SuppressWarnings({ "unchecked""rawtypes" })
      public final void prepareToWrite(final RecordWriter wthrows IOException {
        if (this. == null)
          throw new IOException(this.getClass().toString() + ".prepareToWrite called without setting udf context signature");
      }
      /*
       * @see org.apache.pig.StoreFuncInterface#putNext(org.apache.pig.data.Tuple)
       */
      public final void putNext(final Tuple tthrows IOException {
        try {
          .write(nullt);
        } catch (InterruptedException e) {
          .error("InterruptedException in putNext");
          throw new IOException(e);
        }
      }
      /*
       * @see
       * org.apache.pig.StoreFuncInterface#setStoreFuncUDFContextSignature(java.
       * lang.String)
       */
      public final void setStoreFuncUDFContextSignature(final String signature) {
         = signature;
        super.setUDFContextSignature(signature);
      }
      /*
       * @see org.apache.pig.StoreFuncInterface#cleanupOnFailure(java.lang.String,
       * org.apache.hadoop.mapreduce.Job)
       */
      public final void cleanupOnFailure(final String location,
          final Job jobthrows IOException {
        StoreFunc.cleanupOnFailureImpl(locationjob);
      }

      
    Pig property name for the input avro schema.
      public static final String INPUT_AVRO_SCHEMA =
          "org.apache.pig.builtin.AvroStorage.input.schema";
      /*
       * @see org.apache.pig.LoadFunc#setLocation(java.lang.String,
       * org.apache.hadoop.mapreduce.Job)
       */
      public void setLocation(final String locationfinal Job job)
          throws IOException {
        FileInputFormat.setInputPaths(joblocation);
        if ( == null) {
           = getInputAvroSchema();
          if ( == null) {
             = getAvroSchema(locationjob);
            if ( == null) {
              throw new IOException(
                  "Could not determine avro schema for location " + location);
            }
            setInputAvroSchema();
          }
        }
      }

      
    Sets the input avro schema to .

    Parameters:
    s The specified schema
      protected final void setInputAvroSchema(final Schema s) {
         = s;
      }

      
    Helper function reads the input avro schema from the UDF Properties.

    Returns:
    The input avro schema
      public final Schema getInputAvroSchema() {
        if ( == null) {
        }
        return ;
      }

      
    Utility function that gets the input avro schema from the udf properties and updates schema for this instance.
      private final void updateSchemaFromInputAvroSchema() {
        if (schemaString != null) {
          Schema s = new Schema.Parser().parse(schemaString);
           = s;
        }
      }

      

    See also:
    org.apache.pig.LoadFunc.getInputFormat()
          throws IOException {
          @Override
            createRecordReader(final InputSplit isfinal TaskAttemptContext tc)
              throws IOExceptionInterruptedException {
            Schema s = getInputAvroSchema();
            RecordReader<NullWritableGenericData.Recordrr = null;
            if (s.getType() == .) {
              rr = new AvroArrayReader(s);
            } else {
              rr = new AvroRecordReader(s);
            }
            rr.initialize(istc);
            tc.setStatus(is.toString());
            return rr;
          }
        };
      }
      @SuppressWarnings("rawtypes"private RecordReader reader;
      /*
       * @see
       * org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader
       * , org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit)
       */
      @SuppressWarnings("rawtypes")
      public final void prepareToRead(final RecordReader rfinal PigSplit s)
          throws IOException {
         = r;
         = s;
      }
      /*
       * @see org.apache.pig.LoadFunc#getNext()
       */
      public final Tuple getNext() throws IOException {
        try {
          if (.nextKeyValue()) {
            return new AvroTupleWrapper<GenericData.Record>(
                (GenericData.Record.getCurrentValue());
          } else {
            return null;
          }
        } catch (InterruptedException e) {
          throw new IOException("Wrapped Interrupted Exception"e);
        }
      }
      public void cleanupOnSuccess(final String locationfinal Job job)
          throws IOException {
      }
      public List<OperatorSetgetFeatures() {
      }

      
    List of required fields passed by pig in a push down projection.
      /*
       * @see
       * org.apache.pig.LoadPushDown#pushProjection(org.apache.pig.LoadPushDown.
       * RequiredFieldList)
       */
              throws FrontendException {
           = rfl;
          
          Schema newSchema = AvroStorageSchemaConversionUtilities
                  .newSchemaFromRequiredFieldList(rfl);
          if (newSchema != null) {
               = newSchema;
              setInputAvroSchema();
              return new RequiredFieldResponse(true);
          } else {
              .warn("could not select fields subset " + rfl + "\n");
              warn("could not select fields subset".);
              return new RequiredFieldResponse(false);
          }
      }
    New to GrepCode? Check out our FAQ X