Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.util.*;
 
An optimizer that removes unnecessary temp stores (Stores generated by the MRCompiler to bridge different jobs - even though a real store is produced from the same data). The pattern looks like this: ------------- Split --------- | | Store(InterStorage) Store(StoreFunc) Followed by a load of the tmp store in a dependent MapReduceOper. This optmizer removes the store, collapses the split if only one branch remains and adjusts the loads to load from the real store. The situation is produced by something we do to the logical plan in PigServer. There we change: PreOp | Store Load | PostOp To: PreOp | / \ / \ PostOp Store If there is a job boundary between pre and post we will end up in this case.
 
 class NoopStoreRemover extends MROpPlanVisitor {
     
     private Log log = LogFactory.getLog(getClass());
 
     private Map<StringFileSpecreplacementMap;
     private List<RemovableStoreremovalQ;
     private List<POStorestoreQ;
     
     NoopStoreRemover(MROperPlan plan) {
         super(plannew DependencyOrderWalker<MapReduceOperMROperPlan>(plan));
          = new HashMap<StringFileSpec>();
     }
 
     @Override
     public void visitMROp(MapReduceOper mrthrows VisitorException {
          = new LinkedList<RemovableStore>();
          = new LinkedList<POStore>();
         
         // This situation can happen in map and reduce (not combine)
         new PhysicalRemover(mr.mapPlan).visit();
         new PhysicalRemover(mr.reducePlan).visit();
         
         for (RemovableStore st) {
             removeStore(st);
         }
         
        for (POStore st) {
            // don't need the input filespec anymore; and we don't
            // want to serialize it in the job control compiler.
            st.setInputSpec(null);
        }
    }   
    private void removeStore(RemovableStore rem) {
        try {
            // Remove the store plan from the nested split plan.
            rem.split.removePlan(rem.storePlan);
            // Collapse split if only one nested plan remains.
            if (rem.split.getPlans().size() == 1) {
                PhysicalPlan plan = rem.split.getPlans().get(0);
                POStore store = (POStore)plan.getRoots().get(0);
                plan.remove(store);
                store.setInputs(rem.split.getInputs());
                rem.plan.replace(rem.splitstore);
            } 
        } catch(PlanException pe) {
            .info("failed to remove unnecessary store from plan: "+pe.getMessage());
        }
    }
    private static class RemovableStore {
        public PhysicalPlan storePlan;
        public PhysicalPlan plan;
        public POSplit split;
    }
    private class PhysicalRemover extends PhyPlanVisitor {
        PhysicalRemover(PhysicalPlan plan) {
            super(plannew DependencyOrderWalker<PhysicalOperatorPhysicalPlan>(plan));
        }
        @Override
        public void visit() throws VisitorException {
            super.visit();
        }
        @Override
        public void visitLoad(POLoad load) {
            // As we go through update the load ops of the tmp stores
            // that we removed with the resulting other stores output.
            FileSpec spec = .get(load.getLFile().getFileName());
            if (spec != null) {
                load.setLFile(spec);
            }
        }
        @Override
        public void visitStore(POStore store) {
            // remember these. We will remove the input spec once
            // we're done.
            .add(store);
        }
        
        @Override
        public void visitSplit(POSplit splitthrows VisitorException {
            super.visitSplit(split);
            FileSpec lFile = null;
            FileSpec sFile = null;
            PhysicalPlan tmpStore = null;
            for (PhysicalPlan plansplit.getPlans()) {
                if (plan.size() == 1) {
                    PhysicalOperator op = plan.getRoots().get(0);
                    if (op instanceof POStore) {
                        POStore store = (POStore)op;
                        if (store.isTmpStore()) { 
                            // tmp store means introduced by the
                            // MRCompiler. User didn't ask for
                            // those. There can be at most one per
                            // split. (Though there can be nested
                            // splits.)
                            tmpStore = plan;
                            sFile = store.getSFile();
                        } else if (store.getInputSpec() != null) {
                            // We set the input spec for store
                            // operators that had a corresponding load
                            // but we eliminated it in the
                            // PigServer. There could be multiple of
                            // those, but they are all reversible, so
                            // any of them will do.
                            lFile = store.getInputSpec();
                        }
                    }
                }
            }
            if (tmpStore != null && lFile != null) {
                // schedule removal (happens tuesdays and
                // thursdays. don't park your car on the street on
                // those days..
                RemovableStore rem = new RemovableStore();
                rem.storePlan = tmpStore;
                rem.plan = .getPlan();
                rem.split = split;
                .add(rem);
                .put(sFile.getFileName(), lFile);
            }
        }            
    }
New to GrepCode? Check out our FAQ X