Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.newplan.logical.visitor;
 
 import java.util.List;
 import java.util.Map;
 
 
A visitor to walk operators that contain a nested plan and translate project( * ) operators to a list of projection operators, i.e., project( * ) -> project(0), project(1), ... project(n-2), project(n-1) If input schema is null, project(*) is not expanded. It also expands project range ( eg $1 .. $5). It won't expand project-range-to-end (eg $3 ..) if the input schema is null.
 
 
     public ProjectStarExpander(OperatorPlan plan)
     throws FrontendException {
         super(plannew DependencyOrderWalker(plan));
     }
 
     @Override
     public void visit(LOSort sortthrows FrontendException{
         List<LogicalExpressionPlanexpPlans = sort.getSortColPlans();
         List<BooleanascOrder = sort.getAscendingCols();
 
         // new expressionplans and sort order list after star expansion
         List<LogicalExpressionPlannewExpPlans =
             new ArrayList<LogicalExpressionPlan>();
         List<BooleannewAscOrder =  new ArrayList<Boolean>();
 
         if(expPlans.size() != ascOrder.size()){
             throw new AssertionError("Size of expPlans and ascorder should be same");
         }            
         
         for(int i=0; i < expPlans.size(); i++){
             //expand the plan
             LogicalExpressionPlan ithExpPlan = expPlans.get(i);
             List<LogicalExpressionPlanexpandedPlans = expandPlan(ithExpPlan, 0);
             newExpPlans.addAll(expandedPlans);
 
             //add corresponding isAsc flags
             Boolean isAsc = ascOrder.get(i);
             for(int j=0; j < expandedPlans.size(); j++){
                 newAscOrder.add(isAsc);
             }
         }
 
         //check if there is a project-star-to-end followed by another sort plan
         // in the expanded plans (can happen if there is no input schema)
        for(int i=0; i < newExpPlans.size(); i++){
            ProjectExpression proj = getProjectStar(newExpPlans.get(i));
            if(proj != null && 
                    proj.isRangeProject() && proj.getEndCol() == -1 &&
                    i != newExpPlans.size() -1
            ){
                //because of order by sampler logic limitation, this is not
                //supported right now
                String msg = "Project-range to end (eg. x..)" +
                " is supported in order-by only as last sort column";
                throw new FrontendException(
                        msg,
                        1128,
                        .
                );
            }
        }
        sort.setSortColPlans(newExpPlans);
        sort.setAscendingCols(newAscOrder);
    }
    @Override
    public void visit(LORank rankthrows FrontendException {
        List<LogicalExpressionPlanexpPlans = rank.getRankColPlans();
        List<BooleanascOrder = rank.getAscendingCol();
        List<LogicalExpressionPlannewExpPlans = new ArrayList<LogicalExpressionPlan>();
        List<BooleannewAscOrder = new ArrayList<Boolean>();
        if (expPlans.size() != ascOrder.size()) {
            throw new AssertionError(
                    "Size of expPlans and ascorder should be same");
        }
        for (int i = 0; i < expPlans.size(); i++) {
            // expand the plan
            LogicalExpressionPlan ithExpPlan = expPlans.get(i);
            List<LogicalExpressionPlanexpandedPlans = expandPlan(ithExpPlan,
                    0);
            newExpPlans.addAll(expandedPlans);
            // add corresponding isAsc flags
            Boolean isAsc = ascOrder.get(i);
            for (int j = 0; j < expandedPlans.size(); j++) {
                newAscOrder.add(isAsc);
            }
        }
        // check if there is a project-star-to-end followed by another sort plan
        // in the expanded plans (can happen if there is no input schema)
        for (int i = 0; i < newExpPlans.size(); i++) {
            ProjectExpression proj = getProjectStar(newExpPlans.get(i));
            if (proj != null && proj.isRangeProject() && proj.getEndCol() == -1
                    && i != newExpPlans.size() - 1) {
                String msg = "Project-range to end (eg. x..)"
                        + " is supported in rank-by only as last rank column";
                throw new FrontendException(msg, 1128, .);
            }
        }
        rank.setRankColPlan(newExpPlans);
        rank.setAscendingCol(newAscOrder);
    }

    
Expand plan into multiple plans if the plan contains a project star, if there is no project star the returned list contains the plan argument.

    private List<LogicalExpressionPlanexpandPlan(LogicalExpressionPlan planint inputNum)
    throws FrontendException {
        List<LogicalExpressionPlanexpandedPlans;
        ProjectExpression projStar = getProjectStar(plan);
        if(projStar != null){
            // expand the plan into multiple plans
            return expandPlan(planprojStarinputNum);
        }else{
            //no project star to expand
            expandedPlans = new ArrayList<LogicalExpressionPlan>();
            expandedPlans.add(plan);
        }
        return expandedPlans;
    }
    @Override
    public void visit(LOCogroup cgthrows FrontendException{
        
        MultiMap<IntegerLogicalExpressionPlaninpExprPlans = 
            cg.getExpressionPlans();
 
        //modify the plans if they have project-star
        expandPlans(inpExprPlans);
        
        
        //do some validations -
        List<Operatorinputs = cg.getInputs((LogicalPlancg.getPlan());
        // check if after translation none of group by plans in a cogroup
        // have a project(*) - if they still do it's because the input
        // for the project(*) did not have a schema - in this case, we should
        // error out since we could have different number/types of 
        // cogroup keys
        if(inputs.size() > 1) { // only for cogroups
            for(int i=0; i<inputs.size(); i++)
                for(LogicalExpressionPlan lpinpExprPlans.get(i)) {
                    if(getProjectStar(lp) != null) {
                        String msg = "Cogroup/Group by '*' or 'x..' " +
                        "(range of columns to the end) " +
                        "is only allowed if the input has a schema";
                        throw new VisitorExceptioncg,
                                msg,
                                1123,
                                .
                        );
                    }
                }
        }
        // check if after translation all group by plans have same arity
        int arity = inpExprPlans.get(0).size();
        for(int i=1; i<inputs.size(); i++){
            if(arity != inpExprPlans.get(i).size()) {
                String msg = "The arity of cogroup/group by columns " +
                "do not match";
                throw new VisitorException(cg,
                        msg,
                        1122,
                        .
                );
            }
        }
    }
    @Override
    public void visit(LOCube cuthrows FrontendException {
	// modify the plans if they have project-star
	expandPlans(inpExprPlans);
    }
    @Override
    public void visit(LOJoin jointhrows FrontendException{
        expandPlans(join.getExpressionPlans());
    }
    @Override
    public void visit(LOForEach foreachthrows FrontendException{
        //in case of LOForeach , expand when inner plan has a single project-star
        // and its input LOInnerLoad also is a project-star
        // then Reset the input number in project expressions
        
        LogicalPlan innerPlan = foreach.getInnerPlan();
        
        //visit the inner plan first
        PlanWalker newWalker = .spawnChildWalker(innerPlan);
        pushWalker(newWalker);
        .walk(this);
        popWalker();
        
        //get the LOGenerate
        List<OperatorfeOutputs = innerPlan.getSinks();
        LOGenerate gen = null;
        forOperator op  : feOutputs){
            if(op instanceof LOGenerate){
                if(gen != null){
                    String msg = "Expected single LOGenerate output in innerplan of foreach";
                    throw new VisitorException(foreach,
                            msg,
                            2266,
                            .
                    );
                }
                gen = (LOGenerateop;
            }
        }
        
        //work on the generate plan, flatten and user schema
        List<LogicalExpressionPlanexpPlans = gen.getOutputPlans();
        List<LogicalExpressionPlannewExpPlans = new ArrayList<LogicalExpressionPlan>();
        
        List<OperatorloGenPreds = innerPlan.getPredecessors(gen);
        
        if(loGenPreds == null){
            // there are no LOInnerLoads , must be working on just constants
            // no project-star expansion to be done
            return;
        }
        
        List<LogicalSchemauserSchema = gen.getUserDefinedSchema();
        List<LogicalSchemanewUserSchema = null;
        if(userSchema != null){
            newUserSchema = new ArrayList<LogicalSchema>();
        }
        
        boolean[] flattens = gen.getFlattenFlags();
        List<BooleannewFlattens = new ArrayList<Boolean>(flattens.length);
        //get mapping of LOGenerate predecessor current position to object
        Map<IntegerLogicalRelationalOperatoroldPos2Rel =
            new HashMap<IntegerLogicalRelationalOperator>();
        
        for(int i=0; i<loGenPreds.size(); i++){
            oldPos2Rel.put(i, (LogicalRelationalOperatorloGenPreds.get(i));
        }
        
        //get schema of predecessor, project-star expansion needs a schema
        LogicalRelationalOperator pred =
            (LogicalRelationalOperatorforeach.getPlan().getPredecessors(foreach).get(0);
        LogicalSchema inpSch = pred.getSchema();
 
        //store mapping between the projection in inner plans of
        // of LOGenerate to the input relation object
        Map<ProjectExpressionLogicalRelationalOperatorproj2InpRel =
            new HashMap<ProjectExpressionLogicalRelationalOperator>();
        
        
        for(int i=0; i<expPlans.size(); i++){
            LogicalExpressionPlan expPlan = expPlans.get(i);
            ProjectExpression projStar = getProjectLonelyStar(expPlanoldPos2Rel);
            boolean foundExpandableProject = false;
            if(projStar != null){              
                //there is a project-star to be expanded
                LogicalSchema userStarSch = null;
                if(userSchema != null && userSchema.get(i) != null){
                    userStarSch = userSchema.get(i);
                }
                //the range values are set in the project in LOInnerLoad
                ProjectExpression loInnerProj = ((LOInnerLoad)oldPos2Rel.get(projStar.getInputNum())).getProjection();
                int firstProjCol = 0;
                int lastProjCol = 0;
                
                if(loInnerProj.isRangeProject()){
                    loInnerProj.setColumnNumberFromAlias();
                    firstProjCol = loInnerProj.getStartCol();
                    lastProjCol = loInnerProj.getEndCol();
                }
                
                boolean isProjectToEnd = loInnerProj.isProjectStar() || 
                    (loInnerProj.isRangeProject() && lastProjCol == -1); 
                
                //can't expand if there is no input schema, and this is
                // as project star or project-range-to-end
                if( !(inpSch == null && isProjectToEnd) ){
                    
                    foundExpandableProject = true;
                    if(isProjectToEnd)
                        lastProjCol = inpSch.size() - 1;
                    //replacing the existing project star with new ones
                    expPlan.remove(projStar);
                    //remove the LOInnerLoad with star
                    LOInnerLoad oldLOInnerLoad = (LOInnerLoad)oldPos2Rel.get(projStar.getInputNum());
                    innerPlan.disconnect(oldLOInnerLoadgen);
                    innerPlan.remove(oldLOInnerLoad);
                    //generate new exp plan, inner load for each field in schema
                    for(int j = firstProjColj <= lastProjColj++){
                        //add new LOInnerLoad
                        LOInnerLoad newInLoad = new LOInnerLoad(innerPlanforeachj);
                        innerPlan.add(newInLoad);
                        innerPlan.connect(newInLoadgen);
                        // new expression plan and proj
                        LogicalExpressionPlan newExpPlan = new LogicalExpressionPlan();
                        newExpPlans.add(newExpPlan);
                        ProjectExpression newProj =
                            new ProjectExpression(newExpPlan, -2, -1, gen);
                        proj2InpRel.put(newProjnewInLoad);
                        newFlattens.add(flattens[i]);
                        if(newUserSchema != null ){
                            //index into user specified schema
                            int schIdx = j - firstProjCol;
                            if(userStarSch != null 
                                    && userStarSch.getFields().size() > schIdx
                                    && userStarSch.getField(schIdx) != null){
                                //if the project-star field has user specified schema, use the
                                // j'th field for this column
                                LogicalSchema sch = new LogicalSchema();
                                sch.addField(new LogicalFieldSchema(userStarSch.getField(schIdx)));
                                newUserSchema.add(sch);
                            }
                            else{
                                newUserSchema.add(null);
                            }
                        }
                    }
                }
            }
            if(!foundExpandableProject){ //no project-star that could be expanded
                //get all projects in here 
                FindProjects findProjs = new FindProjects(expPlan);
                findProjs.visit();
                List<ProjectExpressionprojs = findProjs.getProjs();
                //create a mapping of project expression to their inputs
                for(ProjectExpression proj : projs){
                    proj2InpRel.put(projoldPos2Rel.get(proj.getInputNum()));
                }
                newExpPlans.add(expPlan);
                newFlattens.add(flattens[i]);
                if(newUserSchema != null)
                    newUserSchema.add(userSchema.get(i));
            }
        }
        //get mapping of LoGenerate input relation to current position
        List<OperatornewGenPreds = innerPlan.getPredecessors(gen);
        int numNewGenPreds = 0;
        if(newGenPreds != null)
            numNewGenPreds = newGenPreds.size();
            
        for(int i=0; i<numNewGenPredsi++){
            rel2pos.put((LogicalRelationalOperatornewGenPreds.get(i),i);
        }
        
        //correct the input num for projects
        for(Entry<ProjectExpressionLogicalRelationalOperatorprojAndInp : proj2InpRel.entrySet()){
           ProjectExpression proj = projAndInp.getKey();
           LogicalRelationalOperator rel = projAndInp.getValue();
           proj.setInputNum(rel2pos.get(rel));
        }
        
        // set the new lists
        gen.setOutputPlans(newExpPlans);
        gen.setFlattenFlags(Booleans.toArray(newFlattens));
        gen.setUserDefinedSchema(newUserSchema);
        
        gen.resetSchema();
        foreach.resetSchema();
        
    }
    
    
    static class FindProjects extends LogicalExpressionVisitor{
        private List<ProjectExpressionprojs = new ArrayList<ProjectExpression>();
        protected FindProjects(LogicalExpressionPlan plan)
                throws FrontendException {
            super(plannew DepthFirstWalker(plan));
        }
        
        @Override
        public void visit(ProjectExpression proj){
           .add(proj);
        }
        public List<ProjectExpressiongetProjs(){
            return ;
        }
    }

    
Find project-star in foreach statement. The LOInnerLoad corresponding to the project-star also needs to have a project-star

Parameters:
expPlan - expression plan
oldPos2Rel - inner relational plan of foreach
Returns:
ProjectExpression that satisfies the conditions
Throws:
org.apache.pig.impl.logicalLayer.FrontendException
            Map<IntegerLogicalRelationalOperatoroldPos2Relthrows FrontendException {
        //the expression plan should have just a single project
        if(expPlan.size() == 0 || expPlan.size() > 1){
            return null;
        }
        Operator outputOp = expPlan.getOperators().next();
        if(outputOp instanceof ProjectExpression){
            ProjectExpression proj = (ProjectExpression)outputOp;
            //check if ProjectExpression is projectStar
            if(proj.isProjectStar()){
                //now check if its input is a LOInnerLoad and it is projectStar 
                // or range project
                LogicalRelationalOperator inputRel = oldPos2Rel.get(proj.getInputNum());
                if(! (inputRel  instanceof LOInnerLoad)){
                    return null;
                }
                ProjectExpression innerProj = ((LOInnerLoadinputRel).getProjection(); 
                ifinnerProj.isRangeOrStarProject()){
                    return proj;
                }
            }
        }
        return null;
    }
    private void expandPlans(
            MultiMap<IntegerLogicalExpressionPlaninpExprPlans)
    throws FrontendException {
 
        //for each input relation, expand any logical plan that has project-star
        for(int i=0; iinpExprPlans.size() ; i++){
            List<LogicalExpressionPlanplans = inpExprPlans.get(i);
            List<LogicalExpressionPlannewPlans =
                new ArrayList<LogicalExpressionPlan>();
            for(LogicalExpressionPlan plan : plans){
                newPlans.addAll(expandPlan(plani));
            }
            inpExprPlans.removeKey(i);
            inpExprPlans.put(inewPlans);
        }
   
    }

    
expand this plan containing project star to multiple plans each projecting a single column

Parameters:
expPlan
proj
Returns:
Throws:
org.apache.pig.impl.logicalLayer.FrontendException
            LogicalExpressionPlan expPlanProjectExpression projint inputNum)
            throws FrontendException {
        
        Pair<IntegerIntegerstartAndEndProjs =
            ProjectStarExpanderUtil.getProjectStartEndCols(expPlanproj);  
        List<LogicalExpressionPlannewPlans = new ArrayList<LogicalExpressionPlan>();
        if(startAndEndProjs == null){
            // can't expand this project
            newPlans.add(expPlan);
            return newPlans;
        }
        //expand from firstProjCol to lastProjCol 
        int firstProjCol = startAndEndProjs.first;
        int lastProjCol = startAndEndProjs.second;
        LogicalRelationalOperator relOp = proj.getAttachedRelationalOp();
        for(int i = firstProjColi <= lastProjColi++){
            newPlans.add(createExpPlanWithProj(relOpinputNumi));
        }
        return newPlans;
    }

    
Create new logical plan with a project that is attached to LogicalRelation attachRel and projects i'th column from input

Parameters:
attachRel
inputNum
colNum
Returns:
            LogicalRelationalOperator attachRel
            int inputNumint colNum) {
        LogicalExpressionPlan newExpPlan = new LogicalExpressionPlan();
        ProjectExpression newProj = 
            new ProjectExpression(newExpPlaninputNumcolNumattachRel);
        newExpPlan.add(newProj);
        return newExpPlan;
    }

    
if LogicalExpressionPlan argument has a project star output then return it, otherwise return null

Parameters:
expPlan
Returns:
Throws:
org.apache.pig.impl.logicalLayer.FrontendException
    throws FrontendException {
        List<Operatoroutputs = expPlan.getSources();
        ProjectExpression projStar = null;
        for(Operator outputOp : outputs){
            if(outputOp instanceof ProjectExpression){
                ProjectExpression proj = (ProjectExpression)outputOp;
                if(proj.isRangeOrStarProject()){
                    if(outputs.size() > 1){
                        String msg = "More than one operator in an expression plan" +
                        " containing project star(*)/project-range (..)";
                        throw new VisitorException(proj,
                                msg,
                                2264,
                                .
                        );
                    }
                    projStar = proj;
                }
            }
        }
        return projStar;
    }
New to GrepCode? Check out our FAQ X