Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
public class MRUtil {
    // simpleConnectMapToReduce is a utility to end a map phase and start a reduce phase in
    //     a mapreduce operator:
    // 1. mro only contains map plan
    // 2. need to add POLocalRearrange to end map plan, and add 
    //    POPackage to start a reduce plan
    // 3. POLocalRearrange/POPackage are trivial
    static public void simpleConnectMapToReduce(MapReduceOper mroString scopeNodeIdGenerator nigthrows PlanException
    {
        PhysicalPlan ep = new PhysicalPlan();
        POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
        prjStar.setResultType(.);
        prjStar.setStar(true);
        ep.add(prjStar);
        
        List<PhysicalPlaneps = new ArrayList<PhysicalPlan>();
        eps.add(ep);
        
        POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
        try {
            lr.setIndex(0);
        } catch (ExecException e) {
            int errCode = 2058;
            String msg = "Unable to set index on the newly created POLocalRearrange.";
            throw new PlanException(msgerrCode.e);
        }
        lr.setKeyType(.);
        lr.setPlans(eps);
        lr.setResultType(.);
        
        mro.mapPlan.addAsLeaf(lr);
        
        POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
        pkg.setKeyType(.);
        pkg.setNumInps(1);
        boolean[] inner = {false};
        pkg.setInner(inner);
        mro.reducePlan.add(pkg);
        
        mro.reducePlan.addAsLeaf(getPlainForEachOP(scopenig));
    }
    
    // Get a simple POForEach: ForEach X generate flatten($1)
    static public POForEach getPlainForEachOP(String scopeNodeIdGenerator nig)
    {
        List<PhysicalPlaneps1 = new ArrayList<PhysicalPlan>();
        List<Booleanflat1 = new ArrayList<Boolean>();
        PhysicalPlan ep1 = new PhysicalPlan();
        POProject prj1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
        prj1.setResultType(.);
        prj1.setStar(false);
        prj1.setColumn(1);
        prj1.setOverloaded(true);
        ep1.add(prj1);
        eps1.add(ep1);
        flat1.add(true);
        POForEach fe = new POForEach(new OperatorKey(scopenig
                .getNextNodeId(scope)), -1, eps1flat1);
        fe.setResultType(.);
        return fe;
    }
New to GrepCode? Check out our FAQ X