Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /* ************************************************************************
  #
  #  DivConq
  #
  #  http://divconq.com/
  #
  #  Copyright:
  #    Copyright 2014 eTimeline, LLC. All rights reserved.
  #
 #  License:
 #    See the license.txt file in the project's top-level directory for details.
 #
 #  Authors:
 #    * Andy White
 #
 ************************************************************************ */
 package divconq.io.stream;
 
 import java.util.List;
 
 
 
 
 public class UntarStream extends BaseStream implements IStreamSource {
 	protected enum UntarState {
         RECORD,
         XTRAS,
         PREP,
         CONTENT,
         SKIP
     }
 	
     protected byte[] header_buffer = new byte[.];
     protected int partialLength = 0;
     
     protected TarArchiveEntry currEntry = null;
     protected ZipEncoding encoding = null;
     protected long remainContent = 0;
     protected long remainSkip = 0;
 
     protected List<StreamMessageoutlist = new ArrayList<>();
     protected UntarState tstate = .;
     
     public UntarStream() {
         this.  = ZipEncodingHelper.getZipEncoding("UTF-8");
     }
 
 	public void init(StackEntry stackXElement el) {
 	}
 
     public void close() {
     	this. = null;
     	
     	// not truly thread safe, consider
     	for (StreamMessage msg : this.)
     		msg.release();
     	
     	this..clear();
     
     	super.close();
     }
     
 	// make sure we don't return without first releasing the file reference content
     @Override
     public HandleReturn handle(TaskRun cbStreamMessage msg) {
     	if (msg == .
     		return this..handle(cbmsg);
 
     	ByteBuf in = msg.getPayload();
 
     	if (in != null) {
     		while (in.isReadable()) {
     			switch (this.) {
     			case :
 		    		// starting a new record
 		    		if (in.readableBytes() < . - this.) {
 		    			int offset = this.;
 		    			
 		    			this. += in.readableBytes();
 		    			
 		    			in.readBytes(this.offsetin.readableBytes());
 		    			
 		    			continue;
 		    		}
 		    		
 		    		
		    		this. = 0;
		            
		    		//in.readBytes(this.header_buffer, 0, this.header_buffer.length);
		            boolean hasHitEOF = this.isEOFRecord(this.);
		            // if we hit this twice in a row we are at the end - however, source will send FINAL anyway so we don't really care
		            if (hasHitEOF) {
		                this. = null;
		                continue;
		            }
		    		
		            try {
		            	this. = new TarArchiveEntry(this.this.);
		            } 
		            catch (Exception x) {
		                cb.kill("Error detected parsing the header: " + x);
		                in.release();
		                return .;
		            }
		            
		            this. = .;
    			case :
		            if (!in.isReadable())
		            	continue;
		    		
		            // TODO support long names and such - see org.apache.commons.compress.archivers.tar.TarArchiveInputStream
			        if (this..isGNULongLinkEntry()) {
			    		/* 
			            byte[] longLinkData = getLongNameData();
			            if (longLinkData == null) {
			                // Bugzilla: 40334
			                // Malformed tar file - long link entry name not followed by
			                // entry
			                return null;
			            }
			            currEntry.setLinkName(encoding.decode(longLinkData));
			            */
			        	
			        	cb.kill("long link currently not supported");
		                in.release();
			        	return .;
			        }
			        if (this..isGNULongNameEntry()) {
			    		/* 
			            byte[] longNameData = getLongNameData();
			            if (longNameData == null) {
			                // Bugzilla: 40334
			                // Malformed tar file - long entry name not followed by
			                // entry
			                return null;
			            }
			            currEntry.setName(encoding.decode(longNameData));
			            */
			        	
			        	cb.kill("long name currently not supported");
		                in.release();
			        	return .;
			        }
			        if (this..isPaxHeader()) { 
			        	// Process Pax headers
			    		/* 
			            paxHeaders();
			            */
			        	
			        	cb.kill("pax currently not supported");
		                in.release();
			        	return .;
			        }
			        if (this..isGNUSparse()) { 
			        	// Process sparse files
			    		/* 
			            readGNUSparse();
			            */
			        	
			        	cb.kill("sparse currently not supported");
		                in.release();
			        	return .;
			        }
    				
		            this. = .;
    			case :
		            if (!in.isReadable())
		            	continue;
		            
			        // TODO remove
		            ..println("name: " + this..getName());
		            ..println("size: " + this..getSize());
		            ..println("modified: " + this..getModTime());
		            
		            // If the size of the next element in the archive has changed
		            // due to a new size being reported in the posix header
		            // information, we update entrySize here so that it contains
		            // the correct value.
		            long entrySize = this..getSize();
		            this. = entrySize;
		            
		            long numRecords = (entrySize / this..length) + 1;
		            this. = (numRecords * this..length) - entrySize;
		            // grab as much as we can from the current buffer
		            int readSize = (int) Math.min(this.in.readableBytes());            
		            this. -= readSize;
		            // handle empty files too
		            if ((readSize > 0) || (this. == 0)) {
			            ..println("reading content: " + readSize);
			            ByteBuf out = in.copy(in.readerIndex(), readSize);
			            
			            int skipSize = (int) Math.min(this.in.readableBytes() - readSize);            
			            this. -= skipSize;
			            
			            in.skipBytes(readSize + skipSize);
			            
			            this..add(this.nextMessage(out));
		            }
		            
		            this. = .;
    			case :
		            if (!in.isReadable())
		            	continue;
		            
	    			// check if there is still content left in the entry we were last reading from
	    			if (this. > 0) {
	    	            readSize = (int) Math.min(this.in.readableBytes());            
	    	            this. -= readSize;
	    	            
	    	            //System.out.println("reading content: " + readSize);
	    	            
	    	            //ByteBuf out = Hub.instance.getBufferAllocator().heapBuffer((int) readSize);
	    	            
	    	            ByteBuf out = in.copy(in.readerIndex(), readSize);
	    	            
	    	            int skipSize = (int) Math.min(this.in.readableBytes() - readSize);            
	    	            this. -= skipSize;
		                
		                //System.out.println("skipping content: " + skipSize);
	    	            
	    	            in.skipBytes(readSize + skipSize);
	    	            
			            this..add(this.nextMessage(out));
	    			}
	    			
	    			if (this. > 0) 
	    				continue;
	    			
		            this. = null;
		            
		            this. = .;
    			case :
    	            if (!in.isReadable()) 
    	            	continue;
    	            
	    			// check if there is still padding left in the entry we were last reading from
		    		if (this. > 0) {
		                int skipSize = (int) Math.min(this.in.readableBytes());                
		                this. -= skipSize;                
		                
		                //System.out.println("skipping content: " + skipSize);
		                
		                in.skipBytes((intskipSize);
		    		}
	    			
	    			if (this. > 0) 
	    				continue;
		    		
		            this. = .;
    			}
    		}
    		
    		in.release();
    	}
    	
		// write all messages in the queue
		while (this..size() > 0) {
			HandleReturn ret = this..handle(cbthis..remove(0));
			if (ret != .)
				return ret;
		}
    	
       	return .;
    }
    
    public StreamMessage nextMessage(ByteBuf out) {
		// create the output message
        blk.setPath("/" + this..getName());
        blk.setFileSize(this..getRealSize());
        blk.setPayload(out);
        blk.setModified(this..getModTime().getTime());
        blk.setEof(this. == 0);
        
        return blk;
    }
    
    protected boolean isEOFRecord(byte[] record) {
        return record == null || ArchiveUtils.isArrayZero(record.);
    }
    
    @Override
    public void request(TaskRun cb) {
		// write all messages in the queue
		while (this..size() > 0) {
			HandleReturn ret = this..handle(cbthis..remove(0));
			if (ret != .)
				return;
		}
    	this..request(cb);
    }
New to GrepCode? Check out our FAQ X