Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package org.apache.pig.newplan.logical.rules;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
Helper class used by ColumnMapKeyPrune to figure out what columns can be pruned. It doesn't make any changes to the operator plan
 
 public class ColumnPruneHelper {
     protected static final String INPUTUIDS = "ColumnPrune:InputUids";
     public static final String OUTPUTUIDS = "ColumnPrune:OutputUids";
     protected static final String REQUIREDCOLS = "ColumnPrune:RequiredColumns";
 
     private OperatorPlan currentPlan;
     private OperatorSubPlan subPlan;
 
     public ColumnPruneHelper(OperatorPlan currentPlan) {
         this. = currentPlan;
     }
 
     private OperatorSubPlan getSubPlan() throws FrontendException {
         OperatorSubPlan p = null;
         if ( instanceof OperatorSubPlan) {
             p = new OperatorSubPlan(((OperatorSubPlan)).getBasePlan());
         } else {
             p = new OperatorSubPlan();
         }
         Iterator<Operatoriter = .getOperators();
 
         while(iter.hasNext()) {
             Operator op = iter.next();
             if (op instanceof LOForEach) {
                 addOperator(opp);
             }
         }
 
         return p;
     }
 
     private void addOperator(Operator opOperatorSubPlan subplanthrows FrontendException {
         if (op == null) {
             return;
         }
 
        subplan.add(op);
        List<Operatorll = .getPredecessors(op);
        if (ll == null) {
            return;
        }
        for(Operator predll) {
            addOperator(predsubplan);
        }
    }
    @SuppressWarnings("unchecked")
    public boolean check() throws FrontendException {
        List<Operatorsources = .getSources();
        // if this rule has run before, just return false
        if (sources.size() > 1 && sources.get(0).getAnnotation() != null) {
            clearAnnotation();
            return false;
        }
        // create sub-plan that ends with foreach
         = getSubPlan();
        if (.size() == 0) {
            clearAnnotation();
            return false;
        }
        try {
            v.visit();
        }catch(SchemaNotDefinedException e) {
            // if any operator has an unknown schema, just return false
            clearAnnotation();
            return false;
        }
        List<Operatorll = .getSources();
        boolean found = false;
        for(Operator opll) {
            if (op instanceof LOLoad) {
                Set<Longuids = (Set<Long>)op.getAnnotation();
                LogicalSchema s = ((LOLoadop).getSchema();
                Set<Integerrequired = getColumns(suids);
                if (required.size() < s.size()) {
                    op.annotate(required);
                    found = true;
                }
            }
        }
        if (!found)
            clearAnnotation();
        return found;
    }
    private void clearAnnotation() {
        Iterator<Operatoriter = .getOperators();
        while (iter.hasNext()) {
            Operator op = iter.next();
            op.removeAnnotation();
            op.removeAnnotation();
            op.removeAnnotation();
        }
    }
    // get a set of column indexes from a set of uids
    protected Set<IntegergetColumns(LogicalSchema schemaSet<Longuidsthrows FrontendException {
        if (schema == null) {
            throw new SchemaNotDefinedException("Schema is not defined.");
        }
        Set<Integercols = new HashSet<Integer>();
        Iterator<Longiter = uids.iterator();
        while(iter.hasNext()) {
            long uid = iter.next();
            int index = schema.findField(uid);
            if (index == -1) {
                throw new FrontendException("UID " + uid + " is not found in the schema " + schema, 2241);
            }
            cols.add(index);
        }
        return cols;
    }
    public OperatorPlan reportChanges() {
        return ;
    }
    // Visitor to calculate the input and output uids for each operator
    // It doesn't change the plan, only put calculated info as annotations
    // The input and output uids are not necessarily the top level uids of
    // a schema. They may be the uids of lower level fields of complex fields
    // that have their own schema.
    static private class ColumnDependencyVisitor extends LogicalRelationalNodesVisitor {
        public ColumnDependencyVisitor(OperatorPlan planthrows FrontendException {
            super(plannew ReverseDependencyOrderWalker(plan));
        }
        @Override
        public void visit(LOLoad loadthrows FrontendException {
            Set<Longoutput = setOutputUids(load);
            // for load, input uids are same as output uids
            load.annotate(output);
        }
        @Override
        public void visit(LOFilter filterthrows FrontendException {
            Set<Longoutput = setOutputUids(filter);
            // the input uids contains all the output uids and
            // projections in filter conditions
            Set<Longinput = new HashSet<Long>(output);
            LogicalExpressionPlan exp = filter.getFilterPlan();
            collectUids(filterexpinput);
            filter.annotate(input);
        }
        @Override
        public void visit(LOStore storethrows FrontendException {
            Set<Longoutput = setOutputUids(store);
            if (output.isEmpty()) {
                // to deal with load-store-load-store case
                LogicalSchema s = store.getSchema();
                if (s == null) {
                    throw new SchemaNotDefinedException("Schema for " + store.getName() + " is not defined.");
                }
                for(int i=0; i<s.size(); i++) {
                    output.add(s.getField(i).);
                }
            }
            // for store, input uids are same as output uids
            store.annotate(output);
        }
        @Override
        public void visit(LOJoin jointhrows FrontendException {
            Set<Longoutput = setOutputUids(join);
            // the input uids contains all the output uids and
            // projections in join expressions
            Set<Longinput = new HashSet<Long>(output);
            Collection<LogicalExpressionPlanexps = join.getExpressionPlanValues();
            Iterator<LogicalExpressionPlaniter = exps.iterator();
            while(iter.hasNext()) {
                LogicalExpressionPlan exp = iter.next();
                collectUids(joinexpinput);
            }
            join.annotate(input);
        }
        @Override
        public void visit(LOCogroup cgthrows FrontendException {
            Set<Longoutput = setOutputUids(cg);
            // the input uids contains all the output uids and
            // projections in join expressions
            Set<Longinput = new HashSet<Long>();
            // Add all the uids required for doing cogroup. As in all the
            // keys on which the cogroup is done.
            forLogicalExpressionPlan plan : cg.getExpressionPlans().values() ) {
                collectUids(cgplaninput);
            }
            // Now check for the case where the output uid is a generated one
            // If that is the case we need to add the uids which generated it in
            // the input
            long firstUid=-1;
            Map<Integer,LonggeneratedInputUids = cg.getGeneratedInputUids();
            forMap.Entry<IntegerLongentry : generatedInputUids.entrySet() ) {
                Long uid = entry.getValue();
                LogicalRelationalOperator pred =
                    (LogicalRelationalOperatorcg.getPlan().getPredecessors(cg).get(entry.getKey());
                ifoutput.contains(uid) ) {
                    // Hence we need to all the full schema of the bag
                    input.addAllgetAllUidspred.getSchema() ) );
                }
                if (pred.getSchema()!=null)
                    firstUid = pred.getSchema().getField(0).;
            }
            if (input.isEmpty() && firstUid!=-1) {
                input.add(firstUid);
            }
            cg.annotate(input);
        }
        @Override
        public void visit(LOLimit limitthrows FrontendException {
            Set<Longoutput = setOutputUids(limit);
            // the input uids contains all the output uids and
            // projections in limit expression
            Set<Longinput = new HashSet<Long>(output);
            LogicalExpressionPlan exp = limit.getLimitPlan();
            if (exp != null)
                collectUids(limitexpinput);
            limit.annotate(input);
        }
        @Override
        public void visit(LOStream streamthrows FrontendException {
            // output is not used, setOutputUids is used to check if it has output schema
            Set<Longoutput = setOutputUids(stream);
            // Every field is required
            LogicalRelationalOperator pred = (LogicalRelationalOperator).getPredecessors(stream).get(0);
            Set<Longinput = getAllUids(pred.getSchema());
            stream.annotate(input);
        }
        @Override
        public void visit(LODistinct distinctthrows FrontendException {
            setOutputUids(distinct);
            
            Set<Longinput = new HashSet<Long>();
            // Every field is required
            LogicalSchema s = distinct.getSchema();
            if (s == null) {
                throw new SchemaNotDefinedException("Schema for " + distinct.getName() + " is not defined.");
            }
            for(int i=0; i<s.size(); i++) {
                input.add(s.getField(i).);
            }
            distinct.annotate(input);
        }
        @Override
        public void visit(LOCross crossthrows FrontendException {
            Set<Longoutput = setOutputUids(cross);
            // Since we do not change the topology of the plan, we keep
            // at least one input for each predecessor.
            List<Operatorpreds = .getPredecessors(cross);
            for (Operator pred : preds) {
                LogicalSchema schema = ((LogicalRelationalOperator)pred).getSchema();
                Set<Longuids = getAllUids(schema);
                boolean allPruned = true;
                for (Long uid : uids) {
                    if (output.contains(uid))
                        allPruned = false;
                }
                if (allPruned)
                    output.add(schema.getField(0).);
            }
            cross.annotate(output);
        }
        @Override
        public void visit(LOUnion unionthrows FrontendException {
            Set<Longoutput = setOutputUids(union);
            Set<Longinput = new HashSet<Long>();
            for (long uid : output) {
                input.addAll(union.getInputUids(uid));
            }
            union.annotate(input);
        }
        @Override
        public void visit(LOSplit splitthrows FrontendException {
            Set<Longoutput = setOutputUids(split);
            split.annotate(output);
        }
        @Override
        public void visit(LOSplitOutput splitOutputthrows FrontendException {
            Set<Longoutput = setOutputUids(splitOutput);
            // the input uids contains all the output uids and
            // projections in splitOutput conditions
            Set<Longinput = new HashSet<Long>();
            for (long uid : output) {
                input.add(splitOutput.getInputUids(uid));
            }
            LogicalExpressionPlan exp = splitOutput.getFilterPlan();
            collectUids(splitOutputexpinput);
            splitOutput.annotate(input);
        }
        @Override
        public void visit(LOSort sortthrows FrontendException {
            Set<Longoutput = setOutputUids(sort);
            Set<Longinput = new HashSet<Long>(output);
            for (LogicalExpressionPlan exp : sort.getSortColPlans()) {
                collectUids(sortexpinput);
            }
            sort.annotate(input);
        }
        @Override
        public void visit(LORank rankthrows FrontendException {
            Set<Longoutput = setOutputUids(rank);
            Set<Longinput = new HashSet<Long>(output);
            for (LogicalExpressionPlan exp : rank.getRankColPlans()) {
                collectUids(rankexpinput);
            }
            rank.annotate(input);
        }
        /*
         * This function returns all uids present in the given schema
         */
        private Set<LonggetAllUidsLogicalSchema schema ) {
            Set<Longuids = new HashSet<Long>();
            ifschema == null ) {
                return uids;
            }
            forLogicalFieldSchema field : schema.getFields() ) {
                if( ( field.type == . || field.type == . )
                        && field.schema != null ) {
                   uids.addAllgetAllUidsfield.schema ) );
                }
                uids.addfield.uid );
            }
            return uids;
        }
        @SuppressWarnings("unchecked")
        @Override
        public void visit(LOForEach foreachthrows FrontendException {
            Set<Longoutput = setOutputUids(foreach);
            LOGenerate gen = OptimizerUtils.findGenerate(foreach);
            gen.annotate(output);
            visit(gen);
            Set<Longinput = (Set<Long>)gen.getAnnotation();
            // Make sure at least one column will retain
            if (input.isEmpty()) {
                LogicalRelationalOperator pred = (LogicalRelationalOperator).getPredecessors(foreach).get(0);
                if (pred.getSchema()!=null)
                    input.add(pred.getSchema().getField(0).);
            }
            foreach.annotate(input);
        }
        @Override
        @SuppressWarnings("unchecked")
        public void visit(LOGenerate genthrows FrontendException {
             Set<Longoutput = (Set<Long>)gen.getAnnotation();
             Set<Longinput = new HashSet<Long>();
             List<LogicalExpressionPlanll = gen.getOutputPlans();
             Iterator<Longiter = output.iterator();
             while(iter.hasNext()) {
                 long uid = iter.next();
                 for(int i=0; i<ll.size(); i++) {
                     LogicalExpressionPlan exp = ll.get(i);
                     boolean found = false;
                     LogicalSchema planSchema = gen.getOutputPlanSchemas().get(i);
                     for (LogicalFieldSchema fs : planSchema.getFields()) {
                         if (fs.uid == uid) {
                             found = true;
                             break;
                         }
                     }
                     if (found) {
                         List<Operatorsrcs = exp.getSinks();
                         for (Operator src : srcs) {
                             if (src instanceof ProjectExpression) {
                                 List<LOInnerLoadinnerLoads = LOForEach.findReacheableInnerLoadFromBoundaryProject((ProjectExpression)src).;
                                 for (LOInnerLoad innerLoad : innerLoads) {
                                     ProjectExpression prj = innerLoad.getProjection();
                                     if (prj.isProjectStar()) {
                                         if (prj.findReferent().getSchema()!=null) {
                                             for (LogicalSchema.LogicalFieldSchema fs : prj.findReferent().getSchema().getFields()) {
                                                 input.add(fs.uid);
                                             }
                                         }
                                     }
                                     else {
                                         if (prj.findReferent().getSchema()!=null) {
                                             LogicalSchema.LogicalFieldSchema fs = prj.findReferent().getSchema().getField(prj.getColNum()); 
                                             input.add(fs.uid);
                                         }
                                     }
                                 }
                             }
                         }
                     }
                 }
             }
             // for the flatten bag, we need to make sure at least one field is in the input
             for(int i=0; i<ll.size(); i++) {
                 if (!gen.getFlattenFlags()[i]) {
                     continue;
                 }
                 LogicalExpressionPlan exp = ll.get(i);
                 LogicalExpression sink = (LogicalExpression)exp.getSources().get(0);
                 if (sink.getFieldSchema().!=. && sink.getFieldSchema().!=.)
                     continue;
                 List<Operatorsrcs = exp.getSinks();
                 for (Operator src : srcs) {
                     if (!(src instanceof ProjectExpression))
                         continue;
                     List<LOInnerLoadinnerLoads = LOForEach.findReacheableInnerLoadFromBoundaryProject((ProjectExpression)src).;
                     for (LOInnerLoad innerLoad : innerLoads) {
                         ProjectExpression prj = innerLoad.getProjection();
                         if (prj.isProjectStar()) {
                             if (prj.findReferent().getSchema()!=null) {
                                 for (LogicalSchema.LogicalFieldSchema fs : prj.findReferent().getSchema().getFields()) {
                                     input.add(fs.uid);
                                 }
                             }
                         }
                         else {
                             if (prj.findReferent().getSchema()!=null) {
                                 LogicalSchema.LogicalFieldSchema fs = prj.findReferent().getSchema().getField(prj.getColNum());
                                 input.add(fs.uid);
                             }
                         }
                     }
                 }
             }
             gen.annotate(input);
        }
        @Override
        public void visit(LOInnerLoad loadthrows FrontendException {
            Set<Longoutput = setOutputUids(load);
            load.annotate(output);
        }
        private void collectUids(LogicalRelationalOperator currentOpLogicalExpressionPlan expSet<Longuidsthrows FrontendException {
            List<Operatorll = exp.getSinks();
            for(Operator opll) {
                if (op instanceof ProjectExpression) {
                    if (!((ProjectExpression)op).isRangeOrStarProject()) {
                        long uid = ((ProjectExpression)op).getFieldSchema().;
                        uids.add(uid);
                    } else {
                        LogicalRelationalOperator ref = ((ProjectExpression)op).findReferent();
                        LogicalSchema s = ref.getSchema();
                        if (s == null) {
                            throw new SchemaNotDefinedException("Schema not defined for " + ref.getAlias());
                        }
                        for(LogicalFieldSchema fs.getFields()) {
                            uids.add(f.uid);
                        }
                    }
                }
            }
        }
        @SuppressWarnings("unchecked")
        // Get output uid from output schema. If output schema does not exist,
        // throw exception
        private Set<LongsetOutputUids(LogicalRelationalOperator opthrows FrontendException {
            List<Operatorll = .getSuccessors(op);
            Set<Longuids = new HashSet<Long>();
            LogicalSchema s = op.getSchema();
            if (s == null) {
                throw new SchemaNotDefinedException("Schema for " + op.getName() + " is not defined.");
            }
            if (ll != null) {
                // if this is not sink, the output uids are union of input uids of its successors
                for(Operator succll) {
                    Set<LonginputUids = (Set<Long>)succ.getAnnotation();
                    if (inputUids != null) {
                        Iterator<Longiter = inputUids.iterator();
                        while(iter.hasNext()) {
                            long uid = iter.next();
                            if (s.findField(uid) != -1) {
                                uids.add(uid);
                            }
                        }
                    }
                }
            } else {
                // if  it's leaf, set to its schema
                for(int i=0; i<s.size(); i++) {
                    uids.add(s.getField(i).);
                }
            }
            op.annotate(uids);
            return uids;
        }
    }
New to GrepCode? Check out our FAQ X