Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.List;
 
The MapReduce Split operator.

The assumption here is that the logical to physical translation will create this dummy operator with just the filename using which the input branch will be stored and used for loading Also the translation should make sure that appropriate filter operators are configured as outputs of this operator using the conditions specified in the LOSplit. So LOSplit will be converted into: | | | Filter1 Filter2 ... Filter3 | | ... | | | ... | ---- POSplit -... ---- This is different than the existing implementation where the POSplit writes to sidefiles after filtering and then loads the appropriate file.

The approach followed here is as good as the old approach if not better in many cases because of the availability of attachinInputs. An optimization that can ensue is if there are multiple loads that load the same file, they can be merged into one and then the operators that take input from the load can be stored. This can be used when the mapPlan executes to read the file only once and attach the resulting tuple as inputs to all the operators that take input from this load. In some cases where the conditions are exclusive and some outputs are ignored, this approach can be worse. But this leads to easier management of the Split and also allows to reuse this data stored from the split job whenever necessary.

 
 public class POSplit extends PhysicalOperator {
 
     private static final long serialVersionUID = 1L;
     
     /*
      * The filespec that is used to store and load the output of the split job
      * which is the job containing the split
      */
     private FileSpec splitStore;
        
     /*
      * The list of sub-plans the inner plan is composed of
      */
     private List<PhysicalPlanmyPlans = new ArrayList<PhysicalPlan>();
     
     private BitSet processedSet = new BitSet();
     
     private static Result empty = new Result(.null);
     
     private boolean inpEOP = false;
    
    
Constructs an operator with the specified key

Parameters:
k the operator key
    public POSplit(OperatorKey k) {
        this(k,-1,null);
    }

    
Constructs an operator with the specified key and degree of parallelism

Parameters:
k the operator key
rp the degree of parallelism requested
    public POSplit(OperatorKey kint rp) {
        this(k,rp,null);
    }

    
Constructs an operator with the specified key and inputs

Parameters:
k the operator key
inp the inputs that this operator will read data from
    public POSplit(OperatorKey kList<PhysicalOperatorinp) {
        this(k,-1,inp);
    }

    
Constructs an operator with the specified key, degree of parallelism and inputs

Parameters:
k the operator key
rp the degree of parallelism requested
inp the inputs that this operator will read data from
    public POSplit(OperatorKey kint rpList<PhysicalOperatorinp) {
        super(krpinp);
    }
    @Override
    public void visit(PhyPlanVisitor vthrows VisitorException {
        v.visitSplit(this);
    }
    @Override
    public String name() {
        return getAliasString() + "Split - " + .toString();
    }
    @Override
    public boolean supportsMultipleInputs() {
        return false;
    }
    @Override
    public boolean supportsMultipleOutputs() {
        return true;
    }

    
Returns the name of the file associated with this operator

Returns:
the FileSpec associated with this operator
    public FileSpec getSplitStore() {
        return ;
    }

    
Sets the name of the file associated with this operator

Parameters:
splitStore the FileSpec used to store the data
    public void setSplitStore(FileSpec splitStore) {
        this. = splitStore;
    }

    
Returns the list of nested plans.

    public List<PhysicalPlangetPlans() {
        return ;
    }
    
    
Appends the specified plan to the end of the nested input plan list

Parameters:
inPlan plan to be appended to the list
    public void addPlan(PhysicalPlan inPlan) {        
        .add(inPlan);
        .set(.size()-1);
    }

    
Removes plan from the nested input plan list

Parameters:
plan plan to be removed
    public void removePlan(PhysicalPlan plan) {
        .remove(plan);
        .clear(.size());
    }
   
    @Override
    public Result getNextTuple() throws ExecException {
        if (this..) {
            
            return getStreamCloseResult();         
        
        } 
        
        if (.cardinality() == .size()) {
            
            Result inp = processInput();
            
            if (inp.returnStatus == .
                || inp.returnStatus == . ) {
                return inp;
            }
         
            Tuple tuple = (Tuple)inp.result;
            for (PhysicalPlan pl : ) {
                pl.attachInput(tuple);
            }
            
            .clear();
        }
        
        return processPlan();                                       
    }
    private Result processPlan() throws ExecException {
   
        int idx = .nextClearBit(0);
        PhysicalOperator leaf = .get(idx).getLeaves().get(0);
        
        Result res = runPipeline(leaf);
        
        if (res.returnStatus == .) {
            .set(idx++);        
            if (idx < .size()) {
                res = processPlan();
            }
        }
        
        return (res.returnStatus == .) ? res : ;
    }
    
    private Result runPipeline(PhysicalOperator leafthrows ExecException {
       
        Result res = null;
        
        while (true) {
            
            res = leaf.getNextTuple();
            
            if (res.returnStatus == .) {                
                break;
            } else if (res.returnStatus == .) {
                continue;
            } else if (res.returnStatus == .) {
                break;
            } else if (res.returnStatus == .) {
                break;
            }
        }   
        
        return res;
    }
    
    private Result getStreamCloseResult() throws ExecException {
        Result res = null;
        
        while (true) {
            
            if (.cardinality() == .size()) {
                Result inp = processInput();
                if (inp.returnStatus == .) {                
                    Tuple tuple = (Tuple)inp.result;
                    for (PhysicalPlan pl : ) {
                        pl.attachInput(tuple);
                    }
                     = false;
                } else if (inp.returnStatus == .){
                     = true;
                } else if (inp.returnStatus == .) {
                     = false;
                } else if (inp.returnStatus == .) {
                    return inp;
                }
                .clear();
            } 
            
            int idx = .nextClearBit(0);
            if ( ) {
                .get(idx). = true;
            }
            PhysicalOperator leaf = .get(idx).getLeaves().get(0);
            
            res = leaf.getNextTuple();
           
            if (res.returnStatus == .)  {
                .set(idx++);        
                if (idx < .size()) {
                    continue;
                }
            } else {
                break;
            }
            
            if (! && res.returnStatus == .) {                   
                continue;
            } else {
                break;
            }
        }
        
        return res;
                
    }
    
    @Override
    public Tuple illustratorMarkup(Object inObject outint eqClassIndex) {
      // no op  
      return null;
    }
        
New to GrepCode? Check out our FAQ X