Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.newplan.logical.rules;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 
     protected static final Log log = LogFactory.getLog(ColumnPruneVisitor.class);
     private Map<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>> requiredItems = 
         new HashMap<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>>();
     private boolean columnPrune;
 
     public ColumnPruneVisitor(OperatorPlan planMap<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>> requiredItems,
             boolean columnPrunethrows FrontendException {
         super(plannew ReverseDependencyOrderWalker(plan));
         this. = columnPrune;
         this. = requiredItems;
     }
 
     public void addRequiredItems(LOLoad loadPair<Map<Integer,Set<String>>,Set<Integer>> requiredItem) {
         .put(loadrequiredItem);
     }
 
     @Override
     public void visit(LOLoad loadthrows FrontendException {
         if(! .containsKeyload ) ) {
             return;
         }
 
         Pair<Map<Integer,Set<String>>,Set<Integer>> required =
             .get(load);
 
         RequiredFieldList requiredFields = new RequiredFieldList();
 
         LogicalSchema s = load.getSchema();
         for (int i=0;i<s.size();i++) {
             RequiredField requiredField = null;
             // As we have done processing ahead, we assume that 
             // a column is not present in both ColumnPruner and 
             // MapPruner
             ifrequired.first != null && required.first.containsKey(i) ) {
                 requiredField = new RequiredField();
                 requiredField.setIndex(i);
                 requiredField.setAlias(s.getField(i).);
                requiredField.setType(s.getField(i).);
                List<RequiredFieldsubFields = new ArrayList<RequiredField>();
                forString key : required.first.get(i) ) {
                    RequiredField subField = new RequiredField(key,-1,null,.);
                    subFields.add(subField);
                }
                requiredField.setSubFields(subFields);
                requiredFields.add(requiredField);
            }
            ifrequired.second != null && required.second.contains(i) ) {
                requiredField = new RequiredField();
                requiredField.setIndex(i);
                requiredField.setAlias(s.getField(i).);
                requiredField.setType(s.getField(i).);
                requiredFields.add(requiredField);
            }
        }
        boolean[] columnRequired = new boolean[s.size()];
        for (RequiredField rf : requiredFields.getFields())
            columnRequired[rf.getIndex()] = true;
        List<Pair<IntegerInteger>> pruneList = new ArrayList<Pair<IntegerInteger>>();
        for (int i=0;i<columnRequired.length;i++)
        {
            if (!columnRequired[i])
                pruneList.add(new Pair<IntegerInteger>(0, i));
        }
        StringBuffer message = new StringBuffer();
        if (pruneList.size()!=0)
        {
            message.append("Columns pruned for " + load.getAlias() + ": ");
            for (int i=0;i<pruneList.size();i++)
            {
                message.append("$"+pruneList.get(i).);
                if (i!=pruneList.size()-1)
                    message.append(", ");
            }
            .info(message);
        }
        message = new StringBuffer();
        for(RequiredField rfrequiredFields.getFields()) {
            List<RequiredFieldsub = rf.getSubFields();
            if (sub != null) {
                message.append("Map key required for " + load.getAlias() + ": $" + rf.getIndex() + "->" + sub + "\n");
            }
        }
        if (message.length()!=0)
            .info(message);
        LoadPushDown.RequiredFieldResponse response = null;
        try {
            LoadFunc loadFunc = load.getLoadFunc();
            if (loadFunc instanceof LoadPushDown) {
                response = ((LoadPushDown)loadFunc).pushProjection(requiredFields);
            }
        } catch (FrontendException e) {
            .warn("pushProjection on "+load+" throw an exception, skip it");
        }
        // Loader does not support column pruning, insert foreach
        if () {
            if (response==null || !response.getRequiredFieldResponse()) {
                LogicalPlan p = (LogicalPlan)load.getPlan();
                Operator next = p.getSuccessors(load).get(0);
                // if there is already a LOForEach after load, we don't need to
                // add another LOForEach
                if (next instanceof LOForEach) {
                    return;
                }
                LOForEach foreach = new LOForEach(load.getPlan());
                // add foreach to the base plan
                p.add(foreach);
                p.insertBetween(loadforeachnext);
                LogicalPlan innerPlan = new LogicalPlan();
                foreach.setInnerPlan(innerPlan);
                // build foreach inner plan
                List<LogicalExpressionPlanexps = new ArrayList<LogicalExpressionPlan>();
                LOGenerate gen = new LOGenerate(innerPlanexpsnew boolean[requiredFields.getFields().size()]);
                innerPlan.add(gen);
                for (int i=0; i<requiredFields.getFields().size(); i++) {
                    LoadPushDown.RequiredField rf = requiredFields.getFields().get(i);
                    LOInnerLoad innerLoad = new LOInnerLoad(innerPlanforeachrf.getIndex());
                    innerPlan.add(innerLoad);
                    innerPlan.connect(innerLoadgen);
                    LogicalExpressionPlan exp = new LogicalExpressionPlan();
                    ProjectExpression prj = new ProjectExpression(expi, -1, gen);
                    exp.add(prj);
                    exps.add(exp);
                }
            } else {
                // columns are pruned, reset schema for LOLoader
                List<IntegerrequiredIndexes = new ArrayList<Integer>();
                List<LoadPushDown.RequiredFieldfieldList = requiredFields.getFields();
                for (int i=0; i<fieldList.size(); i++) {
                    requiredIndexes.add(fieldList.get(i).getIndex());
                }
                load.setRequiredFields(requiredIndexes);
                LogicalSchema newSchema = new LogicalSchema();
                for (int i=0; i<fieldList.size(); i++) {
                    newSchema.addField(s.getField(fieldList.get(i).getIndex()));
                }
                load.setSchema(newSchema);
            }
        }
    }
    @Override
    public void visit(LOFilter filterthrows FrontendException {
    }
    @Override
    public void visit(LOLimit limitthrows FrontendException {
    }
    @Override
    public void visit(LOSplitOutput splitOutputthrows FrontendException {
    }
    @SuppressWarnings("unchecked")
    @Override
    public void visit(LOSplit splitthrows FrontendException {
        List<OperatorbranchOutputs = split.getPlan().getSuccessors(split);
        for (int i=0;i<branchOutputs.size();i++) {
            Operator branchOutput = branchOutputs.get(i);
            Set<LongbranchOutputUids = (Set<Long>)branchOutput.getAnnotation(.);
            if (branchOutputUids!=null) {
                Set<IntegercolumnsToDrop = new HashSet<Integer>();
                for (int j=0;j<split.getSchema().size();j++) {
                    if (!branchOutputUids.contains(split.getSchema().getField(j).))
                        columnsToDrop.add(j);
                }
                if (!columnsToDrop.isEmpty()) {
                    LOForEach foreach = Util.addForEachAfter((LogicalPlan)split.getPlan(), spliticolumnsToDrop);
                    foreach.getSchema();
                }
            }
        }
    }
    @Override
    public void visit(LOSort sortthrows FrontendException {
    }
    @Override
    public void visit(LORank rankthrows FrontendException {
    }
    @Override
    public void visit(LOStore storethrows FrontendException {
    }
    @Override
    public void visitLOCogroup cg ) throws FrontendException {
        addForEachIfNecessary(cg);
    }
    @Override
    public void visit(LOJoin jointhrows FrontendException {
    }
    @Override
    public void visit(LOCross crossthrows FrontendException {
    }
    @Override
    @SuppressWarnings("unchecked")
    public void visit(LOForEach foreachthrows FrontendException {
        if (!) {
            return;
        }
        // get column numbers from input uids
        Set<LonginputUids = (Set<Long>)foreach.getAnnotation(.);
        // Get all top level projects
        LogicalPlan innerPlan = foreach.getInnerPlan();
        List<LOInnerLoadinnerLoadsnew ArrayList<LOInnerLoad>();
        List<Operatorsources = innerPlan.getSources();
        for (Operator s : sources) {
            if (s instanceof LOInnerLoad)
                innerLoads.add((LOInnerLoad)s);
        }
        // If project of the innerLoad is not in INPUTUIDS, remove this innerLoad
        Set<LOInnerLoadinnerLoadsToRemove = new HashSet<LOInnerLoad>();
        for (LOInnerLoad innerLoadinnerLoads) {
            ProjectExpression project = innerLoad.getProjection();
            if (project.isProjectStar()) {
                LogicalSchema.LogicalFieldSchema tupleFS = project.getFieldSchema();
                // Check the first component of the star projection
                long uid = tupleFS.schema.getField(0).;
                if (!inputUids.contains(uid))
                    innerLoadsToRemove.add(innerLoad);
            }
            else {
                if (!inputUids.contains(project.getFieldSchema().))
                    innerLoadsToRemove.add(innerLoad);
            }
        }
        // Find the logical operator immediate precede LOGenerate which should be removed (the whole branch)
        Set<LogicalRelationalOperatorbranchHeadToRemove = new HashSet<LogicalRelationalOperator>();
        for (LOInnerLoad innerLoad : innerLoadsToRemove) {
            Operator op = innerLoad;
            while (!(innerPlan.getSuccessors(op).get(0) instanceof LOGenerate)) {
                op = innerPlan.getSuccessors(op).get(0);
            }
            branchHeadToRemove.add((LogicalRelationalOperator)op);
        }
        // Find the expression plan to remove
        LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
        List<LogicalExpressionPlangenPlansToRemove = new ArrayList<LogicalExpressionPlan>();
        List<LogicalExpressionPlangenPlans = gen.getOutputPlans();
        for (int i=0;i<genPlans.size();i++) {
            LogicalExpressionPlan expPlan = genPlans.get(i);
            List<OperatorexpSources = expPlan.getSinks();
            for (Operator expSrc : expSources) {
                if (expSrc instanceof ProjectExpression) {
                    LogicalRelationalOperator reference = ((ProjectExpression)expSrc).findReferent();
                    if (branchHeadToRemove.contains(reference)) {
                        genPlansToRemove.add(expPlan);
                    }
                }
            }
        }
        // Build the temporary structure based on genPlansToRemove, which include:
        // * flattenList
        // * outputPlanSchemas
        // * uidOnlySchemas
        // * inputsRemoved
        //     We first construct inputsNeeded, and inputsRemoved = (all inputs) - inputsNeeded.
        //     We cannot figure out inputsRemoved directly since the inputs may be used by other output plan.
        //     We can only get inputsRemoved after visiting all output plans.
        List<BooleanflattenList = new ArrayList<Boolean>();
        Set<IntegerinputsNeeded = new HashSet<Integer>();
        Set<IntegerinputsRemoved = new HashSet<Integer>();
        List<LogicalSchemaoutputPlanSchemas = new ArrayList<LogicalSchema>();
        List<LogicalSchemauidOnlySchemas = new ArrayList<LogicalSchema>();
        List<LogicalSchemauserDefinedSchemas = null;
        if (gen.getUserDefinedSchema()!=null)
            userDefinedSchemas = new ArrayList<LogicalSchema>();
        for (int i=0;i<genPlans.size();i++) {
            LogicalExpressionPlan genPlan = genPlans.get(i);
            if (!genPlansToRemove.contains(genPlan)) {
                flattenList.add(gen.getFlattenFlags()[i]);
                outputPlanSchemas.add(gen.getOutputPlanSchemas().get(i));
                uidOnlySchemas.add(gen.getUidOnlySchemas().get(i));
                if (gen.getUserDefinedSchema()!=null) {
                    userDefinedSchemas.add(gen.getUserDefinedSchema().get(i));
                }
                List<Operatorsinks = genPlan.getSinks();
                for(Operator ssinks) {
                    if (s instanceof ProjectExpression) {
                        inputsNeeded.add(((ProjectExpression)s).getInputNum());
                    }
                }
            }
        }
        List<Operatorpreds = innerPlan.getPredecessors(gen);
        if (preds!=null) {  // otherwise, all gen plan are based on constant, no need to adjust
            for (int i=0;i<preds.size();i++) {
                if (!inputsNeeded.contains(i))
                    inputsRemoved.add(i);
            }
        }
        // Change LOGenerate: remove unneeded output expression plan
        // change flatten flag, outputPlanSchema, uidOnlySchemas
        boolean[] flatten = new boolean[flattenList.size()];
        for (int i=0;i<flattenList.size();i++)
            flatten[i] = flattenList.get(i);
        gen.setFlattenFlags(flatten);
        gen.setOutputPlanSchemas(outputPlanSchemas);
        gen.setUidOnlySchemas(uidOnlySchemas);
        gen.setUserDefinedSchema(userDefinedSchemas);
        for (LogicalExpressionPlan genPlanToRemove : genPlansToRemove) {
            genPlans.remove(genPlanToRemove);
        }
        // shift project input
        if (!inputsRemoved.isEmpty()) {
            for (LogicalExpressionPlan genPlan : genPlans) {
                List<Operatorsinks = genPlan.getSinks();
                for(Operator ssinks) {
                    if (s instanceof ProjectExpression) {
                        int input = ((ProjectExpression)s).getInputNum();
                        int numToShift = 0;
                        for (int i :inputsRemoved) {
                            if (i<input)
                                numToShift++;
                        }
                        ((ProjectExpression)s).setInputNum(input-numToShift);
                    }
                }
            }
        }
        // Prune unneeded LOInnerLoad
        List<LogicalRelationalOperatorpredToRemove = new ArrayList<LogicalRelationalOperator>();
        for (int i : inputsRemoved) {
            predToRemove.add((LogicalRelationalOperator)preds.get(i));
        }
        for (LogicalRelationalOperator pred : predToRemove) {
            removeSubTree(pred);
        }
    }
    @Override
    public void visit(LOUnion unionthrows FrontendException {
        // AddForEach before union if necessary.
        List<Operatorpreds = new ArrayList<Operator>();
        preds.addAll(.getPredecessors(union));
        for (Operator pred : preds) {
            addForEachIfNecessary((LogicalRelationalOperator)pred);
        }
    }
    // remove all the operators starting from an operator
    private void removeSubTree(LogicalRelationalOperator opthrows FrontendException {
        LogicalPlan p = (LogicalPlan)op.getPlan();
        List<Operatorll = p.getPredecessors(op);
        if (ll != null) {
            for(Operator predll.toArray(new Operator[ll.size()])) {
                removeSubTree((LogicalRelationalOperator)pred);
            }
        }
        if (p.getSuccessors(op) != null) {
            Operator[] succs = p.getSuccessors(op).toArray(new Operator[0]);
            for(Operator ssuccs) {
                p.disconnect(ops);
            }
        }
        p.remove(op);
    }
    // Add ForEach after op to prune unnecessary columns
    @SuppressWarnings("unchecked")
        Set<LongoutputUids = (Set<Long>)op.getAnnotation(.);
        if (outputUids!=null) {
            LogicalSchema schema = op.getSchema();
            Set<IntegercolumnsToDrop = new HashSet<Integer>();
            for (int i=0;i<schema.size();i++) {
                if (!outputUids.contains(schema.getField(i).))
                    columnsToDrop.add(i);
            }
            if (!columnsToDrop.isEmpty()) {
                LOForEach foreach = Util.addForEachAfter((LogicalPlan)op.getPlan(), op, 0, columnsToDrop);
                foreach.getSchema();
            }
        }
    }
New to GrepCode? Check out our FAQ X