Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.File;
 import java.util.List;
 import java.util.Map;
 
 import  org.apache.hadoop.conf.Configuration;
 import  org.apache.hadoop.fs.FileSystem;
 import  org.apache.hadoop.fs.Path;
 import  org.apache.hadoop.util.ReflectionUtils;
 
 
 public class HashTableSinkOperator extends TerminalOperator<HashTableSinkDescimplements
     Serializable {
   private static final long serialVersionUID = 1L;
   private static final Log LOG = LogFactory.getLog(HashTableSinkOperator.class.getName());
 
   // from abstract map join operator
   
The expressions for join inputs's join keys.
 
   protected transient Map<ByteList<ExprNodeEvaluator>> joinKeys;
  
The ObjectInspectors for the join inputs's join keys.
 
   protected transient Map<ByteList<ObjectInspector>> joinKeysObjectInspectors;
  
The standard ObjectInspectors for the join inputs's join keys.
 
 
   protected transient int posBigTableTag = -1; // one of the tables that is not in memory
   protected transient int posBigTableAlias = -1; // one of the tables that is not in memory
   transient int mapJoinRowsKey// rows for a given key
 
   protected transient RowContainer<ArrayList<Object>> emptyList = null;
 
   transient int numMapRowsRead;
   protected transient int totalSz// total size of the composite object
   transient boolean firstRow;
  
The filters for join
 
   protected transient Map<ByteList<ExprNodeEvaluator>> joinFilters;
 
   protected transient int numAliases// number of aliases
   
The expressions for join outputs.
 
   protected transient Map<ByteList<ExprNodeEvaluator>> joinValues;
  
The ObjectInspectors for the join inputs.
 
   protected transient Map<ByteList<ObjectInspector>> joinValuesObjectInspectors;
  
The ObjectInspectors for join filters.
 
  protected transient Map<ByteList<ObjectInspector>> joinFilterObjectInspectors;
  
The standard ObjectInspectors for the join inputs.
  protected transient Byte[] order// order in which the results should
  Configuration hconf;
  protected transient Byte alias;
  protected transient Map<ByteTableDescspillTableDesc// spill tables are
  protected transient boolean noOuterJoin;
  private long rowNumber = 0;
  protected transient LogHelper console;
  private long hashTableScale;
  private boolean isAbort = false;
  public static class HashTableSinkObjectCtx {
    SerDe serde;
    Configuration conf;

    

Parameters:
standardOI
serde
    public HashTableSinkObjectCtx(ObjectInspector standardOISerDe serdeTableDesc tblDesc,
        Configuration conf) {
      this. = standardOI;
      this. = serde;
      this. = tblDesc;
      this. = conf;
    }

    

Returns:
the standardOI
    public ObjectInspector getStandardOI() {
      return ;
    }

    

Returns:
the serde
    public SerDe getSerDe() {
      return ;
    }
    public TableDesc getTblDesc() {
      return ;
    }
    public Configuration getConf() {
      return ;
    }
  }
  private static final transient String[] FATAL_ERR_MSG = {
      null// counter value 0 means no error
      "Mapside join size exceeds hive.mapjoin.maxsize. "
          + "Please increase that or remove the mapjoin hint."};
  private final int metadataKeyTag = -1;
  transient int[] metadataValueTag;
  transient int maxMapJoinSize;
  public HashTableSinkOperator() {
  }
    this. = new HashTableSinkDesc(mjop.getConf());
  }
  protected void initializeOp(Configuration hconfthrows HiveException {
    boolean isSilent = HiveConf.getBoolVar(hconf..);
     = new LogHelper(isSilent);
     = 0;
     = true;
    // for small tables only; so get the big table position first
     = .getTagOrder();
    // initialize some variables, which used to be initialized in CommonJoinOperator
     = .getExprs().size();
    this. = hconf;
     = 0;
    // process join keys
    // process join values
    // process join filters
    if () {
    } else {
      Map<ByteList<ObjectInspector>> rowContainerObjectInspectors = new HashMap<ByteList<ObjectInspector>>();
      for (Byte alias : ) {
        if (alias == ) {
          continue;
        }
        ArrayList<ObjectInspectorrcOIs = new ArrayList<ObjectInspector>();
        rcOIs.addAll(.get(alias));
        // for each alias, add object inspector for boolean as the last element
        rowContainerObjectInspectors.put(aliasrcOIs);
      }
       = getStandardObjectInspectors(rowContainerObjectInspectors);
    }
     = new int[];
    for (int pos = 0; pos < pos++) {
      [pos] = -1;
    }
    int hashTableThreshold = HiveConf.getIntVar(hconf..);
    float hashTableLoadFactor = HiveConf.getFloatVar(hconf,
    float hashTableMaxMemoryUsage = this.getConf().getHashtableMemoryUsage();
    if ( <= 0) {
       = 1;
    }
    // initialize the hash tables for other tables
    for (Byte pos : ) {
      if (pos == ) {
        continue;
      }
          hashTableThresholdhashTableLoadFactorhashTableMaxMemoryUsage);
      .put(poshashTable);
    }
  }
      Map<ByteList<ObjectInspector>> aliasToObjectInspectors) {
    HashMap<ByteList<ObjectInspector>> result = new HashMap<ByteList<ObjectInspector>>();
    for (Entry<ByteList<ObjectInspector>> oiEntry : aliasToObjectInspectors.entrySet()) {
      Byte alias = oiEntry.getKey();
      List<ObjectInspectoroiList = oiEntry.getValue();
      ArrayList<ObjectInspectorfieldOIList = new ArrayList<ObjectInspector>(oiList.size());
      for (int i = 0; i < oiList.size(); i++) {
        fieldOIList.add(ObjectInspectorUtils.getStandardObjectInspector(oiList.get(i),
            .));
      }
      result.put(aliasfieldOIList);
    }
    return result;
  }
  private void setKeyMetaData() throws SerDeException {
    TableDesc keyTableDesc = .getKeyTblDesc();
    SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
        null);
    keySerializer.initialize(nullkeyTableDesc.getProperties());
    MapJoinMetaData.clear();
    MapJoinMetaData.put(Integer.valueOf(), new HashTableSinkObjectCtx(
        ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
            .), keySerializerkeyTableDesc));
  }
  /*
   * This operator only process small tables Read the key/value pairs Load them into hashtable
   */
  public void processOp(Object rowint tagthrows HiveException {
    // let the mapJoinOp process these small tables
    try {
      if () {
        // generate the map metadata
        setKeyMetaData();
         = false;
      }
       = [tag];
      // alias = (byte)tag;
      // compute keys and values as StandardObjects
      AbstractMapJoinKey keyMap = JoinUtil.computeMapJoinKeys(row.get(),
          .get());
      Object[] value = JoinUtil.computeMapJoinValues(row.get(),
              .get(), );
          .get((bytetag);
      MapJoinObjectValue o = hashTable.get(keyMap);
      MapJoinRowContainer<Object[]> res = null;
      boolean needNewKey = true;
      if (o == null) {
        res = new MapJoinRowContainer<Object[]>();
        res.add(value);
        if ([tag] == -1) {
          [tag] = [tag];
          setValueMetaData(tag);
        }
        // Construct externalizable objects for key and value
        if (needNewKey) {
          MapJoinObjectValue valueObj = new MapJoinObjectValue([tag], res);
          ++;
          if ( >  &&  %  == 0) {
             = hashTable.isAbort();
            if () {
              throw new HiveException("RunOutOfMeomoryUsage");
            }
          }
          hashTable.put(keyMapvalueObj);
        }
      } else {
        res = o.getObj();
        res.add(value);
      }
    } catch (SerDeException e) {
      throw new HiveException(e);
    }
  }
  private void setValueMetaData(int tagthrows SerDeException {
    TableDesc valueTableDesc = .getValueTblFilteredDescs().get(tag);
    SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(),
        null);
    valueSerDe.initialize(nullvalueTableDesc.getProperties());
    int length = newFields.size();
    List<StringnewNames = new ArrayList<String>(length);
    for (int i = 0; i < lengthi++) {
      String tmp = new String("tmp_" + i);
      newNames.add(tmp);
    }
    StandardStructObjectInspector standardOI = ObjectInspectorFactory
        .getStandardStructObjectInspector(newNamesnewFields);
    MapJoinMetaData.put(Integer.valueOf([tag]), new HashTableSinkObjectCtx(
        standardOIvalueSerDevalueTableDesc));
  }
  public void closeOp(boolean abortthrows HiveException {
    try {
      if ( != null) {
        // get tmp file URI
        String tmpURI = this.getExecContext().getLocalWork().getTmpFileURI();
        .info("Get TMP URI: " + tmpURI);
        long fileLength;
            .entrySet()) {
          // get the key and value
          Byte tag = hashTables.getKey();
          HashMapWrapper<AbstractMapJoinKeyMapJoinObjectValuehashTable = hashTables.getValue();
          // get current input file name
          String bigBucketFileName = this.getExecContext().getCurrentBigBucketFile();
          if (bigBucketFileName == null || bigBucketFileName.length() == 0) {
            bigBucketFileName = "-";
          }
          // get the tmp URI path; it will be a hdfs path if not local mode
          String tmpURIPath = Utilities.generatePath(tmpURItagbigBucketFileName);
          hashTable.isAbort();
          .printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath);
          // get the hashtable file and path
          Path path = new Path(tmpURIPath);
          FileSystem fs = path.getFileSystem();
          File file = new File(path.toUri().getPath());
          fs.create(path);
          fileLength = hashTable.flushMemoryCacheToPersistent(file);
          .printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath + " File size: "
              + fileLength);
          hashTable.close();
        }
      }
      super.closeOp(abort);
    } catch (Exception e) {
      .error("Generate Hashtable error");
      e.printStackTrace();
    }
  }

  
Implements the getName function for the Node Interface.

Returns:
the name of the operator
  public String getName() {
    return "HASHTABLESINK";
  }
  public OperatorType getType() {
  }
New to GrepCode? Check out our FAQ X