Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.impl.builtin;
 
 import java.io.File;
 import java.util.List;
 
 
 
 public class StreamingUDF extends EvalFunc<Object> {
     private static final Log log = LogFactory.getLog(StreamingUDF.class);
 
     private static final String PYTHON_CONTROLLER_JAR_PATH = "/python/streaming/controller.py"//Relative to root of pig jar.
     private static final String PYTHON_PIG_UTIL_PATH = "/python/streaming/pig_util.py"//Relative to root of pig jar.
 
     //Indexes for arguments being passed to external process
     private static final int UDF_LANGUAGE = 0;
     private static final int PATH_TO_CONTROLLER_FILE = 1;
     private static final int UDF_FILE_NAME = 2; //Name of file where UDF function is defined
     private static final int UDF_FILE_PATH = 3; //Path to directory containing file where UDF function is defined
     private static final int UDF_NAME = 4; //Name of UDF function being called.
     private static final int PATH_TO_FILE_CACHE = 5; //Directory where required files (like pig_util) are cached on cluster nodes.
     private static final int STD_OUT_OUTPUT_PATH = 6; //File for output from when user writes to standard output.
     private static final int STD_ERR_OUTPUT_PATH = 7; //File for output from when user writes to standard error.
     private static final int CONTROLLER_LOG_FILE_PATH = 8; //Controller log file logs progress through the controller script not user code.
     private static final int IS_ILLUSTRATE = 9; //Controller captures output differently in illustrate vs running.
     
     private String language;
     private String filePath;
     private String funcName;
     private Schema schema;
     private ExecType execType;
     private String isIllustrate;
     
     private boolean initialized = false;
     private ScriptingOutputCapturer soc;
 
     private Process process// Handle to the external process
     private ProcessErrorThread stderrThread// thread to get process stderr
     private ProcessInputThread stdinThread// thread to send input to process
     private ProcessOutputThread stdoutThread//thread to read output from process
     
     private InputHandler inputHandler;
     private OutputHandler outputHandler;
 
     private BlockingQueue<TupleinputQueue;
    private BlockingQueue<ObjectoutputQueue;
    private DataOutputStream stdin// stdin of the process
    private InputStream stdout// stdout of the process
    private InputStream stderr// stderr of the process
    private static final Object ERROR_OUTPUT = new Object();
    private static final Object NULL_OBJECT = new Object(); //BlockingQueue can't have null.  Use place holder object instead.
    
    private volatile StreamingUDFException outerrThreadsError;
    
    public static final String TURN_ON_OUTPUT_CAPTURING = "TURN_ON_OUTPUT_CAPTURING";
    public StreamingUDF(String language
                        String filePathString funcName
                        String outputSchemaStringString schemaLineNumber,
                        String execTypeString isIllustrate)
                                throws StreamingUDFOutputSchemaExceptionExecException {
        this. = language;
        this. = filePath;
        this. = funcName;
        try {
            this. = Utils.getSchemaFromString(outputSchemaString);
            //ExecTypeProvider.fromString doesn't seem to load the ExecTypes in 
            //mapreduce mode so we'll try to figure out the exec type ourselves.
            if (execType.equals("local")) {
                this. = .;
            } else if (execType.equals("mapreduce")) {
                this. = .;
            } else {
                //Not sure what exec type - try to get it from the string.
                this. = ExecType.fromString(execType);
            }
        } catch (ParserException pe) {
            throw new StreamingUDFOutputSchemaException(pe.getMessage(), Integer.valueOf(schemaLineNumber));
        } catch (IOException ioe) {
            String errorMessage = "Invalid exectype passed to StreamingUDF. Should be local or mapreduce";
            .error(errorMessageioe);
            throw new ExecException(errorMessageioe);
        }
        this. = isIllustrate;
    }
    
    @Override
    public Object exec(Tuple inputthrows IOException {
        if (!) {
            initialize();
             = true;
        }
        return getOutput(input);
    }
    private void initialize() throws ExecExceptionIOException {
         = new ArrayBlockingQueue<Tuple>(1);
         = new ArrayBlockingQueue<Object>(2);
         = new ScriptingOutputCapturer();
        startUdfController();
        createInputHandlers();
        setStreams();
        startThreads();
    }
    private StreamingCommand startUdfController() throws IOException {
        StreamingCommand sc = new StreamingCommand(nullconstructCommand());
        ProcessBuilder processBuilder = StreamingUtil.createProcess(sc);
         = processBuilder.start();
        Runtime.getRuntime().addShutdownHook(new Thread(new ProcessKiller() ) );
        return sc;
    }
    private String[] constructCommand() throws IOException {
        String[] command = new String[10];
        Configuration conf = UDFContext.getUDFContext().getJobConf();
        String jarPath = conf.get("mapred.jar");
        String jobDir;
        if (jarPath != null) {
            jobDir = new File(jarPath).getParent();
        } else {
            jobDir = "";
        }
        
        String standardOutputRootWriteLocation = .getStandardOutputRootWriteLocation();
        String controllerLogFileNameoutFileNameerrOutFileName;
        if ( == .) {
            controllerLogFileName = standardOutputRootWriteLocation +  + "_python.log";
            outFileName = standardOutputRootWriteLocation + "cpython_" +  + "_" + ScriptingOutputCapturer.getRunId() + ".out";
            errOutFileName = standardOutputRootWriteLocation + "cpython_" +  + "_" + ScriptingOutputCapturer.getRunId() + ".err";
        } else {
            controllerLogFileName = standardOutputRootWriteLocation +  + "_python.log";
            outFileName = standardOutputRootWriteLocation +  + ".out";
            errOutFileName = standardOutputRootWriteLocation +  + ".err";
        }
        .registerOutputLocation(outFileName);
        command[] = ;
        command[] = getControllerPath(jobDir);
        int lastSeparator = .lastIndexOf(.) + 1;
        command[] = .substring(lastSeparator);
        command[] = lastSeparator <= 0 ? 
                "." : 
                .substring(0, lastSeparator - 1);
        command[] = ;
        command[] = "\"" + jobDir + .substring(0, lastSeparator) + "\"";
        command[] = outFileName;
        command[] = errOutFileName;
        command[] = controllerLogFileName;
        command[] = ;
        return command;
    }
    private void createInputHandlers() throws ExecExceptionFrontendException {
        PigStreamingUDF serializer = new PigStreamingUDF();
        this. = new StreamingUDFInputHandler(serializer);
        PigStreamingUDF deserializer = new PigStreamingUDF(.getField(0));
        this. = new StreamingUDFOutputHandler(deserializer);
    }
    private void setStreams() throws IOException { 
                .getInputStream()));
                0, .);
        
                .getOutputStream()));
        .bindTo(); 
        
                .getErrorStream()));
    }
    private void startThreads() {
         = new ProcessInputThread();
        .start();
        
         = new ProcessOutputThread();
        .start();
        
         = new ProcessErrorThread();
        .start();
    }

    
