Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.impl.streaming;
 
 
ExecutableManager manages an external executable which processes data in a Pig query. The ExecutableManager is responsible for startup/teardown of the external process and also for managing it. It feeds input records to the executable via it's stdin, collects the output records from the stdout and also diagnostic information from the stdout.
 
 public class ExecutableManager {
     private static final Log LOG = LogFactory.getLog(ExecutableManager.class);
     private static final int SUCCESS = 0;
     private static final Result EOS_RESULT = new Result(.null);
 
     protected StreamingCommand command// Streaming command to be run
 
     Process process// Handle to the process
     protected int exitCode = -127; // Exit code of the process
 
     protected DataOutputStream stdin// stdin of the process
     ProcessInputThread stdinThread// thread to send input to process
 
     ProcessOutputThread stdoutThread// thread to get process stdout
     InputStream stdout// stdout of the process
 
     ProcessErrorThread stderrThread// thread to get process stderr
     InputStream stderr// stderr of the process
 
     // Input/Output handlers
 
     // Statistics
     protected long inputRecords = 0;
     protected long inputBytes = 0;
     protected long outputRecords = 0;
     protected long outputBytes = 0;
 
     protected volatile Throwable outerrThreadsError;
     private POStream poStream;
     private ProcessInputThread fileInputThread;

    
Create a new ExecutableManager.
 
     public ExecutableManager() {
     }

    
Configure and initialize the ExecutableManager.

 
     public void configure(POStream streamthrows IOExceptionExecException {
         this. = stream;
        this. = stream.getCommand();
        // Create the input/output handlers
        this. = HandlerFactory.createInputHandler();
        this. = HandlerFactory.createOutputHandler();
    }

    
Close and cleanup the ExecutableManager.

    public void close() throws IOException {
        // Close the InputHandler, which in some cases lets the process
        // terminate
        .close();
        // Check if we need to start the process now ...
        if (.getInputType() == .) {
            exec();
        }
        // Wait for the process to exit
        try {
             = .waitFor();
        } catch (InterruptedException ie) {
            .error("Unexpected exception while waiting for streaming binary to complete"ie);
            killProcess();
        }
        // Wait for stdout thread to complete
        try {
            if ( != null) {
                .join(0);
            }
             = null;
        } catch (InterruptedException ie) {
            .error("Unexpected exception while waiting for output thread for streaming binary to complete"ie);
            killProcess();
        }
        // Wait for stderr thread to complete
        try {
            if ( != null) {
                .join(0);
            }
             = null;
        } catch (InterruptedException ie) {
            .error("Unexpected exception while waiting for input thread for streaming binary to complete"ie);
            killProcess();
        }
        .debug("Process exited with: " + );
        if ( != ) {
            .error( + " failed with exit status: "
                    + );
        }
        if (.getOutputType() == .) {
            // Trigger the outputHandler
            .bindTo(""null, 0, -1);
            // start thread to process output from executable's stdout
             = new ProcessOutputThread();
            .start();
        }
        // Check if there was a problem with the managed process
        if ( != null) {
            .error("Output/Error thread failed with: "
                    + );
        }
    }

    
Helper function to close input and output streams to the process and kill it

Parameters:
process the process to be killed
Throws:
java.io.IOException
    private void killProcess(Process processthrows IOException {
        if (process != null) {
            .close(process);
            .close();
            process.destroy();
        }
    }

    
Start execution of the external process. This takes care of setting up the environment of the process and also starts ProcessErrorThread to process the stderr of the managed process.

    protected void exec() throws IOException {
        ProcessBuilder processBuilder = StreamingUtil.createProcess(this.);
         = processBuilder.start();
        .debug("Started the process for command: " + );
        // Pick up the process' stderr stream and start the thread to
        // process the stderr stream
                .getErrorStream()));
         = new ProcessErrorThread();
        .start();
        // Check if we need to handle the process' stdout directly
        if (.getOutputType() == .) {
            // Get hold of the stdout of the process
             = new DataInputStream(new BufferedInputStream(
                    .getInputStream()));
            // Bind the stdout to the OutputHandler
            .bindTo(""new BufferedPositionedInputStream(),
                    0, .);
            // start thread to process output from executable's stdout
             = new ProcessOutputThread();
            .start();
        }
    }

    
Start execution of the ExecutableManager.

    public void run() throws IOException {
        // Check if we need to exec the process NOW ...
        if (.getInputType() == .) {
            // start the thread to handle input. we pass the UDFContext to the
            // fileInputThread because when input type is asynchronous, the
            // exec() is called by fileInputThread, and it needs to access to
            // the UDFContext.
             = new ProcessInputThread(
                    , UDFContext.getUDFContext());
            .start();
            // If Input type is ASYNCHRONOUS that means input to the
            // streaming binary is from a file - that means we cannot exec
            // the process till the input file is completely written. This
            // will be done in close() - so now we return
            return;
        }
        // Start the executable ...
        exec();
        // set up input to the executable
                .getOutputStream()));
        .bindTo();
        // Start the thread to send input to the executable's stdin
         = new ProcessInputThread(null);
        .start();
    }

    
