Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;
 
 import java.util.List;
 import java.util.Map;
 
This visitor visits the MRPlan and does the following for each MROper - visits the POPackage in the reduce plan and finds the corresponding POLocalRearrange(s) (either in the map plan of the same oper OR reduce plan of predecessor MROper). It then annotates the POPackage with information about which columns in the "value" are present in the "key" and will need to stitched in to the "value"
 
 public class POPackageAnnotator extends MROpPlanVisitor {

    

Parameters:
plan MR plan to visit
 
     public POPackageAnnotator(MROperPlan plan) {
         super(plannew DepthFirstWalker<MapReduceOperMROperPlan>(plan));
     }
 
     @Override
     public void visitMROp(MapReduceOper mrthrows VisitorException {
         
         // POPackage OR POJoinPackage could be present in the combine plan
         // OR in the reduce plan. POPostCombinerPackage could
         // be present only in the reduce plan. Search in these two
         // plans accordingly
         if(!mr.combinePlan.isEmpty()) {
             PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(mr.combinePlan);
             pkgDiscoverer.visit();
             POPackage pkg = pkgDiscoverer.getPkg();
             if(pkg != null) {
                 handlePackage(mrpkg);
             }   
         }
         
         if(!mr.reducePlan.isEmpty()) {
             PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(mr.reducePlan);
             pkgDiscoverer.visit();
             POPackage pkg = pkgDiscoverer.getPkg();
             if(pkg != null) {
                 // if the POPackage is actually a POPostCombinerPackage, then we should
                 // just look for the corresponding LocalRearrange(s) in the combine plan
                 if(pkg instanceof POCombinerPackage) {
                     if(patchPackage(mr.combinePlanpkg) != pkg.getNumInps()) {
                         int errCode = 2085;
                         String msg = "Unexpected problem during optimization." +
                         " Could not find LocalRearrange in combine plan.";
                         throw new OptimizerException(msgerrCode.);
                     }
                 } else {
                     handlePackage(mrpkg);
                 }
             }
         }
         
     }
     
     private void handlePackage(MapReduceOper mrPOPackage pkgthrows VisitorException {
         // the LocalRearrange(s) could either be in the map of this MapReduceOper
         // OR in the reduce of predecessor MapReduceOpers
         int lrFound = 0;
        
        lrFound = patchPackage(mr.mapPlanpkg);
        if(lrFound != pkg.getNumInps()) {
            // we did not find the LocalRearrange(s) in the map plan
            // let's look in the predecessors
            List<MapReduceOperpreds = this..getPredecessors(mr);
            for (Iterator<MapReduceOperit = preds.iterator(); it.hasNext();) {
                MapReduceOper mrOper = it.next();
                if (mrOper.isLimitOnly() && !.getPredecessors(mrOper).get(0).isGlobalSort())
                    mrOper = this..getPredecessors(mrOper).get(0);
                lrFound += patchPackage(mrOper.reducePlanpkg);
                if(lrFound == pkg.getNumInps()) {
                    break;
                }     
            }
        }
        if(lrFound != pkg.getNumInps()) {
            int errCode = 2086;
            String msg = "Unexpected problem during optimization. Could not find all LocalRearrange operators.";
            throw new OptimizerException(msgerrCode.);
        }
    }
    private int patchPackage(PhysicalPlan planPOPackage pkgthrows VisitorException {
        LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(planpkg);
        lrDiscoverer.visit();
        // let our caller know if we managed to patch
        // the package
        return lrDiscoverer.getLoRearrangeFound();
    }
    
    
Simple visitor of the "Reduce" physical plan which will get a reference to the POPacakge present in the plan
    static class PackageDiscoverer extends PhyPlanVisitor {
        private POPackage pkg;
        
        public PackageDiscoverer(PhysicalPlan plan) {
            super(plannew DepthFirstWalker<PhysicalOperatorPhysicalPlan>(plan));
        }
        
        /* (non-Javadoc)
         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
         */
        @Override
        public void visitPackage(POPackage pkgthrows VisitorException {
            this. = pkg;
        };
        
        /* (non-Javadoc)
         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitJoinPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage)
         */
        @Override
        public void visitJoinPackage(POJoinPackage joinPackage)
                throws VisitorException {
            this. = joinPackage;
        }
        
        /* (non-Javadoc)
         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitCombinerPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage)
         */
        @Override
        public void visitCombinerPackage(POCombinerPackage pkg)
                throws VisitorException {
            this. = pkg;
        }

        

Returns:
the pkg
        public POPackage getPkg() {
            return ;
        }
        
    }
    
    
Physical Plan visitor which tries to get the LocalRearrange(s) present in the plan (if any) and annotate the POPackage given to it with the information in the LocalRearrange (regarding columns in the "value" present in the "key")
    static class LoRearrangeDiscoverer extends PhyPlanVisitor {
        
        private int loRearrangeFound = 0;
        private POPackage pkg;
        
        public LoRearrangeDiscoverer(PhysicalPlan planPOPackage pkg) {
            super(plannew DepthFirstWalker<PhysicalOperatorPhysicalPlan>(plan));
            this. = pkg;
        }
        
        /* (non-Javadoc)
         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
         */
        @Override
        public void visitLocalRearrange(POLocalRearrange lrearrangethrows VisitorException {
            ++;
            Map<Integer,Pair<BooleanMap<IntegerInteger>>> keyInfo;
            if ( instanceof POPackageLite) {
                if(lrearrange.getIndex() != 0) {
                    // Throw some exception here
                    throw new RuntimeException("POLocalRearrange for POPackageLite cannot have index other than 0, but has index - "+lrearrange.getIndex());
                }
            }
            // annotate the package with information from the LORearrange
            // update the keyInfo information if already present in the POPackage
            keyInfo = .getKeyInfo();
            if(keyInfo == null)
                keyInfo = new HashMap<IntegerPair<BooleanMap<IntegerInteger>>>();
            
            if(keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
                // something is wrong - we should not be getting key info 
                // for the same index from two different Local Rearranges
                int errCode = 2087;
                String msg = "Unexpected problem during optimization." +
                " Found index:" + lrearrange.getIndex() + 
                " in multiple LocalRearrange operators.";
                throw new OptimizerException(msgerrCode.);
                
            }
            keyInfo.put(Integer.valueOf(lrearrange.getIndex()), 
                new Pair<BooleanMap<IntegerInteger>>(
                        lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
            .setKeyInfo(keyInfo);
            .setKeyTuple(lrearrange.isKeyTuple());
            .setKeyCompound(lrearrange.isKeyCompound());
        }

        

Returns:
the loRearrangeFound
        public int getLoRearrangeFound() {
            return ;
        }
    }
New to GrepCode? Check out our FAQ X