Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.List;
 
This implementation is applicable for both the physical plan and for the local backend, as the conversion of physical to mapreduce would see the SORT operator and take necessary steps to convert it to a quantile and a sort job. This is a blocking operator. The sortedDataBag accumulates Tuples and sorts them only when there an iterator is started. So all the tuples from the input operator should be accumulated and filled into the dataBag. The attachInput method is not applicable here.
 
 
 //We intentionally skip type checking in backend for performance reasons
 @SuppressWarnings("unchecked")
 public class POSort extends PhysicalOperator {
     private static final Log log = LogFactory.getLog(POSort.class);

 
     private static final long serialVersionUID = 1L;
     //private List<Integer> mSortCols;
 	private List<ByteExprOutputTypes;
 	private List<BooleanmAscCols;
 
 	private boolean inputsAccumulated = false;
 	private long limit;
 	public boolean isUDFComparatorUsed = false;
 	private DataBag sortedBag;
 	transient Iterator<Tupleit;
 
 	public POSort(
             OperatorKey k,
             int rp,
             List inp,
             List<PhysicalPlansortPlans,
 			List<BooleanmAscCols,
             POUserComparisonFunc mSortFunc) {
 		super(krpinp);
 		//this.mSortCols = mSortCols;
 		this. = sortPlans;
 		this. = mAscCols;
         this. = -1;
 		this. = mSortFunc;
 		if (mSortFunc == null) {
              = new SortComparator();
 			/*sortedBag = BagFactory.getInstance().newSortedBag(
 					new SortComparator());*/
 			 = new ArrayList<Byte>(sortPlans.size());
			for(PhysicalPlan plan : sortPlans) {
			}
else {
			/*sortedBag = BagFactory.getInstance().newSortedBag(
					new UDFSortComparator());*/
             = new UDFSortComparator();
		}
	}
	public POSort(OperatorKey kint rpList inp) {
		super(krpinp);
	}
	public POSort(OperatorKey kint rp) {
		super(krp);
	}
	public POSort(OperatorKey kList inp) {
		super(kinp);
	}
	public POSort(OperatorKey k) {
		super(k);
	}
	public class SortComparator implements Comparator<Tuple>,Serializable {
        private static final long serialVersionUID = 1L;
        @Override
        public int compare(Tuple o1Tuple o2) {
			int count = 0;
			int ret = 0;
			if( == null || .size() == 0) {
                return 0;
            }
			for(PhysicalPlan plan : ) {
				try {
					plan.attachInput(o1);
					Result res1 = getResult(plan.get(count));
					plan.attachInput(o2);
					Result res2 = getResult(plan.get(count));
					if(res1.returnStatus != . || res2.returnStatus != .) {
						.error("Error processing the input in the expression plan : " + plan.toString());
else {
						if(.get(count++)) {
							ret = DataType.compare(res1.resultres2.result);
                            // If they are not equal, return
                            // Otherwise, keep comparing the next one
                            if (ret != 0) {
                                return ret ;
                            }
                        }
                        else {
                            ret = DataType.compare(res2.resultres1.result);
                            if (ret != 0) {
                                return ret ;
                            }
                        }
					}
catch (ExecException e) {
					.error("Invalid result while executing the expression plan : " + plan.toString() + "\n" + e.getMessage());
				}
			}
			return ret;
		}
		private Result getResult(PhysicalPlan planbyte resultTypethrows ExecException {
			Result res = null;
			switch (resultType) {
            case .:
            case .:
            case .:
            case .:
            case .:
            case .:
            case .:
            case .:
            case .:
            case .:
            case .:
                res = Op.getNext(resultType);
                break;
            default: {
                int errCode = 2082;
                String msg = "Did not expect result of type: " +
                        DataType.findTypeName(resultType);
                    throw new ExecException(msgerrCode.);
            }
            }
			return res;
		}
	}
	public class UDFSortComparator implements Comparator<Tuple>,Serializable {

        private static final long serialVersionUID = 1L;
        @Override
        public int compare(Tuple t1Tuple t2) {
			Integer i = null;
			Result res = null;
			try {
catch (ExecException e) {
				.error("Input not ready. Error on reading from input. "
e.getMessage());
			}
			if (res != null) {
                return (Integerres.result;
            } else {
                return 0;
            }
		}
	}
    @Override
    public String name() {
        return getAliasString() + "POSort" + "["
                + DataType.findTypeName() + "]" + "("
                + ( != null ? .getFuncSpec() : "") + ")"
                + " - " + .toString();
    }
	public boolean isBlocking() {
		return true;
	}
	public Result getNextTuple() throws ExecException {
		Result res = new Result();
			res = processInput();
			// by default, we create InternalSortedBag, unless user configures
			// explicitly to use old bag
			String bagType = null;
	        if (..get() != null) {
	   			bagType = ..get().get("pig.cachedbag.sort.type");
	   	    }
            if (bagType != null && bagType.equalsIgnoreCase("default")) {
            	 = BagFactory.getInstance().newSortedBag();
       	    } else {
    	    	 = new InternalSortedBag(3, );
    	    }
			while (res.returnStatus != .) {
				if (res.returnStatus == .) {
					.error("Error in reading from the inputs");
					return res;
					//continue;
else if (res.returnStatus == .) {
                    // ignore the null, read the next tuple.
                    res = processInput();
					continue;
				}
				.add((Tupleres.result);
				res = processInput();
			}
		}
		if ( == null) {
             = .iterator();
        }
        if (.hasNext()) {
            res.result = .next();
            illustratorMarkup(res.resultres.result, 0);
            res.returnStatus = .;
        } else {
            res.returnStatus = .;
            reset();
        }
		return res;
	}
	public boolean supportsMultipleInputs() {
		return false;
	}
	public boolean supportsMultipleOutputs() {
		return false;
	}
	public void visit(PhyPlanVisitor vthrows VisitorException {
		v.visitSort(this);
	}
    @Override
    public void reset() {
         = false;
         = null;
         = null;
    }
    public List<PhysicalPlangetSortPlans() {
        return ;
    }
    public void setSortPlans(List<PhysicalPlansortPlans) {
        this. = sortPlans;
    }
        return ;
    }
    public void setMSortFunc(POUserComparisonFunc sortFunc) {
         = sortFunc;
    }
    public List<BooleangetMAscCols() {
        return ;
    }
    public void setLimit(long l)
    {
    	 = l;
    }
    public long getLimit()
    {
    	return ;
    }
    public boolean isLimited()
    {
    	return (!=-1);
    }
    @Override
    public POSort clone() throws CloneNotSupportedException {
        List<PhysicalPlanclonePlans = new
            ArrayList<PhysicalPlan>(.size());
        for (PhysicalPlan plan : ) {
            clonePlans.add(plan.clone());
        }
        List<BooleancloneAsc = new ArrayList<Boolean>(.size());
        for (Boolean b : ) {
            cloneAsc.add(b);
        }
        POUserComparisonFunc cloneFunc = null;
        if ( != null) {
            cloneFunc = .clone();
        }
        // Don't set inputs as PhysicalPlan.clone will take care of that
        return new POSort(new OperatorKey(.,
            NodeIdGenerator.getGenerator().getNextNodeId(.)),
            nullclonePlanscloneAsccloneFunc);
    }
    public Tuple illustratorMarkup(Object inObject outint eqClassIndex) {
        if( != null) {
          .getEquivalenceClasses().get(eqClassIndex).add((Tuplein);
            .addData((Tupleout);
        }
        return (Tupleout;
    }
New to GrepCode? Check out our FAQ X