Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /* ************************************************************************
  #
  #  DivConq
  #
  #  http://divconq.com/
  #
  #  Copyright:
  #    Copyright 2014 eTimeline, LLC. All rights reserved.
  #
 #  License:
 #    See the license.txt file in the project's top-level directory for details.
 #
 #  Authors:
 #    * Andy White
 #
 ************************************************************************ */
 package divconq.api;
 
 
 
 
 // TODO test to be sure that the user associated with the session is not
 // mixed up with the user calling the session, a single task should be able
 // to run multiple local sessions, all impersonating different users
 public class LocalSession extends ApiSession {
 	protected Session session = null;
 	protected ISchedule sched = null;
 	
 	public void init(XElement config) {
 		this.init(..getSessions().create("hub:"config.getAttribute("Domain")), config);
 	}
 	
 	public void init(Session sessionXElement config) {
 		this. = session;
 		
 		//this.session.setKeep(true);
 		
 		this. = this..getUser();
 		
 		// don't think we need context here because .touch sets context anyway
 		Task touchtask = new Task()
 			.withTitle("Keep Local Session Alive: " + this..getId())
 			.withWork(new IWork() {				
 				public void run(TaskRun trun) {
 					trun.complete();
 				}
 			});
 		
 		// use the touch approach to keep session alive - for tethers in gateway
 		this.  = ..getScheduler().runEvery(touchtask, 55);
 	}
 	
 	public class LocalSessionAdatper implements ISessionAdapter {			
 		public void deliver(Message msg) {
 			String to = msg.getFieldAsString("Service");
 			
 			if ("Replies".equals(to)) 
 			else
 		}
 
 		public ListStruct popMessages() {				
 			return null;	// nothing to do, we deliver direct 
 		}
 
 		public void stop() {
 		}
 	}	
 	
	public void stopped() {
		if (this. != null) {
			this..cancel();
			this. = null;
		}
	}
	public void sendForgetMessage(Message msg) {
		this..touch();
		this..setContext("hub:");
	}
	public void sendMessage(final Message msgfinal ServiceResult callback) {		
		msg.removeField("RespondTo");		// always to Replies, no other supported
		this..registerForReply(msgcallback);
		this..setContext("hub:");
	}
	public void abortStream(String channelid) {
		DataStreamChannel chan = this..getChannel(channelid);
		if (chan != null)
			chan.abort();
	}
	/*
	 * Upload and close the stream
	 */
	public void sendStream(ScatteringByteChannel inlong sizelong offsetfinal String channelidfinal OperationCallback callback) {
		final DataStreamChannel chan = this..getChannel(channelid);
		if (chan == null) {
			callback.error(1, "Missing channel");
			callback.complete();
			return;
		}
		chan.setDriver(new IStreamDriver() {
			public void cancel() {
				callback.error(1, "Transfer canceled");				
				chan.complete();
				callback.complete();
			}
			public void message(StreamMessage msg) {
				if (msg.isFinal()) {
					..println("Final on channel: " + channelid);
					chan.complete();
					callback.complete();
				}
			}
			public void nextChunk() {
				// won't chunk so won't happen here
			}
		});		
		long sent = offset;
		int seq = 0;
		if (size > 0) {
			callback.getContext().setAmountCompleted((int)(sent * 100 / size));
			chan.getContext().setAmountCompleted((int)(sent * 100 / size));		// keep the channel active so it does not timeout
		}
		try {
			long toskip = offset;
			if (in instanceof SeekableByteChannel) {
			}
			else {
				while (toskip > 0) {
					int skip = (int) Math.min(bb.capacity(), toskip);
					toskip -= bb.writeBytes(inskip);
					bb.clear();
				}
			}
			chan.touch();
			// now start writing the upload
			int amt = bb.writeBytes(inbb.capacity());
			while (amt != -1) {
				bb.retain();		// this ups ref cnt to 2 - we plan to reuse the buffer
				StreamMessage b = new StreamMessage("Block"bb);  
				b.setField("Sequence"seq);
				OperationResult sr = chan.send(b);
				if (sr.hasErrors()) {
					chan.close();
					break;
				}
				seq++;
				sent += amt;
				if (size > 0) { 
					callback.getContext().setAmountCompleted((int)(sent * 100 / size));
					chan.getContext().setAmountCompleted((int)(sent * 100 / size));		// keep the channel active so it does not timeout
				}
				callback.touch();
				chan.touch();
				// by the time we get here, that buffer has been used up and we can use it for the next buffer
				if (bb.refCnt() != 1) 
					throw new IOException("Buffer reference count is not correct");
				// stop writing if canceled
				if (chan.isClosed())
					break;
				bb.clear();
				amt = bb.writeBytes(inbb.capacity());
			}
			// we are now done with it
			bb.release();
			// final only if not canceled
			if (!chan.isClosed())
				chan.send(MessageUtil.streamFinal());
		catch (IOException x) {
			callback.error(1, "Local read error: " + x);
			chan.send(MessageUtil.streamError(1, "Source read error: " + x));
			chan.close();
			callback.complete();
		finally {
			try {
				in.close();
			catch (IOException x) {
			}
		}
	}
	/*
	 * download and close the stream
	 */
	public void receiveStream(final WritableByteChannel outfinal long sizefinal long offsetfinal String channelidfinal OperationCallback callback) {
		final DataStreamChannel chan = this..getChannel(channelid);
		if (chan == null) {
			callback.error(1, "Missing channel");
			callback.complete();
			return;
		}
		chan.setDriver(new IStreamDriver() {
			protected long amt = offset;
			protected long seq = 0;
			public void cancel() {
				callback.error(1, "Error from source: ");
				chan.complete();
				this.flushClose();
			}
			public void message(StreamMessage msg) {
				int seqnum = (intmsg.getFieldAsInteger("Sequence", 0);
				if (seqnum != this.) {
					this.error(1, "Bad sequence number: " + seqnum);
					return;
				}
				try {
					if (msg.hasData()) {
						int camt = msg.getData().readableBytes();
						for (ByteBuffer bb : msg.getData().nioBuffers()) 
							out.write(bb);
						this. += camt;
					}
					++;
					if (size > 0) 
						callback.getContext().setAmountCompleted((int)(this. * 100 / size));
					if (msg.isFinal()) {
						chan.complete();
						this.flushClose();
					}
				catch (IOException x) {
					this.error(1, "Error writing stream: " + x);
				}
			}
			public void error(int codeString msg) {
				callback.error(1, msg);
				chan.send(MessageUtil.streamError(codemsg));
				this.flushClose();
			}
			public void flushClose() {
				try {
					out.close();
				catch (IOException x) {
				}
				callback.complete();
			}
			public void nextChunk() {
				// doesn't matter for dest
			}
		});		
		chan.touch();
		// get the data flowing
		OperationResult sr = chan.send(new StreamMessage("Start"));
		if (sr.hasErrors()) 
			chan.close();
	}
	public void clearToGuest() {
	}
	public void startSessionAsRoot() {
	}
New to GrepCode? Check out our FAQ X