Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package org.apache.pig.newplan.logical.rules;
 
 import java.util.List;
 
 
 public class MergeFilter extends Rule {
 
     public MergeFilter(String n) {
         super(nfalse);       
     }
 
     @Override
     public Transformer getNewTransformer() {        
         return new MergeFilterTransformer();
     }
 
     public class MergeFilterTransformer extends Transformer {
 
         private OperatorSubPlan subPlan;
 
         @Override
         public boolean check(OperatorPlan matchedthrows FrontendException {           
             LOFilter filter = (LOFilter)matched.getSources().get(0);
             List<Operatorsucceds = .getSuccessors(filter);
             // if this filter is followed by another filter, we should combine them
             if (succeds != null && succeds.size() == 1) {
                 if (succeds.get(0) instanceof LOFilter) {
                     return true;
                 }
             }
             return false;
         }
 
         @Override
         public void transform(OperatorPlan matchedthrows FrontendException {     
              = new OperatorSubPlan();
             
             LOFilter filter = (LOFilter)matched.getSources().get(0);
 
             .add(filter);
             
             List<Operatorsucceds = .getSuccessors(filter);
             if (succeds != null && succeds.size()== 1 && (succeds.get(0) instanceof LOFilter)) {
                 LOFilter next = (LOFilter)succeds.get(0);
                 combineFilterCond(filternext);
                 
                 List<Operatorsuccs = .getSuccessors(next);
                 if (succs!=null && succs.size()>0) {
                     .add(succs.get(0));
                 }
 
                 // Since we remove next, we need to merge soft link into filter
                 List<OperatornextSoftPreds = null;
                 if (.getSoftLinkPredecessors(next)!=null) {
                     nextSoftPreds = new ArrayList<Operator>();
                     nextSoftPreds.addAll(.getSoftLinkPredecessors(next));
                 }
                 
                 if (nextSoftPreds!=null) {
                     for (Operator softPred : nextSoftPreds) {
                         .removeSoftLink(softPrednext);
                         .createSoftLink(softPredfilter);
                     }
                 }
                 .removeAndReconnect(next);
             }
            
            Iterator<Operatoriter = filter.getFilterPlan().getOperators();
            while (iter.hasNext()) {
                Operator oper = iter.next();
                if (oper instanceof ProjectExpression) {
                    ((ProjectExpression)oper).setAttachedRelationalOp(filter);
                }
            }
        }        
        
        @Override
        public OperatorPlan reportChanges() {          
            return ;
        }
        
        // combine the condition of two filters. The condition of second filter
        // is added into the condition of first filter with an AND operator.
        private void combineFilterCond(LOFilter f1LOFilter f2throws FrontendException {
            LogicalExpressionPlan p1 = f1.getFilterPlan();
            LogicalExpressionPlan p2 = f2.getFilterPlan();
            LogicalExpressionPlan andPlan = new LogicalExpressionPlan();
            
            // add existing operators          
            Iterator<Operatoriter = p1.getOperators();
            while(iter.hasNext()) {
                andPlan.add(iter.next());
            }
            
            iter = p2.getOperators();
            while(iter.hasNext()) {
                andPlan.add(iter.next());
            }
            
            // add all connections
            iter = p1.getOperators();
            while(iter.hasNext()) {
                Operator n = iter.next();
                List<Operatorl = p1.getPredecessors(n);
                if (l != null) {
                    for(Operator opl) {
                        andPlan.connect(opn);
                    }
                }
            }
            
            iter = p2.getOperators();
            while(iter.hasNext()) {
                Operator n = iter.next();
                List<Operatorl = p2.getPredecessors(n);
                if (l != null) {
                    for(Operator opl) {
                        andPlan.connect(opn);
                    }
                }
            }          
            
            // create an AND
            new AndExpression(andPlan, (LogicalExpression)p1.getSources().get(0), (LogicalExpression)p2.getSources().get(0));          
            
            f1.setFilterPlan(andPlan);
        }
    }
    @Override
    protected OperatorPlan buildPattern() {        
        // the pattern that this rule looks for
        // is filter operator
        LogicalPlan plan = new LogicalPlan();
        LogicalRelationalOperator op = new LOFilter(plan);
        plan.add(op);        
        
        return plan;
    }
New to GrepCode? Check out our FAQ X