Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2009-2013 by The Regents of the University of California
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * you may obtain a copy of the License from
   * 
   *     http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package edu.uci.ics.hivesterix.logical.plan.visitor;
 
 import java.util.List;
 
 import  org.apache.hadoop.hive.conf.HiveConf;
 import  org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import  org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import  org.apache.hadoop.hive.ql.exec.Operator;
 import  org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import  org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import  org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import  org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import  org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import  org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import  org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import  org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import  org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
 import  org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
 import  org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import  org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 
 @SuppressWarnings({ "rawtypes""unchecked" })
 public class GroupByVisitor extends DefaultVisitor {
 
     private List<IFunctionInfolocalAggs = new ArrayList<IFunctionInfo>();
     private boolean isDistinct = false;
     private boolean gbyKeyNotRedKey = false;
 
     @Override
     public Mutable<ILogicalOperatorvisit(GroupByOperator operator,
             Mutable<ILogicalOperatorAlgebricksParentOperatorRefTranslator tthrows AlgebricksException {
 
         // get descriptors
         GroupByDesc desc = (GroupByDesc) operator.getConf();
         GroupByDesc.Mode mode = desc.getMode();
 
         List<ExprNodeDesc> keys = desc.getKeys();
         List<AggregationDesc> aggregators = desc.getAggregators();
 
         Operator child = operator.getChildOperators().get(0);
 
         if (child.getType() == OperatorType.REDUCESINK) {
             List<ExprNodeDesc> partKeys = ((ReduceSinkDesc) child.getConf()).getPartitionCols();
             if (keys.size() != partKeys.size())
                  = true;
         }
 
         if (mode == GroupByDesc.Mode.PARTIAL1 || mode == GroupByDesc.Mode.HASH || mode == GroupByDesc.Mode.COMPLETE
                 || (aggregators.size() == 0 &&  == false) || ) {
             .clear();
             // add an assign operator if the key is not a column expression
             ArrayList<LogicalVariablekeyVariables = new ArrayList<LogicalVariable>();
             ILogicalOperator currentOperator = null;
             ILogicalOperator assignOperator = t.getAssignOperator(AlgebricksParentOperatorRefkeyskeyVariables);
             if (assignOperator != null) {
                 currentOperator = assignOperator;
                 AlgebricksParentOperatorRef = new MutableObject<ILogicalOperator>(currentOperator);
             }
 
            // get key variable expression list
            List<Mutable<ILogicalExpression>> keyExprs = new ArrayList<Mutable<ILogicalExpression>>();
            for (LogicalVariable var : keyVariables) {
                keyExprs.add(t.translateScalarFucntion(new ExprNodeColumnDesc(TypeInfoFactory.intTypeInfo, var
                        .toString(), ""false)));
            }
            if (aggregators.size() == 0) {
                List<Mutable<ILogicalExpression>> distinctExprs = new ArrayList<Mutable<ILogicalExpression>>();
                for (LogicalVariable var : keyVariables) {
                    Mutable<ILogicalExpressionvarExpr = new MutableObject<ILogicalExpression>(
                            new VariableReferenceExpression(var));
                    distinctExprs.add(varExpr);
                }
                t.rewriteOperatorOutputSchema(keyVariablesoperator);
                 = true;
                ILogicalOperator lop = new DistinctOperator(distinctExprs);
                lop.getInputs().add(AlgebricksParentOperatorRef);
                return new MutableObject<ILogicalOperator>(lop);
            }
            // get the pair<LogicalVariable, ILogicalExpression> list
            List<Pair<LogicalVariableMutable<ILogicalExpression>>> keyParameters = new ArrayList<Pair<LogicalVariableMutable<ILogicalExpression>>>();
            keyVariables.clear();
            for (Mutable<ILogicalExpressionexpr : keyExprs) {
                LogicalVariable keyVar = t.getVariable(expr.getValue().toString(), TypeInfoFactory.unknownTypeInfo);
                keyParameters.add(new Pair(keyVarexpr));
                keyVariables.add(keyVar);
            }
            // get the parameters for the aggregator operator
            ArrayList<LogicalVariableaggVariables = new ArrayList<LogicalVariable>();
            ArrayList<Mutable<ILogicalExpression>> aggExprs = new ArrayList<Mutable<ILogicalExpression>>();
            // get the type of each aggregation function
            HashMap<AggregationDesc, TypeInfo> aggToType = new HashMap<AggregationDesc, TypeInfo>();
            List<ColumnInfo> columns = operator.getSchema().getSignature();
            int offset = keys.size();
            for (int i = offseti < columns.size(); i++) {
                aggToType.put(aggregators.get(i - offset), columns.get(i).getType());
            }
            .clear();
            // rewrite parameter expressions for all aggregators
            for (AggregationDesc aggregator : aggregators) {
                for (ExprNodeDesc parameter : aggregator.getParameters()) {
                    t.rewriteExpression(parameter);
                }
                Mutable<ILogicalExpressionaggExpr = t.translateAggregation(aggregator);
                AbstractFunctionCallExpression localAggExpr = (AbstractFunctionCallExpressionaggExpr.getValue();
                .add(localAggExpr.getFunctionInfo());
                AggregationDesc logicalAgg = new AggregationDesc(aggregator.getGenericUDAFName(),
                        aggregator.getGenericUDAFEvaluator(), aggregator.getParameters(), aggregator.getDistinct(),
                        Mode.COMPLETE);
                Mutable<ILogicalExpressionlogicalAggExpr = t.translateAggregation(logicalAgg);
                .add(logicalAggExpr);
                if (!)
                    aggExprs.add(logicalAggExpr);
                else
                    aggExprs.add(aggExpr);
                aggVariables.add(t.getVariable(aggregator.getExprString() + aggregator.getMode(),
                        aggToType.get(aggregator)));
            }
            if (child.getType() != OperatorType.REDUCESINK)
                 = false;
            // get the sub plan list
            AggregateOperator aggOperator = new AggregateOperator(aggVariablesaggExprs);
            NestedTupleSourceOperator nestedTupleSource = new NestedTupleSourceOperator(
                    new MutableObject<ILogicalOperator>());
            aggOperator.getInputs().add(new MutableObject<ILogicalOperator>(nestedTupleSource));
            List<Mutable<ILogicalOperator>> subRoots = new ArrayList<Mutable<ILogicalOperator>>();
            subRoots.add(new MutableObject<ILogicalOperator>(aggOperator));
            ILogicalPlan subPlan = new ALogicalPlanImpl(subRoots);
            List<ILogicalPlansubPlans = new ArrayList<ILogicalPlan>();
            subPlans.add(subPlan);
            // create the group by operator
            currentOperator = new edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator(
                    keyParametersnew ArrayList<Pair<LogicalVariableMutable<ILogicalExpression>>>(), subPlans);
            currentOperator.getInputs().add(AlgebricksParentOperatorRef);
            nestedTupleSource.getDataSourceReference().setValue(currentOperator);
            List<LogicalVariableoutputVariables = new ArrayList<LogicalVariable>();
            outputVariables.addAll(keyVariables);
            outputVariables.addAll(aggVariables);
            t.rewriteOperatorOutputSchema(outputVariablesoperator);
            if () {
                currentOperator.getAnnotations().put(..);
            }
            HiveConf conf = ConfUtil.getHiveConf();
            Boolean extGby = conf.getBoolean("hive.algebricks.groupby.external"false);
            if (extGby && isSerializable(aggregators)) {
                currentOperator.getAnnotations().put(..);
            }
            return new MutableObject<ILogicalOperator>(currentOperator);
        } else {
             = false;
            // rewrite parameter expressions for all aggregators
            int i = 0;
            for (AggregationDesc aggregator : aggregators) {
                for (ExprNodeDesc parameter : aggregator.getParameters()) {
                    t.rewriteExpression(parameter);
                }
                Mutable<ILogicalExpressionagg = t.translateAggregation(aggregator);
                AggregateFunctionCallExpression originalAgg = (AggregateFunctionCallExpression.get(i)
                        .getValue();
                originalAgg.setStepOneAggregate(.get(i));
                AggregateFunctionCallExpression currentAgg = (AggregateFunctionCallExpressionagg.getValue();
                if (currentAgg.getFunctionInfo() != null) {
                    originalAgg.setTwoStep(true);
                    originalAgg.setStepTwoAggregate(currentAgg.getFunctionInfo());
                }
                i++;
            }
            return null;
        }
    }
    @Override
    public Mutable<ILogicalOperatorvisit(ReduceSinkOperator operator,
            Mutable<ILogicalOperatorAlgebricksParentOperatorRefTranslator t) {
        Operator downStream = (Operator) operator.getChildOperators().get(0);
        if (!(downStream instanceof GroupByOperator)) {
            return null;
        }
        ReduceSinkDesc desc = (ReduceSinkDesc) operator.getConf();
        List<ExprNodeDesc> keys = desc.getKeyCols();
        List<ExprNodeDesc> values = desc.getValueCols();
        // insert assign for keys
        ArrayList<LogicalVariablekeyVariables = new ArrayList<LogicalVariable>();
        t.getAssignOperator(AlgebricksParentOperatorRefkeyskeyVariables);
        // insert assign for values
        ArrayList<LogicalVariablevalueVariables = new ArrayList<LogicalVariable>();
        t.getAssignOperator(AlgebricksParentOperatorRefvaluesvalueVariables);
        ArrayList<LogicalVariablecolumns = new ArrayList<LogicalVariable>();
        columns.addAll(keyVariables);
        columns.addAll(valueVariables);
        t.rewriteOperatorOutputSchema(columnsoperator);
        return null;
    }
    private boolean isSerializable(List<AggregationDesc> descsthrows AlgebricksException {
        try {
            for (AggregationDesc desc : descs) {
                GenericUDAFEvaluator udaf = desc.getGenericUDAFEvaluator();
                AggregationBuffer buf = udaf.getNewAggregationBuffer();
                Class<?> bufferClass = buf.getClass();
                Field[] fields = bufferClass.getDeclaredFields();
                for (Field field : fields) {
                    field.setAccessible(true);
                    String type = field.getType().toString();
                    if (!(type.equals("int") || type.equals("long") || type.equals("float") || type.equals("double") || type
                            .equals("boolean"))) {
                        return false;
                    }
                }
            }
            return true;
        } catch (Exception e) {
            throw new AlgebricksException(e);
        }
    }
New to GrepCode? Check out our FAQ X