The thread which consumes input from POStream's binaryInput queue and feeds it to the the Process
    class ProcessInputThread extends Thread {
        InputHandler inputHandler;
        private POStream poStream;
        private UDFContext udfContext;
        private BlockingQueue<ResultbinaryInputQueue;
        ProcessInputThread(InputHandler inputHandlerPOStream poStreamUDFContext udfContext) {
            setDaemon(true);
            this. = inputHandler;
            this. = poStream;
            // a copy of UDFContext passed from the ExecutableManager thread
            this. = udfContext;
            // the input queue from where this thread will read
            // input tuples
            this. = poStream.getBinaryInputQueue();
        }
        @Override
        public void run() {
            // If input type is asynchronous, set the udfContext of the current
            // thread to the copy of ExecutableManager thread's udfContext. This
            // is necessary because the exec() method is called by the current
            // thread (fileInputThread) instead of the ExecutableManager thread.
            if (.getInputType() == . &&  != null) {
                UDFContext.setUdfContext();
            }
            try {
                // Read tuples from the previous operator in the pipeline
                // and pass it to the executable
                while (true) {
                    Result inp = null;
                    inp = .take();
                    synchronized () {
                        // notify waiting producer
                        // the if check is to keep "findbugs"
                        // happy
                        if(inp != null)
                            .notifyAll();
                    }
                    // We should receive an EOP only when *ALL* input
                    // for this process has already been sent and no
                    // more input is expected
                    if (inp != null && inp.returnStatus == .) {
                        // signal cleanup in ExecutableManager
                        close();
                        return;
                    }
                    if (inp != null && inp.returnStatus == .) {
                        // Check if there was a problem with the managed process
                        if ( != null) {
                            throw new IOException(
                                    "Output/Error thread failed with: "
                                            + );
                        }
                        // Pass the serialized tuple to the executable via the
                        // InputHandler
                        Tuple t = null;
                        try {
                            t = (Tupleinp.result;
                            .putNext(t);
                        } catch (IOException e) {
                            // if input type is synchronous then it could
                            // be related to the process terminating
                            if(.getInputType() == .) {
                                .warn("Exception while trying to write to stream binary's input"e);
                                // could be because the process
                                // died OR closed the input stream
                                // we will only call close() here and not
                                // worry about deducing whether the process died
                                // normally or abnormally - if there was any real
                                // issue the ProcessOutputThread should see
                                // a non zero exit code from the process and send
                                // a POStatus.STATUS_ERR back - what if we got
                                // an IOException because there was only an issue with
                                // writing to input of the binary - hmm..hope that means
                                // the process died abnormally!!
                                close();
                                return;
                            } else {
                                // asynchronous case - then this is a real exception
                                .error("Exception while trying to write to stream binary's input"e);
                                // send POStatus.STATUS_ERR to POStream to signal the error
                                // Generally the ProcessOutputThread would do this but now
                                // we should do it here since neither the process nor the
                                // ProcessOutputThread will ever be spawned
                                Result res = new Result(.,
                                        "Exception while trying to write to stream binary's input" + e.getMessage());
                                sendOutput(.getBinaryOutputQueue(), res);
                                throw e;
                            }
                        }
                         += t.getMemorySize();
                        ++;
                    }
                }
            } catch (Throwable t) {
                // Note that an error occurred
                 = t;
                .error"Error while reading from POStream and " +
                           "passing it to the streaming process"t);
                try {
                    killProcess();
                } catch (IOException ioe) {
                    .warn(ioe);
                }
            }
        }
    }
    private void sendOutput(BlockingQueue<ResultbinaryOutputQueueResult res) {
        try {
            binaryOutputQueue.put(res);
        } catch (InterruptedException e) {
            .error("Error while sending binary output to POStream"e);
        }
        synchronized () {
            // notify waiting consumer
            // the if is to satisfy "findbugs"
            if(res != null) {
                .notifyAll();
            }
        }
    }

    
