Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   /*
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  package org.apache.pig;
  
  import java.io.File;
  import java.util.Arrays;
  import java.util.Date;
  import java.util.HashSet;
  import java.util.List;
  import java.util.Map;
  
  import jline.History;
  
  import  org.antlr.runtime.RecognitionException;
Main class for Pig engine.
  
  public class Main {
  
      private final static Log log = LogFactory.getLog(Main.class);
  
      private static final String LOG4J_CONF = "log4jconf";
      private static final String BRIEF = "brief";
      private static final String DEBUG = "debug";
      private static final String VERBOSE = "verbose";
  
      private static final String version;
      private static final String majorVersion;
      private static final String minorVersion;
      private static final String patchVersion;
     private static final String svnRevision;
     private static final String buildTime;
 
     private enum ExecMode {STRING, FILE, SHELL, UNKNOWN}
 
     private static final String PROP_FILT_SIMPL_OPT
         = "pig.exec.filterLogicExpressionSimplifier";
 
     protected static final String PROGRESS_NOTIFICATION_LISTENER_KEY = "pig.notification.listener";
 
     protected static final String PROGRESS_NOTIFICATION_LISTENER_ARG_KEY = "pig.notification.listener.arg";
 
     static {
        Attributes attr=null;
        try {
             String findContainingJar = JarManager.findContainingJar(Main.class);
             if (findContainingJar != null) {
                 JarFile jar = new JarFile(findContainingJar);
                 final Manifest manifest = jar.getManifest();
                 final Map<String,Attributesattrs = manifest.getEntries();
                 attr = attrs.get("org/apache/pig");
             } else {
                 .info("Unable to read pigs manifest file as we are not running from a jar, version information unavailable");
             }
         } catch (Exception e) {
             .warn("Unable to read pigs manifest file, version information unavailable"e);
         }
         if (attr!=null) {
              = attr.getValue("Implementation-Version");
              = attr.getValue("Svn-Revision");
              = attr.getValue("Build-TimeStamp");
             String[] split = .split("\\.");
             =split[0];
             =split[1];
             =split[2];
         } else {
             =null;
             =null;
             =null;
             =null;
             =null;
             =null;
         }
     }

The Main-Class for the Pig Jar that will provide a shell and setup a classpath appropriate for executing Jar files. Warning, this method calls System.exit().

Parameters:
args -jar can be used to add additional jar files (colon separated). - will start a shell. -e will execute the rest of the command line as if it was input to the shell.
Throws:
IOException
 
 public static void main(String args[]) {
     System.exit(run(argsnull));
 }
 
 static int run(String args[], PigProgressNotificationListener listener) {
     int rc = 1;
     boolean verbose = false;
     boolean gruntCalled = false;
     String logFileName = null;
 
     try {
         Configuration conf = new Configuration(false);
         GenericOptionsParser parser = new GenericOptionsParser(confargs);
         conf = parser.getConfiguration();
 
         Properties properties = new Properties();
         PropertiesUtil.loadDefaultProperties(properties);
         properties.putAll(ConfigurationUtil.toProperties(conf));
 
         if (listener == null) {
             listener = makeListener(properties);
         }
         String[] pigArgs = parser.getRemainingArgs();
 
         boolean userSpecifiedLog = false;
         boolean checkScriptOnly = false;
 
         BufferedReader pin = null;
         boolean debug = false;
         boolean dryrun = false;
         boolean embedded = false;
         List<Stringparams = new ArrayList<String>();
         List<StringparamFiles = new ArrayList<String>();
         HashSet<StringdisabledOptimizerRules = new HashSet<String>();
 
         CmdLineParser opts = new CmdLineParser(pigArgs);
         opts.registerOpt('4'"log4jconf"..);
         opts.registerOpt('b'"brief"..);
         opts.registerOpt('c'"check"..);
         opts.registerOpt('d'"debug"..);
         opts.registerOpt('e'"execute"..);
         opts.registerOpt('f'"file"..);
         opts.registerOpt('g'"embedded"..);
         opts.registerOpt('h'"help"..);
         opts.registerOpt('i'"version"..);
         opts.registerOpt('l'"logfile"..);
         opts.registerOpt('m'"param_file"..);
         opts.registerOpt('p'"param"..);
         opts.registerOpt('r'"dryrun"..);
         opts.registerOpt('t'"optimizer_off"..);
         opts.registerOpt('v'"verbose"..);
         opts.registerOpt('w'"warning"..);
         opts.registerOpt('x'"exectype"..);
         opts.registerOpt('F'"stop_on_failure"..);
         opts.registerOpt('M'"no_multiquery"..);
         opts.registerOpt('P'"propertyFile"..);
 
         ExecMode mode = .;
         String file = null;
         String engine = null;
         ExecType execType = . ;
         String execTypeString = properties.getProperty("exectype");
         if(execTypeString!=null && execTypeString.length()>0){
             execType = ExecType.fromString(execTypeString);
         }
 
         // set up client side system properties in UDF context
         UDFContext.getUDFContext().setClientSystemProps(properties);
 
         char opt;
         while ((opt = opts.getNextOpt()) != .) {
             switch (opt) {
             case '4':
                 String log4jconf = opts.getValStr();
                 if(log4jconf != null){
                     properties.setProperty(log4jconf);
                 }
                 break;
 
             case 'b':
                 properties.setProperty("true");
                 break;
 
             case 'c':
                 checkScriptOnly = true;
                 break;
 
             case 'd':
                 String logLevel = opts.getValStr();
                 if (logLevel != null) {
                     properties.setProperty(logLevel);
                 }
                 debug = true;
                 break;
 
             case 'e':
                 mode = .;
                 break;
 
             case 'f':
                 mode = .;
                 file = opts.getValStr();
                 break;
 
             case 'g':
                 embedded = true;
                 engine = opts.getValStr();
                 break;
 
             case 'F':
                 properties.setProperty("stop.on.failure"""+true);
                 break;
 
             case 'h':
                 String topic = opts.getValStr();
                 if (topic != null)
                     if (topic.equalsIgnoreCase("properties"))
                         printProperties();
                     else{
                         ..println("Invalide help topic - " + topic);
                         usage();
                     }
                 else
                     usage();
                 return .;
 
             case 'i':
                 ..println(getVersionString());
                 return .;
 
             case 'l':
                 //call to method that validates the path to the log file
                 //and sets up the file to store the client side log file
                 String logFileParameter = opts.getValStr();
                 if (logFileParameter != null && logFileParameter.length() > 0) {
                     logFileName = validateLogFile(logFileParameternull);
                 } else {
                     logFileName = validateLogFile(logFileNamenull);
                 }
                 userSpecifiedLog = true;
                 properties.setProperty("pig.logfile", (logFileName == null""logFileName));
                 break;
 
             case 'm':
                 paramFiles.add(opts.getValStr());
                 break;
 
             case 'M':
                 // turns off multiquery optimization
                 properties.setProperty("opt.multiquery",""+false);
                 break;
 
             case 'p':
                 params.add(opts.getValStr());
                 break;
 
             case 'r':
                 // currently only used for parameter substitution
                 // will be extended in the future
                 dryrun = true;
                 break;
 
             case 't':
                 disabledOptimizerRules.add(opts.getValStr());
                 break;
 
             case 'v':
                 properties.setProperty(""+true);
                 verbose = true;
                 break;
 
             case 'w':
                 properties.setProperty("aggregate.warning"""+false);
                 break;
 
             case 'x':
                 try {
                     execType = ExecType.fromString(opts.getValStr());
                     } catch (IOException e) {
                         throw new RuntimeException("ERROR: Unrecognized exectype."e);
                     }
                 break;
 
             case 'P':
             {
                 InputStream inputStream = null;
                 try {
                     FileLocalizer.FetchFileRet localFileRet = FileLocalizer.fetchFile(propertiesopts.getValStr());
                     inputStream = new BufferedInputStream(new FileInputStream(localFileRet.file));
                     properties.load(inputStream) ;
                 } catch (IOException e) {
                     throw new RuntimeException("Unable to parse properties file '" + opts.getValStr() + "'");
                 } finally {
                     if (inputStream != null) {
                         try {
                             inputStream.close();
                         } catch (IOException e) {
                         }
                     }
                 }
             }
             break;
 
             default: {
                 Character cc = Character.valueOf(opt);
                 throw new AssertionError("Unhandled option " + cc.toString());
                      }
             }
         }
         // create the context with the parameter
         PigContext pigContext = new PigContext(execTypeproperties);
 
         // create the static script state object
         String commandLine = LoadFunc.join((AbstractList<String>)Arrays.asList(args), " ");
         ScriptState scriptState = ScriptState.start(commandLinepigContext);
         if (listener != null) {
             scriptState.registerListener(listener);
         }
 
         pigContext.getProperties().setProperty("pig.cmd.args"commandLine);
 
         if(logFileName == null && !userSpecifiedLog) {
             logFileName = validateLogFile(properties.getProperty("pig.logfile"), null);
         }
 
         pigContext.getProperties().setProperty("pig.logfile", (logFileName == null""logFileName));
 
         // configure logging
         configureLog4J(propertiespigContext);
 
         .info(getVersionString().replace("\n"""));
 
         if(logFileName != null) {
             .info("Logging error messages to: " + logFileName);
         }
 
         if( ! Boolean.valueOf(properties.getProperty("false"))){
             //turn off if the user has not explicitly turned on this optimization
             disabledOptimizerRules.add("FilterLogicExpressionSimplifier");
         }
 
                 ObjectSerializer.serialize(disabledOptimizerRules));
 
         PigContext.setClassLoader(pigContext.createCl(null));
 
         // construct the parameter substitution preprocessor
         Grunt grunt = null;
         BufferedReader in;
         String substFile = null;
 
         paramFiles = fetchRemoteParamFiles(paramFilesproperties);
         pigContext.setParams(params);
         pigContext.setParamFiles(paramFiles);
 
         switch (mode) {
 
         case : {
             String remainders[] = opts.getRemainingArgs();
             if (remainders != null) {
                 pigContext.getProperties().setProperty(.,
                         ObjectSerializer.serialize(remainders));
             }
             FileLocalizer.FetchFileRet localFileRet = FileLocalizer.fetchFile(propertiesfile);
             if (localFileRet.didFetch) {
                 properties.setProperty("pig.jars.relative.to.dfs""true");
             }
 
             scriptState.setFileName(file);
 
             if (embedded) {
                 return runEmbeddedScript(pigContextlocalFileRet.file.getPath(), engine);
             } else {
                 SupportedScriptLang type = determineScriptType(localFileRet.file.getPath());
                 if (type != null) {
                     return runEmbeddedScript(pigContextlocalFileRet.file
                                 .getPath(), type.name().toLowerCase());
                 }
             }
             //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available
             in = new BufferedReader(new InputStreamReader(Utils.getCompositeStream(new FileInputStream(localFileRet.file), properties)));
 
             // run parameter substitution preprocessor first
             substFile = file + ".substituted";
 
                 pin = runParamPreprocessor(pigContextin,
                         substFiledebug || dryrun || checkScriptOnly);
             if (dryrun) {
                 if (dryrun(substFilepigContext)) {
                     .info("Dry run completed. Substituted pig script is at "
                             + substFile
                             + ". Expanded pig script is at "
                             + file + ".expanded");
                 } else {
                     .info("Dry run completed. Substituted pig script is at "
                                 + substFile);
                 }
                 return .;
             }
 
 
             logFileName = validateLogFile(logFileNamefile);
             pigContext.getProperties().setProperty("pig.logfile", (logFileName == null""logFileName));
 
             // Set job name based on name of the script
             pigContext.getProperties().setProperty(.,
                                                    "PigLatin:" +new File(file).getName()
             );
 
             if (!debug) {
                 new File(substFile).deleteOnExit();
             }
 
             scriptState.setScript(new File(file));
 
             grunt = new Grunt(pinpigContext);
             gruntCalled = true;
 
             if(checkScriptOnly) {
                 grunt.checkScript(substFile);
                 ..println(file + " syntax OK");
                 rc = .;
             } else {
                 int results[] = grunt.exec();
                 rc = getReturnCodeForStats(results);
             }
 
             return rc;
         }
 
         case : {
             if(checkScriptOnly) {
                 ..println("ERROR:" +
                         "-c (-check) option is only valid " +
                         "when executing pig with a pig script file)");
                 return .;
             }
             // Gather up all the remaining arguments into a string and pass them into
             // grunt.
             StringBuffer sb = new StringBuffer();
             String remainders[] = opts.getRemainingArgs();
             for (int i = 0; i < remainders.lengthi++) {
                 if (i != 0) sb.append(' ');
                 sb.append(remainders[i]);
             }
 
             sb.append('\n');
 
             scriptState.setScript(sb.toString());
 
             in = new BufferedReader(new StringReader(sb.toString()));
 
             grunt = new Grunt(inpigContext);
             gruntCalled = true;
             int results[] = grunt.exec();
             return getReturnCodeForStats(results);
         }
 
         default:
             break;
         }
 
         // If we're here, we don't know yet what they want.  They may have just
         // given us a jar to execute, they might have given us a pig script to
         // execute, or they might have given us a dash (or nothing) which means to
         // run grunt interactive.
         String remainders[] = opts.getRemainingArgs();
         if (remainders == null) {
             if(checkScriptOnly) {
                 ..println("ERROR:" +
                         "-c (-check) option is only valid " +
                         "when executing pig with a pig script file)");
                 return .;
             }
             // Interactive
             mode = .;
           //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available
             ConsoleReader reader = new ConsoleReader(Utils.getCompositeStream(.properties), new OutputStreamWriter(.));
             reader.setDefaultPrompt("grunt> ");
             final String HISTORYFILE = ".pig_history";
             String historyFile = System.getProperty("user.home") + .  + HISTORYFILE;
             reader.setHistory(new History(new File(historyFile)));
             ConsoleReaderInputStream inputStream = new ConsoleReaderInputStream(reader);
             grunt = new Grunt(new BufferedReader(new InputStreamReader(inputStream)), pigContext);
             grunt.setConsoleReader(reader);
             gruntCalled = true;
             grunt.run();
             return .;
         } else {
             pigContext.getProperties().setProperty(., ObjectSerializer.serialize(remainders));
 
             // They have a pig script they want us to run.
             mode = .;
 
             FileLocalizer.FetchFileRet localFileRet = FileLocalizer.fetchFile(propertiesremainders[0]);
             if (localFileRet.didFetch) {
                 properties.setProperty("pig.jars.relative.to.dfs""true");
             }
 
             scriptState.setFileName(remainders[0]);
 
             if (embedded) {
                 return runEmbeddedScript(pigContextlocalFileRet.file.getPath(), engine);
             } else {
                 SupportedScriptLang type = determineScriptType(localFileRet.file.getPath());
                 if (type != null) {
                     return runEmbeddedScript(pigContextlocalFileRet.file
                                 .getPath(), type.name().toLowerCase());
                 }
             }
             //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available
             InputStream seqInputStream = Utils.getCompositeStream(new FileInputStream(localFileRet.file), properties);
             in = new BufferedReader(new InputStreamReader(seqInputStream));
 
             // run parameter substitution preprocessor first
             substFile = remainders[0] + ".substituted";
             pin = runParamPreprocessor(pigContextinsubstFiledebug || dryrun || checkScriptOnly);
             if (dryrun) {
                 if (dryrun(substFilepigContext)) {
                     .info("Dry run completed. Substituted pig script is at "
                             + substFile
                             + ". Expanded pig script is at "
                             + remainders[0] + ".expanded");
                 } else {
                     .info("Dry run completed. Substituted pig script is at "
                             + substFile);
                 }
                 return .;
             }
 
             logFileName = validateLogFile(logFileNameremainders[0]);
             pigContext.getProperties().setProperty("pig.logfile", (logFileName == null""logFileName));
 
             if (!debug) {
                 new File(substFile).deleteOnExit();
             }
 
             // Set job name based on name of the script
             pigContext.getProperties().setProperty(.,
                                                    "PigLatin:" +new File(remainders[0]).getName()
             );
 
             scriptState.setScript(localFileRet.file);
 
             grunt = new Grunt(pinpigContext);
             gruntCalled = true;
 
             if(checkScriptOnly) {
                 grunt.checkScript(substFile);
                 ..println(remainders[0] + " syntax OK");
                 rc = .;
             } else {
                 int results[] = grunt.exec();
                 rc = getReturnCodeForStats(results);
             }
             return rc;
         }
 
         // Per Utkarsh and Chris invocation of jar file via pig depricated.
     } catch (ParseException e) {
         usage();
         rc = .;
         PigStatsUtil.setErrorMessage(e.getMessage());
         PigStatsUtil.setErrorThrowable(e);
     } catch (org.apache.pig.tools.parameters.ParseException e) {
        // usage();
         rc = .;
         PigStatsUtil.setErrorMessage(e.getMessage());
         PigStatsUtil.setErrorThrowable(e);
     } catch (IOException e) {
         if (e instanceof PigException) {
             PigException pe = (PigException)e;
             rc = (pe.retriable()) ? .
                     : .;
             PigStatsUtil.setErrorMessage(pe.getMessage());
             PigStatsUtil.setErrorCode(pe.getErrorCode());
         } else {
             rc = .;
             PigStatsUtil.setErrorMessage(e.getMessage());
         }
         PigStatsUtil.setErrorThrowable(e);
 
         if(!gruntCalled) {
             LogUtils.writeLog(elogFileNameverbose"Error before Pig is launched");
         }
     } catch (Throwable e) {
         rc = .;
         PigStatsUtil.setErrorMessage(e.getMessage());
         PigStatsUtil.setErrorThrowable(e);
 
         if(!gruntCalled) {
             LogUtils.writeLog(elogFileNameverbose"Error before Pig is launched");
         }
     } finally {
         // clear temp files
         FileLocalizer.deleteTempFiles();
         PerformanceTimerFactory.getPerfTimerFactory().dumpTimers();
     }
 
     return rc;
 }
 
 protected static PigProgressNotificationListener makeListener(Properties properties) {
 
     try {
         return PigContext.instantiateObjectFromParams(
                     ConfigurationUtil.toConfiguration(properties),
                     ,
                     ,
                     PigProgressNotificationListener.class);
     } catch (ExecException e) {
         throw new RuntimeException(e);
     }
 }
 
 private static int getReturnCodeForStats(int[] stats) {
     return (stats[1] == 0) ? .         // no failed jobs
                 : (stats[0] == 0) ? .  // no succeeded jobs
                         : .;   // some jobs have failed
 }
 
 public static boolean dryrun(String scriptFilePigContext pigContext)
         throws RecognitionException, IOException {
     BufferedReader rd = new BufferedReader(new FileReader(scriptFile));
 
     DryRunGruntParser dryrun = new DryRunGruntParser(rdscriptFile,
             pigContext);
 
     boolean hasMacro = dryrun.parseStopOnError();
 
     if (hasMacro) {
         String expandedFile = scriptFile.replace(".substituted",
                 ".expanded");
         BufferedWriter fw = new BufferedWriter(new FileWriter(expandedFile));
         fw.append(dryrun.getResult());
         fw.close();
     }
     return hasMacro;
 }
 
 //TODO jz: log4j.properties should be used instead
 private static void configureLog4J(Properties propertiesPigContext pigContext) {
     // TODO Add a file appender for the logs
     // TODO Need to create a property in the properties file for it.
     // sgroschupf, 25Feb2008: this method will be obsolete with PIG-115.
 
     String log4jconf = properties.getProperty();
     String trueString = "true";
     boolean brief = trueString.equalsIgnoreCase(properties.getProperty());
     Level logLevel = .;
 
     String logLevelString = properties.getProperty();
     if (logLevelString != null){
         logLevel = Level.toLevel(logLevelString.);
     }
 
     Properties props = new Properties();
     FileReader propertyReader = null;
     if (log4jconf != null) {
         try {
             propertyReader = new FileReader(log4jconf);
             props.load(propertyReader);
         }
         catch (IOException e)
         {
             ..println("Warn: Cannot open log4j properties file, use default");
         }
         finally
         {
             if (propertyReader != nulltry {propertyReader.close();} catch(Exception e) {}
         }
     }
     if (props.size() == 0) {
         props.setProperty("log4j.logger.org.apache.pig"logLevel.toString());
         if((logLevelString = System.getProperty("pig.logfile.level")) == null){
             props.setProperty("log4j.rootLogger""INFO, PIGCONSOLE");
         }
         else{
             logLevel = Level.toLevel(logLevelString.);
             props.setProperty("log4j.logger.org.apache.pig"logLevel.toString());
             props.setProperty("log4j.rootLogger""INFO, PIGCONSOLE, F");
             props.setProperty("log4j.appender.F","org.apache.log4j.RollingFileAppender");
             props.setProperty("log4j.appender.F.File",properties.getProperty("pig.logfile"));
             props.setProperty("log4j.appender.F.layout","org.apache.log4j.PatternLayout");
             props.setProperty("log4j.appender.F.layout.ConversionPattern"brief ? "%m%n" : "%d [%t] %-5p %c - %m%n");
         }
 
         props.setProperty("log4j.appender.PIGCONSOLE","org.apache.log4j.ConsoleAppender");
         props.setProperty("log4j.appender.PIGCONSOLE.target""System.err");
         props.setProperty("log4j.appender.PIGCONSOLE.layout","org.apache.log4j.PatternLayout");
         props.setProperty("log4j.appender.PIGCONSOLE.layout.ConversionPattern"brief ? "%m%n" : "%d [%t] %-5p %c - %m%n");
     }
 
     PropertyConfigurator.configure(props);
     logLevel = Logger.getLogger("org.apache.pig").getLevel();
     if (logLevel==null) {
         logLevel = Logger.getLogger("org.apache.pig").getEffectiveLevel();
     }
     Properties backendProps = pigContext.getLog4jProperties();
     backendProps.setProperty("log4j.logger.org.apache.pig"logLevel.toString());
     pigContext.setLog4jProperties(backendProps);
     pigContext.setDefaultLogLevel(logLevel);
 }
 
 
 private static List<StringfetchRemoteParamFiles(List<StringparamFilesProperties properties)
         throws IOException {
     List<StringparamFiles2 = new ArrayList<String>();
     for (String paramparamFiles) {
         FileLocalizer.FetchFileRet localFileRet = FileLocalizer.fetchFile(propertiesparam);
         paramFiles2.add(localFileRet.file.getAbsolutePath());
     }
     return paramFiles2;
 }
 // returns the stream of final pig script to be passed to Grunt
 private static BufferedReader runParamPreprocessor(PigContext contextBufferedReader origPigScript,
                                             String scriptFileboolean createFile)
                                 throws org.apache.pig.tools.parameters.ParseExceptionIOException{
 
     if (createFile) {
         return context.doParamSubstitutionOutputToFile(origPigScriptscriptFile);
     } else {
         String substituted = context.doParamSubstitution(origPigScript);
         return new BufferedReader(new StringReader(substituted));
     }
 }

Returns the major version of Pig being run.
 
 public static String getMajorVersion() {
     return ;
 }

Returns the major version of the Pig build being run.
 
 public static String getMinorVersion() {
     return ;
 }

Returns the patch version of the Pig build being run.
 
 public static String getPatchVersion() {
     return ;
 }

Returns the svn revision number of the Pig build being run.
 
 public static String getSvnRevision() {
     return ;
 }

Returns the built time of the Pig build being run.
 
 public static String getBuildTime() {
     return ;
 }

Returns a version string formatted similarly to that of svn.
 Apache Pig version 0.11.0-SNAPSHOT (r1202387)
 compiled Nov 15 2011, 15:22:09
 
 
 public static String getVersionString() {
     return "Apache Pig version " +  + " (r" +  + ") \ncompiled "+;
 }

Print usage string.
 
 public static void usage()
 {
         ..println("\n"+getVersionString()+"\n");
         ..println("USAGE: Pig [options] [-] : Run interactively in grunt shell.");
         ..println("       Pig [options] -e[xecute] cmd [cmd ...] : Run cmd(s).");
         ..println("       Pig [options] [-f[ile]] file : Run cmds found in file.");
         ..println("  options include:");
         ..println("    -4, -log4jconf - Log4j configuration file, overrides log conf");
         ..println("    -b, -brief - Brief logging (no timestamps)");
         ..println("    -c, -check - Syntax check");
         ..println("    -d, -debug - Debug level, INFO is default");
         ..println("    -e, -execute - Commands to execute (within quotes)");
         ..println("    -f, -file - Path to the script to execute");
         ..println("    -g, -embedded - ScriptEngine classname or keyword for the ScriptEngine");
         ..println("    -h, -help - Display this message. You can specify topic to get help for that topic.");
         ..println("        properties is the only topic currently supported: -h properties.");
         ..println("    -i, -version - Display version information");
         ..println("    -l, -logfile - Path to client side log file; default is current working directory.");
         ..println("    -m, -param_file - Path to the parameter file");
         ..println("    -p, -param - Key value pair of the form param=val");
         ..println("    -r, -dryrun - Produces script with substituted parameters. Script is not executed.");
         ..println("    -t, -optimizer_off - Turn optimizations off. The following values are supported:");
         ..println("            SplitFilter - Split filter conditions");
         ..println("            PushUpFilter - Filter as early as possible");
         ..println("            MergeFilter - Merge filter conditions");
         ..println("            PushDownForeachFlatten - Join or explode as late as possible");
         ..println("            LimitOptimizer - Limit as early as possible");
         ..println("            ColumnMapKeyPrune - Remove unused data");
         ..println("            AddForEach - Add ForEach to remove unneeded columns");
         ..println("            MergeForEach - Merge adjacent ForEach");
         ..println("            GroupByConstParallelSetter - Force parallel 1 for \"group all\" statement");
         ..println("            All - Disable all optimizations");
         ..println("        All optimizations listed here are enabled by default. Optimization values are case insensitive.");
         ..println("    -v, -verbose - Print all error messages to screen");
         ..println("    -w, -warning - Turn warning logging on; also turns warning aggregation off");
         ..println("    -x, -exectype - Set execution mode: local|mapreduce, default is mapreduce.");
         ..println("    -F, -stop_on_failure - Aborts execution on the first failed job; default is off");
         ..println("    -M, -no_multiquery - Turn multiquery optimization off; default is on");
         ..println("    -P, -propertyFile - Path to property file");
         ..println("    -printCmdDebug - Overrides anything else and prints the actual command used to run Pig, including");
         ..println("                     any environment variables that are set by the pig command.");
 }
 
 public static void printProperties(){
         ..println("The following properties are supported:");
         ..println("    Logging:");
         ..println("        verbose=true|false; default is false. This property is the same as -v switch");
         ..println("        brief=true|false; default is false. This property is the same as -b switch");
         ..println("        debug=OFF|ERROR|WARN|INFO|DEBUG; default is INFO. This property is the same as -d switch");
         ..println("        aggregate.warning=true|false; default is true. If true, prints count of warnings");
         ..println("            of each type rather than logging each warning.");
         ..println("    Performance tuning:");
         ..println("        pig.cachedbag.memusage=<mem fraction>; default is 0.2 (20% of all memory).");
         ..println("            Note that this memory is shared across all large bags used by the application.");
         ..println("        pig.skewedjoin.reduce.memusagea=<mem fraction>; default is 0.3 (30% of all memory).");
         ..println("            Specifies the fraction of heap available for the reducer to perform the join.");
         ..println("        pig.exec.nocombiner=true|false; default is false. ");
         ..println("            Only disable combiner as a temporary workaround for problems.");
         ..println("        opt.multiquery=true|false; multiquery is on by default.");
         ..println("            Only disable multiquery as a temporary workaround for problems.");
         ..println("        pig.tmpfilecompression=true|false; compression is off by default.");
         ..println("            Determines whether output of intermediate jobs is compressed.");
         ..println("        pig.tmpfilecompression.codec=lzo|gzip; default is gzip.");
         ..println("            Used in conjunction with pig.tmpfilecompression. Defines compression type.");
         ..println("        pig.noSplitCombination=true|false. Split combination is on by default.");
         ..println("            Determines if multiple small files are combined into a single map.");
         ..println("        pig.exec.mapPartAgg=true|false. Default is false.");
         ..println("            Determines if partial aggregation is done within map phase, ");
         ..println("            before records are sent to combiner.");
         ..println("        pig.exec.mapPartAgg.minReduction=<min aggregation factor>. Default is 10.");
         ..println("            If the in-map partial aggregation does not reduce the output num records");
         ..println("            by this factor, it gets disabled.");
         ..println("        " +  + "=true|false; Default is false.");
         ..println("            Enable optimizer rules to simplify filter expressions.");
         ..println("    Miscellaneous:");
         ..println("        exectype=mapreduce|local; default is mapreduce. This property is the same as -x switch");
         ..println("        pig.additional.jars=<colon seperated list of jars>. Used in place of register command.");
         ..println("        udf.import.list=<comma seperated list of imports>. Used to avoid package names in UDF.");
         ..println("        stop.on.failure=true|false; default is false. Set to true to terminate on the first error.");
         ..println("        pig.datetime.default.tz=<UTC time offset>. e.g. +08:00. Default is the default timezone of the host.");
         ..println("            Determines the timezone used to handle datetime datatype and UDFs. ");
         ..println("Additionally, any Hadoop property can be specified.");
 }
 
 private static String validateLogFile(String logFileNameString scriptName) {
     String strippedDownScriptName = null;
 
     if(scriptName != null) {
         File scriptFile = new File(scriptName);
         if(!scriptFile.isDirectory()) {
             String scriptFileAbsPath;
             try {
                 scriptFileAbsPath = scriptFile.getCanonicalPath();
                 strippedDownScriptName = getFileFromCanonicalPath(scriptFileAbsPath);
             } catch (IOException ioe) {
                 .warn("Could not compute canonical path to the script file " + ioe.getMessage());
                 strippedDownScriptName = null;
             }
         }
     }
 
     String defaultLogFileName = (strippedDownScriptName == null ? "pig_" : strippedDownScriptName) + new Date().getTime() + ".log";
     File logFile;
 
     if(logFileName != null) {
         logFile = new File(logFileName);
 
         //Check if the file name is a directory
         //append the default file name to the file
         if(logFile.isDirectory()) {
             if(logFile.canWrite()) {
                 try {
                     logFileName = logFile.getCanonicalPath() + . + defaultLogFileName;
                 } catch (IOException ioe) {
                     .warn("Could not compute canonical path to the log file " + ioe.getMessage());
                     return null;
                 }
                 return logFileName;
             } else {
                 .warn("Need write permission in the directory: " + logFileName + " to create log file.");
                 return null;
             }
         } else {
             //we have a relative path or an absolute path to the log file
             //check if we can write to the directory where this file is/will be stored
 
             if (logFile.exists()) {
                 if(logFile.canWrite()) {
                     try {
                         logFileName = new File(logFileName).getCanonicalPath();
                     } catch (IOException ioe) {
                         .warn("Could not compute canonical path to the log file " + ioe.getMessage());
                         return null;
                     }
                     return logFileName;
                 } else {
                     //do not have write permissions for the log file
                     //bail out with an error message
                     .warn("Cannot write to file: " + logFileName + ". Need write permission.");
                     return logFileName;
                 }
             } else {
                 logFile = logFile.getParentFile();
 
                 if(logFile != null) {
                     //if the directory is writable we are good to go
                     if(logFile.canWrite()) {
                         try {
                             logFileName = new File(logFileName).getCanonicalPath();
                         } catch (IOException ioe) {
                             .warn("Could not compute canonical path to the log file " + ioe.getMessage());
                             return null;
                         }
                         return logFileName;
                     } else {
                         .warn("Need write permission in the directory: " + logFile + " to create log file.");
                         return logFileName;
                     }
                 }//end if logFile != null else is the default in fall through
             }//end else part of logFile.exists()
         }//end else part of logFile.isDirectory()
     }//end if logFileName != null
 
     //file name is null or its in the current working directory
     //revert to the current working directory
     String currDir = System.getProperty("user.dir");
     logFile = new File(currDir);
     logFileName = currDir + . + (logFileName == nulldefaultLogFileName : logFileName);
     if(logFile.canWrite()) {
        return logFileName;
    }
    .warn("Cannot write to log file: " + logFileName);
    return null;
private static String getFileFromCanonicalPath(String canonicalPath) {
    return canonicalPath.substring(canonicalPath.lastIndexOf(.));
        throws IOException {
    return ScriptEngine.getSupportedScriptLang(file);
private static int runEmbeddedScript(PigContext pigContextString fileString engine)
throws IOException {
    .info("Run embedded script: " + engine);
    pigContext.connect();
    ScriptEngine scriptEngine = ScriptEngine.getInstance(engine);
    Map<StringList<PigStats>> statsMap = scriptEngine.run(pigContextfile);
    PigStatsUtil.setStatsMap(statsMap);
    int failCount = 0;
    int totalCount = 0;
    for (List<PigStatslst : statsMap.values()) {
        if (lst != null && !lst.isEmpty()) {
            for (PigStats stats : lst) {