Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package org.apache.hadoop.hive.ql.ppd;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import  org.apache.hadoop.mapred.JobConf;

Operator factory for predicate pushdown processing of operator graph Each operator determines the pushdown predicates by walking the expression tree. Each operator merges its own pushdown predicates with those of its children Finally the TableScan operator gathers all the predicates and inserts a filter operator after itself. TODO: Further optimizations 1) Multi-insert case 2) Create a filter operator for those predicates that couldn't be pushed to the previous operators in the data flow 3) Merge multiple sequential filter predicates into so that plans are more readable 4) Remove predicates from filter operators that have been pushed. Currently these pushed predicates are evaluated twice.
 
 public final class OpProcFactory {
 
   protected static final Log LOG = LogFactory.getLog(OpProcFactory.class
     .getName());

  
Processor for Script Operator Prevents any predicates being pushed.
 
   public static class ScriptPPD extends DefaultPPD implements NodeProcessor {
 
     @Override
     public Object process(Node ndStack<NodestackNodeProcessorCtx procCtx,
         Object... nodeOutputsthrows SemanticException {
       .info("Processing for " + nd.getName() + "("
           + ((Operatornd).getIdentifier() + ")");
       // script operator is a black-box to hive so no optimization here
       // assuming that nothing can be pushed above the script op
       // same with LIMIT op
       return null;
     }
 
   }
 
   public static class LateralViewForwardPPD extends DefaultPPD implements NodeProcessor {
 
     @Override
     public Object process(Node ndStack<NodestackNodeProcessorCtx procCtx,
        Object... nodeOutputsthrows SemanticException {
      .info("Processing for " + nd.getName() + "("
          + ((Operatornd).getIdentifier() + ")");
      OpWalkerInfo owi = (OpWalkerInfoprocCtx;
      ExprWalkerInfo childPreds = owi
      .getPrunedPreds((Operator<? extends Serializable>) nd.getChildren()
      .get(0));
      owi.putPrunedPreds((Operator<? extends Serializable>) ndchildPreds);
      return null;
    }
  }

  
Combines predicates of its child into a single expression and adds a filter op as new child.
  public static class TableScanPPD extends DefaultPPD implements NodeProcessor {
    @Override
    public Object process(Node ndStack<NodestackNodeProcessorCtx procCtx,
        Object... nodeOutputsthrows SemanticException {
      .info("Processing for " + nd.getName() + "("
          + ((Operatornd).getIdentifier() + ")");
      OpWalkerInfo owi = (OpWalkerInfoprocCtx;
      TableScanOperator tsOp = (TableScanOperatornd;
      mergeWithChildrenPred(tsOpowinullnullfalse);
      ExprWalkerInfo pushDownPreds = owi.getPrunedPreds(tsOp);
      return createFilter(tsOppushDownPredsowi);
    }
  }

  
Determines the push down predicates in its where expression and then combines it with the push down predicates that are passed from its children.
  public static class FilterPPD extends DefaultPPD implements NodeProcessor {
    @Override
    public Object process(Node ndStack<NodestackNodeProcessorCtx procCtx,
        Object... nodeOutputsthrows SemanticException {
      .info("Processing for " + nd.getName() + "("
          + ((Operatornd).getIdentifier() + ")");
      OpWalkerInfo owi = (OpWalkerInfoprocCtx;
      Operator<? extends Serializableop = (Operator<? extends Serializable>) nd;
      ExprNodeDesc predicate = (((FilterOperatornd).getConf()).getPredicate();
      // get pushdown predicates for this operator's predicate
      ExprWalkerInfo ewi = ExprWalkerProcFactory.extractPushdownPreds(owiop,
          predicate);
      if (!ewi.isDeterministic()) {
        /* predicate is not deterministic */
        if (op.getChildren() != null && op.getChildren().size() == 1) {
          createFilter(opowi
              .getPrunedPreds((Operator<? extends Serializable>) (op
              .getChildren().get(0))), owi);
        }
        return null;
      }
      logExpr(ndewi);
      owi.putPrunedPreds(opewi);
      // merge it with children predicates
      mergeWithChildrenPred(opowiewinullfalse);
      return null;
    }
  }

  
Determines predicates for which alias can be pushed to it's parents. See the comments for getQualifiedAliases function.
  public static class JoinPPD extends DefaultPPD implements NodeProcessor {
    @Override
    public Object process(Node ndStack<NodestackNodeProcessorCtx procCtx,
        Object... nodeOutputsthrows SemanticException {
      .info("Processing for " + nd.getName() + "("
          + ((Operatornd).getIdentifier() + ")");
      OpWalkerInfo owi = (OpWalkerInfoprocCtx;
      Set<Stringaliases = getQualifiedAliases((JoinOperatorndowi
          .getRowResolver(nd));
      mergeWithChildrenPred(ndowinullaliasesfalse);
      return null;
    }

    
Figures out the aliases for whom it is safe to push predicates based on ANSI SQL semantics For inner join, all predicates for all aliases can be pushed For full outer join, none of the predicates can be pushed as that would limit the number of rows for join For left outer join, all the predicates on the left side aliases can be pushed up For right outer join, all the predicates on the right side aliases can be pushed up Joins chain containing both left and right outer joins are treated as full outer join. TODO: further optimization opportunity for the case a.c1 = b.c1 and b.c2 = c.c2 a and b are first joined and then the result with c. But the second join op currently treats a and b as separate aliases and thus disallowing predicate expr containing both tables a and b (such as a.c3 + a.c4 > 20). Such predicates also can be pushed just above the second join and below the first join

Parameters:
op Join Operator
rr Row resolver
Returns:
set of qualified aliases
    private Set<StringgetQualifiedAliases(JoinOperator opRowResolver rr) {
      Set<Stringaliases = new HashSet<String>();
      int loj = .;
      int roj = -1;
      boolean oj = false;
      JoinCondDesc[] conds = op.getConf().getConds();
      Map<IntegerSet<String>> posToAliasMap = op.getPosToAliasMap();
      for (JoinCondDesc jc : conds) {
        if (jc.getType() == .) {
          oj = true;
          break;
        } else if (jc.getType() == .) {
          if (jc.getLeft() < loj) {
            loj = jc.getLeft();
          }
        } else if (jc.getType() == .) {
          if (jc.getRight() > roj) {
            roj = jc.getRight();
          }
        }
      }
      if (oj || (loj != . && roj != -1)) {
        return aliases;
      }
      for (Entry<IntegerSet<String>> pa : posToAliasMap.entrySet()) {
        if (loj != .) {
          if (pa.getKey() <= loj) {
            aliases.addAll(pa.getValue());
          }
        } else if (roj != -1) {
          if (pa.getKey() >= roj) {
            aliases.addAll(pa.getValue());
          }
        } else {
          aliases.addAll(pa.getValue());
        }
      }
      Set<Stringaliases2 = rr.getTableNames();
      aliases.retainAll(aliases2);
      return aliases;
    }
  }

  
Processor for ReduceSink operator.
  public static class ReduceSinkPPD extends DefaultPPD implements NodeProcessor {
    @Override
    public Object process(Node ndStack<NodestackNodeProcessorCtx procCtx,
        Object... nodeOutputsthrows SemanticException {
      .info("Processing for " + nd.getName() + "("
          + ((Operatornd).getIdentifier() + ")");
      OpWalkerInfo owi = (OpWalkerInfoprocCtx;
      Set<Stringaliases = owi.getRowResolver(nd).getTableNames();
      boolean ignoreAliases = false;
      if (aliases.size() == 1 && aliases.contains("")) {
        // Reduce sink of group by operator
        ignoreAliases = true;
      }
      mergeWithChildrenPred(ndowinullaliasesignoreAliases);
      return null;
    }
  }

  
Default processor which just merges its children.
  public static class DefaultPPD implements NodeProcessor {
    @Override
    public Object process(Node ndStack<NodestackNodeProcessorCtx procCtx,
        Object... nodeOutputsthrows SemanticException {
      .info("Processing for " + nd.getName() + "("
          + ((Operatornd).getIdentifier() + ")");
      mergeWithChildrenPred(nd, (OpWalkerInfoprocCtxnullnullfalse);
      return null;
    }

    

Parameters:
nd
ewi
    protected void logExpr(Node ndExprWalkerInfo ewi) {
      for (Entry<StringList<ExprNodeDesc>> e : ewi.getFinalCandidates()
          .entrySet()) {
        .info("Pushdown Predicates of " + nd.getName() + " For Alias : "
            + e.getKey());
        for (ExprNodeDesc n : e.getValue()) {
          .info("\t" + n.getExprString());
        }
      }
    }

    
Take current operators pushdown predicates and merges them with children's pushdown predicates.

Parameters:
nd current operator
owi operator context during this walk
ewi pushdown predicates (part of expression walker info)
aliases aliases that this operator can pushdown. null means that all aliases can be pushed down
ignoreAliases
Throws:
SemanticException
    protected void mergeWithChildrenPred(Node ndOpWalkerInfo owi,
        ExprWalkerInfo ewiSet<Stringaliasesboolean ignoreAliases)
        throws SemanticException {
      if (nd.getChildren() == null || nd.getChildren().size() > 1) {
        // ppd for multi-insert query is not yet implemented
        // no-op for leafs
        return;
      }
      Operator<? extends Serializableop = (Operator<? extends Serializable>) nd;
      ExprWalkerInfo childPreds = owi
          .getPrunedPreds((Operator<? extends Serializable>) nd.getChildren()
          .get(0));
      if (childPreds == null) {
        return;
      }
      if (ewi == null) {
        ewi = new ExprWalkerInfo();
      }
      for (Entry<StringList<ExprNodeDesc>> e : childPreds
          .getFinalCandidates().entrySet()) {
        if (ignoreAliases || aliases == null || aliases.contains(e.getKey())
            || e.getKey() == null) {
          // e.getKey() (alias) can be null in case of constant expressions. see
          // input8.q
          ExprWalkerInfo extractPushdownPreds = ExprWalkerProcFactory
              .extractPushdownPreds(owiope.getValue());
          ewi.merge(extractPushdownPreds);
          logExpr(ndextractPushdownPreds);
        }
      }
      owi.putPrunedPreds((Operator<? extends Serializable>) ndewi);
    }
  }
  protected static Object createFilter(Operator op,
      ExprWalkerInfo pushDownPredsOpWalkerInfo owi) {
    if (pushDownPreds == null || pushDownPreds.getFinalCandidates() == null
        || pushDownPreds.getFinalCandidates().size() == 0) {
      return null;
    }
    RowResolver inputRR = owi.getRowResolver(op);
    // combine all predicates into a single expression
    List<ExprNodeDescpreds = null;
    ExprNodeDesc condn = null;
    Iterator<List<ExprNodeDesc>> iterator = pushDownPreds.getFinalCandidates()
        .values().iterator();
    while (iterator.hasNext()) {
      preds = iterator.next();
      int i = 0;
      if (condn == null) {
        condn = preds.get(0);
        i++;
      }
      for (; i < preds.size(); i++) {
        List<ExprNodeDescchildren = new ArrayList<ExprNodeDesc>(2);
        children.add(condn);
        children.add(preds.get(i));
            FunctionRegistry.getGenericUDFForAnd(), children);
      }
    }
    if (condn == null) {
      return null;
    }
    
    if (op instanceof TableScanOperator) {
      boolean pushFilterToStorage;
      HiveConf hiveConf = owi.getParseContext().getConf();
      pushFilterToStorage =
      if (pushFilterToStorage) {
        condn = pushFilterToStorageHandler(
          (TableScanOperatorop,
          condn,
          owi,
          hiveConf);
        if (condn == null) {
          // we pushed the whole thing down
          return null;
        }
      }
    }
    // add new filter op
    List<Operator<? extends Serializable>> originalChilren = op
        .getChildOperators();
    op.setChildOperators(null);
    Operator<FilterDescoutput = OperatorFactory.getAndMakeChild(
        new FilterDesc(condnfalse), new RowSchema(inputRR.getColumnInfos()),
        op);
    output.setChildOperators(originalChilren);
    for (Operator<? extends Serializablech : originalChilren) {
      List<Operator<? extends Serializable>> parentOperators = ch
          .getParentOperators();
      int pos = parentOperators.indexOf(op);
      assert pos != -1;
      parentOperators.remove(pos);
      parentOperators.add(posoutput); // add the new op as the old
    }
    OpParseContext ctx = new OpParseContext(inputRR);
    owi.put(outputctx);
    return output;
  }

  
Attempts to push a predicate down into a storage handler. For native tables, this is a no-op.

Parameters:
tableScanOp table scan against which predicate applies
originalPredicate predicate to be pushed down
owi object walk info
hiveConf Hive configuration
Returns:
portion of predicate which needs to be evaluated by Hive as a post-filter, or null if it was possible to push down the entire predicate
    TableScanOperator tableScanOp,
    ExprNodeDesc originalPredicate,
    OpWalkerInfo owi,
    HiveConf hiveConf) {
    TableScanDesc tableScanDesc = tableScanOp.getConf();
    Table tbl = owi.getParseContext().getTopToTable().get(tableScanOp);
    if (!tbl.isNonNative()) {
      return originalPredicate;
    }
    HiveStorageHandler storageHandler = tbl.getStorageHandler();
    if (!(storageHandler instanceof HiveStoragePredicateHandler)) {
      // The storage handler does not provide predicate decomposition
      // support, so we'll implement the entire filter in Hive.  However,
      // we still provide the full predicate to the storage handler in
      // case it wants to do any of its own prefiltering.
      tableScanDesc.setFilterExpr(originalPredicate);
      return originalPredicate;
    }
    HiveStoragePredicateHandler predicateHandler =
      (HiveStoragePredicateHandlerstorageHandler;
    JobConf jobConf = new JobConf(owi.getParseContext().getConf());
    Utilities.setColumnNameList(jobConftableScanOp);
      Utilities.getTableDesc(tbl),
      jobConf);
    Deserializer deserializer = tbl.getDeserializer();
      predicateHandler.decomposePredicate(
        jobConf,
        deserializer,
        originalPredicate);
    if (decomposed == null) {
      // not able to push anything down
      if (.isDebugEnabled()) {
        .debug("No pushdown possible for predicate:  "
          + originalPredicate.getExprString());
      }
      return originalPredicate;
    }
    if (.isDebugEnabled()) {
      .debug("Original predicate:  "
        + originalPredicate.getExprString());
      if (decomposed.pushedPredicate != null) {
        .debug(
          "Pushed predicate:  "
          + decomposed.pushedPredicate.getExprString());
      }
      if (decomposed.residualPredicate != null) {
        .debug(
          "Residual predicate:  "
          + decomposed.residualPredicate.getExprString());
      }
    }
    tableScanDesc.setFilterExpr(decomposed.pushedPredicate);
    return decomposed.residualPredicate;
  }
  
  public static NodeProcessor getFilterProc() {
    return new FilterPPD();
  }
  public static NodeProcessor getJoinProc() {
    return new JoinPPD();
  }
  public static NodeProcessor getRSProc() {
    return new ReduceSinkPPD();
  }
  public static NodeProcessor getTSProc() {
    return new TableScanPPD();
  }
  public static NodeProcessor getDefaultProc() {
    return new DefaultPPD();
  }
  public static NodeProcessor getSCRProc() {
    return new ScriptPPD();
  }
  public static NodeProcessor getLIMProc() {
    return new ScriptPPD();
  }
  public static NodeProcessor getUDTFProc() {
    return new ScriptPPD();
  }
  public static NodeProcessor getLVFProc() {
    return new LateralViewForwardPPD();
  }
  private OpProcFactory() {
    // prevent instantiation
  }
New to GrepCode? Check out our FAQ X