Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 
 package org.apache.hadoop.hive.ql.optimizer.physical;
 
 import java.io.File;
 import java.util.List;
 import java.util.Map;
 
 import  org.apache.hadoop.fs.Path;
GenMRSkewJoinProcessor.
 
 public final class GenMRSkewJoinProcessor {
 
   private GenMRSkewJoinProcessor() {
     // prevent instantiation
   }

  
Create tasks for processing skew joins. The idea is (HIVE-964) to use separated jobs and map-joins to handle skew joins.

  • Number of mr jobs to handle skew keys is the number of table minus 1 (we can stream the last table, so big keys in the last table will not be a problem).
  • At runtime in Join, we output big keys in one table into one corresponding directories, and all same keys in other tables into different dirs(one for each table). The directories will look like:
    • dir-T1-bigkeys(containing big keys in T1), dir-T2-keys(containing keys which is big in T1),dir-T3-keys(containing keys which is big in T1), ...
    • dir-T1-keys(containing keys which is big in T2), dir-T2-bigkeys(containing big keys in T2),dir-T3-keys(containing keys which is big in T2), ...
    • dir-T1-keys(containing keys which is big in T3), dir-T2-keys(containing big keys in T3),dir-T3-bigkeys(containing keys which is big in T3), ... .....
For each table, we launch one mapjoin job, taking the directory containing big keys in this table and corresponding dirs in other tables as input. (Actally one job for one row in the above.)

For more discussions, please check https://issues.apache.org/jira/browse/HIVE-964.

  public static void processSkewJoin(JoinOperator joinOp,
      Task<? extends SerializablecurrTaskParseContext parseCtx)
      throws SemanticException {
    // We are trying to adding map joins to handle skew keys, and map join right
    // now does not work with outer joins
    if (!GenMRSkewJoinProcessor.skewJoinEnabled(parseCtx.getConf(), joinOp)) {
      return;
    }
    String baseTmpDir = parseCtx.getContext().getMRTmpFileURI();
    JoinDesc joinDescriptor = joinOp.getConf();
    Map<ByteList<ExprNodeDesc>> joinValues = joinDescriptor.getExprs();
    int numAliases = joinValues.size();
    Map<ByteStringbigKeysDirMap = new HashMap<ByteString>();
    Map<ByteMap<ByteString>> smallKeysDirMap = new HashMap<ByteMap<ByteString>>();
    Map<ByteStringskewJoinJobResultsDir = new HashMap<ByteString>();
    Byte[] tags = joinDescriptor.getTagOrder();
    for (int i = 0; i < numAliasesi++) {
      Byte alias = tags[i];
      String bigKeysDir = getBigKeysDir(baseTmpDiralias);
      bigKeysDirMap.put(aliasbigKeysDir);
      Map<ByteStringsmallKeysMap = new HashMap<ByteString>();
      smallKeysDirMap.put(aliassmallKeysMap);
      for (Byte src2 : tags) {
        if (!src2.equals(alias)) {
          smallKeysMap.put(src2getSmallKeysDir(baseTmpDiraliassrc2));
        }
      }
      skewJoinJobResultsDir.put(aliasgetBigKeysSkewJoinResultDir(baseTmpDir,
          alias));
    }
    joinDescriptor.setHandleSkewJoin(true);
    joinDescriptor.setBigKeysDirMap(bigKeysDirMap);
    joinDescriptor.setSmallKeysDirMap(smallKeysDirMap);
    joinDescriptor.setSkewKeyDefinition(HiveConf.getIntVar(parseCtx.getConf(),
    HashMap<StringTask<? extends Serializable>> bigKeysDirToTaskMap =
      new HashMap<StringTask<? extends Serializable>>();
    List<SerializablelistWorks = new ArrayList<Serializable>();
    List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
    MapredWork currPlan = (MapredWorkcurrTask.getWork();
    TableDesc keyTblDesc = (TableDesccurrPlan.getKeyDesc().clone();
    List<StringjoinKeys = Utilities
        .getColumnNames(keyTblDesc.getProperties());
    List<StringjoinKeyTypes = Utilities.getColumnTypes(keyTblDesc
        .getProperties());
    Map<ByteTableDesctableDescList = new HashMap<ByteTableDesc>();
    Map<ByteList<ExprNodeDesc>> newJoinValues = new HashMap<ByteList<ExprNodeDesc>>();
    Map<ByteList<ExprNodeDesc>> newJoinKeys = new HashMap<ByteList<ExprNodeDesc>>();
    // used for create mapJoinDesc, should be in order
    List<TableDescnewJoinValueTblDesc = new ArrayList<TableDesc>();
    for (Byte tag : tags) {
      newJoinValueTblDesc.add(null);
    }
    for (int i = 0; i < numAliasesi++) {
      Byte alias = tags[i];
      List<ExprNodeDescvalueCols = joinValues.get(alias);
      String colNames = "";
      String colTypes = "";
      int columnSize = valueCols.size();
      List<ExprNodeDescnewValueExpr = new ArrayList<ExprNodeDesc>();
      List<ExprNodeDescnewKeyExpr = new ArrayList<ExprNodeDesc>();
      boolean first = true;
      for (int k = 0; k < columnSizek++) {
        TypeInfo type = valueCols.get(k).getTypeInfo();
        String newColName = i + "_VALUE_" + k// any name, it does not matter.
        newValueExpr
            .add(new ExprNodeColumnDesc(typenewColName"" + ifalse));
        if (!first) {
          colNames = colNames + ",";
          colTypes = colTypes + ",";
        }
        first = false;
        colNames = colNames + newColName;
        colTypes = colTypes + valueCols.get(k).getTypeString();
      }
      // we are putting join keys at last part of the spilled table
      for (int k = 0; k < joinKeys.size(); k++) {
        if (!first) {
          colNames = colNames + ",";
          colTypes = colTypes + ",";
        }
        first = false;
        colNames = colNames + joinKeys.get(k);
        colTypes = colTypes + joinKeyTypes.get(k);
        newKeyExpr.add(new ExprNodeColumnDesc(TypeInfoFactory
            .getPrimitiveTypeInfo(joinKeyTypes.get(k)), joinKeys.get(k),
            "" + ifalse));
      }
      newJoinValues.put(aliasnewValueExpr);
      newJoinKeys.put(aliasnewKeyExpr);
      tableDescList.put(alias, Utilities.getTableDesc(colNamescolTypes));
      // construct value table Desc
      String valueColNames = "";
      String valueColTypes = "";
      first = true;
      for (int k = 0; k < columnSizek++) {
        String newColName = i + "_VALUE_" + k// any name, it does not matter.
        if (!first) {
          valueColNames = valueColNames + ",";
          valueColTypes = valueColTypes + ",";
        }
        valueColNames = valueColNames + newColName;
        valueColTypes = valueColTypes + valueCols.get(k).getTypeString();
        first = false;
      }
      newJoinValueTblDesc.set(Byte.valueOf((bytei), Utilities.getTableDesc(
          valueColNamesvalueColTypes));
    }
    joinDescriptor.setSkewKeysValuesTables(tableDescList);
    joinDescriptor.setKeyTableDesc(keyTblDesc);
    for (int i = 0; i < numAliases - 1; i++) {
      Byte src = tags[i];
      MapredWork newPlan = PlanUtils.getMapRedWork();
      // This code has been only added for testing
      boolean mapperCannotSpanPartns =
        parseCtx.getConf().getBoolVar(
      newPlan.setMapperCannotSpanPartns(mapperCannotSpanPartns);
      MapredWork clonePlan = null;
      try {
        String xmlPlan = currPlan.toXML();
        StringBuilder sb = new StringBuilder(xmlPlan);
        ByteArrayInputStream bis;
        bis = new ByteArrayInputStream(sb.toString().getBytes("UTF-8"));
        clonePlan = Utilities.deserializeMapRedWork(bisparseCtx.getConf());
      } catch (UnsupportedEncodingException e) {
        throw new SemanticException(e);
      }
      Operator<? extends Serializable>[] parentOps = new TableScanOperator[tags.length];
      for (int k = 0; k < tags.lengthk++) {
        Operator<? extends Serializablets = OperatorFactory.get(
            TableScanDesc.class, (RowSchemanull);
        ((TableScanOperator)ts).setTableDesc(tableDescList.get((byte)k));
        parentOps[k] = ts;
      }
      Operator<? extends SerializabletblScan_op = parentOps[i];
      ArrayList<Stringaliases = new ArrayList<String>();
      String alias = src.toString();
      aliases.add(alias);
      String bigKeyDirPath = bigKeysDirMap.get(src);
      newPlan.getPathToAliases().put(bigKeyDirPathaliases);
      newPlan.getAliasToWork().put(aliastblScan_op);
      PartitionDesc part = new PartitionDesc(tableDescList.get(src), null);
      newPlan.getPathToPartitionInfo().put(bigKeyDirPathpart);
      newPlan.getAliasToPartnInfo().put(aliaspart);
      Operator<? extends Serializablereducer = clonePlan.getReducer();
      assert reducer instanceof JoinOperator;
      JoinOperator cloneJoinOp = (JoinOperatorreducer;
      MapJoinDesc mapJoinDescriptor = new MapJoinDesc(newJoinKeyskeyTblDesc,
          newJoinValuesnewJoinValueTblDescnewJoinValueTblDesc,joinDescriptor
          .getOutputColumnNames(), ijoinDescriptor.getConds(),
          joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin());
      mapJoinDescriptor.setTagOrder(tags);
      mapJoinDescriptor.setHandleSkewJoin(false);
      MapredLocalWork localPlan = new MapredLocalWork(
          new LinkedHashMap<StringOperator<? extends Serializable>>(),
          new LinkedHashMap<StringFetchWork>());
      Map<ByteStringsmallTblDirs = smallKeysDirMap.get(src);
      for (int j = 0; j < numAliasesj++) {
        if (j == i) {
          continue;
        }
        Byte small_alias = tags[j];
        Operator<? extends SerializabletblScan_op2 = parentOps[j];
        localPlan.getAliasToWork().put(small_alias.toString(), tblScan_op2);
        Path tblDir = new Path(smallTblDirs.get(small_alias));
        localPlan.getAliasToFetchWork().put(small_alias.toString(),
            new FetchWork(tblDir.toString(), tableDescList.get(small_alias)));
      }
      newPlan.setMapLocalWork(localPlan);
      // construct a map join and set it as the child operator of tblScan_op
      MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory
          .getAndMakeChild(mapJoinDescriptor, (RowSchemanullparentOps);
      // change the children of the original join operator to point to the map
      // join operator
      List<Operator<? extends Serializable>> childOps = cloneJoinOp
          .getChildOperators();
      for (Operator<? extends SerializablechildOp : childOps) {
        childOp.replaceParent(cloneJoinOpmapJoinOp);
      }
      mapJoinOp.setChildOperators(childOps);
      HiveConf jc = new HiveConf(parseCtx.getConf(),
          GenMRSkewJoinProcessor.class);
      newPlan.setNumMapTasks(HiveConf
      newPlan
      newPlan.setInputformat(HiveInputFormat.class.getName());
      Task<? extends SerializableskewJoinMapJoinTask = TaskFactory.get(
          newPlanjc);
      bigKeysDirToTaskMap.put(bigKeyDirPathskewJoinMapJoinTask);
      listWorks.add(skewJoinMapJoinTask.getWork());
      listTasks.add(skewJoinMapJoinTask);
    }
    ConditionalWork cndWork = new ConditionalWork(listWorks);
    ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork,
        parseCtx.getConf());
    cndTsk.setListTasks(listTasks);
    cndTsk
        bigKeysDirToTaskMap));
    List<Task<? extends Serializable>> oldChildTasks = currTask.getChildTasks();
    currTask.setChildTasks(new ArrayList<Task<? extends Serializable>>());
    currTask.addDependentTask(cndTsk);
    if (oldChildTasks != null) {
      for (Task<? extends Serializabletsk : cndTsk.getListTasks()) {
        for (Task<? extends SerializableoldChild : oldChildTasks) {
          tsk.addDependentTask(oldChild);
        }
      }
    }
    return;
  }
  public static boolean skewJoinEnabled(HiveConf confJoinOperator joinOp) {
    if (conf != null && !conf.getBoolVar(..)) {
      return false;
    }
    if (!joinOp.getConf().isNoOuterJoin()) {
      return false;
    }
    byte pos = 0;
    for (Byte tag : joinOp.getConf().getTagOrder()) {
      if (tag != pos) {
        return false;
      }
      pos++;
    }
    return true;
  }
  private static String skewJoinPrefix = "hive_skew_join";
  private static String UNDERLINE = "_";
  private static String BIGKEYS = "bigkeys";
  private static String SMALLKEYS = "smallkeys";
  private static String RESULTS = "results";
  static String getBigKeysDir(String baseDirByte srcTbl) {
    return baseDir + . +  +  + 
        +  + srcTbl;
  }
  static String getBigKeysSkewJoinResultDir(String baseDirByte srcTbl) {
    return baseDir + . +  +  + 
        +  +  +  + srcTbl;
  }
  static String getSmallKeysDir(String baseDirByte srcTblBigTbl,
      Byte srcTblSmallTbl) {
    return baseDir + . +  +  + 
        +  + srcTblBigTbl +  + srcTblSmallTbl;
  }
New to GrepCode? Check out our FAQ X