Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.newplan.logical.rules;
 
 import java.util.List;
 import java.util.Map;
 
It's generally a good idea to do flattens as late as possible as they tend to generate more rows (and so more I/O). This optimization swaps the order of SORTs, CROSSes and JOINs that come after FOREACH..GENERATE..FLATTENs. FILTERs are re-ordered by the FilterAboveForeach rule so are ignored here.
 
 public class PushDownForEachFlatten extends Rule {
 
     public PushDownForEachFlatten(String name) {
         supernamefalse );
     }
 
     @Override
     protected OperatorPlan buildPattern() {
         LogicalPlan plan = new LogicalPlan();
         LogicalRelationalOperator foreach = new LOForEach(plan);
         plan.addforeach );
         return plan;
     }
 
     @Override
     public Transformer getNewTransformer() {
         return new PushDownForEachFlattenTransformer();
     }
     
     class PushDownForEachFlattenTransformer extends Transformer {
         private OperatorSubPlan subPlan;
 
         @Override
         public boolean check(OperatorPlan matchedthrows FrontendException {
             // the foreach with flatten can be swapped with an order by
             // as the order by will have lesser number of records to sort
             // also the sort does not alter the records that are processed
             
             // the foreach with flatten can be pushed down a cross or a join
             // for the same reason. In this case the foreach has to be first
             // unflattened and then a new foreach has to be inserted after
             // the cross or join. In both cross and foreach the actual columns
             // from the foreach are not altered but positions might be changed
             
             // in the case of union the column is transformed and as a result
             // the foreach flatten cannot be pushed down
             
             // for distinct the output before flattening and the output
             // after flattening might be different. For example, consider
             // {(1), (1)}. Distinct of this bag is still {(1), (1)}.
             // distinct(flatten({(1), (1)})) is (1). However,
             // flatten(distinct({(1), (1)})) is (1), (1)
             
             // in both cases correctness is not affected
             
             LOForEach foreach = (LOForEach)matched.getSources().get(0);
             LOGenerate gen = OptimizerUtils.findGenerateforeach );
            
            if( !OptimizerUtils.hasFlattengen ) )
                return false;
            
            // If a foreach contains a nondeterministic udf, we shouldn't push it down.
            for (LogicalExpressionPlan p : gen.getOutputPlans()) {
                if (OptimizerUtils.planHasNonDeterministicUdf(p))
                    return false;
            }
            
            List<Operatorsuccs = .getSuccessorsforeach );
            ifsuccs == null || succs.size() != 1 )
                return false;
            
            List<Longuids = getNonFlattenFieldUidsgen );
            Operator succ = succs.get( 0  );
            if( !( succ instanceof LOSort || succ instanceof LOJoin || succ instanceof LOCross ) )
                return false;
            
            ifsucc instanceof LOSort ) {
                // Check if the expressions for the foreach generate are purely projection including flatten fields.
                List<LogicalExpressionPlanexprs = gen.getOutputPlans();
                forLogicalExpressionPlan expr : exprs ) {
                    if( !isPureProjectionexpr ) )
                        return false;
                }
                // Check if flatten fields are required by the successor.
                LOSort sort = (LOSort)succ;
                List<LogicalExpressionPlanexps = sort.getSortColPlans();
                forint i = 0; i < exps.size(); i++ ) {
                    LogicalExpressionPlan exp = exps.geti );
                    ProjectExpression proj = (ProjectExpression)exp.getOperators().next();
                    if( !uids.containsproj.getFieldSchema(). ) )
                        return false;
                }
                return true;
            } else {
                List<Operatorpreds = .getPredecessorssucc );
                
                // We do not optimize if peer is ForEach with flatten. This is 
                // a simplification, may change in the future.
                forOperator op : preds ) {
                    ifop == foreach )
                        continue;
                    else ifop instanceof LOForEach && 
                            OptimizerUtils.hasFlatten( OptimizerUtils.findGenerate( (LOForEach)op ) ) )
                        return false;
                }
                
                if( ( (LogicalRelationalOperator)succ ).getSchema() == null )
                    return false;
                
                ifsucc instanceof LOCross ) {
                    return true;
                } else {
                    LOJoin join = (LOJoin)succ;
                    forint i = 0; i < preds.size(); i++ ) {
                        Operator op = preds.geti );
                        ifop == foreach ) {
                            Collection<LogicalExpressionPlanexprs = join.getJoinPlani );
                            forLogicalExpressionPlan expr : exprs ) {
                                List<ProjectExpressionprojs = getProjectExpressionsexpr );
                                forProjectExpression proj : projs ) {
                                    if( !uids.containsproj.getFieldSchema(). ) ) {
                                        return false;
                                    }
                                }
                            }
                            break;
                        }
                    }
                    return true;
                }
            }
        } 
        
            List<Operatorops = expr.getSinks();
            List<ProjectExpressionprojs = new ArrayList<ProjectExpression>( ops.size() );
            forOperator op : ops ) {
                ifop instanceof ProjectExpression ) {
                    projs.add( (ProjectExpression)op );
                }
            }
            return projs;
        }
        private List<LonggetNonFlattenFieldUids(LOGenerate genthrows FrontendException {
            List<Longuids = new ArrayList<Long>();
            
            List<LogicalExpressionPlanexprs = gen.getOutputPlans();
            forint i = 0; i < exprs.size(); i++ ) {
                LogicalExpressionPlan expr = exprs.geti );
                ifgen.getFlattenFlags()[i] )
                    continue;
                LogicalExpression e = (LogicalExpression)expr.getSources().get( 0 );
                uids.adde.getFieldSchema(). );
            }
            
            return uids;
        }
        
        
