Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.fasterxml.clustermate.service.msg;
  
  import java.io.*;
  import java.util.*;
  
  import org.slf4j.Logger;
  
 
 
 
StreamingResponseContent implementation used for returning content for entries that have external File-backed payload. Difference to SimpleStreamingResponseContent is that due to file system access, more care has to be taken, including allowing possible throttling of reading of content to output.
 
     implements StreamingResponseContent
 {
     private final static Logger LOG = LoggerFactory.getLogger(FileBackedResponseContentImpl.class);

    
Since allocation failures can come in clusters, let's do basic throttling
 
     protected final static SimpleLogThrottler bufferAllocFailLogger = new SimpleLogThrottler(, 1000);
    
    
Let's use large enough read buffer to allow read-all-write-all cases.
 
     private final static int READ_BUFFER_LENGTH = 64000;
 
     /*
     /**********************************************************************
     /* Helper objects
     /**********************************************************************
      */
    
    
We can reuse read buffers as they are somewhat costly to allocate, reallocate all the time.
 
     final protected static BufferRecycler _bufferRecycler = new BufferRecycler();
 
     final protected OperationDiagnostics _diagnostics;
     
     final protected TimeMaster _timeMaster;
 
     final protected StorableStore _store;
     
     final protected StoreOperationThrottler _throttler;
     
     /*
     /**********************************************************************
     /* Data to stream out
     /**********************************************************************
      */
 
     private final StoredEntry<?> _entry;
 
     private final File _file;
 
     private FileInputStream _fileInput;
     
     private final long _dataOffset;
 	
     private final long _dataLength;
 
     /*
     /**********************************************************************
     /* Metadata
    /**********************************************************************
     */
    private final long _operationTime;
    
    private final Compression _compression;

    
When reading from a file, this indicates length of content before processing (if any).
    private final long _fileLength;
    /*
    /**********************************************************************
    /* Construction
    /**********************************************************************
     */
            StorableStore storelong operationTime,
            File fCompression compByteRange range,
            StoredEntry<?> entry)
        throws StoreException
    {
         = diag;
         = timeMaster;
         = store;
         = store.getThrottler();
         = operationTime;
         = entry;
         = entry.getStorageLength();
        long contentLen = (comp == null) ?  : entry.getActualUncompressedLength();
        if (range == null) {
             = -1L;
             = contentLen;
        } else {
            // Range can be stored in offset..
             = range.getStart();
             = range.calculateLength();
        }
         = f;
         = comp;
        try {
             = new FileInputStream();
        } catch (FileNotFoundException e) {
            throw new StoreException.NoSuchFile(.getKey().asStorableKey(),
                    "File '"+.getAbsolutePath()+"' not found for Entry "+);
        }
    }
    
    /*
    /**********************************************************************
    /* Metadata
    /**********************************************************************
     */
    @Override
    public boolean hasFile() { return true; }
    @Override
    public boolean inline() { return false; }
    
    @Override
    public long getLength() {
        return ;
    }
    /*
    /**********************************************************************
    /* Actual streaming
    /**********************************************************************
     */
    
    @Override
    public void writeContent(final OutputStream outthrows IOException
    {
        final BufferRecycler.Holder bufferHolder = .getHolder();        
        final byte[] copyBuffer = bufferHolder.borrowBuffer();
        try {
            _writeContent(outcopyBuffer);
        } finally {
            bufferHolder.returnBuffer(copyBuffer);
        }
    }
    protected final void _writeContent(final OutputStream outfinal byte[] copyBuffer)
        throws IOException
    {
        // 4 main combinations: compressed/not-compressed, range/no-range
        // and then 2 variations; fits in buffer or not
        // Start with uncompressed
        if (!Compression.needsUncompress()) {
            // and if all we need fits in the buffer, read all, write all:
            if ( <= ) {
                _readAllWriteAllUncompressed(outcopyBuffer, (int);
            } else {
                // if not, need longer lock...
                _readAllWriteStreamingUncompressed(outcopyBuffer);
            }
            return;
        }
        
        // And then compressed variants. First, maybe we can read all data in memory before uncomp?
        if ( <= ) {
            _readAllWriteAllCompressed(outcopyBuffer);
        } else {
            _readAllWriteStreamingCompressed(outcopyBuffer);
        }
    }
    /*
    /**********************************************************************
    /* Second level copy methods; uncompressed data
    /**********************************************************************
     */

    
Method called for the simple case where we can just read all data into single buffer (and do that in throttled block), then write it out at our leisure.
    protected void _readAllWriteAllUncompressed(OutputStream outfinal byte[] copyBuffer,
            final long offsetfinal int dataLength)
        throws IOException
    {
        _readAll(outcopyBufferoffsetdataLength);
        // and write out like so
        final long start = ( == null) ? 0L : .nanosForDiagnostics();
        out.write(copyBuffer, 0, dataLength);
        if ( != null) {
        }
    }

    
Method called in cases where content does not all fit in a copy buffer and has to be streamed.
    protected void _readAllWriteStreamingUncompressed(final OutputStream outfinal byte[] copyBuffer,
            final long offsetfinal long dataLength)
        throws IOException
    {
            @Override
            public IOException withBuffer(StreamyBytesMemBuffer buffer) {
                try {
                    _readAllWriteStreamingUncompressed2(outcopyBufferoffsetdataLengthbuffer);
                } catch (IOException e) {
                    return e;
                }
                return null;
            }
            @Override
            public IOException withError(IllegalStateException e) {
                .logWarn("Failed to allocate off-heap buffer for streaming output: {}"e);
                return withBuffer(null);
            }
        });
        if (e0 != null) {
            throw e0;
        }
    }
    protected void _readAllWriteStreamingUncompressed2(final OutputStream outfinal byte[] copyBuffer,
            final long offsetfinal long dataLength,
            final StreamyBytesMemBuffer offHeap)
        throws IOException
    {
        final long fsWaitStart = ( == null) ? 0L : .nanosForDiagnostics();
                .getRaw(), ,
                new FileOperationCallback<StreamyBytesMemBuffer>() {
            @Override
            public StreamyBytesMemBuffer perform(long operationTimeStorableKey keyStorable valueFile externalFile)
                throws IOException
            {
                // gets tricky, so process wait separately
                if ( != null) {
                    .addFileReadWait.nanosForDiagnostics() - fsWaitStart);
                }
                final InputStream in = ;
                try {
                    if (offset > 0L) {
                        final long start = ( == null) ? 0L : .nanosForDiagnostics();
                        _skip(inoffset);
                        // assume skip is "free"?
                        if ( != null) {
                            .addFileReadAccess(startstart.nanosForDiagnostics(), 0L);
                        }
                    }
                    long left = dataLength;
                    // Can we make use of off-heap buffer?
                    if (offHeap != null) {
                        int leftovers = _readInBuffer(keyinleftcopyBufferoffHeap);
                        // easy case: all read?
                        if (leftovers == 0) {
                            return offHeap;
                        }
                        // if not, read+write; much longer. But starting with buffered, if any
                        byte[] stuff = Arrays.copyOf(copyBufferleftovers);
                        left -= _writeBuffered(offHeapoutcopyBufferstuff);
                    }                    
                    while (left > 0L) {
                        final long fsStart = ( == null) ? 0L : .nanosForDiagnostics();
                        int read = _read(incopyBufferleft);
                        if ( != null) {
                            .addFileReadAccess(fsStart,  , (longread);
                        }
                        final long writeStart = ( == null) ? 0L : .nanosForDiagnostics();
                        out.write(copyBuffer, 0, read);
                        left -= read;
                        if ( != null) {
                            .addResponseWriteTime(writeStart);
                        }
                    }
                } finally {
                    _close(in);
                }
                return null;
            }
        });
        // Optimized case: all in the buffer, written outside file-read lock (to reduce lock time)
        if (fullBuffer != null) {
            _writeBuffered(fullBufferoutcopyBuffernull);
        }
    }
    
    /*
    /**********************************************************************
    /* Second level copy methods; compressed data
    /**********************************************************************
     */

    
