Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.List;
 
This operator implements merge join algorithm to do map side joins. Currently, only two-way joins are supported. One input of join is identified as left and other is identified as right. Left input tuples are the input records in map. Right tuples are read from HDFS by opening right stream. This join doesn't support outer join. Data is assumed to be sorted in ascending order. It will fail if data is sorted in descending order.
 
 
 public class POMergeJoin extends PhysicalOperator {
     private static final Log log = LogFactory.getLog(POMergeJoin.class);
 
     private static final long serialVersionUID = 1L;
 
     // flag to indicate when getNext() is called first.
     private boolean firstTime = true;
 
     //The Local Rearrange operators modeling the join key
     private POLocalRearrange[] LRs;
 
     private transient LoadFunc rightLoader;
     private OperatorKey opKey;
 
     private Object prevLeftKey;
 
     private Result prevLeftInp;
 
     private Object prevRightKey = null;
 
     private Result prevRightInp;
 
     //boolean denoting whether we are generating joined tuples in this getNext() call or do we need to read in more data.
     private boolean doingJoin;
 
     private FuncSpec rightLoaderFuncSpec;
 
     private String rightInputFileName;
     
     private String indexFile;
 
     // Buffer to hold accumulated left tuples.
    private transient TuplesToSchemaTupleList leftTuples;
    private boolean noInnerPlanOnRightSide;
    private Object curJoinKey;
    private Tuple curJoiningRightTup;
    private int counter// # of tuples on left side with same key.
    private int leftTupSize = -1;
    private int rightTupSize = -1;
    private int arrayListSize = 1024;
    private LOJoin.JOINTYPE joinType;
    private String signature;
    // This serves as the default TupleFactory
    private transient TupleFactory mTupleFactory;

    
These TupleFactories are used for more efficient Tuple generation. This should decrease the amount of memory needed for a given map task to successfully perform a merge join.
    private transient TupleMaker mergedTupleMaker;
    private transient TupleMaker leftTupleMaker;
    private Schema leftInputSchema;
    private Schema mergedInputSchema;

    

Parameters:
k
rp
inp
inpPlans there can only be 2 inputs each being a List<PhysicalPlan> Ex. join A by ($0,$1), B by ($1,$2);
    public POMergeJoin(OperatorKey kint rpList<PhysicalOperatorinpMultiMap<PhysicalOperatorPhysicalPlaninpPlans,
            List<List<Byte>> keyTypesLOJoin.JOINTYPE joinTypeSchema leftInputSchemaSchema rightInputSchemaSchema mergedInputSchemathrows PlanException{
        super(krpinp);
        this. = k;
        this. = false;
        this. = inpPlans;
         = new POLocalRearrange[2];
        this.createJoinPlans(inpPlans,keyTypes);
        this. = null;
        this. = joinType;  
        this. = leftInputSchema;
        this. = mergedInputSchema;
    }

    
Configures the Local Rearrange operators to get keys out of tuple.

    private void createJoinPlans(MultiMap<PhysicalOperatorPhysicalPlaninpPlansList<List<Byte>> keyTypesthrows PlanException{
        int i=-1;
        for (PhysicalOperator inpPhyOp : inpPlans.keySet()) {
            ++i;
            POLocalRearrange lr = new POLocalRearrange(genKey());
            try {
                lr.setIndex(i);
            } catch (ExecException e) {
                throw new PlanException(e.getMessage(),e.getErrorCode(),e.getErrorSource(),e);
            }
            lr.setResultType(.);
            lr.setKeyType(keyTypes.get(i).size() > 1 ? . : keyTypes.get(i).get(0));
            lr.setPlans(inpPlans.get(inpPhyOp));
            [i]= lr;
        }
    }

    
This is a helper method that sets up all of the TupleFactory members.
    private void prepareTupleFactories() {
         = TupleFactory.getInstance();
        if ( != null) {
             = SchemaTupleBackend.newSchemaTupleFactory(false.);
        }
        if ( == null) {
            .debug("No SchemaTupleFactory available for combined left merge join schema: " + );
             = ;
        } else {
            .debug("Using SchemaTupleFactory for left merge join schema: " + );
        }
        if ( != null) {
             = SchemaTupleBackend.newSchemaTupleFactory(false.);
        }
        if ( == null) {
            .debug("No SchemaTupleFactory available for combined left/right merge join schema: " + );
             = ;
        } else {
            .debug("Using SchemaTupleFactory for left/right merge join schema: " + );
        }
    }

    
This provides a List to store Tuples in. The implementation of that list depends on whether or not there is a TupleFactory available.

Returns:
the list object to store Tuples in
    }

    
This is a class that extends ArrayList, making it easy to provide on the fly conversion from Tuple to SchemaTuple. This is necessary because we are not getting SchemaTuples from the source, though in the future that is what we would like to do.
    protected static class TuplesToSchemaTupleList {
        private List<Tupletuples;
        private SchemaTupleFactory tf;
        protected TuplesToSchemaTupleList(int ctTupleMaker<?> tf) {
             = new ArrayList<Tuple>(ct);
            if (tf instanceof SchemaTupleFactory) {
                this. = (SchemaTupleFactory)tf;
            }
        }
        public static SchemaTuple<?> convert(Tuple tSchemaTupleFactory tf) {
            if (t instanceof SchemaTuple<?>) {
                return (SchemaTuple<?>)t;
            }
            SchemaTuple<?> st = tf.newTuple();
            try {
                return st.set(t);
            } catch (ExecException e) {
                throw new RuntimeException("Unable to set SchemaTuple with schema ["
                        + st.getSchemaString() + "] with given Tuple in merge join.");
            }
        }
        public boolean add(Tuple t) {
            if ( != null) {
                t = convert(t);
            }
            return .add(t);
        }
        public Tuple get(int i) {
            return .get(i);
        }
        public int size() {
            return .size();
        }
        public List<TuplegetList() {
            return ;
        }
    }
    @SuppressWarnings("unchecked")
    @Override
    public Result getNextTuple() throws ExecException {
        Object curLeftKey;
        Result curLeftInp;
        if(){
            prepareTupleFactories();
             = newLeftTupleArray();
            // Do initial setup.
            curLeftInp = processInput();
            if(curLeftInp.returnStatus != .)
                return curLeftInp;       // Return because we want to fetch next left tuple.
            curLeftKey = extractKeysFromTuple(curLeftInp, 0);
            if(null == curLeftKey// We drop the tuples which have null keys.
                return new Result(.null);
            
            try {
                seekInRightStream(curLeftKey);
            } catch (IOException e) {
                throwProcessingException(truee);
            } catch (ClassCastException e) {
                throwProcessingException(truee);
            }
            .add((Tuple)curLeftInp.result);
             = false;
             = curLeftKey;
            return new Result(.null);
        }
        if(){
            // We matched on keys. Time to do the join.
            if( > 0){    // We have left tuples to join with current right tuple.
                Tuple joiningLeftTup = .get(--);
                 = joiningLeftTup.size();
                Tuple joinedTup = .newTuple( + );
                for(int i=0; i<i++) {
                    joinedTup.set(ijoiningLeftTup.get(i));
                }
                for(int i=0; i < i++) {
                    joinedTup.set(i+.get(i));
                }
                return new Result(.joinedTup);
            }
            // Join with current right input has ended. But bag of left tuples
            // may still join with next right tuple.
             = false;
            while(true){
                Result rightInp = getNextRightInp();
                if(rightInp.returnStatus != .){
                     = null;
                    return rightInp;
                }
                else{
                    Object rightKey = extractKeysFromTuple(rightInp, 1);
                    if(null == rightKey// If we see tuple having null keys in stream, we drop them 
                        continue;       // and fetch next tuple.
                    int cmpval = ((Comparable)rightKey).compareTo();
                    if (cmpval == 0){
                        // Matched the very next right tuple.
                         = (Tuple)rightInp.result;
                         = .size();
                         = .size();
                         = true;
                        return this.getNextTuple();
                    }
                    else if(cmpval > 0){    // We got ahead on right side. Store currently read right tuple.
                        if(!this..){
                             = rightKey;
                             = rightInp;
                            // There cant be any more join on this key.
                             = newLeftTupleArray();
                            .add((Tuple).);
                        }
                        else{           // This is end of all input and this is last join output.
                            // Right loader in this case wouldn't get a chance to close input stream. So, we close it ourself.
                            try {
                                ((IndexableLoadFunc)).close();
                            } catch (IOException e) {
                                // Non-fatal error. We can continue.
                                .error("Received exception while trying to close right side file: " + e.getMessage());
                            }
                        }
                        return new Result(.null);
                    }
                    else{   // At this point right side can't be behind.
                        int errCode = 1102;
                        String errMsg = "Data is not sorted on right side. Last two tuples encountered were: \n"+
                        "\n" + (Tuple)rightInp.result ;
                        throw new ExecException(errMsg,errCode);
                    }    
                }
            }
        }
        curLeftInp = processInput();
        switch(curLeftInp.returnStatus){
        case .:
            curLeftKey = extractKeysFromTuple(curLeftInp, 0);
            if(null == curLeftKey// We drop the tuples which have null keys.
                return new Result(.null);
            
            int cmpVal = ((Comparable)curLeftKey).compareTo();
            if(cmpVal == 0){
                // Keep on accumulating.
                .add((Tuple)curLeftInp.result);
                return new Result(.null);
            }
            else if(cmpVal > 0){ // Filled with left bag. Move on.
                 = ;
                break;   
            }
            else{   // Current key < Prev Key
                int errCode = 1102;
                String errMsg = "Data is not sorted on left side. Last two keys encountered were: \n"+
                "\n" + curLeftKey ;
                throw new ExecException(errMsg,errCode);
            }
 
        case .:
            if(this..){
                // We hit the end on left input. 
                // Tuples in bag may still possibly join with right side.
                 = ;
                curLeftKey = null;
                break;                
            }
            else    // Fetch next left input.
                return curLeftInp;
        default:    // If encountered with ERR / NULL on left side, we send it down.
            return curLeftInp;
        }
        if((null != ) && !this.. && ((Comparable)).compareTo(curLeftKey) >= 0){
            // This will happen when we accumulated inputs on left side and moved on, but are still behind the right side
            // In that case, throw away the tuples accumulated till now and add the one we read in this function call.
             = newLeftTupleArray();
            .add((Tuple)curLeftInp.result);
             = curLeftInp;
             = curLeftKey;
            return new Result(.null);
        }
        // Accumulated tuples with same key on left side.
        // But since we are reading ahead we still haven't checked the read ahead right tuple.
        // Accumulated left tuples may potentially join with that. So, lets check that first.
        
        if((null != ) && .equals()){
             = (Tuple).;
             = .size();
             = .size();
             = true;
             = curLeftInp;
             = curLeftKey;
            return this.getNextTuple();
        }
        // We will get here only when curLeftKey > prevRightKey
        boolean slidingToNextRecord = false;
        while(true){
            // Start moving on right stream to find the tuple whose key is same as with current left bag key.
            Result rightInp;
            if (slidingToNextRecord) {
                rightInp = getNextRightInp();
                slidingToNextRecord = false;
            } else
                rightInp = getNextRightInp();
                
            if(rightInp.returnStatus != .)
                return rightInp;
            Object extractedRightKey = extractKeysFromTuple(rightInp, 1);
            
            if(null == extractedRightKey// If we see tuple having null keys in stream, we drop them 
                continue;       // and fetch next tuple.
            
            Comparable rightKey = (Comparable)extractedRightKey;
            
            if != null && rightKey.compareTo() < 0){
                // Sanity check.
                int errCode = 1102;
                String errMsg = "Data is not sorted on right side. Last two keys encountered were: \n"+
                "\n" + rightKey ;
                throw new ExecException(errMsg,errCode);
            }
            int cmpval = rightKey.compareTo();
            if(cmpval < 0) {    // still behind the left side, do nothing, fetch next right tuple.
                slidingToNextRecord = true;
                continue;
            }
            else if (cmpval == 0){  // Found matching tuple. Time to do join.
                 = (Tuple)rightInp.result;
                 = .size();
                 = .size();
                 = true;
                 = curLeftInp;
                 = curLeftKey;
                return this.getNextTuple();
            }
            else{    // We got ahead on right side. Store currently read right tuple.
                 = rightKey;
                 = rightInp;
                // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call. 
                 = newLeftTupleArray();
                .add((Tuple)curLeftInp.result);
                 = curLeftInp;
                 = curLeftKey;
                if(this..){  // This is end of all input and this is last time we will read right input.
                    // Right loader in this case wouldn't get a chance to close input stream. So, we close it ourself.
                    try {
                        ((IndexableLoadFunc)).close();
                    } catch (IOException e) {
                     // Non-fatal error. We can continue.
                        .error("Received exception while trying to close right side file: " + e.getMessage());
                    }
                }
                return new Result(.null);
            }
        }
    }
    
    private void seekInRightStream(Object firstLeftKeythrows IOException{
        
        // check if hadoop distributed cache is used
        if ( != null &&  instanceof DefaultIndexableLoader) {
            DefaultIndexableLoader loader = (DefaultIndexableLoader);
            loader.setIndexFile();
        }
        
        // Pass signature of the loader to rightLoader
        // make a copy of the conf to use in calls to rightLoader.
        Job job = new Job(new Configuration(..get()));
                firstLeftKey instanceof Tuple ? (Tuple)firstLeftKey : .newTuple(firstLeftKey));
    }
    private Result getNextRightInp(Object leftKeythrows ExecException{
        /*
         * Only call seekNear if the merge join is 'merge-sparse'.  DefaultIndexableLoader does not
         * support more than a single call to seekNear per split - so don't call seekNear.
         */
    	if ( == ..) {
    		try {
    			((IndexableLoadFunc)).seekNear(leftKey instanceof Tuple ? (Tuple)leftKey : .newTuple(leftKey));
    			 = null;
    		} catch (IOException e) {
    			throwProcessingException(truee);
    		}
    	}
    	return this.getNextRightInp();
    }
    private Result getNextRightInp() throws ExecException{
        try {
            if(){
                Tuple t = .getNext();
                if(t == null) { // no more data on right side
                    return new Result(.null);
                } else {
                    return new Result(.t);
                }
            } else {
                Result res = .getNextTuple();
                .detachInput();
                switch(res.returnStatus){
                case .:
                    return res;
                case .:
                    Tuple t = .getNext();
                    if(t == null) { // no more data on right side
                        return new Result(.null);
                    } else {
                        // run the tuple through the pipeline
                        .attachInput(t);
                        return this.getNextRightInp();
                        
                    }
                    default// We don't deal with ERR/NULL. just pass them down
                        throwProcessingException(falsenull);
                        
                }
            }
        } catch (IOException e) {
            throwProcessingException(truee);
        }
        // we should never get here!
        return new Result(.null);
    }
    public void throwProcessingException (boolean withCauseExceptionException ethrows ExecException {
        int errCode = 2176;
        String errMsg = "Error processing right input during merge join";
        if(withCauseException) {
            throw new ExecException(errMsgerrCode.e);
        } else {
            throw new ExecException(errMsgerrCode.);
        }
    }
    private Object extractKeysFromTuple(Result inpint lrIdxthrows ExecException{
        //Separate Key & Value of input using corresponding LR operator
        POLocalRearrange lr = [lrIdx];
        lr.attachInput((Tuple)inp.result);
        Result lrOut = lr.getNextTuple();
        lr.detachInput();
        if(lrOut.returnStatus!=.){
            int errCode = 2167;
            String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly";
            throw new ExecException(errMsg,errCode,.);
        } 
          
        return ((TuplelrOut.result).get(1);
    }
    public void setupRightPipeline(PhysicalPlan rightPipelinethrows FrontendException{
        if(rightPipeline != null){
            if(rightPipeline.getLeaves().size() != 1 || rightPipeline.getRoots().size() != 1){
                int errCode = 2168;
                String errMsg = "Expected physical plan with exactly one root and one leaf.";
                throw new FrontendException(errMsg,errCode,.);
            }
             = false;
            this. = rightPipeline.getLeaves().get(0);
            this. = rightPipeline.getRoots().get(0);
            this..setInputs(null);            
        }
        else
             = true;
    }
        is.defaultReadObject();
         = TupleFactory.getInstance();
    }
    private OperatorKey genKey(){
        return new OperatorKey(.,NodeIdGenerator.getGenerator().getNextNodeId(.));
    }
    public void setRightLoaderFuncSpec(FuncSpec rightLoaderFuncSpec) {
        this. = rightLoaderFuncSpec;
    }
    public List<PhysicalPlangetInnerPlansOf(int index) {
        return .get(.get(index));
    }
    @Override
    public void visit(PhyPlanVisitor vthrows VisitorException {
        v.visitMergeJoin(this);
    }
    @Override
    public String name() {
        String name = getAliasString() + "MergeJoin";
        if (==..)
            name+="(sparse)";
        name+="[" + DataType.findTypeName() + "]" + " - " + .toString();
        return name;
    }
    @Override
    public boolean supportsMultipleInputs() {
        return true;
    }
    /* (non-Javadoc)
     * @see org.apache.pig.impl.plan.Operator#supportsMultipleOutputs()
     */
    @Override
    public boolean supportsMultipleOutputs() {
        return false;
    }
    
    

Parameters:
rightInputFileName the rightInputFileName to set
    public void setRightInputFileName(String rightInputFileName) {
        this. = rightInputFileName;
    }
    
    public String getSignature() {
        return ;
    }
    
    public void setSignature(String signature) {
        this. = signature;
    }
    public void setIndexFile(String indexFile) {
        this. = indexFile;
    }
    public String getIndexFile() {
        return ;
    }
    
    @Override
    public Tuple illustratorMarkup(Object inObject outint eqClassIndex) {
        return null;
    }
    
    public LOJoin.JOINTYPE getJoinType() {
        return ;
    }
New to GrepCode? Check out our FAQ X