Check if the given expression contains only a pure projection. For instance $0 is legal, f1 is legal, but 5 + $2 is not legal. (int)f1 is not legal either.
        private boolean isPureProjection(LogicalExpressionPlan expr) {
            if (expr.size()!=1)
                return false;
            if (!(expr.getSinks().get(0) instanceof ProjectExpression))
                return false;
            return true;
        }
        
        @Override
        public OperatorPlan reportChanges() {
            return ;
        }
        @Override
        public void transform(OperatorPlan matchedthrows FrontendException {
             = new OperatorSubPlan );
            
            LOForEach foreach = (LOForEach)matched.getSources().get(0);
            Operator next = .getSuccessorsforeach ).get(0);
            ifnext instanceof LOSort ) {
                Operator pred = .getPredecessorsforeach ).get( 0 );
                List<Operatorsuccs = new ArrayList<Operator>();
                succs.addAll(.getSuccessorsnext ));
                Pair<IntegerIntegerpos1 = .disconnectpredforeach );
                Pair<IntegerIntegerpos2 = .disconnectforeachnext );
                .connectpredpos1.firstnextpos2.second );
                ifsuccs != null ) {
                    forOperator succ : succs ) {
                        Pair<IntegerIntegerpos = .disconnectnextsucc );
                        .connectnextpos.firstforeach, 0 );
                        .connectforeach, 0, succpos.second );
                    }
                } else {
                    .connectnextforeach );
                }
                .add(foreach);
                .add(next);
            } else ifnext instanceof LOCross || next instanceof LOJoin ) {
                List<Operatorpreds = .getPredecessorsnext );
                List<IntegerfieldsToBeFlattaned = new ArrayList<Integer>();
                Map<IntegerLogicalSchemacachedUserDefinedSchema = new HashMap<IntegerLogicalSchema>();
                boolean[] flags = null;
                int fieldCount = 0;
                forOperator op : preds ) {
                    ifop == foreach ) {
                        LOGenerate gen = OptimizerUtils.findGenerateforeach );
                        flags = gen.getFlattenFlags();
                        forint i = 0; i < flags.lengthi++ ) {
                            ifflags[i] ) {
                                fieldsToBeFlattaned.add(fieldCount);
                                if (gen.getUserDefinedSchema()!=null && gen.getUserDefinedSchema().get(i)!=null) {
                                    cachedUserDefinedSchema.put(fieldCountgen.getUserDefinedSchema().get(i));
                                    gen.getUserDefinedSchema().set(inull);
                                }
                                fieldCount++;
                            } else {
                                fieldCount++;
                            }
                        }
                    } else {
                        fieldCount += ( (LogicalRelationalOperator)op ).getSchema().size();
                    }
                }
                
                
                boolean[] flattenFlags = new boolean[fieldCount];
                List<LogicalSchemamUserDefinedSchema = null;
                if (cachedUserDefinedSchema!=null) {
                    mUserDefinedSchema = new ArrayList<LogicalSchema>();
                    for (int i=0;i<fieldCount;i++)
                        mUserDefinedSchema.add(null);
                }
                forInteger i : fieldsToBeFlattaned ) {
                    flattenFlags[i] = true;
                    if (cachedUserDefinedSchema.containsKey(i)) {
                        mUserDefinedSchema.set(icachedUserDefinedSchema.get(i));
                    }
                }
                
                // Now create a new foreach after cross/join and insert it into the plan.
                LOForEach newForeach = new LOForEach );
                LogicalPlan innerPlan = new LogicalPlan();
                List<LogicalExpressionPlanexprs = new ArrayList<LogicalExpressionPlan>( fieldCount );
                LOGenerate gen = new LOGenerateinnerPlanexprsflattenFlags );
                if (mUserDefinedSchema!=null)
                    gen.setUserDefinedSchema(mUserDefinedSchema);
                innerPlan.addgen );
                newForeach.setInnerPlaninnerPlan );
                forint i = 0; i < fieldCounti++ ) {
                    LogicalExpressionPlan expr = new LogicalExpressionPlan();
                    expr.addnew ProjectExpressionexpri, -1, gen ) );
                    exprs.addexpr );
                    
                    LOInnerLoad innerLoad = new LOInnerLoad(innerPlannewForeachi);
                    innerPlan.add(innerLoad);
                    innerPlan.connect(innerLoadgen);
                }
                
                newForeach.setAlias(((LogicalRelationalOperator)next).getAlias());
                
                Operator opAfterX = null;
                List<Operatorsuccs = .getSuccessorsnext );
                ifsuccs == null || succs.size() == 0 ) {
                    .addnewForeach );
                    .connectnextnewForeach );
                } else {
                    opAfterX = succs.get( 0 );
                    .insertBetween(nextnewForeachopAfterX);
                }
                
                // Finally remove flatten flags from the original foreach and regenerate schemas for those impacted.
                forint i = 0; i < flags.lengthi++ ) {
                    flags[i] = false;
                }
                
                .add(foreach);
                .add(next);
                .add(newForeach);
            }
        }
    }
New to GrepCode? Check out our FAQ X