Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package org.apache.avro.io;
 
 
A Encoder that writes large arrays and maps as a sequence of blocks. So long as individual primitive values fit in memory, arbitrarily long arrays and maps may be written and subsequently read without exhausting memory. Values are buffered until the specified block size would be exceeded, minimizing block overhead.

See also:
Encoder
 
 public class BlockingBinaryEncoder extends BinaryEncoder {
 
  /* Implementation note:
   *
   * Blocking is complicated because of nesting.  If a large, nested
   * value overflows your buffer, you've got to do a lot of dancing
   * around to output the blocks correctly.
   *
   * To handle this complexity, this class keeps a stack of blocked
   * values: each time a new block is started (e.g., by a call to
   * {@link #writeArrayStart}), an entry is pushed onto this stack.
   *
   * In this stack, we keep track of the state of a block.  Blocks can
   * be in two states.  "Regular" blocks have a non-zero byte count.
   * "Overflow" blocks help us deal with the case where a block
   * contains a value that's too big to buffer.  In this case, the
   * block contains only one item, and we give it an unknown
   * byte-count.  Because these values (1,unknown) are fixed, we're
   * able to write the header for these overflow blocks to the
   * underlying stream without seeing the entire block.  After writing
   * this header, we've freed our buffer space to be fully devoted to
   * blocking the large, inner value.
   */
 
   private static class BlockedValue {
     public enum State {
      
The bottom element of our stack represents being _outside_ of a blocked value.
 
       ROOT,

      
Represents the "regular" case, i.e., a blocked-value whose current block is fully contained in the buffer. In this case, BlockingBinaryEncoder.BlockedValue.start points to the start of the blocks _data_ -- but no room has been left for a header! When this block is terminated, it's data will have to be moved over a bit to make room for the header.
 
       REGULAR,

      
Represents a blocked-value whose current block is in the overflow state. In this case, BlockingBinaryEncoder.BlockedValue.start is zero. The header for such a block has _already been written_ (we've written out a header indicating that the block has a single item, and we put a "zero" down for the byte-count to indicate that we don't know the physical length of the buffer. Any blocks _containing_ this block must be in the OVERFLOW state.
 
      OVERFLOW
     };

    
The type of this blocked value (ARRAY or MAP).
 
     public Schema.Type type;

    
The state of this BlockedValue
 
     public State state;
    
    
The location in the buffer where this blocked value starts
 
     public int start;

    
The index one past the last byte for the previous item. If this is the first item, this is same as start.
    public int lastFullItem;
    
    
Number of items in this blocked value that are stored in the buffer.
    public int items;

    
Number of items left to writ
    public long itemsLeftToWrite;

    
Create a ROOT instance.
    public BlockedValue() {
      this. = null;
      this. = ..;
      this. = this. = 0;
      this. = 1; // Makes various assertions work out
    }
    
    
Check invariants of this and also the BlockedValue containing this.
    public boolean check(BlockedValue prevint pos) {
      assert  != . ||  == null;
      assert ( == . ||
               == .. ||  == ..);
      assert 0 <= ;
      assert 0 !=  ||  == pos;         // 0==itms ==> start==pos
      assert 1 <  ||  == // 1<=itms ==> start==lFI
      assert  <= 1 ||  <= // 1<itms ==> start<=lFI
      assert  <= pos;
      switch () {
      case :
          assert  == 0;
          assert prev == null;
          break;
      case :
          assert  >= 0;
          assert prev.lastFullItem <= ;
          assert 1 <= prev.items;
          break;
      case :
          assert  == 0;
          assert  == 1;
          assert prev.state == . || prev.state == .;
          break;
      }
      return false;
    }
  }

  
The buffer to hold the bytes before being written into the underlying stream.
  private byte[] buf;
  
  
Index into the location in buf, where next byte can be written.
  private int pos;
  
  
The state stack.
  private BlockedValue[] blockStack;
  private int stackTop = -1;
  private static final int STACK_STEP = 10;
  private static final class EncoderBuffer extends ByteArrayOutputStream {
    public byte[] buffer() {
      return ;
    }
    
    public int length() {
      return ;
    }
  }
  
  private EncoderBuffer encoderBuffer = new EncoderBuffer();
  private boolean check() {
    assert  != null;
    assert  != null;
    assert  <= .;
    assert 0 <= ;
    assert  <= . :  + " " + .;
    assert  != null;
    BlockedValue prev = null;
    for (int i = 0; i <= i++) {
      BlockedValue v = [i];
      v.check(prev);
      prev = v;
    }
    return true;
  }
  private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
  private static final int MIN_BUFFER_SIZE = 64;
    this(out);
  }
  public BlockingBinaryEncoder(OutputStream outint bufferSize) {
    super(out);
    if (bufferSize < ) {
      throw new IllegalArgumentException("Buffer size too smll.");
    }
    this. = new byte[bufferSize];
    this. = 0;
     = new BlockedValue[0];
    expandStack();
    BlockedValue bv = [++];
    bv.type = null;
    bv.state = ..;
    bv.start = bv.lastFullItem = 0;
    bv.items = 1;
    assert check();
  }
  private void expandStack() {
    int oldLength = .;
     = Arrays.copyOf(,
        . + );
    for (int i = oldLengthi < .i++) {
      [i] = new BlockedValue();
    }
  }

  
Redirect output (and reset the parser state if we're checking).
  public void init(OutputStream outthrows IOException {
    super.init(out);
    this. = 0;
    this. = 0;
    assert check();
  }
  public void flush() throws IOException {
    if ( != null) {
      BlockedValue bv = [];
      if (bv.state == ..) {
        .write(, 0, );
         = 0;
      } else {
        while (bv.state != ..) {
          compact();
        }
      }
      .flush();
    }
    assert check();
  }
  public void writeBoolean(boolean bthrows IOException {
    if (. < ( + 1)) ensure(1);
    [++] = (byte)(b ? 1 : 0);
    assert check();
  }
  public void writeInt(int nthrows IOException {
    if ( + 5 > .) {
      ensure(5);
    }
     = encodeLong(n);
    assert check();
  }
  public void writeLong(long nthrows IOException {
    if ( + 10 > .) {
      ensure(10);
    }
     = encodeLong(n);
    assert check();
  }
    
  public void writeFloat(float fthrows IOException {
    if ( + 4 > .) {
      ensure(4);
    }
     = encodeFloat(f);
    assert check();
  }
  public void writeDouble(double dthrows IOException {
    if ( + 8 > .) {
      ensure(8);
    }
     = encodeDouble(d);
    assert check();
  }
  public void writeString(Utf8 utf8throws IOException {
    writeBytes(utf8.getBytes(), 0, utf8.getLength());
    assert check();
  }
  public void writeBytes(ByteBuffer bytesthrows IOException {
    writeBytes(bytes.array(), bytes.position(), bytes.remaining());
    assert check();
  }
  
  public void writeFixed(byte[] bytesint startint lenthrows IOException {
    doWriteBytes(bytesstartlen);
    assert check();
  }
  
  public void writeEnum(int ethrows IOException {
    writeInt(e);
  }
  public void writeBytes(byte[] bytesint startint lenthrows IOException {
    if ( + 5 > .) {
      ensure(5);
    }
     = encodeLong(len);
    doWriteBytes(bytesstartlen);
    assert check();
  }
  public void writeArrayStart() throws IOException {
    if ( + 1 == .) {
      expandStack();
    }
    BlockedValue bv = [++];
    bv.type = ..;
    bv.state = ..;
    bv.start = bv.lastFullItem = ;
    bv.items = 0;
    assert check();
  }
  public void setItemCount(long itemCountthrows IOException {
    assert v.type == .. || v.type == ..;
    assert v.itemsLeftToWrite == 0;
    v.itemsLeftToWrite = itemCount;
    assert check();
  }
  
  public void startItem() throws IOException {
      finishOverflow();
    }
    t.items++;
    t.lastFullItem = ;
    t.itemsLeftToWrite--;
    assert check();
  }
  public void writeArrayEnd() throws IOException {
    if (top.type != ..) {
      throw new AvroTypeException("Called writeArrayEnd outside of an array.");
    }
    if (top.itemsLeftToWrite != 0) {
      throw new AvroTypeException("Failed to write expected number of array elements.");
    }
    assert check();
  }
  public void writeMapStart() throws IOException {
    if ( + 1 == .) {
      expandStack();
    }
    BlockedValue bv = [++];
    bv.type = ..;
    bv.state = ..;
    bv.start = bv.lastFullItem = ;
    bv.items = 0;
    assert check();
  }
  public void writeMapEnd() throws IOException {
    if (top.type != ..) {
      throw new AvroTypeException("Called writeMapEnd outside of a map.");
    }
    if (top.itemsLeftToWrite != 0) {
      throw new AvroTypeException("Failed to read write expected number of array elements.");
    }
    
    assert check();
  }
  public void writeIndex(int unionIndexthrows IOException {
    if ( + 5 > .) {
      ensure(5);
    }
     = encodeLong(unionIndex);
    assert check();
  }
  private void endBlockedValue() throws IOException {
    for (; ;) {
      assert check();
      BlockedValue t = [];
      assert t.state != ..;
      if (t.state == ..) {
        finishOverflow();
      }
      assert t.state == ..;
      if (0 < t.items) {
        int byteCount =  - t.start;
        if (t.start == 0 &&
          [ - 1].
            != ..) { // Lucky us -- don't have to move
          encodeLong(-t.items);
          encodeLong(byteCount);
        } else {
          encodeLong(-t.items);
          encodeLong(byteCount);
          final int headerSize = .length();
          if (. >=  + headerSize) {
             += headerSize;
            final int m = t.start;
            System.arraycopy(mm + headerSizebyteCount);
            System.arraycopy(.buffer(), 0, mheaderSize);
            .reset();
          } else {
            .reset();
            compact();
            continue;
          }
        }
      }
      --;
      if (. < ( + 1)) ensure(1);
      [++] = 0;   // Sentinel for last block in a blocked value
      assert check();
      if ([]. == ..) {
        flush();
      }
      return;
    }
  }

  
Called when we've finished writing the last item in an overflow buffer. When this is finished, the top of the stack will be an empty block in the "regular" state.

  private void finishOverflow() throws IOException {
    if (s.state != ..) {
      throw new IllegalStateException("Not an overflow block");
    }
    assert check();
    // Flush any remaining data for this block
    .write(, 0, );
     = 0;
    // Reset top of stack to be in REGULAR mode
    s.state = ..;
    s.start = s.lastFullItem = 0;
    s.items = 0;
    assert check();
  }
  private void ensure(int lthrows IOException {
    if (. < l) {
      throw new IllegalArgumentException("Too big: " + l);
    }
    while (. < ( + l)) {
        compact();
      } else {
        .write(, 0, );
         = 0;
      }
    }
  }
  private void doWriteBytes(byte[] bytesint startint len)
    throws IOException {
    if (len < .) {
      ensure(len);
      System.arraycopy(bytesstartlen);
       += len;
    } else {
      ensure(.);
      assert []. == .. ||
      write(bytesstartlen);
    }
    assert check();
  }
  private void write(byte[] bint offint lenthrows IOException {
      .write(bofflen);
    } else {
      assert check();
      while (. < ( + len)) {
        if ([]. == ..) {
          compact();
        } else {
          .write(, 0, );
           = 0;
          if (. <= len) {
            .write(bofflen);
            len = 0;
          }
        }
      }
      System.arraycopy(bofflen);
       += len;
      assert check();
    }
  }

  
Only call if you're there are REGULAR-state values on the stack.
  private void compact() throws IOException {
    assert check();
    // Find first REGULAR-state value
    BlockedValue s = null;
    int i;
    for (i = 1; i <= i++) {
      s = [i];
      if (s.state == ..break;
    }
    assert s != null;
    // We're going to transition "s" into the overflow state.  To do
    // this, We're going to flush any bytes prior to "s", then write
    // any full items of "s" into a block, start an overflow
    // block, write any remaining bytes of "s" up to the start of the
    // next more deeply-nested blocked-value, and finally move over
    // any remaining bytes (which will be from more deeply-nested
    // blocked values).
    // Flush any bytes prios to "s"
    .write(, 0, s.start);
    // Write any full items of "s"
    if (1 < s.items) {
      encodeLong(-(s.items - 1), );
      encodeLong(s.lastFullItem - s.start);
      .write(s.starts.lastFullItem - s.start);
      s.start = s.lastFullItem;
      s.items = 1;
    }
    // Start an overflow block for s
    encodeLong(1, );
    // Write any remaining bytes for "s", up to the next-most
    // deeply-nested value
    BlockedValue n = ((i + 1) <=  ?
        [i + 1] : null);
    int end = (n == null ?  : n.start);
    .write(s.lastFullItemend - s.lastFullItem);
    // Move over any bytes that remain (and adjust indices)
    System.arraycopy(end, 0,  - end);
    for (int j = i + 1; j <= j++) {
        n = [j];
        n.start -= end;
        n.lastFullItem -= end;
    }
     -= end;
    assert s.items == 1;
    s.start = s.lastFullItem = 0;
    s.state = ..;
    assert check();
  }
New to GrepCode? Check out our FAQ X