Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package divconq.io.test;
  
  
  import divconq.hub.Hub;
what needs to happen is CtpStreamSource gets a ref of StreamInboundHandler and methods to say "give me more". the underlying ByteToMessageDecoder needs to have a "resume" feature that keeps reading from the current buffer and processing the messages rather than StreamInboundHandler being in the pipeline, probably it should just be a class that is handed to CtpStreamDecoder as one option for message handling. so this class may become obsolete as this code may begin to merge with CtpStreamSource. switching to Shared then is merely replacing CtpStreamSource with the switching handler and then in turn replacing with the shared handler.

Author(s):
andy
 
 public class StreamInboundHandler extends CtpMessageDecoder {
 	protected CtpStreamSource src = null;
 	protected TaskRun srun = null;
 	
     @Override
     public void channelRead(ChannelHandlerContext ctxObject msg) {
     	CtpMessage m = (CtpMessagemsg;
     	
     	if (m instanceof ClientHelloMessage) {
     		..println("HELLO from client!");
         	
         	m.release();
     	}
     	else if (m instanceof StreamMessage) {
     		..println("STREAM from client! " + ((StreamMessage)m).getPath());
     		
     		// TODO read from context not channel?  so give context to CTP Stream??
     		if (this. == null) {
     			this. = new CtpStreamSource(ctx.channel(), (StreamMessage)msg);
     			
     			FileSystemFile dest = StreamUtil.localFile(Paths.get("c:/temp/testtar"));
     			
     			Task t = new Task()
     				.withTitle("Streaming In Test")
     				.withTimeout(0);
     			
     			this. = StreamUtil.composeStream(t
     				this.
     				dest.allocDest(true));
     			
     			this..addObserver(new TaskObserver() {
     				@Override
     				public void completed(TaskRun or) {
     					..println("Transfer In is complete!!");
     	    			StreamInboundHandler.this. = true;
     					ctx.read();
     				}
     			});
     			
     			this. = false;
     			
     			..getWorkPool().submit(this.);
     		}
     		else {
     			..println("Read resume !!!!!!!!!!!!!!");
     			
     			this..setNext((StreamMessage)msg);
     			this..resume();
     		}
     	}
     	else if (m instanceof TerminateStream) {
     		..println("TERM STREAM from client! ");
     		
 			this. = null;
 			this. = null;
 	    	
 	    	m.release();
     	}
     }
 
 	public void handleMessage() {
		// TODO Auto-generated method stub
	}
New to GrepCode? Check out our FAQ X