Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /* ************************************************************************
  #
  #  DivConq
  #
  #  http://divconq.com/
  #
  #  Copyright:
  #    Copyright 2014 eTimeline, LLC. All rights reserved.
  #
 #  License:
 #    See the license.txt file in the project's top-level directory for details.
 #
 #  Authors:
 #    * Andy White
 #
 ************************************************************************ */
 package divconq.ctp.stream;
 
 
 public class FunnelStream extends BaseStream implements IStreamSource {
 	protected int aperture = 1;
 	protected FileDescriptor current = null;
 	protected ByteBuf currbuf = null;
 	protected boolean relayed = false;
 
 	// TODO currently only small Aperture is supported well because we may not get large buffers from 
 	// source.  we should accumulate small buffers into a large buffer so we always pass the correct size
 	// down, except at EOF of course.  see remnant in UngzipStream for an example of this sort of buffer gathering
 	
 	public void init(StackEntry stackXElement el) {
 		this. = (intstack.intFromElement(el"Aperture"this.);
 	}
 	
 	public boolean hasMore() {
 		FileDescriptor curr = this.;
 
 		if (curr == null)
 			return false;
 		
 		if (!this.)		// TODO what about EOF, we need to send that along, so even first is not enough?
 			return true;
 		
 		ByteBuf payload = this.;
 		
 		return (payload != null) && payload.isReadable();
 	}
 	
 	public ReturnOption nextMessage() {
 		FileDescriptor curr = this.;
 		
 		if (curr == null)
 
 		FileDescriptor blk = new FileDescriptor();
 		blk.copyAttributes(curr);
 		
 		ByteBuf payload = this.;
 		
 		if ((payload != null) && payload.isReadable()) {
 			int ramt = Math.min(this.payload.readableBytes());
 			
 			ByteBuf pslice = payload.copy(payload.readerIndex(), ramt);
 			
 			payload.skipBytes(ramt);
 			
 			// TODO blk.payloadoffset = 0;			
 			
 			blk.setEof(!payload.isReadable() && curr.isEof());
 			
 			if (blk.isEof()) {
 				payload.release();
 				
 				this. = null;
 				this. = null;
 			}
 			
 			payload = pslice;
 		}
 		else {
 			blk.setEof(curr.isEof());
 			
 			if (payload != null)
 				payload.release();
 			
 			payload = null;
 			this. = null;
 			this. = null;
 		}
 		
 		// current has been sent at least once
 		this. = true;
 		
 		return this..handle(blkpayload);
 	}
	public ReturnOption handle(FileDescriptor fileByteBuf data) {
    	if (file == .
    		return this..handle(filedata);
    	
		this. = file;
		this. = data;
		this. = false;
		while (this.hasMore()) {
			ReturnOption ret = this.nextMessage();
			if (ret != .)
				return ret;
		}
    	
       	return .;
	}
	public void read() {
		while (this.hasMore()) {
			ReturnOption ret = this.nextMessage();
			if (ret != .)
				return;
		}
    	this..read();
	}
	public void close() {
		ByteBuf curr = this.;
		if (curr != null
			curr.release();
		this. = null;
		this. = null;
		super.close();
	}
New to GrepCode? Check out our FAQ X