Find the path to the controller file for the streaming language. First check path to job jar and if the file is not found (like in the case of running hadoop in standalone mode) write the necessary files to temporary files and return that path.

Parameters:
language
jarPath
Returns:
Throws:
java.io.IOException
    private String getControllerPath(String jarPaththrows IOException {
        if (.toLowerCase().equals("python")) {
            String controllerPath = jarPath + ;
            File controller = new File(controllerPath);
            if (!controller.exists()) {
                File controllerFile = File.createTempFile("controller"".py");
                InputStream pythonControllerStream = Launcher.class.getResourceAsStream();
                try {
                    FileUtils.copyInputStreamToFile(pythonControllerStreamcontrollerFile);
                } finally {
                    pythonControllerStream.close();
                }
                controllerFile.deleteOnExit();
                File pigUtilFile = new File(controllerFile.getParent() + "/pig_util.py");
                pigUtilFile.deleteOnExit();
                InputStream pythonUtilStream = Launcher.class.getResourceAsStream();
                try {
                    FileUtils.copyInputStreamToFile(pythonUtilStreampigUtilFile);
                } finally {
                    pythonUtilStream.close();
                }
                controllerPath = controllerFile.getAbsolutePath();
            }
            return controllerPath;
        } else {
            throw new ExecException("Invalid language: " + );
        }
    }

    
Returns a list of file names (relative to root of pig jar) of files that need to be included in the jar shipped to the cluster. Will need to be smarter as more languages are added and the controller files are large.

Returns:
    public static List<StringgetResourcesForJar() {
        List<Stringfiles = new ArrayList<String>();
        files.add();
        files.add();
        return files;
    }
    private Object getOutput(Tuple inputthrows ExecException {
        if ( == null) {
            throw new ExecException("Process has already been shut down.  No way to retrieve output for input: " + input);
        }
        if (ScriptingOutputCapturer.isClassCapturingOutput() && 
                !.isInstanceCapturingOutput()) {
            Tuple t = TupleFactory.getInstance().newTuple();
            try {
                .put(t);
            } catch (InterruptedException e) {
                throw new ExecException("Failed adding capture input flag to inputQueue");
            }
            .setInstanceCapturingOutput(true);
        }
        try {
            if (this.getInputSchema() == null || this.getInputSchema().size() == 0) {
                //When nothing is passed into the UDF the tuple 
                //being sent is the full tuple for the relation.
                //We want it to be nothing (since that's what the user wrote).
                input = TupleFactory.getInstance().newTuple(0);
            }
            .put(input);
        } catch (Exception e) {
            throw new ExecException("Failed adding input to inputQueue"e);
        }
        Object o = null;
        try {
            if ( != null) {
                o = .take();
                if (o == ) {
                    o = null;
                }
            }
        } catch (Exception e) {
            throw new ExecException("Problem getting output"e);
        }
        if (o == ) {
             = null;
            if ( == null) {
                 = new StreamingUDFException(this."Problem with streaming udf.  Can't recreate exception.");
            }
            throw ;
        }
        return o;
    }
    @Override
    public Schema outputSchema(Schema input) {
        return this.;
    }

    
The thread which consumes input and feeds it to the the Process
    class ProcessInputThread extends Thread {
        ProcessInputThread() {
            setDaemon(true);
        }
        public void run() {
            try {
                .debug("Starting PIT");
                while (true) {
                    Tuple inputTuple = .take();
                    .putNext(inputTuple);
                    try {
                        .flush();
                    } catch(Exception e) {
                        return;
                    }
                }
            } catch (Exception e) {
                .error(e);
            }
        }
    }
    
    private static final int WAIT_FOR_ERROR_LENGTH = 500;
    private static final int MAX_WAIT_FOR_ERROR_ATTEMPTS = 5;
    
    
The thread which consumes output from process
    class ProcessOutputThread extends Thread {
        ProcessOutputThread() {
            setDaemon(true);
        }
        public void run() {
            Object o = null;
            try{
                .debug("Starting POT");
                //StreamUDFToPig wraps object in single element tuple
                o = .getNext().get(0);
                while (o != .) {
                    if (o != null)
                        .put(o);
                    else
                        .put();
                    o = .getNext().get(0);
                }
            } catch(Exception e) {
                if ( != null) {
                    try {
                        //Give error thread a chance to check the standard error output
                        //for an exception message.
                        int attempt = 0;
                        while (.isAlive() && attempt < ) {
                            Thread.sleep();
                            attempt++;
                        }
                        //Only write this if no other error.  Don't want to overwrite
                        //an error from the error thread.
                        if ( == null) {
                             = new StreamingUDFException("Error deserializing output.  Please check that the declared outputSchema for function " +
                                                                         + " matches the data type being returned."e);
                        }
                        .put(); //Need to wake main thread.
                    } catch(InterruptedException ie) {
                        .error(ie);
                    }
                }
            }
        }
    }
    class ProcessErrorThread extends Thread {
        public ProcessErrorThread() {
            setDaemon(true);
        }
        public void run() {
            try {
                .debug("Starting PET");
                Integer lineNumber = null;
                StringBuffer error = new StringBuffer();
                String errInput;
                BufferedReader reader = new BufferedReader(
                        new InputStreamReader(.));
                while ((errInput = reader.readLine()) != null) {
                    //First line of error stream is usually the line number of error.
                    //If its not a number just treat it as first line of error message.
                    if (lineNumber == null) {
                        try {
                            lineNumber = Integer.valueOf(errInput);
                        } catch (NumberFormatException nfe) {
                            error.append(errInput + "\n");
                        }
                    } else {
                        error.append(errInput + "\n");
                    }
                }
                 = new StreamingUDFException(error.toString(), lineNumber);
                if ( != null) {
                    .put(); //Need to wake main thread.
                }
                if ( != null) {
                    .close();
                     = null;
                }
            } catch (IOException e) {
                .debug("Process Ended");
            } catch (Exception e) {
                .error("standard error problem"e);
            }
        }
    }
    
    public class ProcessKiller implements Runnable {
        public void run() {
            .destroy();
        }
    }
New to GrepCode? Check out our FAQ X