Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 
 package org.apache.hadoop.hive.ql.optimizer.physical;
 
 import java.util.List;
 import java.util.Map;
 
An implementation of PhysicalPlanResolver. It iterator each MapRedTask to see whether the task has a local map work if it has, it will move the local work to a new local map join task. Then it will make this new generated task depends on current task's parent task and make current task depends on this new generated task.
 
 public class MapJoinResolver implements PhysicalPlanResolver {
   @Override
 
     // create dispatcher and graph walker
     Dispatcher disp = new LocalMapJoinTaskDispatcher(pctx);
     TaskGraphWalker ogw = new TaskGraphWalker(disp);
 
     // get all the tasks nodes from root task
     ArrayList<NodetopNodes = new ArrayList<Node>();
     topNodes.addAll(pctx.rootTasks);
 
     // begin to walk through the task tree.
     ogw.startWalking(topNodesnull);
     return pctx;
   }

  
Iterator each tasks. If this task has a local work,create a new task for this local work, named MapredLocalTask. then make this new generated task depends on current task's parent task, and make current task depends on this new generated task
 
   class LocalMapJoinTaskDispatcher implements Dispatcher {
 
     private PhysicalContext physicalContext;
 
     public LocalMapJoinTaskDispatcher(PhysicalContext context) {
       super();
        = context;
     }
 
     private void processCurrentTask(Task<? extends SerializablecurrTask,
         ConditionalTask conditionalTaskthrows SemanticException {
       // get current mapred work and its local work
       MapredWork mapredWork = (MapredWorkcurrTask.getWork();
       MapredLocalWork localwork = mapredWork.getMapLocalWork();
       if (localwork != null) {
        // get the context info and set up the shared tmp URI
        Context ctx = .getContext();
        String tmpFileURI = Utilities.generateTmpURI(ctx.getLocalTmpFileURI(), currTask.getId());
        localwork.setTmpFileURI(tmpFileURI);
        String hdfsTmpURI = Utilities.generateTmpURI(ctx.getMRTmpFileURI(), currTask.getId());
        mapredWork.setTmpHDFSFileURI(hdfsTmpURI);
        // create a task for this local work; right now, this local work is shared
        // by the original MapredTask and this new generated MapredLocalTask.
        MapredLocalTask localTask = (MapredLocalTask) TaskFactory.get(localwork
            .getParseContext().getConf());
        // set the backup task from curr task
        localTask.setBackupTask(currTask.getBackupTask());
        localTask.setBackupChildrenTasks(currTask.getBackupChildrenTasks());
        currTask.setBackupChildrenTasks(null);
        currTask.setBackupTask(null);
        if (currTask.getTaskTag() == .) {
          localTask.setTaskTag(.);
        } else {
          localTask.setTaskTag(.);
        }
        // replace the map join operator to local_map_join operator in the operator tree
        // and return all the dummy parent
        LocalMapJoinProcCtx  localMapJoinProcCtxadjustLocalTask(localTask);
        List<Operator<? extends Serializable>> dummyOps = localMapJoinProcCtx.getDummyParentOp();
        // create new local work and setup the dummy ops
        MapredLocalWork newLocalWork = new MapredLocalWork();
        newLocalWork.setDummyParentOp(dummyOps);
        newLocalWork.setTmpFileURI(tmpFileURI);
        newLocalWork.setInputFileChangeSensitive(localwork.getInputFileChangeSensitive());
        mapredWork.setMapLocalWork(newLocalWork);
        // get all parent tasks
        List<Task<? extends Serializable>> parentTasks = currTask.getParentTasks();
        currTask.setParentTasks(null);
        if (parentTasks != null) {
          for (Task<? extends Serializabletsk : parentTasks) {
            // make new generated task depends on all the parent tasks of current task.
            tsk.addDependentTask(localTask);
            // remove the current task from its original parent task's dependent task
            tsk.removeDependentTask(currTask);
          }
        } else {
          // in this case, current task is in the root tasks
          // so add this new task into root tasks and remove the current task from root tasks
          if (conditionalTask == null) {
            .addToRootTask(localTask);
            .removeFromRootTask(currTask);
          } else {
            // set list task
            List<Task<? extends Serializable>> listTask = conditionalTask.getListTasks();
            ConditionalWork conditionalWork = conditionalTask.getWork();
            int index = listTask.indexOf(currTask);
            listTask.set(indexlocalTask);
            // set list work
            List<SerializablelistWork = (List<Serializable>) conditionalWork.getListWorks();
            index = listWork.indexOf(mapredWork);
            listWork.set(index, (Serializablelocalwork);
            conditionalWork.setListWorks(listWork);
            ConditionalResolver resolver = conditionalTask.getResolver();
            if (resolver instanceof ConditionalResolverSkewJoin) {
              // get bigKeysDirToTaskMap
              ConditionalResolverSkewJoinCtx context = (ConditionalResolverSkewJoinCtxconditionalTask
                  .getResolverCtx();
              HashMap<StringTask<? extends Serializable>> bigKeysDirToTaskMap = context
                  .getDirToTaskMap();
              // to avoid concurrent modify the hashmap
              HashMap<StringTask<? extends Serializable>> newbigKeysDirToTaskMap = new HashMap<StringTask<? extends Serializable>>();
              // reset the resolver
              for (Map.Entry<StringTask<? extends Serializable>> entry : bigKeysDirToTaskMap
                  .entrySet()) {
                Task<? extends Serializabletask = entry.getValue();
                String key = entry.getKey();
                if (task.equals(currTask)) {
                  newbigKeysDirToTaskMap.put(keylocalTask);
                } else {
                  newbigKeysDirToTaskMap.put(keytask);
                }
              }
              context.setDirToTaskMap(newbigKeysDirToTaskMap);
              conditionalTask.setResolverCtx(context);
            } else if (resolver instanceof ConditionalResolverCommonJoin) {
              // get bigKeysDirToTaskMap
              ConditionalResolverCommonJoinCtx context = (ConditionalResolverCommonJoinCtxconditionalTask
                  .getResolverCtx();
              HashMap<StringTask<? extends Serializable>> aliasToWork = context.getAliasToTask();
              // to avoid concurrent modify the hashmap
              HashMap<StringTask<? extends Serializable>> newAliasToWork = new HashMap<StringTask<? extends Serializable>>();
              // reset the resolver
              for (Map.Entry<StringTask<? extends Serializable>> entry : aliasToWork.entrySet()) {
                Task<? extends Serializabletask = entry.getValue();
                String key = entry.getKey();
                if (task.equals(currTask)) {
                  newAliasToWork.put(keylocalTask);
                } else {
                  newAliasToWork.put(keytask);
                }
              }
              context.setAliasToTask(newAliasToWork);
              conditionalTask.setResolverCtx(context);
            }
          }
        }
        // make current task depends on this new generated localMapJoinTask
        // now localTask is the parent task of the current task
        localTask.addDependentTask(currTask);
      }
    }
    @Override
    public Object dispatch(Node ndStack<NodestackObject... nodeOutputs)
        throws SemanticException {
      Task<? extends SerializablecurrTask = (Task<? extends Serializable>) nd;
      // not map reduce task or not conditional task, just skip
      if (currTask.isMapRedTask()) {
        if (currTask instanceof ConditionalTask) {
          // get the list of task
          List<Task<? extends Serializable>> taskList = ((ConditionalTaskcurrTask).getListTasks();
          for (Task<? extends Serializabletsk : taskList) {
            if (tsk.isMapRedTask()) {
              this.processCurrentTask(tsk, ((ConditionalTaskcurrTask));
            }
          }
        } else {
          this.processCurrentTask(currTasknull);
        }
      }
      return null;
    }
    // replace the map join operator to local_map_join operator in the operator tree
        throws SemanticException {
      LocalMapJoinProcCtx localMapJoinProcCtx = new LocalMapJoinProcCtx(task
          .getParseContext());
      Map<RuleNodeProcessoropRules = new LinkedHashMap<RuleNodeProcessor>();
      opRules.put(new RuleRegExp("R1""MAPJOIN%"), LocalMapJoinProcFactory.getJoinProc());
      // The dispatcher fires the processor corresponding to the closest
      // matching rule and passes the context along
      Dispatcher disp = new DefaultRuleDispatcher(LocalMapJoinProcFactory.getDefaultProc(),
          opRuleslocalMapJoinProcCtx);
      GraphWalker ogw = new DefaultGraphWalker(disp);
      // iterator the reducer operator tree
      ArrayList<NodetopNodes = new ArrayList<Node>();
      topNodes.addAll(task.getWork().getAliasToWork().values());
      ogw.startWalking(topNodesnull);
      return localMapJoinProcCtx;
    }
      return ;
    }
    public void setPhysicalContext(PhysicalContext physicalContext) {
      this. = physicalContext;
    }
  }

  
A container of current task and parse context.
  public static class LocalMapJoinProcCtx implements NodeProcessorCtx {
    private Task<? extends SerializablecurrentTask;
    private ParseContext parseCtx;
    private List<Operator<? extends Serializable>> dummyParentOp = null;
    private boolean isFollowedByGroupBy;
    public LocalMapJoinProcCtx(Task<? extends SerializabletaskParseContext parseCtx) {
       = task;
      this. = parseCtx;
       = new ArrayList<Operator<? extends Serializable>>();
       = false;
    }
    public Task<? extends SerializablegetCurrentTask() {
      return ;
    }
    public void setCurrentTask(Task<? extends SerializablecurrentTask) {
      this. = currentTask;
    }
    public boolean isFollowedByGroupBy() {
      return ;
    }
    public void setFollowedByGroupBy(boolean isFollowedByGroupBy) {
      this. = isFollowedByGroupBy;
    }
    public ParseContext getParseCtx() {
      return ;
    }
    public void setParseCtx(ParseContext parseCtx) {
      this. = parseCtx;
    }
    public void setDummyParentOp(List<Operator<? extends Serializable>> op) {
      this. = op;
    }
    public List<Operator<? extends Serializable>> getDummyParentOp() {
      return this.;
    }
    public void addDummyParentOp(Operator<? extends Serializableop) {
      this..add(op);
    }
  }
New to GrepCode? Check out our FAQ X