Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 package org.apache.hadoop.hive.ql.optimizer.physical;
 import java.util.List;
 import java.util.Map;
 public class CommonJoinResolver implements PhysicalPlanResolver {
     // create dispatcher and graph walker
     Dispatcher disp = new CommonJoinTaskDispatcher(pctx);
     TaskGraphWalker ogw = new TaskGraphWalker(disp);
     // get all the tasks nodes from root task
     ArrayList<NodetopNodes = new ArrayList<Node>();
     // begin to walk through the task tree.
     return pctx;

Iterator each tasks. If this task has a local work,create a new task for this local work, named MapredLocalTask. then make this new generated task depends on current task's parent task, and make current task depends on this new generated task
   class CommonJoinTaskDispatcher implements Dispatcher {
     private final PhysicalContext physicalContext;
     public CommonJoinTaskDispatcher(PhysicalContext context) {
        = context;
     private ConditionalTask processCurrentTask(MapRedTask currTaskConditionalTask conditionalTask)
         throws SemanticException {
       // whether it contains common join op; if contains, return this common join op
       JoinOperator joinOp = getJoinOp(currTask);
       if (joinOp == null) {
         return null;
       MapredWork currWork = currTask.getWork();
       // create conditional work list and task list
       List<SerializablelistWorks = new ArrayList<Serializable>();
       List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
       // create alias to task mapping and alias to input file mapping for resolver
      HashMap<StringTask<? extends Serializable>> aliasToTask = new HashMap<StringTask<? extends Serializable>>();
      HashMap<StringStringaliasToPath = new HashMap<StringString>();
      HashMap<StringArrayList<String>> pathToAliases = currTask.getWork().getPathToAliases();
      // get parseCtx for this Join Operator
      ParseContext parseCtx = .getParseContext();
      QBJoinTree joinTree = parseCtx.getJoinContext().get(joinOp);
      // start to generate multiple map join tasks
      JoinDesc joinDesc = joinOp.getConf();
      Byte[] order = joinDesc.getTagOrder();
      int numAliases = order.length;
      try {
        HashSet<IntegersmallTableOnlySet = MapJoinProcessor.getSmallTableOnlySet(joinDesc
        // no table could be the big table; there is no need to convert
        if (smallTableOnlySet == null) {
          return null;
        String xml = currWork.toXML();
        String bigTableAlias = null;
        if(smallTableOnlySet.size() == numAliases) {
          return null;
        for (int i = 0; i < numAliasesi++) {
          // this table cannot be big table
          if (smallTableOnlySet.contains(i)) {
          // create map join task and set big table as i
          // deep copy a new mapred work from xml
          InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
          MapredWork newWork = Utilities.deserializeMapRedWork(in.getConf());
          // create a mapred task for this work
          MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork
          JoinOperator newJoinOp = getJoinOp(newTask);
          // optimize this newWork and assume big table position is i
          bigTableAlias = MapJoinProcessor.genMapJoinOpAndLocalWork(newWorknewJoinOpi);
          // add into conditional task
          //set up backup task
          // put the mapping alias to task
          // set alias to path
          for (Map.Entry<StringArrayList<String>> entry : pathToAliases.entrySet()) {
            String path = entry.getKey();
            ArrayList<StringaliasList = entry.getValue();
            if (aliasList.contains(bigTableAlias)) {
      } catch (Exception e) {
        throw new SemanticException("Generate Map Join Task Error: " + e.getMessage());
      // insert current common join task to conditional task
      // clear JoinTree and OP Parse Context
      // create conditional task and insert conditional task into task tree
      ConditionalWork cndWork = new ConditionalWork(listWorks);
      ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWorkparseCtx.getConf());
      // set resolver and resolver context
      cndTsk.setResolver(new ConditionalResolverCommonJoin());
      //replace the current task with the new generated conditional task
      return cndTsk;
    private void replaceTaskWithConditionalTask(Task<? extends SerializablecurrTaskConditionalTask cndTskPhysicalContext physicalContext) {
      // add this task into task tree
      // set all parent tasks
      List<Task<? extends Serializable>> parentTasks = currTask.getParentTasks();
      if (parentTasks != null) {
        for (Task<? extends Serializabletsk : parentTasks) {
          // make new generated task depends on all the parent tasks of current task.
          // remove the current task from its original parent task's dependent task
      } else {
        // remove from current root task and add conditional task to root tasks
      // set all child tasks
      List<Task<? extends Serializable>> oldChildTasks = currTask.getChildTasks();
      if (oldChildTasks != null) {
        for (Task<? extends Serializabletsk : cndTsk.getListTasks()) {
          if (tsk.equals(currTask)) {
          for (Task<? extends SerializableoldChild : oldChildTasks) {
    public Object dispatch(Node ndStack<NodestackObject... nodeOutputs)
        throws SemanticException {
      if (nodeOutputs == null || nodeOutputs.length == 0) {
        throw new SemanticException("No Dispatch Context");
      TaskGraphWalkerContext walkerCtx = (TaskGraphWalkerContextnodeOutputs[0];
      Task<? extends SerializablecurrTask = (Task<? extends Serializable>) nd;
      // not map reduce task or not conditional task, just skip
      if (currTask.isMapRedTask()) {
        if (currTask instanceof ConditionalTask) {
          // get the list of task
          List<Task<? extends Serializable>> taskList = ((ConditionalTaskcurrTask).getListTasks();
          for (Task<? extends Serializabletsk : taskList) {
            if (tsk.isMapRedTask()) {
              ConditionalTask cndTask = this.processCurrentTask((MapRedTasktsk,
        } else {
          ConditionalTask cndTask = this.processCurrentTask((MapRedTaskcurrTasknull);
      return null;
    private JoinOperator getJoinOp(MapRedTask taskthrows SemanticException {
      if (task.getWork() == null) {
        return null;
      Operator<? extends SerializablereducerOp = task.getWork().getReducer();
      if (reducerOp instanceof JoinOperator) {
        return (JoinOperatorreducerOp;
      } else {
        return null;
New to GrepCode? Check out our FAQ X