Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package divconq.io.stream;
  
  
  public class FunnelStream extends BaseStream implements IStreamSource {
  	protected int aperture = 1;
 	protected StreamMessage current = null;
 	protected boolean relayed = false;
 
 	// TODO currently only small Aperture is supported well because we may not get large buffers from 
 	// source.  we should accumulate small buffers into a large buffer so we always pass the correct size
 	// down, except at EOF of course.  see remnant in UngzipStream for an example of this sort of buffer gathering
 	
 	public void init(StackEntry stackXElement el) {
 		this. = (intstack.intFromElement(el"Aperture"this.);
 	}
 	
 	public boolean hasMore() {
 		StreamMessage curr = this.;
 
 		if (curr == null)
 			return false;
 		
 		if (!this.)
 			return true;
 		
 		ByteBuf payload = curr.getPayload();
 		
 		return (payload != null) && payload.isReadable();
 	}
 	
 	public StreamMessage nextMessage() {
 		StreamMessage curr = this.;
 		
 		if (curr == null)
 			return null;
 
 		StreamMessage blk = new StreamMessage();
 		
 		blk.setMsgType(curr.getMsgType());
 		blk.setPath(curr.getPath());
 		blk.setIsFolder(curr.isFolder());
 		blk.setFileSize(curr.getFileSize());
 		blk.setModified(curr.getModified());
 		blk.setPermission(curr.getPermission());  
 		
 		ByteBuf payload = curr.getPayload();
 		
 		if ((payload != null) && payload.isReadable()) {
 			int ramt = Math.min(this.payload.readableBytes());
 			
 			blk.setPayload(payload.copy(payload.readerIndex(), ramt));
 			
 			payload.skipBytes(ramt);
 			
 			// TODO blk.payloadoffset = 0;			
 			
 			blk.setEof(!payload.isReadable() && curr.isEof());
 			
 			if (blk.isEof()) {
 				payload.release();
 				
 				this. = null;
 			}
 		}
 		else {
 			blk.setEof(curr.isEof());
 			
 			if (payload != null)
 				payload.release();
 			
 			this. = null;
 		}
 		
 		// current has been sent at least once
 		this. = true;
 		
 		return blk;
 	}
 	
 	public HandleReturn handle(TaskRun cbStreamMessage msg) {
     	if (msg == .
     		return this..handle(cbmsg);
     	
 		this. = msg;
 		this. = false;
 		
 		while (this.hasMore()) {
 			HandleReturn ret = this..handle(cbthis.nextMessage());
 			
 			if (ret != .)
 				return ret;
 		}
     	
       	return .;
	}
	public void request(TaskRun cb) {
		while (this.hasMore()) {
			HandleReturn ret = this..handle(cbthis.nextMessage());
			if (ret != .)
				return;
		}
    	this..request(cb);
	}
	public void close() {
		StreamMessage curr = this.;
		if (curr != null) {
			curr.release();
			this. = null;
		}
		// TODO Auto-generated method stub
		super.close();
	}
New to GrepCode? Check out our FAQ X