Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 
 import java.util.List;
 
 
 public class POStream extends PhysicalOperator {
     private static final long serialVersionUID = 2L;
     
     private static final Result EOP_RESULT = new Result(.null);
 
     private String executableManagerStr;            // String representing ExecutableManager to use
     transient private ExecutableManager executableManager;    // ExecutableManager to use 
     private StreamingCommand command;               // Actual command to be run
     private Properties properties;
 
     private boolean initialized = false;
 
 
     protected BlockingQueue<ResultbinaryInputQueue = new ArrayBlockingQueue<Result>(1);
 
     protected boolean allInputFromPredecessorConsumed = false;
 
     protected boolean allOutputFromBinaryProcessed = false;
 
     public POStream(OperatorKey kExecutableManager executableManager
                       StreamingCommand commandProperties properties) {
         super(k);
         this. = executableManager.getClass().getName();
         this. = command;
         this. = properties;
 
         // Setup streaming-specific properties
         if (command.getShipFiles()) {
             parseShipCacheSpecs(command.getShipSpecs(), 
                                 properties"pig.streaming.ship.files");
         }
         parseShipCacheSpecs(command.getCacheSpecs(), 
                             properties"pig.streaming.cache.files");
     }
     
     private static void parseShipCacheSpecs(List<Stringspecs
             Properties propertiesString property) {
         
         String existingValue = properties.getProperty(property"");
         if (specs == null || specs.size() == 0) {
             return;
         }
         
         // Setup streaming-specific properties
         StringBuffer sb = new StringBuffer();
         Iterator<Stringi = specs.iterator();
         // first append any existing value
         if(!existingValue.equals("")) {
             sb.append(existingValue);
             if (i.hasNext()) {
                 sb.append(", ");
             }
         }
         while (i.hasNext()) {
            sb.append(i.next());
            if (i.hasNext()) {
                sb.append(", ");
            }
        }
        properties.setProperty(propertysb.toString());        
    }
        return ;
    }
    
    
Get the org.apache.pig.impl.streaming.StreamingCommand for this StreamSpec.

