Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package divconq.io.stream;
  
  
  
 
 public class FileSourceStream extends BaseStream implements IStreamSource {
 	protected IFileCollection source = null;	
 	protected IFileStoreFile current = null;
 	protected FileChannel in = null;
 	protected long insize = 0;
 	protected long inprog = 0;
 	
 		this. = src;
 	}
 
 	// for use with dcScript
 	public void init(StackEntry stackXElement el) {
 		// anything we need to gleam from the xml?
 	}
 
 	public HandleReturn handle(TaskRun cbStreamMessage msg) {
 		// we are at top of stream, nothing to do here
 	}
 	
 	public void close() {
 		//System.out.println("File SRC killed");	// TODO
 		
 		if (this. != null)
 			try {
 				this..close();
 			} 
 			catch (IOException x) {
 			}
 		
 		this. = null;
 		this. = null;
 		this. = null;
 		
 		super.close();
 	}

Someone downstream wants more data
 
 	public void request(TaskRun cb) {
 		if (this. == null) {
 			return;
 		}
 		
 		if (this. == null) {
 				public void callback() {
 					if (this.hasErrors()) {
 						cb.kill();
 						return;
 					}
 					
 					FileSourceStream.this.readFile(cbthis.getResult());
 				}
 			});
 		}
 		// folders are handled in 1 msg, so we wouldn't get here in second or later call to a file
 		else if (this. instanceof FileSystemFile)
 		else {
 		}
 	}
 	
 	public void readFile(TaskRun cbIFileStoreFile file) {
 		this. = file;
 		
 		// if we reached the end of the collection then finish
 		if (this. == null) {
 		}
 		else if (this..isFolder()) {
 	        StreamMessage fref = StreamMessage.fromFileStore(this.);
	        fref.setIsFolder(true);
	        fref.setPath(this..path().subpath(this..path()));
	        
			if (this..handle(cbfref) == .) {
				cb.resume();
			}
		}		
		else if (this. instanceof FileSystemFile)
		else {
		}
	}
	public void readOtherFile(TaskRun cb) {
		// TODO abstract out so this class is a FileCollectionSourceStream and we
		// use it pull out the source streams of files, which we then use as if upstream from us
	}
	// release data if error
	public void readLocalFile(TaskRun cb) {
		if (this. == null) {
			this. = fs.getSize();
			// As a source we are responsible for progress tracking
			try {
				this. = FileChannel.open(fs.localPath(), .);
			catch (IOException x) {
				cb.kill("Unable to read source file " + x);
				return;
			}
		}
		while (true) {
			// TODO sizing?
	        ByteBuf data = ..getBufferAllocator().heapBuffer(32768);
	        ByteBuffer buffer = ByteBuffer.wrap(data.array(), data.arrayOffset(), data.capacity());
	        
	        int pos = -1;
	        try {
				pos = (int)this..read(buffer);
			catch (IOException x1) {
				cb.kill("Problem reading source file: " + x1);
				data.release();
				return;
			}
	        StreamMessage fref = StreamMessage.fromFileStore(this.);
	        fref.setPayload(data);
	        fref.setPath(this..path().subpath(this..path()).toString());
	        
	        ..println("writing: " + fref.getPath() + " from: " + this.);
	        
	        if (pos == -1) {
	        	try {
					this..close();
	        	catch (IOException x) {
					cb.kill("Problem closing source file: " + x);
					data.release();
					return;
				}
	        	
	        	cb.setAmountCompleted(100);
	        	
		        fref.setEof(true);
	        	
	        	this. = null;
	        	this. = null;
	        	this. = 0;
	        	this.  = 0;
	        }
	        else {
		        this. += pos;
		        
		        data.writerIndex(pos);
		        cb.setAmountCompleted((int)(this. * 100 / this.));
	        }
	        
	    	if (this..handle(cbfref) != .)
	    		break;
	    	
	    	if (this. == null) {
	    		// we need the next file
	    		cb.resume();
	    		
	    		// wait on the implied request
	    		break;
	    	}
		}
	}
New to GrepCode? Check out our FAQ X