Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   /*
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  package org.apache.pig.newplan.logical.visitor;
  
  import java.util.List;
  
  
  /*
  All the getType() of these operators always return BAG.
  We just have to :-
  1) Check types of inputs, expression plans
  2) Compute output schema with type information
     (At the moment, the parser does only return GetSchema with correct aliases)
  3) Insert casting if necessary
  
   */
  
  
      public TypeCheckingRelVisitor(OperatorPlan planCompilationMessageCollector msgCollector)
      throws FrontendException {
          super(plannew DependencyOrderWalker(plan));
          this. = msgCollector;
  
      }
  
      @Override
      public void visit(LOLoad load){
          // do nothing
      }
  
      @Override
      public void visit(LOStore store)
      throws FrontendException {
          store.resetSchema();
          store.getSchema();
      }

    
The schema of filter output will be the same as filter input

  
      @Override
      public void visit(LOFilter filterthrows FrontendException {
          filter.resetSchema();
         LogicalExpressionPlan comparisonPlan = filter.getFilterPlan() ;
 
         // Check that the inner plan has only 1 output port
         if (comparisonPlan.getSources().size() > 1) {
             int errCode = 1057;
             String msg = "Filter's cond plan can only have one output" ;
             .collect(msg.) ;
             throwTypeCheckerException(filtermsgerrCode.null) ;
         }
 
         // visit the filter expression
         visitExpressionPlan(comparisonPlanfilter);
 
 
         //check filter expression type
         byte innerCondType = ((LogicalExpression)comparisonPlan.getSources().get(0)).getType();
         if (innerCondType != .) {
             int errCode = 1058;
             String msg = "Filter's condition must evaluate to boolean. Found: " + 
                          DataType.findTypeName(innerCondType);
             .collect(msg.) ;
             throwTypeCheckerException(filtermsgerrCode.null) ;
         }       
 
         try {
             // re-compute the schema
             filter.resetSchema();
             filter.getSchema() ;
         } 
         catch (FrontendException fe) {
             int errCode = 1059;
             String msg = "Problem while reconciling output schema of Filter" ;
             .collect(msg.);
             throwTypeCheckerException(filtermsgerrCode.fe) ;
         }
     }
     
     private void throwTypeCheckerException(Operator opString msg,
             int errCodebyte inputFrontendException fethrows TypeCheckerException {
         iffe == null ) {
             throw new TypeCheckerException(opmsgerrCode.);
         }
         throw new TypeCheckerException(opmsgerrCode.fe);
     }
 
     public void visit(LOGenerate genthrows FrontendException {
         for(int i=0; i < gen.getOutputPlans().size(); i++) {
             LogicalExpressionPlan expPlan = gen.getOutputPlans().get(i);
             // Check that the inner plan has only 1 output port
             if (expPlan.getSources().size() > 1) {
                 int errCode = 1057;
                 String msg = "LOGenerate expression plan can only have one output" ;
                 .collect(msg.) ;
                 throwTypeCheckerExceptiongenmsgerrCode.null) ;
             }
             // visit the filter expression
             visitExpressionPlanexpPlangen );
 
        }
         gen.resetSchema();
         gen.getSchema();
     }
 
     @Override
     public void visit(LOInnerLoad innerLoadthrows FrontendException{
         innerLoad.resetSchema();
         innerLoad.getSchema();
     }
     
     @Override
     public void visit(LOForEach forEachthrows FrontendException {
         try {
             // visit inner plan
             new TypeCheckingRelVisitorforEach.getInnerPlan(),  ).visit();
             // re-compute the schema
             forEach.resetSchema();
             forEach.getSchema() ;
         }  catch (FrontendException fe) {
             int errCode = 1059;
             String msg = "Problem while reconciling output schema of ForEach" ;
             .collect(msg.);
             throwTypeCheckerException(forEachmsgerrCode.fe) ;
         }
     }
 
     private void visitExpressionPlan(LogicalExpressionPlan explPlan,
             LogicalRelationalOperator relOp)
     throws FrontendException {
         TypeCheckingExpVisitor expTypeCheck =
             new TypeCheckingExpVisitor(explPlanrelOp);
         expTypeCheck.visit();
 
     }
 
     /* (non-Javadoc)
      * @see org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor#visit(org.apache.pig.newplan.logical.relational.LOUnion)
      *     The output schema of LOUnion is the merge of all input schemas.
      *     Operands on left side always take precedance on aliases.
      *     We allow type promotion here
      */
     @Override
     public void visit(LOUnion uthrows FrontendException {
         u.resetSchema();
         // Have to make a copy, because as we insert operators, this list will
         // change under us.
         List<Operatorinputs = new ArrayList<Operator>(u.getInputs());
 
         // There is no point to union only one operand
         // it should be a problem in the parser
         if (inputs.size() < 2) {
             throw new AssertionError("Union with Count(Operand) < 2") ;
         }
 
         LogicalSchema schema = null ;
         try {
             // Compute the schema
             schema = u.getSchema() ;
 
         }
         catch (FrontendException fee) {
             int errCode = 1055;
             String msg = "Problem while reading schemas from inputs of Union" ;
             .collect(msg.) ;
             throwTypeCheckerException(umsgerrCode.fee) ;
         }
 
         // Do cast insertion only if we are typed 
         // and if its not union-onschema. In case of union-onschema the
         // foreach with cast is added in UnionOnSchemaSetter
         if (schema != null && !u.isOnSchema()) {
             // Insert casting to inputs if necessary
             for (int i=0; iinputs.size() ;i++) {
                 LOForEach insertedOp
                 = insertCastForEachInBetweenIfNecessary((LogicalRelationalOperator)inputs.get(i), u) ;
 
                 // We may have to compute the schema of the input again
                 // because we have just inserted
                 if (insertedOp != null) {
                     if(insertedOp.getAlias()==null){
                         insertedOp.setAlias(((LogicalRelationalOperator)inputs.get(i)).getAlias());
                     }
                     try {
                         this.visit(insertedOp);
                     }
                     catch (FrontendException fee) {
                         int errCode = 1056;
                         String msg = "Problem while casting inputs of Union" ;
                         .collect(msg.) ;
                         throwTypeCheckerException(umsgerrCode.fee) ;
                     }
                 }
             }
         }
         u.resetSchema();
         u.getSchema();
     }

    
For casting insertion for relational operators only if it's necessary Currently this only does "shallow" casting

Parameters:
fromOp
toOp
Returns:
the inserted operator. null is no insertion
Throws:
org.apache.pig.impl.logicalLayer.FrontendException
 
             LogicalRelationalOperator fromOp,
             LogicalRelationalOperator toOp)
     throws FrontendException {
 
 
         // Make sure that they are adjacent and the direction
         // is from "fromOp" to "toOp"
         List<OperatorpreList = .getPredecessors(toOp) ;
         boolean found = false ;
         for(Operator tmpOppreList) {
             // compare by reference
             if (tmpOp == fromOp) {
                 found = true ;
                 break ;
             }
         }
 
         if (!found) {
             int errCode = 1077;
             String msg = "Two operators that require a cast in between are not adjacent.";
             throwTypeCheckerException(fromOpmsgerrCode.null);
         }
 
         // retrieve input schema to be casted
         // this will be used later
         LogicalSchema fromSchema = null ;
         LogicalSchema toSchema = null ;
         try {
             fromSchema = fromOp.getSchema() ;
             toSchema = toOp.getSchema();
         }
         catch(FrontendException fe) {
             int errCode = 1055;
             String msg = "Problem while reading schema from input of " 
                 + fromOp.getClass().getSimpleName();
             throwTypeCheckerException(fromOpmsgerrCode.fe);
         }
 
         // make sure the supplied targetSchema has the same number of members
         // as number of output fields from "fromOp"
         if (fromSchema.size() != toSchema.size()) {
             int errCode = 1078;
             String msg = "Schema size mismatch for casting. Input schema size: " 
                 + fromSchema.size() + ". Target schema size: " + toSchema.size();
             throwTypeCheckerException(toOpmsgerrCode.null);
         }
 
         // Plans inside Generate. Fields that do not need casting will only
         // have Project. Fields that need casting will have Project + Cast
         ArrayList<LogicalExpressionPlangeneratePlans = new ArrayList<LogicalExpressionPlan>() ;
         LogicalPlan innerPlan = new LogicalPlan();
 
         // create LOGenerate for foreach
         LOGenerate loGen = new LOGenerate(innerPlangeneratePlans
                 new boolean[toSchema.size()]);
         innerPlan.add(loGen);
 
         // Create ForEach to be inserted
         LOForEach foreach = new LOForEach();
         foreach.setInnerPlan(innerPlan);
 
 
         int castNeededCounter = 0 ;
         for(int i=0;i < fromSchema.size(); i++) {
 
             LOInnerLoad innerLoad = new LOInnerLoad(innerPlanforeachi);
             innerPlan.add(innerLoad);
             innerPlan.connect(innerLoadloGen);
 
             LogicalExpressionPlan genPlan = new LogicalExpressionPlan() ;
             ProjectExpression project = new ProjectExpression(genPlani, 0, loGen);
             genPlan.add(project);
 
             // add casting if necessary by comparing target types
             // to the input schema
             LogicalFieldSchema fs = null ;
             fs = fromSchema.getField(i) ;
 
             // This only does "shallow checking"
 
             LogicalFieldSchema outFieldSchema ;
 
             outFieldSchema = toSchema.getField(i) ;
 
             if (outFieldSchema.type != fs.type) {
                 castNeededCounter++ ;
                 new CastExpression(genPlanprojectoutFieldSchema);
             }
 
             generatePlans.add(genPlan) ;
         }
 
         // if we really need casting
         if (castNeededCounter > 0)  {
             // Flatten List
             // This is just cast insertion so we don't have any flatten
             ArrayList<BooleanflattenList = new ArrayList<Boolean>() ;
             for(int i=0;i < toSchema.size(); i++) {
                 flattenList.add(Boolean.valueOf(false)) ;
             }
 
             // Manipulate the plan structure
             .add(foreach);
             .insertBetween(fromOpforeachtoOp);
             return foreach;
 
         }
         else {
             .remove(foreach);
             return null ;
         }
     }
 
 
     @Override
     public void visit(LOSplitOutput opthrows FrontendException {
         op.resetSchema();
         OperatorPlan lp = op.getPlan();
         // LOSplitOutput can only have 1 input
         List<Operatorlist = lp.getPredecessors(op) ;
         if (list.size() != 1) {
             int errCode = 2008;
             String msg = "LOSplitOutput cannot have more than one input. Found: " + list.size() + " input(s).";
             throwTypeCheckerException(opmsgerrCode.null) ;
         }
 
         LogicalExpressionPlan condPlan = op.getFilterPlan() ;
 
         // Check that the inner plan has only 1 output port
         if (condPlan.getSources().size() != 1) {
             int errCode = 1057;
             String msg = "Split's inner plan can only have one output (leaf)" ;
             .collect(msg.) ;
             throwTypeCheckerException(opmsgerrCode.null) ;
         }
 
         visitExpressionPlan(condPlanop);
 
         byte innerCondType = ((LogicalExpression)condPlan.getSources().get(0)).getType() ;
         if (innerCondType != .) {
             int errCode = 1058;
             String msg = "Split's condition must evaluate to boolean. Found: " + DataType.findTypeName(innerCondType) ;
             .collect(msg.) ;
             throwTypeCheckerException(opmsgerrCode.null) ;
         }
 
         try {
             // Compute the schema
             op.getSchema() ;
         }
         catch (FrontendException fe) {
             int errCode = 1055;
             String msg = "Problem while reading"
                 + " schemas from inputs of SplitOutput" ;
             .collect(msg.) ;
             throwTypeCheckerException(opmsgerrCode.fe) ;
         }
     }

    
LODistinct, output schema should be the same as input

 
 
     @Override
     public void visit(LODistinct opthrows VisitorException {
         op.resetSchema();
 
         try {
             // Compute the schema
             op.getSchema() ;
         }
         catch (FrontendException fe) {
             int errCode = 1055;
             String msg = "Problem while reading"
                 + " schemas from inputs of Distinct" ;
             .collect(msg.) ;
             throwTypeCheckerException(opmsgerrCode.fe) ;
         }
     }
 
     @Override
     public void visit(LOLimit limitthrows FrontendException {
         limit.resetSchema();
         LogicalExpressionPlan expressionPlan = limit.getLimitPlan();
         if (expressionPlan != null) {
             // Check that the inner plan has only 1 output port
             if (expressionPlan.getSources().size() > 1) {
                 int errCode = 1057;
                 String msg = "Limit's expression plan can only have one output";
                 .collect(msg.);
                 throwTypeCheckerException(limitmsgerrCode.null);
             }
 
             // visit the limit expression
             visitExpressionPlan(expressionPlanlimit);
 
             // check limit expression type
             byte innerCondType = ((LogicalExpressionexpressionPlan.getSources().get(0))
                     .getType();
             // cast to long if it is a bytearray
             if (innerCondType == .)
                 insertAtomicCastForInnerPlan(expressionPlanlimit.);
             // else it must be an int or a long
             else if (innerCondType != . && innerCondType != .) {
                 int errCode = 1058;
                 String msg = "Limit's expression must evaluate to Long or Integer. Found: "
                         + DataType.findTypeName(innerCondType);
                 .collect(msg.);
                 throwTypeCheckerException(limitmsgerrCode.null);
             }
         }
         try {
             // Compute the schema
             limit.getSchema();
         } catch (FrontendException fe) {
             int errCode = 1055;
             String msg = "Problem while reading schemas from inputs of Limit";
             .collect(msg.) ;
             throwTypeCheckerException(limitmsgerrCode.fe);
         }
     }

    
Return concatenated of all fields from all input operators If one of the inputs have no schema then we cannot construct the output schema.

 
     public void visit(LOCross csthrows VisitorException {
         cs.resetSchema();
 
         try {
             // Compute the schema
             cs.getSchema() ;
         }
         catch (FrontendException fe) {
             int errCode = 1055;
             String msg = "Problem while reading"
                 + " schemas from inputs of Cross" ;
             .collect(msg.) ;
             throwTypeCheckerException(csmsgerrCode.fe) ;
         }
     }

    
The schema of sort output will be the same as sort input.

 
     public void visit(LOSort sortthrows FrontendException {
         sort.resetSchema();
         // Type checking internal plans.
         for(int i=0;i < sort.getSortColPlans().size(); i++) {
 
             LogicalExpressionPlan sortColPlan = sort.getSortColPlans().get(i) ;
 
             // Check that the inner plan has only 1 output port
             if (sortColPlan.getSources().size() != 1) {
                 int errCode = 1057;
                 String msg = "Sort's inner plan can only have one output (leaf)" ;
                 .collect(msg.) ;
                 throwTypeCheckerException(sortmsgerrCode.null) ;
             }
 
             visitExpressionPlan(sortColPlansort);
         }
 
         try {
             // Compute the schema
             sort.getSchema() ;
         }
         catch (FrontendException fee) {
             int errCode = 1059;
             String msg = "Problem while reconciling output schema of Sort" ;
             .collect(msg.);
             throwTypeCheckerException(sortmsgerrCode.fee) ;
         }
     }

    
The schema of rank output will be the same as input, plus a rank field.

 
     public void visit(LORank rankthrows FrontendException {
         rank.resetSchema();
 
         // Type checking internal plans.
         List<LogicalExpressionPlanrankColPlans = rank.getRankColPlans();
 
         for(int i=0;i < rankColPlans.size(); i++) {
             LogicalExpressionPlan rankColPlan = rankColPlans.get(i) ;
 
             // Check that the inner plan has only 1 output port
             if (rankColPlan.getSources().size() != 1) {
                 int errCode = 1057;
                 String msg = "Rank's inner plan can only have one output (leaf)" ;
                 .collect(msg.) ;
                 throwTypeCheckerException(rankmsgerrCode.null) ;
             }
 
             visitExpressionPlan(rankColPlanrank);
 
         }
 
         try {
             // Compute the schema
             rank.getSchema() ;
         }
         catch (FrontendException fee) {
             int errCode = 1059;
             String msg = "Problem while reconciling output schema of Rank" ;
             .collect(msg.);
             throwTypeCheckerException(rankmsgerrCode.fee) ;
         }
 
     }

    
The schema of split output will be the same as split input
 
 
     public void visit(LOSplit splitthrows VisitorException {
         OperatorPlan lp = split.getPlan();
         List<OperatorinputList = lp.getPredecessors(split);
 
         if (inputList.size() != 1) {            
             int errCode = 2008;
             String msg = "LOSplit cannot have more than one input. Found: " + inputList.size() + " input(s).";
             throwTypeCheckerException(splitmsgerrCode.null) ;
         }
 
         split.resetSchema();
         try {
             // Compute the schema
             split.getSchema();
         }
         catch (FrontendException fe) {
             int errCode = 1059;
             String msg = "Problem while reconciling output schema of Split" ;
             .collect(msg.);
             throwTypeCheckerException(splitmsgerrCode.fe) ;
         }
     }

    
 
     public void visit(LOJoin jointhrows FrontendException {
         try {
             join.resetSchema();
             join.getSchema();
         } catch (FrontendException fe) {
             int errCode = 1060;
             String msg = "Cannot resolve Join output schema" ;
             .collect(msg.) ;
             throwTypeCheckerException(joinmsgerrCode.fe) ;
         }
 
         MultiMap<IntegerLogicalExpressionPlanjoinColPlans
         = join.getExpressionPlans() ;
         List<Operatorinputs = join.getInputs((LogicalPlan) ;
 
         // Type checking internal plans.
         for(int i=0;i < inputs.size(); i++) {
             ArrayList<LogicalExpressionPlaninnerPlans
             = new ArrayList<LogicalExpressionPlan>(joinColPlans.get(i)) ;
 
             for(int j=0; j < innerPlans.size(); j++) {
 
                 LogicalExpressionPlan innerPlan = innerPlans.get(j) ;
 
                 // Check that the inner plan has only 1 output port
                 if (innerPlan.getSources().size() != 1) {
                     int errCode = 1057;
                     String msg = "Join's inner plans can only"
                         + " have one output (leaf)" ;
                     .collect(msg.) ;
                     throwTypeCheckerException(joinmsgerrCode.null) ;
                 }
                 visitExpressionPlan(innerPlanjoin);
             }
         }
 
         try {
 
             if (!isJoinOnMultiCols(join)) {
                 // merge all the inner plan outputs so we know what type
                 // our group column should be
                 byte groupType = getAtomicJoinColType(join);
 
                 // go through all inputs again to add cast if necessary
                 for(int i=0;i < inputs.size(); i++) {
                     Collection<LogicalExpressionPlanexprPlans = join.getJoinPlan(i);
 
                     //there should be one and only expression plan - that gets 
                     // checked in getAtomicJoinColType()
                     LogicalExpressionPlan exprPlan = exprPlans.iterator().next();
 
                     // Checking innerPlan size already done above
                     byte innerType =
                         ((LogicalExpression)exprPlan.getSources().get(0)).getType();
 
                     if (innerType != groupType) {
                         insertAtomicCastForInnerPlan(exprPlanjoingroupType);
                     }
                 }
             }
             else {
                 //schema of the group-by key
                 LogicalSchema groupBySchema = getSchemaFromInnerPlans(join.getExpressionPlans(), join) ;
 
                 // go through all inputs again to add cast if necessary
                 for(int i=0;i < inputs.size(); i++) {
                     List<LogicalExpressionPlaninnerPlans = 
                         new ArrayList<LogicalExpressionPlan>(join.getJoinPlan(i)) ;
                     for(int j=0;j < innerPlans.size(); j++) {
                         LogicalExpressionPlan innerPlan = innerPlans.get(j) ;
                         LogicalExpression outputExp = ((LogicalExpression)innerPlan.getSources().get(0));
                         byte innerType = outputExp.getType() ;
 
                         byte expectedType = groupBySchema.getField(j). ;
 
                         if (!DataType.isAtomic(innerType) && (. != innerType)) {
                             int errCode = 1057;
                             String msg = "Join's inner plans can only"
                                 + "have one output (leaf)" ;
                             .collect(msg.) ;
                             throwTypeCheckerException(joinmsgerrCode.null) ;
                         }
                         if (innerType != expectedType) {
                             insertAtomicCastForInnerPlan(
                                     innerPlan,joinexpectedType
                             ) ;
                         }
                     }
                 }
             }
         }
         catch (FrontendException fe) {
             int errCode = 1060;
             String msg = "Cannot resolve Join output schema" ;
             .collect(msg.) ;
             throwTypeCheckerException(joinmsgerrCode.fe) ;
         }
 
         try {
             join.resetSchema();
             join.getSchema(); 
         }
         catch (FrontendException fe) {
             int errCode = 1060;
             String msg = "Cannot resolve Join output schema" ;
             .collect(msg.) ;
             throwTypeCheckerException(joinmsgerrCode.fe) ;
         }
     }

    

Parameters:
join
Returns:
true if there is more than one join column for an input
 
     private boolean isJoinOnMultiCols(LOJoin join) {
         MultiMap<IntegerLogicalExpressionPlanexprPlans = join.getExpressionPlans();
         if(exprPlans == null || exprPlans.size() == 0){
             throw new AssertionError("LOJoin.isJoinOnMultiCols() can only be called "
                     + " after it has an join expression plans ") ;
         }
         return exprPlans.get(0).size() > 1;
     }

    
This can be used to get the merged type of output join col only when the join col is of atomic type

Returns:
The type of the join col
Throws:
org.apache.pig.impl.logicalLayer.FrontendException
 
     private byte getAtomicJoinColType(LOJoin jointhrows FrontendException {
         if (isJoinOnMultiCols(join)) {
             int errCode = 1010;
             String msg = "getAtomicJoinColType is used only when"
                 + " dealing with atomic group col";
             throw new FrontendException(msgerrCode.falsenull) ;
         }
 
         byte groupType = . ;
         // merge all the inner plan outputs so we know what type
         // our group column should be
         for(int i=0;i < .getPredecessors(join).size() ; i++) {
             List<LogicalExpressionPlaninnerPlans = 
                 new ArrayList<LogicalExpressionPlan>(join.getJoinPlan(i)) ;
             if (innerPlans.size() != 1) {
                 int errCode = 1012;
                 String msg = "Each COGroup input has to have "
                     + "the same number of inner plans";
                 throw new FrontendException(msgerrCode.falsenull) ;
             }
             byte innerType = ((LogicalExpression)innerPlans.get(0).getSources().get(0)).getType() ;
             groupType = DataType.mergeType(groupTypeinnerType) ;
             if (groupType == -1)
             {
                 int errCode = 1107;
                 String msg = "Cannot merge join keys, incompatible types";
                 throw new FrontendException(msgerrCode.) ;
             }
         }
 
         return groupType ; 
     }

    
This can be used to get the merged type of output join col only when the join/cogroup col is of atomic type

Returns:
The type of the join col
Throws:
org.apache.pig.impl.logicalLayer.FrontendException
 
     private byte getAtomicColType(MultiMap<IntegerLogicalExpressionPlanallExprPlansthrows FrontendException {
         if (isMultiExprPlanPerInput(allExprPlans)) {
             int errCode = 1010;
             String msg = "getAtomicJoinColType is used only when"
                 + " dealing with atomic group col";
             throw new FrontendException(msgerrCode.falsenull) ;
         }
 
         byte groupType = . ;
         // merge all the inner plan outputs so we know what type
         // our group column should be
         for(int i=0;i < allExprPlans.size() ; i++) {
             List<LogicalExpressionPlaninnerPlans = 
                 new ArrayList<LogicalExpressionPlan>(allExprPlans.get(i)) ;
             if (innerPlans.size() != 1) {
                 int errCode = 1012;
                 String msg = "Each COGroup input has to have "
                     + "the same number of inner plans";
                 throw new FrontendException(msgerrCode.falsenull) ;
             }
             byte innerType = ((LogicalExpression)innerPlans.get(0).getSources().get(0)).getType() ;
             groupType = DataType.mergeType(groupTypeinnerType) ;
             if (groupType == -1)
             {
                 int errCode = 1107;
                 String msg = "Cannot merge join keys, incompatible types";
                 throw new FrontendException(msgerrCode.) ;
             }
         }
 
         return groupType ; 
     }
 
 
 
     private boolean isMultiExprPlanPerInput(
             MultiMap<IntegerLogicalExpressionPlanexprPlans) {
         if(exprPlans == null || exprPlans.size() == 0){
             throw new AssertionError("LOJoin.isJoinOnMultiCols() can only be called "
                     + " after it has an join expression plans ") ;
         }
         return exprPlans.get(0).size() > 1;
     }

    
Cast the single output operator of innerPlan to toType

Parameters:
innerPlan
relOp - join or cogroup
toType
Throws:
org.apache.pig.impl.logicalLayer.FrontendException
 
     private void insertAtomicCastForInnerPlan(LogicalExpressionPlan innerPlan,
             LogicalRelationalOperator relOpbyte toTypethrows FrontendException {
         if (!DataType.isUsableType(toType)) {
             int errCode = 1051;
             String msg = "Cannot cast to "
                 + DataType.findTypeName(toType);
             throwTypeCheckerException(relOpmsgerrCode.null);
         }
 
         List<Operatoroutputs = innerPlan.getSources();
         if (outputs.size() > 1) {
             int errCode = 2060;
             String msg = "Expected one output. Found " + outputs.size() + "  outputs.";
             throwTypeCheckerException(relOpmsgerrCode.null);
         }
         LogicalExpression currentOutput = (LogicalExpressionoutputs.get(0);
         TypeCheckingExpVisitor.collectCastWarning(
                 relOpcurrentOutput.getType(),
                 toType
         );
         LogicalFieldSchema newFS = new LogicalFieldSchema(
                 currentOutput.getFieldSchema().nulltoType
         );
         //add cast
         new CastExpression(innerPlancurrentOutputnewFS);
 
         //visit modified inner plan
         visitExpressionPlan(innerPlanrelOp);
     }

    
Create combined group-by/join column schema based on join/cogroup expression plans for all inputs. This implementation is based on the assumption that all the inputs have the same join col tuple arity.

Parameters:
exprPlans
Returns:
Throws:
org.apache.pig.impl.logicalLayer.FrontendException
 
             MultiMap<IntegerLogicalExpressionPlanexprPlans,
             LogicalRelationalOperator op
     )
     throws FrontendException {
         // this fsList represents all the columns in group tuple
         List<LogicalFieldSchemafsList = new ArrayList<LogicalFieldSchema>() ;
 
         int outputSchemaSize = exprPlans.get(0).size();
 
         // by default, they are all bytearray
         // for type checking, we don't care about aliases
         for(int i=0; i<outputSchemaSizei++) {
             fsList.add(new LogicalFieldSchema(nullnull.));
         }
 
         // merge all the inner plan outputs so we know what type
         // our group column should be
         for(int i=0;i < exprPlans.size(); i++) {
             List<LogicalExpressionPlaninnerPlans =
                 new ArrayList<LogicalExpressionPlan>(exprPlans.get(i)) ;
 
             for(int j=0;j < innerPlans.size(); j++) {
                 LogicalExpression eOp = (LogicalExpression)innerPlans.get(j).getSources().get(0);
                 byte innerType = eOp.getType();
 
                 if(eOp instanceof ProjectExpression) {
                     if(((ProjectExpression)eOp).isProjectStar()) {
                         //there is a project star and there is more than one 
                         // expression plan
                         int errCode = 1013;
                         String msg = "Grouping attributes can either be star (*) " +
                         "or a list of expressions, but not both.";
                         .collect(msg.) ;
                         throw new FrontendException(
                                 msgerrCode.falsenull
                         );         
                     }
                 }
                 //merge the type
                 LogicalFieldSchema groupFs = fsList.get(j);
                 groupFs.type = DataType.mergeType(groupFs.typeinnerType) ;
                 if(groupFs.type == .){
                     String colType = "join";
                     if(op instanceof LOCogroup){
                         colType = "group";
                     }
                     String msg =
                         colType + " column no. " +
                         (j+1) + " in relation no. " + (i+1) + " of  " + colType + 
                         " statement has datatype " + DataType.findTypeName(innerType) +
                         " which is incompatible with type of corresponding column" +
                         " in earlier relation(s) in the statement";
                     .collect(msg.) ;
                     TypeCheckerException ex =
                         new TypeCheckerException(opmsg, 1130, .);
                     ex.setMarkedAsShowToUser(true);
                     throw ex;
                 }
             }
 
         }
         //create schema from field schemas
         LogicalSchema tupleSchema = new LogicalSchema();
         for(LogicalFieldSchema fs : fsList){
             tupleSchema.addField(fs);
         }
         return tupleSchema;
     }

    
COGroup All group by cols from all inputs have to be of the same type

 
     @Override
     public void visit(LOCogroup cgthrows FrontendException {
         try {
             cg.resetSchema();
             cg.getSchema();
         } catch (FrontendException fe) {
             int errCode = 1060;
             String msg = "Cannot resolve COGroup output schema" ;
             .collect(msg.) ;
             throwTypeCheckerException(cgmsgerrCode.fe) ;
         }
 
         MultiMap<IntegerLogicalExpressionPlangroupByPlans = 
             cg.getExpressionPlans();
 
         List<Operatorinputs = cg.getInputs((LogicalPlan));
 
         // Type checking internal plans.
         for(int i=0;i < inputs.size(); i++) {
             List<LogicalExpressionPlaninnerPlans =
                 new ArrayList<LogicalExpressionPlan>(cg.getExpressionPlans().get(i)) ;
 
             for(int j=0; j < innerPlans.size(); j++) {
 
                 LogicalExpressionPlan innerPlan = innerPlans.get(j) ;
 
                 // Check that the inner plan has only 1 output port
                 if (innerPlan.getSources().size() != 1) {
                     int errCode = 1057;
                     String msg = "COGroup's inner plans can only"
                         + "have one output (leaf)" ;
                     .collect(msg.) ;
                     throwTypeCheckerException(cgmsgerrCode.null) ;
                 }
                 visitExpressionPlan(innerPlancg);
             }
 
         }
 
         try {
 
             if (!isCoGroupOnMultiCols(cg)) {
                 // merge all the inner plan outputs so we know what type
                 // our group column should be
                 byte groupType = getAtomicColType(cg.getExpressionPlans());
                // go through all inputs again to add cast if necessary
                for(int i=0;i < inputs.size(); i++) {
                    List<LogicalExpressionPlaninnerPlans =
                        new ArrayList<LogicalExpressionPlan>(cg.getExpressionPlans().get(i)) ;
                    // Checking innerPlan size already done above
                    byte innerType = ((LogicalExpression)innerPlans.get(0).getSources().get(0)).getType() ;
                    if (innerType != groupType) {
                        insertAtomicCastForInnerPlan(
                                innerPlans.get(0),cggroupType
                        ) ;
                    }
                }
            }
            else {
                LogicalSchema groupBySchema =