Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.impl.streaming;
 
 
 
OutputHandler is responsible for handling the output of the Pig-Streaming external command. The output of the managed executable could be fetched in a OutputHandler.OutputType.SYNCHRONOUS manner via its stdout or in an OutputHandler.OutputType.ASYNCHRONOUS manner via an external file to which the process wrote its output.
 
 public abstract class OutputHandler {
     public static final Object END_OF_OUTPUT = new Object();
     private static final byte[] DEFAULT_RECORD_DELIM = new byte[] {'\n'};
 
     public enum OutputType {SYNCHRONOUS, ASYNCHRONOUS}
 
     /*
      * The deserializer to be used to send data to the managed process.
      *
      * It is the responsibility of the concrete sub-classes to setup and
      * manage the deserializer.
      */
     protected StreamToPig deserializer;
 
     private PigStreamingBase newDeserializer;
 
     protected LineReader in = null;
 
     private Text currValue = new Text();
 
     
     //Both of these ignore the trailing \n.  So if the
     //default delimiter is "\n" recordDelimStr is "".
     private String recordDelimStr = null;
     private int recordDelimLength = 0;

    
Get the handled OutputType.

Returns:
the handled OutputType
 
     public abstract OutputType getOutputType();
 
     // flag to mark if close() has already been called
     protected boolean alreadyClosed = false;

    
Bind the OutputHandler to the InputStream from which to read the output data of the managed process.

Parameters:
is InputStream from which to read the output data of the managed process
Throws:
java.io.IOException
 
     public void bindTo(String fileNameBufferedPositionedInputStream is,
                        long offsetlong endthrows IOException {
         this.  = is;
         this. = new LineReader();
         if (this. instanceof PigStreamingBase) {
             this. = (PigStreamingBase;
         }
     }

    
Get the next output Tuple of the managed process.

Returns:
the next output Tuple of the managed process
Throws:
java.io.IOException
 
     public Tuple getNext() throws IOException {
        if ( == null) {
            return null;
        }
        .clear();
        if (!readValue()) {
            return null;
        }
        if ( != null) {
            return .deserialize(.getBytes(), 0, .getLength());
        } else {
            byte[] newBytes = new byte[.getLength()];
            System.arraycopy(.getBytes(), 0, newBytes, 0, .getLength());
            return .deserialize(newBytes);
        }
    }
    private boolean readValue() throws IOException {
        int num = .readLine();
        if (num <= 0) {
            return false;
        }
        while(!isEndOfRow()) {
            //Need to add back the newline character we ate.
            .append(new byte[] {'\n'}, 0, 1);
            byte[] lineBytes = readNextLine();
            if (lineBytes == null) {
                //We have no more input, so just break;
                break;
            }
            .append(lineBytes, 0, lineBytes.length);
        }
        
        return true;
    }
    
    private byte[] readNextLine() throws IOException {
        Text line = new Text();
        int num = .readLine(line);
        byte[] lineBytes = line.getBytes();
        if (num <= 0) {
            return null;
        }
        
        return lineBytes;
    }
    private boolean isEndOfRow() {
        if ( == null) {
            byte[] recordDelimBa = getRecordDelimiter();
             = recordDelimBa.length - 1; //Ignore trailing \n
             = new String(recordDelimBa, 0, ,  .);
        }
        if ( == 0 || .getLength() < ) {
            return true;
        }
        return .find(.getLength() - ) >= 0;
    }
    
    protected byte[] getRecordDelimiter() {
        return ;
    }

    
Close the OutputHandler.

    public synchronized void close() throws IOException {
        if(!) {
            .close();
             = null;
             = true;
        }
    }
New to GrepCode? Check out our FAQ X