Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.List;
 
The union operator that combines the two inputs into a single stream. Note that this doesn't eliminate duplicate tuples. The Operator will also be added to every map plan which processes more than one input. This just pulls out data from the piepline using the proposed single threaded shared execution model. By shared execution I mean, one input to the Union operator is called once and the execution moves to the next non-drained input till all the inputs are drained.
 
 public class POUnion extends PhysicalOperator {
    
 
     private static final long serialVersionUID = 1L;
 
     //Used for efficiently shifting between non-drained
     //inputs
     BitSet done;
 
     boolean nextReturnEOP = false ;
     private static Result eopResult = new Result(.null) ;
     
     //The index of the last input that was read
     int lastInd = 0;
 
     public POUnion(OperatorKey k) {
         this(k, -1, null);
     }
 
     public POUnion(OperatorKey kint rp) {
         this(krpnull);
     }
 
     public POUnion(OperatorKey kList<PhysicalOperatorinp) {
         this(k, -1, inp);
     }
 
     public POUnion(OperatorKey kint rpList<PhysicalOperatorinp) {
         super(krpinp);
     }
 
     @Override
     public void setInputs(List<PhysicalOperatorinputs) {
         super.setInputs(inputs);
         if (inputs != null) {
              = new BitSet(inputs.size());
         }
         else {
              = new BitSet(0) ;
         }
     }
 
     @Override
     public void visit(PhyPlanVisitor vthrows VisitorException {
         v.visitUnion(this);
     }
 
     @Override
     public String name() {
         return getAliasString() + "Union" + "[" + DataType.findTypeName()
                 + "]" + " - " + .toString();
     }
    @Override
    public boolean supportsMultipleInputs() {
        return true;
    }
    @Override
    public boolean supportsMultipleOutputs() {
        return false;
    }
    public void clearDone() {
        .clear();
    }

    
The code below, tries to follow our single threaded shared execution model with execution being passed around each non-drained input
    @Override
    public Result getNextTuple() throws ExecException {
        if () {
             = false ;
            return  ;
        }
        // Case 1 : Normal connected plan
        if (!isInputAttached()) {
            
            if ( == null || .size()==0) {
                // Neither does this Union have predecessors nor
                // was any input attached! This can happen when we have
                // a plan like below
                // POUnion
                // |
                // |--POLocalRearrange
                // |    |
                // |    |-POUnion (root 2)--> This union's getNext() can lead the code here
                // |
                // |--POLocalRearrange (root 1)
                
                // The inner POUnion above is a root in the plan which has 2 roots.
                // So these 2 roots would have input coming from different input
                // sources (dfs files). So certain maps would be working on input only
                // meant for "root 1" above and some maps would work on input
                // meant only for "root 2". In the former case, "root 2" would
                // neither get input attached to it nor does it have predecessors
                // which is the case which can lead us here.
                return ;
            }
          
            while(true){
                if (.nextClearBit(0) >= .size()) {
                    clearDone();
                    return  ;
                }
                if( >= .size() || .nextClearBit() >= .size())
                     = 0;
                int ind = .nextClearBit();
                Result res;
                while(true){
                    if(getReporter()!=null) {
                        getReporter().progress();
                    }
                    res = .get(ind).getNextTuple();
                     = ind + 1;
                    if(res.returnStatus == . || 
                            res.returnStatus == . || res.returnStatus == .) {
                        illustratorMarkup(res.resultres.resultind);
                        return res;
                    }
                    if (res.returnStatus == .) {
                        .set(ind);
                        break;
                    }
                }
            }
        }
        // Case 2 : Input directly injected
        else {
            . = ;
            . = .;
            detachInput();
             = true ;
            illustratorMarkup(.., 0);
            return ;
        }
    }
    
    @Override
    public Tuple illustratorMarkup(Object inObject outint eqClassIndex) {
        if( != null) {
            if (.getEquivalenceClasses() == null) {
                int size = ( == null ? 1 : .size());
                LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
                for (int i = 0; i < size; ++i) {
                    IdentityHashSet<TupleequivalenceClass = new IdentityHashSet<Tuple>();
                    equivalenceClasses.add(equivalenceClass);
                }
                .setEquivalenceClasses(equivalenceClassesthis);
            }
            ExampleTuple tIn = (ExampleTuplein;
            .getEquivalenceClasses().get(eqClassIndex).add(tIn);
            .addData((Tupleout);
        }
        return null;
    }
New to GrepCode? Check out our FAQ X