Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
  
  package org.apache.pig.backend.hadoop.hbase;
  
  import java.io.DataInput;
  import java.util.Arrays;
  import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
  
  import  org.apache.hadoop.hbase.mapreduce.TableInputFormat;
  import  org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  import  org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
  import  org.apache.hadoop.hbase.mapreduce.TableSplit;
  
A HBase implementation of LoadFunc and StoreFunc.

Below is an example showing how to load data from HBase:

raw = LOAD 'hbase://SampleTable'
       USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
       'info:first_name info:last_name friends:* info:*', '-loadKey true -limit 5')
       AS (id:bytearray, first_name:chararray, last_name:chararray, friends_map:map[], info_map:map[]);
 
This example loads data redundantly from the info column family just to illustrate usage. Note that the row key is inserted first in the result schema. To load only column names that start with a given prefix, specify the column name with a trailing '*'. For example passing friends:bob_* to the constructor in the above example would cause only columns that start with bob_ to be loaded.

Note that when using a prefix like friends:bob_*, explicit HBase filters are set for all columns and prefixes specified. Querying HBase with many filters can cause performance degredation. This is typically seen when mixing one or more prefixed descriptors with a large list of columns. In that case better perfomance will be seen by either loading the entire family via friends:* or by specifying explicit column descriptor names.

Below is an example showing how to store data into HBase:

copy = STORE raw INTO 'hbase://SampleTableCopy'
       USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
       'info:first_name info:last_name friends:* info:*');
 