Method called in cases where the compressed file can be fully read in a single buffer, to be uncompressed and written.
    protected void _readAllWriteAllCompressed(final OutputStream outfinal byte[] copyBuffer,
            final long offsetfinal long dataLength)
        throws IOException
    {
        // important: specify no offset, file length; data offset/length is for _uncompressed_
        int inputLength = (int;
        _readAll(outcopyBuffer, 0L, inputLength);
        // Compress-Ning package allows "push" style uncompression (yay!)
        Uncompressor uncomp;
        DataHandler h = new RangedDataHandler(outoffsetdataLength);
        if ( == .) {
            uncomp = new LZFUncompressor(h);
        } else if ( == .) {
            uncomp = new GZIPUncompressor(h);
        } else { // otherwise, must use bulk operations?
            // TODO: currently we do not have other codecs
            throw new UnsupportedOperationException("No Uncompressor for compression type: "+);
        }
        final long start = ( == null) ? 0L : .nanosForDiagnostics();
        uncomp.feedCompressedData(copyBuffer, 0, inputLength);
        uncomp.complete();
        if ( != null) {
            .addResponseWriteTime(start);
        }
    }

    
Method called in the complex case of having to read a large piece of content, where source does not fit in the input buffer.
    protected void _readAllWriteStreamingCompressed(final OutputStream outfinal byte[] copyBuffer)
        throws IOException
    {
            @Override
            public IOException withBuffer(StreamyBytesMemBuffer buffer) {
                InputStream in = ;
                try {
                    // First: LZF has special optimization to use, if we are to copy the whole thing:
                    if (( == .) && ( < 0L)) {
                        _readAllWriteStreamingCompressedLZF(inoutcopyBufferbuffer);
                    } else {
                        _readAllWriteStreamingCompressed2(inoutcopyBufferbuffer);
                    }
                } catch (IOException e) {
                    return e;
                } finally {
                    _close(in);
                }
                return null;
            }
            @Override
            public IOException withError(IllegalStateException e) {
                .logWarn("Failed to allocate off-heap buffer for streaming output: {}"e);
                return withBuffer(null);
            }
        });
        if (e0 != null) {
            throw e0;
        }
    }
    // specialized version for read-all-no-offset, lzf-compressed case
    protected void _readAllWriteStreamingCompressedLZF(final InputStream infinal OutputStream out,
            final byte[] copyBufferfinal StreamyBytesMemBuffer offHeap)
        throws IOException
    {
        final long waitStart = ( == null) ? 0L : .nanosForDiagnostics();
                .getRaw(), ,
                new FileOperationCallback<StreamyBytesMemBuffer>() {
            @SuppressWarnings("resource")
            @Override
            public StreamyBytesMemBuffer perform(long operationTimeStorableKey keyStorable valueFile externalFile)
                    throws IOExceptionStoreException
            {
                long left = .getStorageLength();
                if ( != null) {
                    .addFileReadWait(.nanosForDiagnostics() - waitStart);
                }
                InputStream combined;
                
                if (offHeap != null) {
                    .getStorageLength();
                    int leftovers = _readInBuffer(keyinleftcopyBufferoffHeap);
                    // easy case: all read?
                    if (leftovers == 0) {
                        return offHeap;
                    }
                    // if not, read+write; much longer. But starting with buffered, if any
                    Vector<InputStreamcoll = new Vector<InputStream>(3);
                    coll.add(new BufferBackedInputStream(offHeap));
                    coll.add(new ByteArrayInputStream(copyBuffer, 0, leftovers));
                    coll.add(in);
                    combined = new SequenceInputStream(coll.elements());
                } else {
                    // otherwise, just read from input
                    combined = in;
                }
                final long start = ( == null) ? 0L : .nanosForDiagnostics();
                CountingInputStream counter = new CountingInputStream(combined);
                LZFInputStream lzfIn = new LZFInputStream(counter);
                try {
                     lzfIn.readAndWrite(out);
                } finally {
                    _close(lzfIn);
                    if ( != null) {
                        final long totalSpent = .nanosForDiagnostics() - start;
                        // Not good, but need to try avoiding double-booking so assume 1/4 for response write
                        long respTime = (totalSpent >> 2);
                        .addResponseWriteTime(respTime);
                        .addFileReadAccess(startstartstart + totalSpent - respTime,
                                counter.readCount());
                    }
                }
                return null;
            }
        });
        // Fast case is out of file-access lock so must be handled here
        if (fullBuffer != null) {
            final long start = ( == null) ? 0L : .nanosForDiagnostics();
            LZFInputStream lzfIn = new LZFInputStream(new BufferBackedInputStream(fullBuffer));
            try {
                lzfIn.readAndWrite(out);
            } finally {
                _close(lzfIn);
                 if ( != null) {
                     .addResponseWriteTime(start);
                 }
            }
        }
    }
    protected void _readAllWriteStreamingCompressed2(final InputStream infinal OutputStream out,
            final byte[] copyBufferfinal StreamyBytesMemBuffer offHeap)
        throws IOException
    {
                .getRaw(), ,
                new FileOperationCallback<Void>() {
            @Override
            public Void perform(long operationTimeStorableKey key,
                    Storable valueFile externalFilethrows IOException,
                    StoreException {
                _readAllWriteStreamingCompressed3(inoutcopyBufferoffHeap);
                return null;
            }
        });
    }
        
    protected void _readAllWriteStreamingCompressed3(InputStream in0OutputStream out,
            byte[] copyBufferStreamyBytesMemBuffer offHeap)
        throws IOException
    {
        final CountingInputStream counter = new CountingInputStream(in0);
        
        InputStream in = Compressors.uncompressingStream(counter);
        // First: anything to skip (only the case for range requests)?
        if ( > 0L) {
            long skipped = 0L;
            long toSkip = ;
            final long start = ( == null) ? 0L : .nanosForDiagnostics();
            
            while (toSkip > 0) {
                long count = in.skip(toSkip);
                if (count <= 0L) { // should not occur really...
                    throw new IOException("Failed to skip more than "+skipped+" bytes (needed to skip "++")");
                }
                skipped += count;
                toSkip -= count;
            }
            if ( != null) {
                // assume here skipping is "free" (i.e. no bytes read)
                .addFileReadAccess(start, 0L);
            }
        }
        // Second: output the whole thing, or just subset?
        // TODO: buffer
        if ( < 0) { // all of it
            long prevCount = 0L;
            while (true) {
                final long start = .nanosForDiagnostics();
                int count = in.read(copyBuffer);
                if ( != null) {
                    long newCount = counter.readCount();
                    .addFileReadAccess(startnewCount-prevCount);
                    prevCount = newCount;
                }
                
                if (count <= 0) {
                    break;
                }
                final long outputStart = .nanosForDiagnostics();
                out.write(copyBuffer, 0, count);
                if ( != null) {
                    .addResponseWriteTime(outputStart);
                }
            }
            return;
        }
        // Just some of it
        long left = ;
        // TODO: buffer
        long prevCount = 0L;
        while (left > 0) {
            final long start = ( == null) ? 0L : .nanosForDiagnostics();
            int count = in.read(copyBuffer, 0, (int) Math.min(copyBuffer.lengthleft));
            if ( != null) {
                long newCount = counter.readCount();
                .addFileReadAccess(startnewCount-prevCount);
                prevCount = newCount;
            }
            if (count <= 0) {
                break;
            }
            final long outputStart = ( == null) ? 0L : .nanosForDiagnostics();
            out.write(copyBuffer, 0, count);
            if ( != null) {
                .addResponseWriteTime(outputStart);
            }
            left -= count;
        }
        // Sanity check; can't fix or add headers as output has been written...
        if (left > 0) {
            .error("Failed to write request Range %d-%d (from File {}): only wrote {} bytes",
	                new Object[] { ++1, .getAbsolutePath(),
	                -left });
        }
    }
    /*
    /**********************************************************************
    /* Shared file access, buffering methods
    /**********************************************************************
     */
    protected void _readAll(OutputStream outfinal byte[] copyBuffer,
            final long offsetfinal int dataLength)
        throws IOException
    {
        final long start = ( == null) ? 0L : .nanosForDiagnostics();
                .getRaw(), ,
                new FileOperationCallback<Void>() {
            @Override
            public Void perform(long operationTime,  StorableKey keyStorable valueFile externalFile)
                throws IOException
            {
                final long fsStart = ( == null) ? 0L : .nanosForDiagnostics();
                int count = _readFromFile(externalFilecopyBufferoffsetdataLength);
                if ( != null) {
                    .addFileReadAccess(startfsStart.nanosForDiagnostics(),
                            count);
                }
                if (count < dataLength) {
                    throw new IOException("Failed to read all "+dataLength+" bytes from '"
                            +externalFile.getAbsolutePath()+"'; only got: "+count);
                }
                return null;
            }
        });
    }
    protected int _readInBuffer(StorableKey key,
            InputStream inputlong toReadbyte[] readBufferStreamyBytesMemBuffer offHeap)
        throws IOException
    {
        final long nanoStart = ( == null) ? 0L : .nanosForDiagnostics();
        long total = 0;
        try {
            while (toRead > 0L) {
                int count;
                try {
                    int maxToRead = (int) Math.min(toReadreadBuffer.length);
                    count = input.read(readBuffer, 0, maxToRead);
                } catch (IOException e) {
                    long offLength = (offHeap == null) ? 0 : offHeap.getTotalPayloadLength();
                    throw new StoreException.IO(key"Failed to read content to store (after "+offLength
                            +" bytes)"e);
                }
                if (count <= 0) { // got it all babe
                    throw new IOException("Failed to read all content: missing last "+toRead+" bytes");
                }
                // can we append it in buffer?
                if ((offHeap == null) || !offHeap.tryAppend(readBuffer, 0, count)) {
                    _verifyBufferLength(offHeaptotal);
                    total += count// since total is reported via diagnostics
                    return count;
                }
                total += count;
                toRead -= count;
            }
            _verifyBufferLength(offHeaptotal);
            return 0;
        } finally {
            if ( != null) {
                .addFileReadAccess(nanoStarttotal);
            }
        }
    }
    private long _writeBuffered(StreamyBytesMemBuffer offHeapOutputStream out,
            byte[] copyBufferbyte[] leftovers)
        throws IOException
    {
        final long nanoStart = ( == null) ? 0L : .nanosForDiagnostics();
        
        long total = 0L;
        int count;
        
        if (offHeap != null) {
            while ((count = offHeap.readIfAvailable(copyBuffer)) > 0) {
                out.write(copyBuffer, 0, count);
                total += count;
            }
        }
        if (leftovers != null) {
            out.write(leftovers);
            total += leftovers.length;
        }
        if ( != null) {
            .addResponseWriteTime(nanoStart);
        }
        return total;
    }
    
    private void _verifyBufferLength(StreamyBytesMemBuffer offHeaplong totalthrows IOException
    {
        // sanity check to verify that buffer has everything it is supposed to...
        long bufd = (offHeap == null) ? 0L : offHeap.getTotalPayloadLength();
        if (bufd != total) {
            throw new IOException("Internal buffering problem: read "+total
                    +" bytes, buffer contains "+bufd);
        }
    }
    
    /*
    /**********************************************************************
    /* Simple helper methods
    /**********************************************************************
     */

    
Helper method for reading all the content from specified file into given buffer. Caller must ensure that amount to read fits in the buffer.
    protected int _readFromFile(File fbyte[] bufferlong toSkipint dataLengththrows IOException
    {
        InputStream in = ;
        int offset = 0;
        try {
            // Content to skip?
            if (toSkip > 0L) {
                _skip(intoSkip);
            }
            int left = dataLength;
            while (left > 0) {
                int count = in.read(bufferoffsetleft);
                if (count <= 0) {
                    if (count == 0) {
                        throw new IOException("Weird stream ("+in+"): read for "+left+" bytes returned "+count);
                    }
                    break;
                }
                offset += count;
                left -= count;
            }
        } finally {
            _close(in);
        }
        return offset;
    }
    protected final int _read(InputStream inbyte[] bufferlong maxReadthrows IOException
    {
        int toRead = (int) Math.min(buffer.lengthmaxRead);
        int offset = 0;
        while (offset < toRead) {
            int countin.read(bufferoffsettoRead-offset);
            if (count <= 0) {
                throw new IOException("Failed to read next "+toRead+" bytes (of total needed: "+maxRead+"): only got "
                        +offset);
            }
            offset += count;
        }
        return toRead;
    }
    protected final void _skip(InputStream inlong toSkipthrows IOException
    {
        long skipped = 0L;
        while (skipped < toSkip) {
            long count = in.skip(toSkip - skipped);
            if (count <= 0L) { // should not occur really...
                throw new IOException("Failed to skip more than "+skipped+" bytes (needed to skip "++")");
            }
            skipped += count;
        }
    }
    private final void _close(InputStream in)
    {
        if (in != null) {
            try {
                in.close();
            } catch (IOException e) {
                .warn("Failed to close file '{}': {}"e.getMessage());
            }
        }
    }
    /*
    /**********************************************************************
    /* Helper classes
    /**********************************************************************
     */
    
    
com.ning.compress.DataHandler implementation we use to extract out optional ranges, writing out content as it becomes available
    static class RangedDataHandler implements DataHandler
    {
        protected final OutputStream _out;
        protected final long _fullDataLength;
        protected long _leftToSkip;
        protected long _leftToWrite;
        public RangedDataHandler(OutputStream outlong offsetlong dataLength)
        {
             = out;
             = dataLength;
             = offset;
             = dataLength;
        }
        
        @Override
        public boolean handleData(byte[] bufferint offsetint lenthrows IOException
        {
            if ( > 0L) {
                if (len <= ) {
                     -= len;
                    return true;
                }
                offset += (int;
                len -= (int;
                 = 0L;
            }
            if ( > 0L) {
                if (len > ) {
                    len = (int;
                }
                .write(bufferoffsetlen);
                 -= len;
                // all done?
                return ( > 0);
            }
            // No need to call again, all done!
            return false;
        }
        @Override
        public void allDataHandled() throws IOException {
            if ( > 0L) {
                throw new IOException("Could not uncompress all data ("++" bytes): missing last "+);
            }
        }
    }
New to GrepCode? Check out our FAQ X