Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package org.apache.hadoop.hive.ql.optimizer.physical;
 
 import java.util.List;
 import java.util.Map;
 
 
 
 public class CommonJoinResolver implements PhysicalPlanResolver {
   @Override
 
     // create dispatcher and graph walker
     Dispatcher disp = new CommonJoinTaskDispatcher(pctx);
     TaskGraphWalker ogw = new TaskGraphWalker(disp);
 
     // get all the tasks nodes from root task
     ArrayList<NodetopNodes = new ArrayList<Node>();
     topNodes.addAll(pctx.rootTasks);
 
     // begin to walk through the task tree.
     ogw.startWalking(topNodesnull);
     return pctx;
   }


  
Iterator each tasks. If this task has a local work,create a new task for this local work, named MapredLocalTask. then make this new generated task depends on current task's parent task, and make current task depends on this new generated task
 
   class CommonJoinTaskDispatcher implements Dispatcher {
 
     private final PhysicalContext physicalContext;
 
     public CommonJoinTaskDispatcher(PhysicalContext context) {
       super();
        = context;
     }
 
     private ConditionalTask processCurrentTask(MapRedTask currTaskConditionalTask conditionalTask)
         throws SemanticException {
 
       // whether it contains common join op; if contains, return this common join op
       JoinOperator joinOp = getJoinOp(currTask);
       if (joinOp == null) {
         return null;
       }
       currTask.setTaskTag(.);
 
       MapredWork currWork = currTask.getWork();
       // create conditional work list and task list
       List<SerializablelistWorks = new ArrayList<Serializable>();
       List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
 
       // create alias to task mapping and alias to input file mapping for resolver
      HashMap<StringTask<? extends Serializable>> aliasToTask = new HashMap<StringTask<? extends Serializable>>();
      HashMap<StringStringaliasToPath = new HashMap<StringString>();
      HashMap<StringArrayList<String>> pathToAliases = currTask.getWork().getPathToAliases();
      // get parseCtx for this Join Operator
      ParseContext parseCtx = .getParseContext();
      QBJoinTree joinTree = parseCtx.getJoinContext().get(joinOp);
      // start to generate multiple map join tasks
      JoinDesc joinDesc = joinOp.getConf();
      Byte[] order = joinDesc.getTagOrder();
      int numAliases = order.length;
      try {
        HashSet<IntegersmallTableOnlySet = MapJoinProcessor.getSmallTableOnlySet(joinDesc
            .getConds());
        // no table could be the big table; there is no need to convert
        if (smallTableOnlySet == null) {
          return null;
        }
        currWork.setOpParseCtxMap(parseCtx.getOpParseCtx());
        currWork.setJoinTree(joinTree);
        String xml = currWork.toXML();
        String bigTableAlias = null;
        if(smallTableOnlySet.size() == numAliases) {
          return null;
        }
        for (int i = 0; i < numAliasesi++) {
          // this table cannot be big table
          if (smallTableOnlySet.contains(i)) {
            continue;
          }
          // create map join task and set big table as i
          // deep copy a new mapred work from xml
          InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
          MapredWork newWork = Utilities.deserializeMapRedWork(in.getConf());
          // create a mapred task for this work
          MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork
              .getParseContext().getConf());
          JoinOperator newJoinOp = getJoinOp(newTask);
          // optimize this newWork and assume big table position is i
          bigTableAlias = MapJoinProcessor.genMapJoinOpAndLocalWork(newWorknewJoinOpi);
          // add into conditional task
          listWorks.add(newWork);
          listTasks.add(newTask);
          newTask.setTaskTag(.);
          //set up backup task
          newTask.setBackupTask(currTask);
          newTask.setBackupChildrenTasks(currTask.getChildTasks());
          // put the mapping alias to task
          aliasToTask.put(bigTableAliasnewTask);
          // set alias to path
          for (Map.Entry<StringArrayList<String>> entry : pathToAliases.entrySet()) {
            String path = entry.getKey();
            ArrayList<StringaliasList = entry.getValue();
            if (aliasList.contains(bigTableAlias)) {
              aliasToPath.put(bigTableAliaspath);
            }
          }
        }
      } catch (Exception e) {
        e.printStackTrace();
        throw new SemanticException("Generate Map Join Task Error: " + e.getMessage());
      }
      // insert current common join task to conditional task
      listWorks.add(currTask.getWork());
      listTasks.add(currTask);
      // clear JoinTree and OP Parse Context
      currWork.setOpParseCtxMap(null);
      currWork.setJoinTree(null);
      // create conditional task and insert conditional task into task tree
      ConditionalWork cndWork = new ConditionalWork(listWorks);
      ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWorkparseCtx.getConf());
      cndTsk.setListTasks(listTasks);
      // set resolver and resolver context
      cndTsk.setResolver(new ConditionalResolverCommonJoin());
      resolverCtx.setAliasToPath(aliasToPath);
      resolverCtx.setAliasToTask(aliasToTask);
      resolverCtx.setCommonJoinTask(currTask);
      cndTsk.setResolverCtx(resolverCtx);
      //replace the current task with the new generated conditional task
      this.replaceTaskWithConditionalTask(currTaskcndTsk);
      return cndTsk;
    }
    private void replaceTaskWithConditionalTask(Task<? extends SerializablecurrTaskConditionalTask cndTskPhysicalContext physicalContext) {
      // add this task into task tree
      // set all parent tasks
      List<Task<? extends Serializable>> parentTasks = currTask.getParentTasks();
      currTask.setParentTasks(null);
      if (parentTasks != null) {
        for (Task<? extends Serializabletsk : parentTasks) {
          // make new generated task depends on all the parent tasks of current task.
          tsk.addDependentTask(cndTsk);
          // remove the current task from its original parent task's dependent task
          tsk.removeDependentTask(currTask);
        }
      } else {
        // remove from current root task and add conditional task to root tasks
        physicalContext.removeFromRootTask(currTask);
        physicalContext.addToRootTask(cndTsk);
      }
      // set all child tasks
      List<Task<? extends Serializable>> oldChildTasks = currTask.getChildTasks();
      if (oldChildTasks != null) {
        for (Task<? extends Serializabletsk : cndTsk.getListTasks()) {
          if (tsk.equals(currTask)) {
            continue;
          }
          for (Task<? extends SerializableoldChild : oldChildTasks) {
            tsk.addDependentTask(oldChild);
          }
        }
      }
    }
    @Override
    public Object dispatch(Node ndStack<NodestackObject... nodeOutputs)
        throws SemanticException {
      if (nodeOutputs == null || nodeOutputs.length == 0) {
        throw new SemanticException("No Dispatch Context");
      }
      TaskGraphWalkerContext walkerCtx = (TaskGraphWalkerContextnodeOutputs[0];
      Task<? extends SerializablecurrTask = (Task<? extends Serializable>) nd;
      // not map reduce task or not conditional task, just skip
      if (currTask.isMapRedTask()) {
        if (currTask instanceof ConditionalTask) {
          // get the list of task
          List<Task<? extends Serializable>> taskList = ((ConditionalTaskcurrTask).getListTasks();
          for (Task<? extends Serializabletsk : taskList) {
            if (tsk.isMapRedTask()) {
              ConditionalTask cndTask = this.processCurrentTask((MapRedTasktsk,
                  ((ConditionalTaskcurrTask));
              walkerCtx.addToDispatchList(cndTask);
            }
          }
        } else {
          ConditionalTask cndTask = this.processCurrentTask((MapRedTaskcurrTasknull);
          walkerCtx.addToDispatchList(cndTask);
        }
      }
      return null;
    }
    private JoinOperator getJoinOp(MapRedTask taskthrows SemanticException {
      if (task.getWork() == null) {
        return null;
      }
      Operator<? extends SerializablereducerOp = task.getWork().getReducer();
      if (reducerOp instanceof JoinOperator) {
        return (JoinOperatorreducerOp;
      } else {
        return null;
      }
    }
  }
New to GrepCode? Check out our FAQ X