Returns:
the org.apache.pig.impl.streaming.StreamingCommand for this StreamSpec
    public StreamingCommand getCommand() {
        return ;
    }
    
    
    /* (non-Javadoc)
     * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#getNext(org.apache.pig.data.Tuple)
     */
    @Override
    public Result getNextTuple() throws ExecException {
        // The POStream Operator works with ExecutableManager to
        // send input to the streaming binary and to get output
        // from it. To achieve a tuple oriented behavior, two queues
        // are used - one for output from the binary and one for 
        // input to the binary. In each getNext() call:
        // 1) If there is no more output expected from the binary, an EOP is
        // sent to successor
        // 2) If there is any output from the binary in the queue, it is passed
        // down to the successor
        // 3) if neither of these two are true and if it is possible to
        // send input to the binary, then the next tuple from the
        // predecessor is got and passed to the binary
        try {
            // if we are being called AFTER all output from the streaming 
            // binary has already been sent to us then just return EOP
            // The "allOutputFromBinaryProcessed" flag is set when we see
            // an EOS (End of Stream output) from streaming binary
            if() {
                return new Result(.null);
            }
            
            // if we are here AFTER all map() calls have been completed
            // AND AFTER we process all possible input to be sent to the
            // streaming binary, then all we want to do is read output from
            // the streaming binary
            if() {
                Result r = .take();
                if(r.returnStatus == .) {
                    // If we received EOS, it means all output
                    // from the streaming binary has been sent to us
                    // So we can send an EOP to the successor in
                    // the pipeline. Also since we are being called
                    // after all input from predecessor has been processed
                    // it means we got here from a call from close() in
                    // map or reduce. So once we send this EOP down, 
                    // getNext() in POStream should never be called. So
                    // we don't need to set any flag noting we saw all output
                    // from binary
                    r = ;
                } else if (r.returnStatus == .)
                    illustratorMarkup(r.resultr.result, 0);
                return(r);
            }
            
            // if we are here, we haven't consumed all input to be sent
            // to the streaming binary - check if we are being called
            // from close() on the map or reduce
            if(this..) {
                Result r = getNextHelper((Tuple)null);
                if(r.returnStatus == .) {
                    // we have now seen *ALL* possible input
                    // check if we ever had any real input
                    // in the course of the map/reduce - if we did
                    // then "initialized" will be true. If not, just
                    // send EOP down.
                    if(getInitialized()) {
                        // signal End of ALL input to the Executable Manager's 
                        // Input handler thread
                        .put(r);
                        // note this state for future calls
                          = true;
                        // look for output from binary
                        r = .take();
                        if(r.returnStatus == .) {
                            // If we received EOS, it means all output
                            // from the streaming binary has been sent to us
                            // So we can send an EOP to the successor in
                            // the pipeline. Also since we are being called
                            // after all input from predecessor has been processed
                            // it means we got here from a call from close() in
                            // map or reduce. So once we send this EOP down, 
                            // getNext() in POStream should never be called. So
                            // we don't need to set any flag noting we saw all output
                            // from binary
                            r = ;
                        }
                    }
                    
                } else if(r.returnStatus == .) {
                    // If we received EOS, it means all output
                    // from the streaming binary has been sent to us
                    // So we can send an EOP to the successor in
                    // the pipeline. Also we are being called
                    // from close() in map or reduce (this is so because
                    // only then this.parentPlan.endOfAllInput is true).
                    // So once we send this EOP down, getNext() in POStream
                    // should never be called. So we don't need to set any 
                    // flag noting we saw all output from binary
                    r = ;
                } else if (r.returnStatus == .)
                  illustratorMarkup(r.resultr.result, 0);
                return r;
            } else {
                // we are not being called from close() - so
                // we must be called from either map() or reduce()
                // get the next Result from helper
                Result r = getNextHelper((Tuple)null);
                if(r.returnStatus == .) {
                    // If we received EOS, it means all output
                    // from the streaming binary has been sent to us
                    // So we can send an EOP to the successor in
                    // the pipeline and also note this condition
                    // for future calls
                    r = ;
                      = true;
                } else if (r.returnStatus == .)
                    illustratorMarkup(r.resultr.result, 0);
                return r;
            }
            
        } catch(Exception e) {
            int errCode = 2083;
            String msg = "Error while trying to get next result in POStream.";
            throw new ExecException(msgerrCode.e);
        }
            
        
    }
    public synchronized boolean getInitialized() {
        return ;
    }
    public synchronized void setInitialized(boolean initialized) {
        this. = initialized;
    }
    public Result getNextHelper(Tuple tthrows ExecException {
        try {
            synchronized(this) {
                while(true) {
                    // if there is something in binary output Queue
                    // return it
                    if(!.isEmpty()) {
                        Result res = .take();
                        return res;
                    }
                    
                    // check if we can write tuples to 
                    // input of the process
                    if(.remainingCapacity() > 0) {
                        
                        Result input = processInput();
                        if(input.returnStatus == . || 
                                input.returnStatus == .) {
                            return input;
                        } else {
                            // we have a tuple to send as input
                            // Only when we see the first tuple which can
                            // be sent as input to the binary we want
                            // to initialize the ExecutableManager and set
                            // up the streaming binary - this is required in 
                            // Unions due to a JOIN where there may never be
                            // any input to send to the binary in one of the map
                            // tasks - so we initialize only if we have to.
                            // initialize the ExecutableManager once
                            if(!) {
                                // set up the executableManager
                                 = 
                                    (ExecutableManager)PigContext.instantiateFuncFromSpec();
                                
                                try {
                                    .configure(this);
                                    .run();
                                } catch (IOException ioe) {
                                    int errCode = 2084;
                                    String msg = "Error while running streaming binary.";
                                    throw new ExecException(msgerrCode.ioe);
                                }            
                                 = true;
                            }
                            
                            // send this input to the streaming
                            // process
                            .put(input);
                        }
                        
                    } else {
                        
                        // wait for either input to be available
                        // or output to be consumed
                        while(.isEmpty() && !.isEmpty())
                            wait();
                        
                    }
                }
            }
        } catch (Exception e) {
            int errCode = 2083;
            String msg = "Error while trying to get next result in POStream.";
            throw new ExecException(msgerrCode.e);
        }
    }
    
    public String toString() {
        return getAliasString() + "POStream" + "[" + .toString() + "]"
                + " - " + .toString();
    }
 
    @Override
    public void visit(PhyPlanVisitor vthrows VisitorException {
        v.visitStream(this);
        
    }
    @Override
    public String name() {
       return toString(); 
    }
    @Override
    public boolean supportsMultipleInputs() {
        return false;
    }
    @Override
    public boolean supportsMultipleOutputs() {
        return false;
    }

    
    public void finish() throws IOException {
        .close();
    }

    

Returns:
the Queue which has input to binary
        return ;
    }

    

Returns:
the Queue which has output from binary
        return ;
    }
    
    @Override
    public Tuple illustratorMarkup(Object inObject outint eqClassIndex) {
      if( != null) {
          ExampleTuple tIn = (ExampleTuplein;
          .getEquivalenceClasses().get(eqClassIndex).add(tIn);
            .addData((Tupleout);
      }
      return (Tupleout;
    }
New to GrepCode? Check out our FAQ X