Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.pig.impl.io;
 
 
A record reader used to read data written using InterRecordWriter It uses the default InterSedes object for deserialization.
 
 public class InterRecordReader extends RecordReader<TextTuple> {
 
   private long start;
   private long pos;
   private long end;
   private Tuple value = null;
   public static final int RECORD_1 = 0x01;
   public static final int RECORD_2 = 0x02;
   public static final int RECORD_3 = 0x03;
   private DataInputStream inData = null;
   private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
 
   public void initialize(InputSplit genericSplit,
                          TaskAttemptContext contextthrows IOException {
     FileSplit split = (FileSplitgenericSplit;
     Configuration job = context.getConfiguration();
      = split.getStart();
      =  + split.getLength();
     final Path file = split.getPath();
 
     // open the file and seek to the start of the split
     FileSystem fs = file.getFileSystem(job);
     FSDataInputStream fileIn = fs.open(split.getPath());
     if ( != 0) {
         fileIn.seek();
     }
      = new BufferedPositionedInputStream(fileIn);
      = new DataInputStream();
   }
   
   public boolean nextKeyValue() throws IOException {
       int b = 0;
       //    skip to next record
       while (true) {
           if ( == null || .getPosition() >=) {
               return false;
           }
           // check if we saw RECORD_1 in our last attempt
           // this can happen if we have the following 
           // sequence RECORD_1-RECORD_1-RECORD_2-RECORD_3
           // After reading the second RECORD_1 in the above
           // sequence, we should not look for RECORD_1 again
           if(b != ) {
               b = .read();
               if(b !=  && b != -1) {
                   continue;
               }
               if(b == -1) return false;
           }
           b = .read();
           if(b !=  && b != -1) {
               continue;
           }
           if(b == -1) return false;
           b = .read();
           if(b !=  && b != -1) {
               continue;
          }
          if(b == -1) return false;
          b = .read();
          if(!BinInterSedes.isTupleByte((byteb) &&
                  b != -1) {
              continue;
          }
          if(b == -1) return false;
          break;
      }
      try {
          // if we got here, we have seen RECORD_1-RECORD_2-RECORD_3-TUPLE_MARKER
          // sequence - lets now read the contents of the tuple 
           =  (Tuple).readDatum(, (byte)b);
          =.getPosition();
          return true;
      } catch (ExecException ee) {
          throw ee;
      }
  }
  public Text getCurrentKey() {
      // the key is always null since we don't really have a key for each
      // input record
      return null;
  }
  public Tuple getCurrentValue() {
    return ;
  }

  
Get the progress within the split
  public float getProgress() {
    if ( == ) {
      return 0.0f;
    } else {
      return Math.min(1.0f, ( - ) / (float)( - ));
    }
  }
  
  public synchronized void close() throws IOException {
    if ( != null) {
      .close(); 
    }
  }
New to GrepCode? Check out our FAQ X