Note that STORE will expect the first value in the tuple to be the row key. Scalars values need to map to an explicit column descriptor and maps need to map to a column family name. In the above examples, the friends column family data from SampleTable will be written to a buddies column family in the SampleTableCopy table.
 
 public class HBaseStorage extends LoadFunc implements StoreFuncInterfaceLoadPushDownOrderedLoadFunc {
 
     private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
 
     private final static String STRING_CASTER = "UTF8StorageConverter";
     private final static String BYTE_CASTER = "HBaseBinaryConverter";
     private final static String CASTER_PROPERTY = "pig.hbase.caster";
     private final static String ASTERISK = "*";
     private final static String COLON = ":";
     private final static String HBASE_SECURITY_CONF_KEY = "hbase.security.authentication";
     private final static String HBASE_CONFIG_SET = "hbase.config.set";
     private final static String HBASE_TOKEN_SET = "hbase.token.set";
 
     private List<ColumnInfocolumnInfo_ = Lists.newArrayList();
 
     //Use JobConf to store hbase delegation token
     private JobConf m_conf;
     private RecordReader reader;
     private RecordWriter writer;
     private TableOutputFormat outputFormat = null;
     private Scan scan;
     private String contextSignature = null;
 
     private final CommandLine configuredOptions_;
     private final static Options validOptions_ = new Options();
     private final static CommandLineParser parser_ = new GnuParser();
 
     private boolean loadRowKey_;
     private String delimiter_;
     private boolean ignoreWhitespace_;
     private final long limit_;
     private final int caching_;
     private final boolean noWAL_;
     private final long minTimestamp_;
     private final long maxTimestamp_;
     private final long timestamp_;
 
     protected transient byte[] gt_;
     protected transient byte[] gte_;
     protected transient byte[] lt_;
     protected transient byte[] lte_;
 
     private LoadCaster caster_;
 
     private ResourceSchema schema_;
 
     private static void populateValidOptions() {
         .addOption("loadKey"false"Load Key");
         .addOption("gt"true"Records must be greater than this value " +
                 "(binary, double-slash-escaped)");
         .addOption("lt"true"Records must be less than this value (binary, double-slash-escaped)");
         .addOption("gte"true"Records must be greater than or equal to this value");
         .addOption("lte"true"Records must be less than or equal to this value");
         .addOption("caching"true"Number of rows scanners should cache");
         .addOption("limit"true"Per-region limit");
         .addOption("delim"true"Column delimiter");
         .addOption("ignoreWhitespace"true"Ignore spaces when parsing columns");
         .addOption("caster"true"Caster to use for converting values. A class name, " +
                 "HBaseBinaryConverter, or Utf8StorageConverter. For storage, casters must implement LoadStoreCaster.");
         .addOption("noWAL"false"Sets the write ahead to false for faster loading. To be used with extreme caution since this could result in data loss (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).");
         .addOption("minTimestamp"true"Record must have timestamp greater or equal to this value");
         .addOption("maxTimestamp"true"Record must have timestamp less then this value");
         .addOption("timestamp"true"Record must have timestamp equal to this value");
 
     }

    
Constructor. Construct a HBase Table LoadFunc and StoreFunc to load or store the cells of the provided columns.

Parameters:
columnList columnlist that is a presented string delimited by space and/or commas. To retreive all columns in a column family Foo, specify a column as either Foo: or Foo:*. To fetch only columns in the CF that start with bar, specify Foo:bar*. The resulting tuple will always be the size of the number of tokens in columnList. Items in the tuple will be scalar values when a full column descriptor is specified, or a map of column descriptors to values when a column family is specified.
Throws:
ParseException when unable to parse arguments
IOException
 
     public HBaseStorage(String columnListthrows ParseExceptionIOException {
         this(columnList,"");
     }

    
Constructor. Construct a HBase Table LoadFunc and StoreFunc to load or store.

Parameters:
columnList
optString Loader options. Known options:
  • -loadKey=(true|false) Load the row key as the first column
  • -gt=minKeyVal
  • -lt=maxKeyVal
  • -gte=minKeyVal
  • -lte=maxKeyVal
  • -limit=numRowsPerRegion max number of rows to retrieve per region
  • -delim=char delimiter to use when parsing column names (default is space or comma)
  • -ignoreWhitespace=(true|false) ignore spaces when parsing column names (default true)
  • -caching=numRows number of rows to cache (faster scans, more memory).
  • -noWAL=(true|false) Sets the write ahead to false for faster loading.
  • -minTimestamp= Scan's timestamp for min timeRange
  • -maxTimestamp= Scan's timestamp for max timeRange
  • -timestamp= Scan's specified timestamp
  • -caster=(HBaseBinaryConverter|Utf8StorageConverter) Utf8StorageConverter is the default To be used with extreme caution, since this could result in data loss (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).
Throws:
ParseException
IOException
 
     public HBaseStorage(String columnListString optStringthrows ParseExceptionIOException {
         populateValidOptions();
         String[] optsArr = optString.split(" ");
         try {
              = .parse(optsArr);
         } catch (ParseException e) {
             HelpFormatter formatter = new HelpFormatter();
             formatter.printHelp"[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp]" );
             throw e;
         }
 
          = .hasOption("loadKey");
 
          = ",";
         if (.getOptionValue("delim") != null) {
            = .getOptionValue("delim");
         }
 
          = true;
         if (.hasOption("ignoreWhitespace")) {
           String value = .getOptionValue("ignoreWhitespace");
           if (!"true".equalsIgnoreCase(value)) {
              = false;
           }
         }
 
          = parseColumnList(columnList);
 
         String defaultCaster = UDFContext.getUDFContext().getClientSystemProps().getProperty();
         String casterOption = .getOptionValue("caster"defaultCaster);
         if (.equalsIgnoreCase(casterOption)) {
              = new Utf8StorageConverter();
         } else if (.equalsIgnoreCase(casterOption)) {
              = new HBaseBinaryConverter();
         } else {
             try {
                = (LoadCaster) PigContext.instantiateFuncFromSpec(casterOption);
             } catch (ClassCastException e) {
                 .error("Configured caster does not implement LoadCaster interface.");
                 throw new IOException(e);
             } catch (RuntimeException e) {
                 .error("Configured caster class not found."e);
                 throw new IOException(e);
             }
         }
         .debug("Using caster " + .getClass());
 
          = Integer.valueOf(.getOptionValue("caching""100"));
          = Long.valueOf(.getOptionValue("limit""-1"));
          = .hasOption("noWAL");
 
         if (.hasOption("minTimestamp")){
              = Long.parseLong(.getOptionValue("minTimestamp"));
         } else {
              = 0;
         }
 
         if (.hasOption("maxTimestamp")){
              = Long.parseLong(.getOptionValue("maxTimestamp"));
         } else {
              = .;
         }
 
         if (.hasOption("timestamp")){
              = Long.parseLong(.getOptionValue("timestamp"));
         } else {
              = 0;
         }
 
         initScan();
     }

    
Returns UDFProperties based on contextSignature.
 
     private Properties getUDFProperties() {
         return UDFContext.getUDFContext()
             .getUDFProperties(this.getClass(), new String[] {});
     }

    

Returns:
contextSignature + "_projectedFields"
 
     private String projectedFieldsName() {
         return  + "_projectedFields";
     }

    

Parameters:
columnList
delimiter
ignoreWhitespace
Returns:
 
     private List<ColumnInfoparseColumnList(String columnList,
                                              String delimiter,
                                              boolean ignoreWhitespace) {
         List<ColumnInfocolumnInfo = new ArrayList<ColumnInfo>();
 
         // Default behavior is to allow combinations of spaces and delimiter
         // which defaults to a comma. Setting to not ignore whitespace will
         // include the whitespace in the columns names
         String[] colNames = columnList.split(delimiter);
         if(ignoreWhitespace) {
             List<Stringcolumns = new ArrayList<String>();
 
             for (String colName : colNames) {
                 String[] subColNames = colName.split(" ");
 
                 for (String subColName : subColNames) {
                     subColName = subColName.trim();
                     if (subColName.length() > 0) columns.add(subColName);
                 }
             }
 
             colNames = columns.toArray(new String[columns.size()]);
         }
 
         for (String colName : colNames) {
             columnInfo.add(new ColumnInfo(colName));
         }
 
         return columnInfo;
     }
 
     private void initScan() throws IOException{
          = new Scan();
 
         // Map-reduce jobs should not run with cacheBlocks
         .setCacheBlocks(false);
         .setCaching();
 
         // Set filters, if any.
         if (.hasOption("gt")) {
              = Bytes.toBytesBinary(Utils.slashisize(.getOptionValue("gt")));
             addRowFilter(.);
             .setStartRow();
         }
         if (.hasOption("lt")) {
              = Bytes.toBytesBinary(Utils.slashisize(.getOptionValue("lt")));
             addRowFilter(.);
             .setStopRow();
         }
         if (.hasOption("gte")) {
              = Bytes.toBytesBinary(Utils.slashisize(.getOptionValue("gte")));
             .setStartRow();
         }
         if (.hasOption("lte")) {
              = Bytes.toBytesBinary(Utils.slashisize(.getOptionValue("lte")));
             byte[] lt = increment();
             if (.isDebugEnabled()) {
                 .debug(String.format("Incrementing lte value of %s from bytes %s to %s to set stop row",
                           Bytes.toString(), toString(), toString(lt)));
             }
 
             if (lt != null) {
                 .setStopRow(increment());
             }
 
             // The WhileMatchFilter will short-circuit the scan after we no longer match. The
             // setStopRow call will limit the number of regions we need to scan
             addFilter(new WhileMatchFilter(new RowFilter(.new BinaryComparator())));
         }
         if (.hasOption("minTimestamp") || .hasOption("maxTimestamp")){
             .setTimeRange();
         }
         if (.hasOption("timestamp")){
             .setTimeStamp();
         }
 
         // if the group of columnInfos for this family doesn't contain a prefix, we don't need
         // to set any filters, we can just call addColumn or addFamily. See javadocs below.
         boolean columnPrefixExists = false;
         for (ColumnInfo columnInfo : ) {
             if (columnInfo.getColumnPrefix() != null) {
                 columnPrefixExists = true;
                 break;
             }
         }
 
         if (!columnPrefixExists) {
             addFiltersWithoutColumnPrefix();
         }
         else {
             addFiltersWithColumnPrefix();
         }
     }

    
If there is no column with a prefix, we don't need filters, we can just call addColumn and addFamily on the scan
 
     private void addFiltersWithoutColumnPrefix(List<ColumnInfocolumnInfos) {
         // Need to check for mixed types in a family, so we don't call addColumn 
         // after addFamily on the same family
         Map<StringList<ColumnInfo>> groupedMap = groupByFamily(columnInfos);
         for (Entry<StringList<ColumnInfo>> entrySet : groupedMap.entrySet()) {
             boolean onlyColumns = true;
             for (ColumnInfo columnInfo : entrySet.getValue()) {
                 if (columnInfo.isColumnMap()) {
                     onlyColumns = false;
                     break;
                 }
             }
             if (onlyColumns) {
                 for (ColumnInfo columnInfo : entrySet.getValue()) {
                     if (.isDebugEnabled()) {
                         .debug("Adding column to scan via addColumn with cf:name = "
                                 + Bytes.toString(columnInfo.getColumnFamily()) + ":"
                                 + Bytes.toString(columnInfo.getColumnName()));
                     }
                     .addColumn(columnInfo.getColumnFamily(), columnInfo.getColumnName());                    
                 }
             } else {
                 String family = entrySet.getKey();
                 if (.isDebugEnabled()) {
                     .debug("Adding column family to scan via addFamily with cf:name = "
                             + family);
                 }
                 .addFamily(Bytes.toBytes(family));                
             }
         }
     }

    
If we have a qualifier with a prefix and a wildcard (i.e. cf:foo*), we need a filter on every possible column to be returned as shown below. This will become very inneficient for long lists of columns mixed with a prefixed wildcard. FilterList - must pass ALL of - FamilyFilter - AND a must pass ONE FilterList of - either Qualifier - or ColumnPrefixFilter If we have only column family filters (i.e. cf:*) or explicit column descriptors (i.e., cf:foo) or a mix of both then we don't need filters, since the scan will take care of that.
 
     private void addFiltersWithColumnPrefix(List<ColumnInfocolumnInfos) {
         // we need to apply a CF AND column list filter for each family
         FilterList allColumnFilters = null;
         Map<StringList<ColumnInfo>> groupedMap = groupByFamily(columnInfos);
         for (String cfString : groupedMap.keySet()) {
             List<ColumnInfocolumnInfoList = groupedMap.get(cfString);
             byte[] cf = Bytes.toBytes(cfString);
 
             // all filters roll up to one parent OR filter
             if (allColumnFilters == null) {
                 allColumnFilters = new FilterList(..);
             }
 
             // each group contains a column family filter AND (all) and an OR (one of) of
             // the column filters
             FilterList thisColumnGroupFilter = new FilterList(..);
             thisColumnGroupFilter.addFilter(new FamilyFilter(.new BinaryComparator(cf)));
             FilterList columnFilters = new FilterList(..);
             for (ColumnInfo colInfo : columnInfoList) {
                 if (colInfo.isColumnMap()) {
 
                     if (.isDebugEnabled()) {
                         .debug("Adding family:prefix filters with values " +
                                 Bytes.toString(colInfo.getColumnFamily()) +  +
                                 Bytes.toString(colInfo.getColumnPrefix()));
                     }
 
                     // add a PrefixFilter to the list of column filters
                     if (colInfo.getColumnPrefix() != null) {
                         columnFilters.addFilter(new ColumnPrefixFilter(
                             colInfo.getColumnPrefix()));
                     }
                 }
                 else {
 
                     if (.isDebugEnabled()) {
                         .debug("Adding family:descriptor filters with values " +
                                 Bytes.toString(colInfo.getColumnFamily()) +  +
                                 Bytes.toString(colInfo.getColumnName()));
                     }
 
                     // add a QualifierFilter to the list of column filters
                     columnFilters.addFilter(new QualifierFilter(.,
                             new BinaryComparator(colInfo.getColumnName())));
                 }
             }
             thisColumnGroupFilter.addFilter(columnFilters);
             allColumnFilters.addFilter(thisColumnGroupFilter);
         }
         if (allColumnFilters != null) {
             addFilter(allColumnFilters);
         }
     }
 
     private void addRowFilter(CompareOp opbyte[] val) {
         if (.isDebugEnabled()) {
             .debug("Adding filter " + op.toString() +
                     " with value " + Bytes.toStringBinary(val));
         }
         addFilter(new RowFilter(opnew BinaryComparator(val)));
     }
 
     private void addFilter(Filter filter) {
         FilterList scanFilter = (FilterList.getFilter();
         if (scanFilter == null) {
             scanFilter = new FilterList(..);
         }
         scanFilter.addFilter(filter);
         .setFilter(scanFilter);
     }

   
Returns the ColumnInfo list so external objects can inspect it.

Returns:
List of ColumnInfo objects
 
     public List<ColumnInfogetColumnInfoList() {
         return ;
     }

   
Updates the ColumnInfo List. Use this if you need to implement custom projections
 
     protected void setColumnInfoList(List<ColumnInfocolumnInfoList) {
         this. = columnInfoList;
     }

   
Stores the requiredFieldsList as a serialized object so it can be fetched on the cluster. If you plan to overwrite pushProjection, you need to call this with the requiredFieldList so it they can be accessed on the cluster.
 
     protected void storeProjectedFieldNames(RequiredFieldList requiredFieldListthrows FrontendException {
         try {
             getUDFProperties().setProperty(projectedFieldsName(),
               ObjectSerializer.serialize(requiredFieldList));
         } catch (IOException e) {
             throw new FrontendException(e);
         }
     }
 
     @Override
     public Tuple getNext() throws IOException {
         try {
             if (.nextKeyValue()) {
                 ImmutableBytesWritable rowKey = (ImmutableBytesWritable
                 .getCurrentKey();
                 Result result = (Result.getCurrentValue();
 
                 int tupleSize = .size();
 
                 // use a map of families -> qualifiers with the most recent
                 // version of the cell. Fetching multiple vesions could be a
                 // useful feature.
                 NavigableMap<byte[], NavigableMap<byte[], byte[]>> resultsMap =
                         result.getNoVersionMap();
 
                 if (){
                     tupleSize++;
                 }
                 Tuple tuple=TupleFactory.getInstance().newTuple(tupleSize);
 
                 int startIndex=0;
                 if (){
                     tuple.set(0, new DataByteArray(rowKey.get()));
                     startIndex++;
                 }
                 for (int i = 0;i < .size(); ++i){
                     int currentIndex = startIndex + i;
 
                     ColumnInfo columnInfo = .get(i);
                     if (columnInfo.isColumnMap()) {
                         // It's a column family so we need to iterate and set all
                         // values found
                         NavigableMap<byte[], byte[]> cfResults =
                                 resultsMap.get(columnInfo.getColumnFamily());
                         Map<StringDataByteArraycfMap =
                                 new HashMap<StringDataByteArray>();
 
                         if (cfResults != null) {
                             for (byte[] quantifier : cfResults.keySet()) {
                                 // We need to check against the prefix filter to
                                 // see if this value should be included. We can't
                                 // just rely on the server-side filter, since a
                                 // user could specify multiple CF filters for the
                                 // same CF.
                                 if (columnInfo.getColumnPrefix() == null ||
                                         columnInfo.hasPrefixMatch(quantifier)) {
 
                                     byte[] cell = cfResults.get(quantifier);
                                     DataByteArray value =
                                             cell == null ? null : new DataByteArray(cell);
                                     cfMap.put(Bytes.toString(quantifier), value);
                                 }
                             }
                         }
                         tuple.set(currentIndexcfMap);
                     } else {
                         // It's a column so set the value
                         byte[] cell=result.getValue(columnInfo.getColumnFamily(),
                                                     columnInfo.getColumnName());
                         DataByteArray value =
                                 cell == null ? null : new DataByteArray(cell);
                         tuple.set(currentIndexvalue);
                     }
                 }
 
                 if (.isDebugEnabled()) {
                     for (int i = 0; i < tuple.size(); i++) {
                         .debug("tuple value:" + tuple.get(i));
                     }
                 }
 
                 return tuple;
             }
         } catch (InterruptedException e) {
             throw new IOException(e);
         }
         return null;
     }
 
     @Override
     public InputFormat getInputFormat() {
         TableInputFormat inputFormat = new HBaseTableIFBuilder()
         .withLimit()
         .withGt()
         .withGte()
         .withLt()
         .withLte()
         .withConf()
         .build();
         inputFormat.setScan();
         return inputFormat;
     }
 
     @Override
     public void prepareToRead(RecordReader readerPigSplit split) {
         this. = reader;
     }
 
     @Override
     public void setUDFContextSignature(String signature) {
         this. = signature;
     }
 
     @Override
     public void setLocation(String locationJob jobthrows IOException {
         Properties udfProps = getUDFProperties();
         job.getConfiguration().setBoolean("pig.noSplitCombination"true);
 
          = initializeLocalJobConfig(job);
         String delegationTokenSet = udfProps.getProperty();
         if (delegationTokenSet == null) {
             addHBaseDelegationToken(job);
             udfProps.setProperty("true");
         }
 
         String tablename = location;
         if (location.startsWith("hbase://")) {
             tablename = location.substring(8);
         }
 
         .set(TableInputFormat.INPUT_TABLE, tablename);
 
         String projectedFields = udfProps.getPropertyprojectedFieldsName() );
         if (projectedFields != null) {
             // update columnInfo_
             pushProjection((RequiredFieldList) ObjectSerializer.deserialize(projectedFields));
         }
 
         if ( != null) {
             Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
                     new String[] {});
             p.setProperty( + "_projectedFields", ObjectSerializer.serialize());
         }
     }
 
     private void initializeHBaseClassLoaderResources(Job jobthrows IOException {
         // Depend on HBase to do the right thing when available, as of HBASE-9165
         try {
             Method addHBaseDependencyJars =
               TableMapReduceUtil.class.getMethod("addHBaseDependencyJars"Configuration.class);
             if (addHBaseDependencyJars != null) {
                 addHBaseDependencyJars.invoke(nulljob.getConfiguration());
                 return;
             }
         } catch (NoSuchMethodException e) {
             .debug("TableMapReduceUtils#addHBaseDependencyJars not available."
               + " Falling back to previous logic."e);
         } catch (IllegalAccessException e) {
             .debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
               + " not permitted. Falling back to previous logic."e);
         } catch (InvocationTargetException e) {
             .debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
               + " failed. Falling back to previous logic."e);
         }
         // fall back to manual class handling.
         // Make sure the HBase, ZooKeeper, and Guava jars get shipped.
         TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
             org.apache.hadoop.hbase.client.HTable.class// main hbase jar or hbase-client
             org.apache.hadoop.hbase.mapreduce.TableSplit.class// main hbase jar or hbase-server
             com.google.common.collect.Lists.class// guava
             org.apache.zookeeper.ZooKeeper.class); // zookeeper
         // Additional jars that are specific to v0.95.0+
         addClassToJobIfExists(job"org.cloudera.htrace.Trace"); // htrace
         addClassToJobIfExists(job"org.apache.hadoop.hbase.protobuf.generated.HBaseProtos"); // hbase-protocol
         addClassToJobIfExists(job"org.apache.hadoop.hbase.TableName"); // hbase-common
         addClassToJobIfExists(job"org.apache.hadoop.hbase.CompatibilityFactory"); // hbase-hadoop-compar
         addClassToJobIfExists(job"org.jboss.netty.channel.ChannelFactory"); // netty
     }
 
     private void addClassToJobIfExists(Job jobString classNamethrows IOException {
       Class klass = null;
       try {
           klass = Class.forName(className);
       } catch (ClassNotFoundException e) {
           .debug("Skipping adding jar for class: " + className);
           return;
       }
 
       TableMapReduceUtil.addDependencyJars(job.getConfiguration(), klass);
     }
 
     private JobConf initializeLocalJobConfig(Job job) {
         Properties udfProps = getUDFProperties();
         Configuration jobConf = job.getConfiguration();
         JobConf localConf = new JobConf(jobConf);
         if (udfProps.containsKey()) {
             for (Entry<ObjectObjectentry : udfProps.entrySet()) {
                 localConf.set((Stringentry.getKey(), (Stringentry.getValue());
             }
         } else {
             Configuration hbaseConf = HBaseConfiguration.create();
             for (Entry<StringStringentry : hbaseConf) {
                 // JobConf may have some conf overriding ones in hbase-site.xml
                 // So only copy hbase config not in job config to UDFContext
                 // Also avoids copying core-default.xml and core-site.xml
                 // props in hbaseConf to UDFContext which would be redundant.
                 if (jobConf.get(entry.getKey()) == null) {
                     udfProps.setProperty(entry.getKey(), entry.getValue());
                     localConf.set(entry.getKey(), entry.getValue());
                 }
             }
             udfProps.setProperty("true");
         }
         return localConf;
     }

    
Get delegation token from hbase and add it to the Job
 
     @SuppressWarnings({ "rawtypes""unchecked" })
     private void addHBaseDelegationToken(Configuration hbaseConfJob job) {
 
         if (!UDFContext.getUDFContext().isFrontend()) {
             return;
         }
 
         if ("kerberos".equalsIgnoreCase(hbaseConf.get())) {
             // Will not be entering this block for 0.20.2 as it has no security.
             try {
                 // getCurrentUser method is not public in 0.20.2
                 Method m1 = UserGroupInformation.class.getMethod("getCurrentUser");
                 UserGroupInformation currentUser = (UserGroupInformationm1.invoke(null,(Object[]) null);
                 // hasKerberosCredentials method not available in 0.20.2
                 Method m2 = UserGroupInformation.class.getMethod("hasKerberosCredentials");
                 boolean hasKerberosCredentials = (Booleanm2.invoke(currentUser, (Object[]) null);
                 if (hasKerberosCredentials) {
                     // Class and method are available only from 0.92 security release
                     Class tokenUtilClass = Class
                             .forName("org.apache.hadoop.hbase.security.token.TokenUtil");
                     Method m3 = tokenUtilClass.getMethod("obtainTokenForJob"new Class[] {
                             Configuration.classUserGroupInformation.classJob.class });
                     m3.invoke(nullnew Object[] { hbaseConfcurrentUserjob });
                 } else {
                     .info("Not fetching hbase delegation token as no Kerberos TGT is available");
                 }
             } catch (ClassNotFoundException cnfe) {
                 throw new RuntimeException("Failure loading TokenUtil class, "
                         + "is secure RPC available?"cnfe);
             } catch (RuntimeException re) {
                 throw re;
             } catch (Exception e) {
                 throw new UndeclaredThrowableException(e,
                         "Unexpected error calling TokenUtil.obtainTokenForJob()");
             }
         }
     }
 
     @Override
     public String relativeToAbsolutePath(String locationPath curDir)
     throws IOException {
         return location;
     }

    
Set up the caster to use for reading values out of, and writing to, HBase.
 
     @Override
     public LoadCaster getLoadCaster() throws IOException {
         return ;
     }
 
     /*
      * StoreFunc Methods
      * @see org.apache.pig.StoreFuncInterface#getOutputFormat()
      */
 
     @Override
     public OutputFormat getOutputFormat() throws IOException {
         if ( == null) {
             if ( == null) {
                 throw new IllegalStateException("setStoreLocation has not been called");
             } else {
                 this. = new TableOutputFormat();
                 this..setConf();
             }
         }
         return ;
     }
 
     @Override
     public void checkSchema(ResourceSchema sthrows IOException {
         if (! ( instanceof LoadStoreCaster)) {
             .error("Caster must implement LoadStoreCaster for writing to HBase.");
             throw new IOException("Bad Caster " + .getClass());
         }
          = s;
         getUDFProperties().setProperty( + "_schema",
                                        ObjectSerializer.serialize());
     }
 
     // Suppressing unchecked warnings for RecordWriter, which is not parameterized by StoreFuncInterface
     @Override
     public void prepareToWrite(@SuppressWarnings("rawtypes"RecordWriter writerthrows IOException {
         this. = writer;
     }
 
     // Suppressing unchecked warnings for RecordWriter, which is not parameterized by StoreFuncInterface
     @SuppressWarnings("unchecked")
     @Override
     public void putNext(Tuple tthrows IOException {
         ResourceFieldSchema[] fieldSchemas = ( == null) ? null : .getFields();
         byte type = (fieldSchemas == null) ? DataType.findType(t.get(0)) : fieldSchemas[0].getType();
         long ts=System.currentTimeMillis();
 
         Put put = createPut(t.get(0), type);
 
         if (.isDebugEnabled()) {
             .debug("putNext -- WAL disabled: " + );
             for (ColumnInfo columnInfo : ) {
                 .debug("putNext -- col: " + columnInfo);
             }
         }
 
         for (int i=1;i<t.size();++i){
             ColumnInfo columnInfo = .get(i-1);
             if (.isDebugEnabled()) {
                 .debug("putNext - tuple: " + i + ", value=" + t.get(i) +
                         ", cf:column=" + columnInfo);
             }
 
             if (!columnInfo.isColumnMap()) {
                 put.add(columnInfo.getColumnFamily(), columnInfo.getColumnName(),
                         tsobjToBytes(t.get(i), (fieldSchemas == null) ?
                         DataType.findType(t.get(i)) : fieldSchemas[i].getType()));
             } else {
                 Map<StringObjectcfMap = (Map<StringObject>) t.get(i);
                 for (String colName : cfMap.keySet()) {
                     if (.isDebugEnabled()) {
                         .debug("putNext - colName=" + colName +
                                   ", class: " + colName.getClass());
                     }
                     // TODO deal with the fact that maps can have types now. Currently we detect types at
                     // runtime in the case of storing to a cf, which is suboptimal.
                     put.add(columnInfo.getColumnFamily(), Bytes.toBytes(colName.toString()), ts,
                             objToBytes(cfMap.get(colName), DataType.findType(cfMap.get(colName))));
                 }
             }
         }
 
         try {
             .write(nullput);
         } catch (InterruptedException e) {
             throw new IOException(e);
         }
     }

    
Public method to initialize a Put. Used to allow assertions of how Puts are initialized by unit tests.

Parameters:
key
type
Returns:
new put
Throws:
IOException
 
     public Put createPut(Object keybyte typethrows IOException {
         Put put = new Put(objToBytes(keytype));
 
         if() {
             put.setWriteToWAL(false);
         }
 
         return put;
     }
 
     @SuppressWarnings("unchecked")
     private byte[] objToBytes(Object obyte typethrows IOException {
         LoadStoreCaster caster = (LoadStoreCaster;
         if (o == nullreturn null;
         switch (type) {
         case .return ((DataByteArrayo).get();
         case .return caster.toBytes((DataBago);
         case .return caster.toBytes((Stringo);
         case .return caster.toBytes((Doubleo);
         case .return caster.toBytes((Floato);
         case .return caster.toBytes((Integero);
         case .return caster.toBytes((Longo);
         case .return caster.toBytes((BigIntegero);
         case .return caster.toBytes((BigDecimalo);
         case .return caster.toBytes((Booleano);
         case .return caster.toBytes((DateTimeo);
 
         // The type conversion here is unchecked.
         // Relying on DataType.findType to do the right thing.
         case .return caster.toBytes((Map<StringObject>) o);
 
         case .return null;
         case .return caster.toBytes((Tupleo);
         case .throw new IOException("Unable to determine type of " + o.getClass());
         defaultthrow new IOException("Unable to find a converter for tuple field " + o);
         }
     }
 
     @Override
     public String relToAbsPathForStoreLocation(String locationPath curDir)
     throws IOException {
         return location;
     }
 
     @Override
     public void setStoreFuncUDFContextSignature(String signature) {
         this. = signature;
     }
 
     @Override
     public void setStoreLocation(String locationJob jobthrows IOException {
         if (location.startsWith("hbase://")){
             job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location.substring(8));
         }else{
            job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location);
        }
        String serializedSchema = getUDFProperties().getProperty( + "_schema");
        if (serializedSchema!= null) {
             = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
        }
         = initializeLocalJobConfig(job);
        // Not setting a udf property and getting the hbase delegation token
        // only once like in setLocation as setStoreLocation gets different Job
        // objects for each call and the last Job passed is the one that is
        // launched. So we end up getting multiple hbase delegation tokens.
        addHBaseDelegationToken(job);
    }
    @Override
    public void cleanupOnFailure(String locationJob jobthrows IOException {
    }
    @Override
    public void cleanupOnSuccess(String locationJob jobthrows IOException {
    }
    /*
     * LoadPushDown Methods.
     */
    @Override
    public List<OperatorSetgetFeatures() {
        return Arrays.asList(..);
    }
    @Override
            RequiredFieldList requiredFieldListthrows FrontendException {
        List<RequiredField>  requiredFields = requiredFieldList.getFields();
        List<ColumnInfonewColumns = Lists.newArrayListWithExpectedSize(requiredFields.size());
        if (this. != null) {
            // in addition to PIG, this is also called by this.setLocation().
            .debug("projection is already set. skipping.");
            return new RequiredFieldResponse(true);
        }
        /* How projection is handled :
         *  - pushProjection() is invoked by PIG on the front end
         *  - pushProjection here both stores serialized projection in the
         *    context and adjusts columnInfo_.
         *  - setLocation() is invoked on the backend and it reads the
         *    projection from context. setLocation invokes this method again
         *    so that columnInfo_ is adjected.
         */
        // colOffset is the offset in our columnList that we need to apply to indexes we get from requiredFields
        // (row key is not a real column)
        int colOffset =  ? 1 : 0;
        // projOffset is the offset to the requiredFieldList we need to apply when figuring out which columns to prune.
        // (if key is pruned, we should skip row key's element in this list when trimming colList)
        int projOffset = colOffset;
        this. = requiredFieldList;
        if (requiredFieldList != null && requiredFields.size() > (.size() + colOffset)) {
            throw new FrontendException("The list of columns to project from HBase is larger than HBaseStorage is configured to load.");
        }
        // remember the projection
        storeProjectedFieldNames(requiredFieldList);
        if ( &&
                ( requiredFields.size() < 1 || requiredFields.get(0).getIndex() != 0)) {
                 = false;
            projOffset = 0;
        }
        for (int i = projOffseti < requiredFields.size(); i++) {
            int fieldIndex = requiredFields.get(i).getIndex();
            newColumns.add(.get(fieldIndex - colOffset));
        }
        if (.isDebugEnabled()) {
            .debug("pushProjection After Projection: loadRowKey is " + ) ;
            for (ColumnInfo colInfo : newColumns) {
                .debug("pushProjection -- col: " + colInfo);
            }
        }
        setColumnInfoList(newColumns);
        return new RequiredFieldResponse(true);
    }