The thread which gets output from the streaming binary and puts it onto the binary output Queue of POStream
    class ProcessOutputThread extends Thread {
        private BlockingQueue<ResultbinaryOutputQueue;
        ProcessOutputThread(OutputHandler outputHandlerPOStream poStream) {
            setDaemon(true);
            this. = outputHandler;
            // the output queue where this thread will put
            // output tuples for POStream
            this. = poStream.getBinaryOutputQueue();
        }
        @Override
        public void run() {
            try {
                // Read tuples from the executable and send it to
                // Queue of POStream
                Tuple tuple = null;
                while ((tuple = .getNext()) != null) {
                    processOutput(tuple);
                     += tuple.getMemorySize();
                }
                // output from binary is done
                processOutput(null);
                .close();
            } catch (Throwable t) {
                // Note that an error occurred
                 = t;
                .error("Caught Exception in OutputHandler of Streaming binary, " +
                        "sending error signal to pipeline"t);
                // send ERROR to POStream
                try {
                    Result res = new Result();
                    res.result = "Error reading output from Streaming binary:" +
                            "'" + .toString() + "':" + t.getMessage();
                    res.returnStatus = .;
                    sendOutput(res);
                    killProcess();
                } catch (Exception e) {
                    .error("Error while trying to signal Error status to pipeline"e);
                }
            }
        }
        void processOutput(Tuple t) {
            Result res = new Result();
            if (t != null) {
                // we have a valid tuple to pass back
                res.result = t;
                res.returnStatus = .;
                ++;
            } else {
                // t == null means end of output from
                // binary - wait for the process to exit
                // and harvest exit code
                try {
                     = .waitFor();
                } catch (InterruptedException ie) {
                    try {
                        killProcess();
                    } catch (IOException e) {
                        .warn("Exception trying to kill process while processing null output " +
                                "from binary"e);
                    }
                    // signal error
                    String errMsg = "Failure while waiting for process (" + .toString() + ")" +
                            ie.getMessage();
                    .error(errMsgie);
                    res.result = errMsg;
                    res.returnStatus = .;
                    sendOutput(res);
                    return;
                }
                if( == 0) {
                    // signal EOS (End Of Stream output)
                    res = ;
                } else {
                    // signal Error
                    String errMsg = "'" + .toString() + "'" + " failed with exit status: "
                            + ;
                    .error(errMsg);
                    res.result = errMsg;
                    res.returnStatus = .;
                }
            }
            sendOutput(res);
        }
    }



    
Workhorse to process the stderr stream of the managed process. By default ExecuatbleManager just sends out the received error message to the stderr of itself.

Parameters:
error error message from the managed process.
    protected void processError(String error) {
        // Just send it out to our stderr
        ..print(error);
    }
    class ProcessErrorThread extends Thread {
        public ProcessErrorThread() {
            setDaemon(true);
        }
        @Override
        public void run() {
            try {
                String error;
                BufferedReader reader = new BufferedReader(
                        new InputStreamReader());
                while ((error = reader.readLine()) != null) {
                    processError(error + "\n");
                }
                if ( != null) {
                    .close();
                    .debug("ProcessErrorThread done");
                }
            } catch (Throwable t) {
                // Note that an error occurred
                 = t;
                .error(t);
                try {
                    if ( != null) {
                        .close();
                    }
                } catch (IOException ioe) {
                    .warn(ioe);
                }
                throw new RuntimeException(t);
            }
        }
    }
New to GrepCode? Check out our FAQ X