Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 
 package org.apache.pig.newplan.logical.rules;
 
 import java.util.List;
 import java.util.Map;
 
 
 public class PartitionFilterOptimizer extends Rule {
     private String[] partitionKeys;
    
    
a reference to the LoadMetada implementation
 
     private LoadMetadata loadMetadata;

    
a reference to the LoadFunc implementation
 
     private LoadFunc loadFunc;
     
     private LOLoad loLoad;
     private LOFilter loFilter;
    
    
a map between column names as reported in org.apache.pig.LoadMetadata.getSchema(java.lang.String,org.apache.hadoop.mapreduce.Job) and as present in org.apache.pig.newplan.logical.relational.LOLoad.getSchema(). The two will be different when the user has provided a schema in the load statement
 
     private Map<StringStringcolNameMap = new HashMap<StringString>();
    
    
a map between column nameas as present in org.apache.pig.newplan.logical.relational.LOLoad.getSchema() and as reported in org.apache.pig.LoadMetadata.getSchema(java.lang.String,org.apache.hadoop.mapreduce.Job). The two will be different when the user has provided a schema in the load statement.
 
     private Map<StringStringreverseColNameMap = new HashMap<StringString>();
 
     public PartitionFilterOptimizer(String name) {
         supernamefalse );
     }
 
     @Override
     protected OperatorPlan buildPattern() {
         // match each foreach.
         LogicalPlan plan = new LogicalPlan();
         LogicalRelationalOperator load = new LOLoad (nullplan);
         plan.addload );
 //        LogicalRelationalOperator filter = new LOFilter( plan );
 //        plan.add( filter );
 //        plan.connect( load, filter );
         return plan;
     }
 
     @Override
     public Transformer getNewTransformer() {
         if(.equals("PartitionFilterOptimizer")) {
            return new PartitionFilterPushDownTransformer();
        } else {
            return new NewPartitionFilterPushDownTransformer();
        }
    }
        @Override
        public void transform(OperatorPlan matchedthrows FrontendException {
             = new OperatorSubPlan );
            setupColNameMaps();
            FilterExtractor filterFinder = new FilterExtractor(
                    .getFilterPlan(), getMappedKeys ) );
            filterFinder.visit();
            Expression partitionFilter = filterFinder.getPColCondition();
            if(partitionFilter != null) {
                // the column names in the filter may be the ones provided by
                // the user in the schema in the load statement - we may need
                // to replace them with partition column names as given by
                // LoadFunc.getSchema()
                updateMappedColNames(partitionFilter);
                try {
                    .setPartitionFilter(partitionFilter);
                } catch (IOException e) {
                    throw new FrontendExceptione );
                }
                if(filterFinder.isFilterRemovable()) {
                    .removeAndReconnect );
                } else {
                    .setFilterPlan(filterFinder.getFilteredPlan());
                }
            }
        }
    }
    public class PartitionFilterPushDownTransformer extends Transformer {
        protected OperatorSubPlan subPlan;
        @Override
        public boolean check(OperatorPlan matchedthrows FrontendException {
             = (LOLoad)matched.getSources().get(0);
            // Match filter.
            List<Operatorsucceds = .getSuccessors );
            ifsucceds == null || succeds.size() == 0 || !( succeds.get(0) instanceof LOFilter ) )
                return false;
             =  (LOFilter)succeds.get(0);
            
            // Filter has dependency other than load, skip optimization
            if (.getSoftLinkPredecessors()!=null)
                return false;
            
            // we have to check more only if LoadFunc implements LoadMetada
             = .getLoadFunc();
            if(!(  instanceof LoadMetadata ) ) {
                return false;
            }
            
             = (LoadMetadata);
            try {
catch (IOException e) {
				throw new FrontendExceptione );
			}
            if == null || . == 0 ) {
                return false;
            }
            
            return true;
        }
        @Override
        public OperatorPlan reportChanges() {
            return ;
        }
        @Override
        public void transform(OperatorPlan matchedthrows FrontendException {
        	 = new OperatorSubPlan );
        	setupColNameMaps();
        	
        	// PIG-1871: Don't throw exception if partition filters cannot be pushed up. 
        	// Perform transformation on a copy of the filter plan, and replace the 
        	// original filter plan only if the transformation is successful 
        	// (i.e. partition filter can be pushed down) 
        	LogicalExpressionPlan filterExpr = .getFilterPlan();
        	LogicalExpressionPlan filterExprCopy = filterExpr.deepCopy();
        	
        	PColFilterExtractor pColFilterFinder = new PColFilterExtractor(
        	        filterExprCopygetMappedKeys ) );
        	pColFilterFinder.visit();
        	Expression partitionFilter = pColFilterFinder.getPColCondition();
        	
        	if(partitionFilter != null) {
        		// the column names in the filter may be the ones provided by
        		// the user in the schema in the load statement - we may need
        		// to replace them with partition column names as given by
        		// LoadFunc.getSchema()
        		updateMappedColNames(partitionFilter);
        		try {
catch (IOException e) {
					throw new FrontendExceptione );
				}
        		if(pColFilterFinder.isFilterRemovable()) {  
        		} else {
                    .setFilterPlan(filterExprCopy);
                }
            }
        }
        
        protected void updateMappedColNames(Expression expr) {
            if(expr instanceof BinaryExpression) {
                updateMappedColNames(((BinaryExpressionexpr).getLhs());
                updateMappedColNames(((BinaryExpressionexpr).getRhs());
            } else if (expr instanceof Column) {
                Column col = (Columnexpr;
                col.setName(.get(col.getName()));
            }
        }

        
The partition keys in the argument are as reported by org.apache.pig.LoadMetadata.getPartitionKeys(java.lang.String,org.apache.hadoop.mapreduce.Job). The user may have renamed these by providing a schema with different names in the load statement - this method will replace the former names with the latter names.

Parameters:
partitionKeys
Returns:
        protected List<StringgetMappedKeys(String[] partitionKeys) {
            List<StringmappedKeys = new ArrayList<String>(partitionKeys.length);
            for (int i = 0; i < partitionKeys.lengthi++) {
                mappedKeys.add(.get(partitionKeys[i]));
            }
            return mappedKeys;
        }
        protected void setupColNameMaps() throws FrontendException {
            LogicalSchema loLoadSchema = .getSchema();
            LogicalSchema loadFuncSchema = .getDeterminedSchema();
             for(int i = 0; i < loadFuncSchema.size(); i++) {
                .put(loadFuncSchema.getField(i).,
                        (i < loLoadSchema.size() ? loLoadSchema.getField(i). :
                            loadFuncSchema.getField(i).));
                
                .put((i < loLoadSchema.size() ? loLoadSchema.getField(i). :
                            loadFuncSchema.getField(i).), 
                            loadFuncSchema.getField(i).);
            }
        }
    }
New to GrepCode? Check out our FAQ X