Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.fasterxml.util.membuf.impl;
  
com.fasterxml.util.membuf.StreamyBytesMemBuffer implementation used for storing a sequence of byte values in a single byte sequence which does not retain boundaries implied by appends (as per com.fasterxml.util.membuf.StreamyMemBuffer). This means that ordering of bytes appended is retained on byte-by-byte basis, but there are no discrete grouping of sub-sequences (entries); all content is seen as sort of stream.

Access to queue is fully synchronized -- meaning that all methods are synchronized by implementations as necessary, and caller should not need to use external synchronization -- since parts will have to be anyway (updating of stats, pointers), and since all real-world use cases will need some level of synchronization anyway, even with just single producer and consumer. If it turns out that there are bottlenecks that could be avoided with more granular (or external) locking, this design can be revisited.

Note that if instances are discarded, they MUST be closed: finalize() method is not implemented since it is both somewhat unreliable (i.e. should not be counted on) and can add overhead for GC processing.

 
 {
     /*
     /**********************************************************************
     /* Life-cycle
     /**********************************************************************
      */

    

Parameters:
allocator Allocator used for allocating underlying segments
minSegmentsToAllocate Maximum number of segments to hold on to (for reuse) after being released. Increases minimum memory usage but can improve performance by avoiding unnecessary re-allocation; and also guarantees that buffer always has at least this much storage capacity.
maxSegmentsToAllocate Maximum number of segments that can be allocated for this buffer: limits maximum capacity and memory usage
initialSegments Chain of pre-allocated segments, containing _maxSegmentsForReuse segments that are allocated to ensure that there is always specified minimum capacity available
 
             int minSegmentsToAllocateint maxSegmentsToAllocate,
             BytesSegment initialSegments) {
         super(allocatorminSegmentsToAllocatemaxSegmentsToAllocateinitialSegments);
     }
 
         super(src);
     }
 
     /*
     /**********************************************************************
     /* Public API, simple statistics (not data) accessors
     /**********************************************************************
      */
 
     @Override
     public synchronized boolean isEmpty() {
         return  == 0L;
     }
 
     @Override
     public synchronized long available() {
         return ;
     }
     
     /*
     /**********************************************************************
     /* Public API, appending
     /**********************************************************************
      */
 
     // from base class:
     //public void append(byte[] data);
     //public void append(byte[] data, int offset, int length);
     //public boolean tryAppend(byte[] data);
     //public void append(byte value);
 
     @Override
     public synchronized boolean tryAppend(byte value)
     {
         if ( == null) {
             _reportClosed();
         }
         if (.tryAppend(value)) {
             ++;
             return true;
         }
         // need to allocate a new segment, possible?
         if ( <= 0) { // no local buffers available yet
             if ( >= ) { // except we are maxed out
                 return false;
            }
            // if we are, let's try allocate: will be added to "free" segments first, then used
            BytesSegment newFree = .allocateSegments(1, );
            if (newFree == null) {
                return false;
            }
             += 1;
             = newFree;
        }
        // should be set now so:
        final BytesSegment seg = ;
        seg.finishWriting();
        // and allocate, init-for-writing new one:
        BytesSegment newSeg = _reuseFree().initForWriting();
        seg.relink(newSeg);
         = newSeg;
        if (!.tryAppend(value)) {
            throw new IllegalStateException("Should have room for a byte after allocation");
        }
        ++;
        return true;
    }
    @Override
    public synchronized boolean tryAppend(byte[] dataint dataOffsetint dataLength)
    {
        if ( == null) {
            _reportClosed();
        }
        int freeInCurrent = .availableForAppend();
        // First, simple case: can fit it in the current buffer?
        if (freeInCurrent >= dataLength) {
            .append(datadataOffsetdataLength);
        } else {
            // if not, must check whether we could allocate enough segments to fit in
            int neededSegments = ((dataLength - freeInCurrent) + (-1)) / ;
    
            // Which may need reusing local segments, or allocating new ones via allocates
            int segmentsToAlloc = neededSegments - ;
            if (segmentsToAlloc > 0) { // nope: need more
                // ok, but are allowed to grow that big?
                if (( +  + segmentsToAlloc) > ) {
                    return false;
                }
                // if we are, let's try allocate: will be added to "free" segments first, then used
                BytesSegment newFree = .allocateSegments(segmentsToAlloc);
                if (newFree == null) {
                    return false;
                }
                 += segmentsToAlloc;
                 = newFree;
            }
    
            // and if we got this far, it's just simple matter of writing pieces into segments
            _doAppendChunked(datadataOffsetdataLength);
        }
        boolean wasEmpty = ( == 0);
         += dataLength;        
        if (wasEmpty) {
            this.notifyAll();
        }
        return true;
    }
    protected void _doAppendChunked(byte[] bufferint offsetint length)
    {
        if (length < 1) {
            return;
        }
        BytesSegment seg = ;
        while (true) {
            int actual = seg.tryAppend(bufferoffsetlength);
            offset += actual;
            length -= actual;
            if (length == 0) { // complete, can leave
                return;
            }
            // otherwise, need another segment, so complete current write
            seg.finishWriting();
            // and allocate, init-for-writing new one:
            BytesSegment newSeg = _reuseFree().initForWriting();
            seg.relink(newSeg);
             = seg = newSeg;
        }
    }
    /*
    /**********************************************************************
    /* Public API, reading
    /**********************************************************************
     */
    
    @Override
    public synchronized int read() throws InterruptedException
    {
        if ( == null) {
            _reportClosed();
        }
        // first: must have something to return
        while ( == 0L) {
            _waitForData();
        }
        if (.availableForReading() == 0) {
            String error = _freeReadSegment(null);
            if (error != null) {
                throw new IllegalStateException(error);
            }
        }
        int i = .read();
        --;
        return i;
    }
    @Override
    public synchronized int read(byte[] bufferint offsetint lengththrows InterruptedException
    {
        if ( == null) {
            _reportClosed();
        }
        if (length < 1) {
            return 0;
        }
        // first: must have something to return
        while ( == 0L) {
            _waitForData();
        }
        return _doRead(bufferoffsetlength);
    }
    @Override
    public synchronized int readIfAvailable(byte[] bufferint offsetint length) {
        if ( == null) {
            _reportClosed();
        }
        if ( == 0L) {
            return 0;
        }
        return _doRead(bufferoffsetlength);
    }
    @Override
    public synchronized int read(long timeoutMsecsbyte[] bufferint offsetint length)
            throws InterruptedException
    {
        if ( == null) {
            _reportClosed();
        }
        if ( > 0L) {
            return _doRead(bufferoffsetlength);
        }
        long now = System.currentTimeMillis();
        long end = now + timeoutMsecs;
        while (now < end) {
            _waitForData(end - now);
            if ( > 0L) {
                return _doRead(bufferoffsetlength);
            }
            now = System.currentTimeMillis();
        }
        return 0;
    }
    private final int _doRead(byte[] bufferint offsetint length)
    {
        if (length < 1) {
            return 0;
        }
        final int end = buffer.length;
        if (offset >= end || offset < 0) {
            throw new IllegalArgumentException("Illegal offset ("+offset+"): allowed values [0, "+end+"[");
        }
        if ((offset + length) > end) {
            throw new IllegalArgumentException("Illegal length ("+length+"): offset ("+offset
                    +") + length end past end of buffer ("+end+")");
        }
        // also, can't read more than what is available
        if (length > ) {
            length = (int;
        }
        // ok: simple case; all data available from within current segment
        int avail = .availableForReading();
        if (avail >= length) {
             -= length;
            .read(bufferoffsetlength);
            return length;
        }
        // otherwise need to do segmented read...
        String error = null;
        int remaining = length;
        while (true) {
            int actual = .tryRead(bufferoffsetremaining);
             -= actual;
            offset += actual;
            remaining -= actual;
            if (remaining == 0) { // complete, can leave
                break;
            }
            error = _freeReadSegment(error);
        }
        if (error != null) {
            throw new IllegalStateException(error);
        }
        return length;
    }
    /*
    /**********************************************************************
    /* Abstract method impls
    /**********************************************************************
     */
    // // // No peeked data, so these are simple
    
    @Override
    protected void _clearPeeked() { }
    @Override
    protected int _peekedLength() {
        return 0;
    }    
New to GrepCode? Check out our FAQ X