Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package org.apache.pig.backend.hadoop.executionengine;
 
 import java.net.URL;
 import java.util.Map;
 
 
 public class HExecutionEngine {
     
     public static final String JOB_TRACKER_LOCATION = "mapred.job.tracker";
     private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
     private static final String ALTERNATIVE_FILE_SYSTEM_LOCATION = "fs.defaultFS";
     
     private static final String HADOOP_SITE = "hadoop-site.xml";
     private static final String CORE_SITE = "core-site.xml";
     private static final String YARN_SITE = "yarn-site.xml";
     private final Log log = LogFactory.getLog(getClass());
     public static final String LOCAL = "local";
     
     protected PigContext pigContext;
     
     protected DataStorage ds;
     
     @SuppressWarnings("deprecation")
     protected JobConf jobConf;
 
     // key: the operator key from the logical plan that originated the physical plan
     // val: the operator key for the root of the phyisical plan
     
     // map from LOGICAL key to into about the execution
     
     protected Map<OperatorPhysicalOperatornewLogToPhyMap;
     private LogicalPlan newPreoptimizedPlan;
     
     public HExecutionEngine(PigContext pigContext) {
         this. = pigContext;
         this. = new HashMap<OperatorKeyOperatorKey>();      
        this. = new HashMap<OperatorKeyMapRedResult>();
        
        this. = null;
        
        // to be set in the init method
        this. = null;
    }
    
    @SuppressWarnings("deprecation")
    public JobConf getJobConf() {
        return this.;
    }
    
        return this.;
    }
        
    public DataStorage getDataStorage() {
        return this.;
    }
    public void init() throws ExecException {
        init(this..getProperties());
    }
    
    @SuppressWarnings("deprecation")
    private void init(Properties propertiesthrows ExecException {
        //First set the ssh socket factory
        setSSHFactory();
        
        String cluster = null;
        String nameNode = null;
    
        // We need to build a configuration object first in the manner described below
        // and then get back a properties object to inspect the JOB_TRACKER_LOCATION
        // and FILE_SYSTEM_LOCATION. The reason to do this is if we looked only at
        // the existing properties object, we may not get the right settings. So we want
        // to read the configurations in the order specified below and only then look
        // for JOB_TRACKER_LOCATION and FILE_SYSTEM_LOCATION.
            
        // Hadoop by default specifies two resources, loaded in-order from the classpath:
        // 1. hadoop-default.xml : Read-only defaults for hadoop.
        // 2. hadoop-site.xml: Site-specific configuration for a given hadoop installation.
        // Now add the settings from "properties" object to override any existing properties
        // All of the above is accomplished in the method call below
           
        JobConf jc = null;
        if ( this..getExecType() == . ) {
            // Check existence of user provided configs
            String isHadoopConfigsOverriden = properties.getProperty("pig.use.overriden.hadoop.configs");
            if (isHadoopConfigsOverriden != null && isHadoopConfigsOverriden.equals("true")) {
                jc = new JobConf(ConfigurationUtil.toConfiguration(properties));
            }
            else {
                // Check existence of hadoop-site.xml or core-site.xml in classpath
                // if user provided confs are not being used
                Configuration testConf = new Configuration();
                ClassLoader cl = testConf.getClassLoader();
                URL hadoop_site = cl.getResource );
                URL core_site = cl.getResource );
                ifhadoop_site == null && core_site == null ) {
                        throw new ExecException("Cannot find hadoop configurations in classpath (neither hadoop-site.xml nor core-site.xml was found in the classpath)." +
                                " If you plan to use local mode, please put -x local option in command line"
                                4010);
                }
                jc = new JobConf();
            }
            jc.addResource("pig-cluster-hadoop-site.xml");
            jc.addResource();
            
            // Trick to invoke static initializer of DistributedFileSystem to add hdfs-default.xml 
            // into configuration
            new DistributedFileSystem();
            
            //the method below alters the properties object by overriding the
            //hadoop properties with the values from properties and recomputing
            //the properties
            recomputeProperties(jcproperties);
        } else {
            // If we are running in local mode we dont read the hadoop conf file
            properties.setProperty("mapreduce.framework.name""local");
            properties.setProperty( );
            properties.setProperty("file:///");
            properties.setProperty("file:///");
            jc = new JobConf(false);
            jc.addResource("core-default.xml");
            jc.addResource("mapred-default.xml");
            jc.addResource("yarn-default.xml");
            recomputeProperties(jcproperties);
        }
        cluster = jc.get();
        nameNode = jc.get();
        if (nameNode==null)
            nameNode = (String).getProperties().get();
        if (cluster != null && cluster.length() > 0) {
            if(!cluster.contains(":") && !cluster.equalsIgnoreCase()) {
                cluster = cluster + ":50020";
            }
            properties.setProperty(cluster);
        }
        if (nameNode!=null && nameNode.length() > 0) {
            if(!nameNode.contains(":")  && !nameNode.equalsIgnoreCase()) {
                nameNode = nameNode + ":8020";
            }
            properties.setProperty(nameNode);
        }
     
        .info("Connecting to hadoop file system at: "  + (nameNode==nullnameNode) )  ;
        // constructor sets DEFAULT_REPLICATION_FACTOR_KEY
         = new HDataStorage(properties);
                
        if(cluster != null && !cluster.equalsIgnoreCase()){
            .info("Connecting to map-reduce job tracker at: " + jc.get());
        }
        // Set job-specific configuration knobs
         = jc;
    }
    public void updateConfiguration(Properties newConfiguration
            throws ExecException {
        init(newConfiguration);
    }
    @SuppressWarnings("unchecked")
    public PhysicalPlan compile(LogicalPlan plan,
                                Properties propertiesthrows FrontendException {
        if (plan == null) {
            int errCode = 2041;
            String msg = "No Plan to compile";
            throw new FrontendException(msgerrCode.);
        }
         = new LogicalPlanplan );
        
        if (.) {
            // disable all PO-specific optimizations
            POOptimizeDisabler pod = new POOptimizeDisablerplan );
            pod.visit();
        }
        
        UidResetter uidResetter = new UidResetterplan );
        uidResetter.visit();
        
        SchemaResetter schemaResetter = new SchemaResetterplantrue /*skip duplicate uid check*/ );
        schemaResetter.visit();
        
        HashSet<StringdisabledOptimizerRules;
        try {
            disabledOptimizerRules = (HashSet<String>) ObjectSerializer
                    .deserialize(.getProperties().getProperty(
                            .));
        } catch (IOException ioe) {
            int errCode = 2110;
            String msg = "Unable to deserialize optimizer rules.";
            throw new FrontendException(msgerrCode.ioe);
        }
        if (disabledOptimizerRules == null) {
            disabledOptimizerRules = new HashSet<String>();
        }
        String pigOptimizerRulesDisabled = this..getProperties().getProperty(
                .);
        if (pigOptimizerRulesDisabled != null) {
            disabledOptimizerRules.addAll(Lists.newArrayList((Splitter.on(",").split(
                    pigOptimizerRulesDisabled))));
        }
        if( ! Boolean.valueOf(this..getProperties().getProperty(
            ."false"))){
            // Turn off the old partition filter optimizer
            disabledOptimizerRules.add("PartitionFilterOptimizer");
        } else {
            disabledOptimizerRules.add("NewPartitionFilterOptimizer");
        }
        if (.) {
            disabledOptimizerRules.add("MergeForEach");
            disabledOptimizerRules.add("PartitionFilterOptimizer");
            disabledOptimizerRules.add("LimitOptimizer");
            disabledOptimizerRules.add("SplitFilter");
            disabledOptimizerRules.add("PushUpFilter");
            disabledOptimizerRules.add("MergeFilter");
            disabledOptimizerRules.add("PushDownForEachFlatten");
            disabledOptimizerRules.add("ColumnMapKeyPrune");
            disabledOptimizerRules.add("AddForEach");
            disabledOptimizerRules.add("GroupByConstParallelSetter");
        }
        StoreAliasSetter storeAliasSetter = new StoreAliasSetterplan );
        storeAliasSetter.visit();
        
        // run optimizer
        LogicalPlanOptimizer optimizer = new LogicalPlanOptimizer(plan, 100, disabledOptimizerRules);
        optimizer.optimize();
        
        // compute whether output data is sorted or not
        SortInfoSetter sortInfoSetter = new SortInfoSetterplan );
        sortInfoSetter.visit();
        
        if (!.) {
            // Validate input/output file. Currently no validation framework in
            // new logical plan, put this validator here first.
            // We might decide to move it out to a validator framework in future
            InputOutputFileValidator validator = new InputOutputFileValidatorplan );
            validator.validate();
        }
        
        // translate new logical plan to physical plan
        LogToPhyTranslationVisitor translator = new LogToPhyTranslationVisitorplan );
        
        translator.setPigContext();
        translator.visit();
         = translator.getLogToPhyMap();
        return translator.getPhysicalPlan();
    }
    
        return ;
    }
    
            new HashMap<LOForEachMap<LogicalRelationalOperatorPhysicalOperator>>();
        Iterator<OperatorouterIter = plan.getOperators();
        while (outerIter.hasNext()) {
            Operator oper = outerIter.next();
            if (oper instanceof LOForEach) {
                LogicalPlan innerPlan = ((LOForEachoper).getInnerPlan();
                Map<LogicalRelationalOperatorPhysicalOperatorinnerOpMap = new HashMap<LogicalRelationalOperatorPhysicalOperator>();
                Iterator<OperatorinnerIter = innerPlan.getOperators();
                while (innerIter.hasNext()) {
                    Operator innerOper = innerIter.next();
                    innerOpMap.put(((LogicalRelationalOperator)innerOper), .get(innerOper));
                }
                result.put((LOForEachoperinnerOpMap);
            }
        }
        return result;
    }
    
    public LogicalPlan getNewPlan() {
        return ;
    }
      
    @SuppressWarnings({ "unchecked""rawtypes" })
    private void setSSHFactory(){
        Properties properties = this..getProperties();
        String g = properties.getProperty("ssh.gateway");
        if (g == null || g.length() == 0) return;
        try {
            Class clazz = Class.forName("org.apache.pig.shock.SSHSocketImplFactory");
            SocketImplFactory f = (SocketImplFactory)clazz.getMethod("getFactory"new Class[0]).invoke(0, new Object[0]);
            Socket.setSocketImplFactory(f);
        } 
        catch (SocketException e) {}
        catch (Exception e){
            throw new RuntimeException(e);
        }
    }

    
Method to apply pig properties to JobConf (replaces properties with resulting jobConf values)

Parameters:
conf JobConf with appropriate hadoop resource files
properties Pig properties that will override hadoop properties; properties might be modified
    @SuppressWarnings("deprecation")
    private void recomputeProperties(JobConf jobConfProperties properties) {
        // We need to load the properties from the hadoop configuration
        // We want to override these with any existing properties we have.
        if (jobConf != null && properties != null) {
            // set user properties on the jobConf to ensure that defaults
            // and deprecation is applied correctly
            Enumeration<ObjectpropertiesIter = properties.keys();
            while (propertiesIter.hasMoreElements()) {
                String key = (StringpropertiesIter.nextElement();
                String val = properties.getProperty(key);
                // We do not put user.name, See PIG-1419
                if (!key.equals("user.name"))
                	jobConf.set(keyval);
            }
            //clear user defined properties and re-populate
            properties.clear();
            Iterator<Map.Entry<StringString>> iter = jobConf.iterator();
            while (iter.hasNext()) {
                Map.Entry<StringStringentry = iter.next();
                properties.put(entry.getKey(), entry.getValue());
            } 
        }
    } 
    
New to GrepCode? Check out our FAQ X