Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   /* ************************************************************************
   #
   #  DivConq
   #
   #  http://divconq.com/
   #
   #  Copyright:
   #    Copyright 2014 eTimeline, LLC. All rights reserved.
   #
  #  License:
  #    See the license.txt file in the project's top-level directory for details.
  #
  #  Authors:
  #    * Andy White
  #
  ************************************************************************ */
  package divconq.bus;
  
  import java.util.HashMap;
  import java.util.List;
  
  import divconq.hub.Hub;
  import divconq.work.Task;
   
  public class HubRouter {
  	protected String hubid = null;
  	protected String squad = null;
  	
  	// used with local router
  	protected boolean local = false;
  	protected boolean gateway = false;
  	protected ReplyService localReplies = null;
  	
  	protected boolean usekeepalive = true;
  	
  	// used with direct connections
  	protected List<Sessionsessions = new CopyOnWriteArrayList<>();				// direct connection
  	protected List<HubRoutertunnels = new CopyOnWriteArrayList<>();			// tunnel (proxy) connection
  	protected ReentrantLock sessionlock = new ReentrantLock();
  	protected int next = 0;	
  	
  	protected List<StreamSessionstreamsessions = new CopyOnWriteArrayList<>();				// direct connection
  	protected int streamnext = 0;	
  	
  	// used with tunnel connections
  	protected HashMap<StringHubRouterproxied = new HashMap<>();
  	
  	// use with direct and tunnel
  	protected HashMap<StringStreamPathstreampaths = new HashMap<>();
  	
  	public String getHubId() {
  		return this.;
  	}
  	
  	public String getSquadId() {
  		return this.;
  	}
  	
  	public Collection<StringgetServices() {
  		return this.;
  	}
  	
  	
  		return this..values();
  	}
  	
  	public boolean isLocal() {
  		return this.;
  	}
  	
  	public boolean isDirect() {
  		return (this..size() > 0) && (this..size() > 0);
 	}
 	
 	public boolean isTunneled() {
 		return (this..size() > 0);
 	}
 	
 	public boolean isActive() {
 		return this. || (this..size() > 0) || (this..size() > 0);
 	}
 
 	public void setUseKeepAlive(boolean v) {
 		this. = v;
 	}
 	
 	// use for remote router
 	public HubRouter(String idboolean gateway) {
 		this. = id;
 		this. = gateway;
 	}
 	
 	// use for local router
 	public HubRouter() {
 		this. = true;
 		this. = OperationContext.getHubId();
 		this. = new ReplyService();
 	}
 	
 	// for use with local only
 	public void registerService(IService callback) {
 		this..put(callback.serviceName(), callback);
 		this..add(callback.serviceName());
 	}
 
 	// for use with local only
 	public void removeService(String name) {
 		this..remove(name);
 		this..remove(name);
 	}
     
 	// for use with local only
     public Message buildHello(String to) {
 		Message cmd = new Message();
 		cmd.setField("Kind""HELLO");
 		cmd.setField("Id", OperationContext.getHubId());
 		cmd.setField("Squad"this.);
 		
 		// if we are idle then don't list our non-core services with other servers anymore
 		// TODO we should still support the Reply service and maybe a few others...just not app level services?
 		if (!..isIdled()) {
 			cmd.setField("Services"new ListStruct(this.));
 			
 				// make sure we list each hub only once in ProxiedServices
 				HashMap<StringHubRouterproxied = new HashMap<>();
 				
 				for (HubRouter hub : ..getBus().getHubs()) {
 					if (!hub.isLocal() && hub.isActive() && !hub.getHubId().equals(to)) {
 						proxied.put(hub.getHubId(), hub);
 						
 						for (HubRouter phub : hub.getProxiedHubs())
 							if (phub.isActive() && !phub.getHubId().equals(to))
 								proxied.put(phub.getHubId(), phub);
 					}
 				}
 				
 				ListStruct plist = new ListStruct();
 				
 				for (HubRouter hub : proxied.values()) {
 					plist.addItem(new RecordStruct(
 							new FieldStruct("Id"hub.getHubId()),
 							new FieldStruct("Squad"hub.getSquadId()),
 							new FieldStruct("Services"new ListStruct(hub.getServices()))
 					));
 				}
 				
 				cmd.setField("ProxiedServices"plist);
 			}
 		}
 		
 		return cmd;
     }
     
 	// for use with local only
     public StreamMessage buildStreamHello(String to) {
 		StreamMessage cmd = new StreamMessage("HELLO");
 		
 		cmd.setField("Id", OperationContext.getHubId());
 		cmd.setField("Squad"this.);
 		
 		return cmd;
     }
 
 	// for use with local only
 	public String registerForReply(Message msgServiceResult callback) {
 		return this..registerForReply(msgcallback);
 	}
 
 	// for use with local only
 		return this.;
 	}
 	
 		
 		if (msg == null) {
 			or.error(1, "Message missing.");  // TODO code
 			return or;
 		}
         
         // route to local
         if (this.) {    	
             msg.removeField("Kind");  // not used locally
     		
 	        //System.out.println("dcBus received message: " + msg);
 	        
 			String srv = msg.getFieldAsString("Service");
 		
 			if (srv == null) {
 				or.error(1, "Message missing service.");  // TODO code
 				return or;
 			}
 			
 			IService cb = this..get(srv);
 			
 			if (cb == null) {
 				or.error(1, "Service not on this hub.");  // TODO code
 				return or;
 			}
 			
 			// TODO if cb is ReplyService then get the task context from there instead of the message ... update the signin/verify user to work with message body for new user context
 	
 			// TODO now that TaskContext is immutable we could optimize local Bus calls by not freezing and thawing except remotely - future optimization
 			OperationContext tc = OperationContext.allocate(msg);
 			
 			if ((tc == null) || (tc.getUserContext() == null)) {
 				or.errorTr(442);
 				return or;
 			}
 			
 			Task tb = new Task()
 				.withTitle("Hub Router: " + srv)
 				.withParams(msg)
 				.withBucket("Bus")		// typically this will fall back into Default
 				.withWork(task -> {			
 						public void callback() {
 							UserContext uc = this.getResult();
 							
 							// update the operation context
 							if (uc != null
 								OperationContext.use(uctc.toBuilder());
 							
 							if (task.hasErrors()) {
 							    task.complete();
 							    return;
 							}
 
 							// validate the structure of the message
 							
 							// if invalid structure then do not continue
 							if (vres.hasErrors()) {
 							    task.complete();
 							    return;
 							}
 							
 							// when making a valid call to any service, you are elevated to system access for the duration of the request
 							// RPC users get a new context with each call though, and reply will not violate any security
 							// so it is up to each service to call only other services as appropriate - that is each service 
 							// is ultimately responsible for security
 							if (!OperationContext.get().isElevated())
 								OperationContext.use(OperationContext.get().toBuilder().withElevated(true));
 							
 							cb.handle(task);
 							
 							//System.out.println("d3: " + msg);
 							//System.out.println("d4: " + rmsg);
 							
 							// reply may be async so this could be null - when async the service handler calls sendReply directly
 							// which is also a reason not to add a lot of logic here, as it won't get called in async cases
 							//if (rmsg != null)			
 							//    Hub.instance.getBus().sendReply(rmsg, msg);
 							
 							//task.complete();
 						}
 					};
 					
 					// don't verify a Verify request or it'll be stuck forever making new verify checks
 					if (msg.isVerifyRequest()) {
 						fcb.complete();
 					}
 					else 
 						tc.verify(fcb);
 				});
 					
 				public void completed(OperationContext or) {
 					// set to this so that we can use the proper - elevated - context during
 					// the reply routing
 					OperationContext.set(or);
 					
 					// send a response, be it just error messages or a full body
 				}
 			});
 			
 			//System.out.println("Call to " + srv + " -- " + tb.getId());
 		
 			return or;
         }
 
         if (this.) {
         	// TODO consider stripping AuthToken and SessionId and Credentials from message
         	// when sending to gateway... review how this would work
         }
         
         // route to remote
 		Session sess = this.nextDirectRoute();
 		
 		if (sess == null) {	
 			HubRouter tunnel = this.nextTunnelRoute();
 			
 			if (tunnel != null)
 				return tunnel.deliverMessage(msg);
 			
 			if (!"HELLO".equals(msg.getFieldAsString("Kind")))
 				or.error(1, "Unable to route message to proxied hub: " + msg); // TODO log, better code
 			
 			return or;
 		}
 		
 		if (!sess.write(msg))			
 			or.error(1, "Unable to route message to remote hub: " + msg); // TODO log, better code
 		
 		return or;
 	}
 	
 	public void receiveMessage(Session sessionMessage msg) {
 		// update our service list if SERVICES message
     	if ("HELLO".equals(msg.getFieldAsString("Kind"))) {
     		this. = msg.getFieldAsString("Squad");
     		
     		// copy the existing list
     		Collection<HubRouteroldplist = new ArrayList<>(this..values()); 
     		
     		this..clear();
     		this..clear();
     		
     		ListStruct plist = msg.getFieldAsList("ProxiedServices");
     		
     		if (plist != null) {
     			for (Struct pitem : plist.getItems()) {
     				RecordStruct prec = (RecordStructpitem;
     				HubRouter phub = ..getBus().allocateOrGetHub(prec.getFieldAsString("Id"), session.getSocketInfo().isGateway());
     				
     				/*
     				if (phub == null) {
     					System.out.println("Could not allocate hub: " + prec.getFieldAsString("Id") + " >> me -- " + TaskContext.getHubId());
     					continue;
     				}
     				*/
 
    					phub.addTunnel(precthis);
     				
     				// we don't need to clear our tunnel from this proxied hub
     				oldplist.remove(phub);
     				
     				this..put(phub.getHubId(), phub);
     			}
     		}
     		
     		this.clearMyTunnels(oldplist);
     		
     		ListStruct slist = msg.getFieldAsList("Services");
     		
     		if (slist != null) {
     			this..addAll(slist.toStringList());
     			int sessionsize = this..size();
     			..getCountManager().allocateSetNumberCounter("dcBus_" + this.getHubId() + "_Sessions"sessionsize);
     		}
     		
 			
 			return;
     	}            
         
     	String service = msg.getFieldAsString("Service");
     	String feature = msg.getFieldAsString("Feature");
     	
     	boolean looksLikeReply = ("Replies".equals(service) || ("Session".equals(service) && "Reply".equals(feature)));
     	
     	// =====================================================================
     	// when coming from a gateway be very picky about what we allow through
 		// however, all calls to Replies service are allowed for now, we can get more specific later
     	// and of course verify requests are allowed since we are the verifier :)
     	// =====================================================================
     	if (this. && !msg.isVerifyRequest() && !looksLikeReply) {
 	        boolean isguest = true;
 	        
 	        RecordStruct context = msg.getFieldAsRecord("Context");
 			
 			// session must be present if not Guest 
 	        if (context == null) {
 	        	..println("dcBus " + this.getHubId() + " tried to call without context, got: " + msg);
 	        	return;
 	        }
 	        
 			String uid = context.getFieldAsString("UserId");
 			
 			// session must be present if not Guest 
 	        if (StringUtil.isEmpty(uid)) {
 	        	..println("dcBus " + this.getHubId() + " tried to call without userid, got: " + msg);
 	        	return;
 	        }
 			
 			if (!"00000_000000000000002".equals(uid))
 				isguest = false;
 			else if (!context.isFieldEmpty("AuthToken") || !context.isFieldEmpty("Credentials"))
 				isguest = false;
 			else {
 				ListStruct tags = context.getFieldAsList("AuthTags");
 				
 				if ((tags == null) || (tags.getSize() != 1))
 					isguest = false;
 				else if (!"Guest".equals(tags.getItemAsString(0)))
 					isguest = false;
 			}
 
 			// if not guest then we are even more picky
 			if (!isguest) {
 				Op op = ..getSchema().getService().getOp(servicefeaturemsg.getFieldAsString("Op"));
 				
 				// operations tagged as Gateway can be called by gateway no matter what...even when gateway is hacked
 				// normal user tag check applies, Gateway only means it gets past here, not pass message validation
 				// though if gateway is hacked then a Gateway tag pretty much = callable as hacker can send Root context
 				if (!op.isTagged("Gateway")) {
 					String sid = context.getFieldAsString("SessionId");
 					
 					// session must be present if not Guest 
 			        if (StringUtil.isEmpty(sid)) {
 			        	..println("dcBus " + this.getHubId() + " tried to call as user without session, got: " + msg);
 			        	return;
 			        }
 			        
 					String atoken = context.getFieldAsString("AuthToken");
 					
 					// session must be present if not Guest 
 			        if (StringUtil.isEmpty(atoken)) {
 			        	..println("dcBus " + this.getHubId() + " tried to call as user without authtoken, got: " + msg);
 			        	return;
 			        }
 	
 			        String expectedhubid = session.getSocketInfo().getHubId();
 					
 					// session must be present if not Guest - session must come from gateway
 			        if (StringUtil.isNotEmpty(expectedhubid) && !sid.startsWith(expectedhubid)) {
 			        	..println("dcBus " + this.getHubId() + " tried to call with session " + sid + ", got: " + msg);
 			        	return;
 			        }
 			        
 			        divconq.session.Session us = ..getSessions().lookup(sid);
 			        
 			        if (us == null) {
 			        	..println("dcBus " + this.getHubId() + " tried to call with missing session " + sid + ", got: " + msg);
 			        	return;
 			        }
 			        
 			        if (!atoken.equals(us.getUser().getAuthToken())) {
 			        	..println("dcBus " + this.getHubId() + " tried to call with bad token " + atoken + ", got: " + msg);
 			        	return;
 			        }
 			        
 			        if (!uid.equals(us.getUser().getUserId())) {
 			        	..println("dcBus " + this.getHubId() + " tried to call with user id for session " + uid + ", got: " + msg);
 			        	return;
 			        }
 					
 					//System.out.println("Gateway request passed checks, before context: " + context);
 			        
 			        // OK, we got this far, go forward but only with the context we had at login
 			        // copy the user context into the message
 			        us.getUser().freeze(context);
 			        
 					//System.out.println("Gateway request passed checks, after context: " + context);
 				}
 			}
 			
 			//System.out.println("Gateway request passed checks z: " + msg);
     	}
     	// TODO temp - show me messages coming into gateway from server 
     	else if (!looksLikeReply) {
 			..println("&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&");
 			..println("Server request passed checks z: " + msg);
 			..println("&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&");
     	}
     	
     	// =====================================================================
     	
     	// if message is not HELLO then it needs to be routed to the correct hub
     	String srv = msg.getFieldAsString("Service");
     	
 		
 		if (router == null) {
 			// TODO log warning
 			return;
 		}
 
 		// this will end up in the proper work pool after routing
 		OperationResult routeres = router.sendMessage(msg);
 		
 		if (routeres.hasErrors()) {	
 			// TODO log error
 			return;
 		}
 	}
 	
 	// TODO from this point on release StreamMessages
 	public void receiveMessage(StreamSession sessionStreamMessage msg) {
 		// update our service list if SERVICES message
     	if ("HELLO".equals(msg.getFieldAsString("Op"))) {
         	msg.release();
     		this. = msg.getFieldAsString("Squad");
 			return;
     	}            
 
 		String hub = msg.getFieldAsString("ToHub");
 		
 		
 		OperationResult routeres = router.deliverMessage(msg);
 		
 		if (routeres.hasErrors()) { 	
 			StreamMessage rmsg = MessageUtil.streamMessages(routeres);
 			..getBus().sendReply(rmsgmsg);
 		}
 	}
 
 	public void addSession(Session session) {
 		this..lock();
 		
 		int sessionsize = this..size();
 		
 		try {
 			this..add(session);
 			
 			sessionsize = this..size();
 			
 			..getCountManager().allocateSetNumberCounter("dcBus_" + this.getHubId() + "_Sessions"sessionsize);
 			
 			if ((sessionsize == 1) && this.isDirect()) 	    	    	
     	    	// let hub know we are connected, in another thread
     	    	..getWorkPool().submit(trun ->  { 
     	    		..fireEvent(.null);
     	    		trun.complete();
     	    	});
 		}
 		finally {
 		}
 
 		Logger.info("Connect on dcBus, " + this.getHubId() + " sessions available: " + sessionsize);
 	}
 
 	public void remove(SocketInfo info) {
 		for (Session sess : this.) { 
 			if (sess.getSocketInfo() == info) {
 				this.removeSession(sess);
 				break;
 			}
 		}
 		
 		for (StreamSession sess : this.) { 
 			if (sess.getSocketInfo() == info) {
 				this.removeSession(sess);
 				break;
 			}
 		}
 	}
 
 	public void removeSession(Session session) {
 		this..lock();
 		
 		boolean direct = this.isDirect();
 		
 		int sessionsize = this..size();
 		int priorsize = sessionsize;
 		
 		try {
 			this..remove(session);
 			
 			sessionsize = this..size();
 			
 			..getCountManager().allocateSetNumberCounter("dcBus_" + this.getHubId() + "_Sessions"sessionsize);
 			
 			session.close();
 		}
 		finally {
 		}
 
 		if (sessionsize != priorsize)
 			Logger.info("Disconnect on dcBus, " + this.getHubId() + " sessions available: " + sessionsize);
 		
 		// if no longer connected then get the agent out of the manager's hair
 		if (sessionsize == 0) {
 	    	// let hub know we are disconnected, in another thread
 			if (direct)
 					trun.complete();
 				});
 			
 		}
 	}
 	
 	public void addSession(StreamSession session) {
 		this..lock();
 		
 		int sessionsize = this..size();
 		
 		try {
 			this..add(session);
 			
 			sessionsize = this..size();
 			
 			// TODO make debug
 			Logger.info("Connect on dcBus, " + this.getHubId() + " stream added: " + sessionsize);
 			
 			..getCountManager().allocateSetNumberCounter("dcBus_" + this.getHubId() + "_SteramSessions"sessionsize);
 			
 			if ((sessionsize == 1) && this.isDirect()) 	    	    	
     	    	// let hub know we are connected, in another thread
     	    	..getWorkPool().submit(trun -> {
     	    		..fireEvent(.null);
     	    		trun.complete();
     	    	});
 		}
 		finally {
 		}
 
 		Logger.info("Connect on dcBus, " + this.getHubId() + " stream sessions available: " + sessionsize);
 	}
 
 	public void removeSession(StreamSession session) {
 		this..lock();
 		
 		boolean direct = this.isDirect();
 		int sessionsize = this..size();
 		int priorsize = sessionsize;
 		
 		try {
 			this..remove(session);
 			
 			sessionsize = this..size();
 			
 			..getCountManager().allocateSetNumberCounter("dcBus_" + this.getHubId() + "_StreamSessions"sessionsize);
 			
 			session.close();
 		}
 		finally {
 		}
 
 		if (sessionsize != priorsize
 			Logger.info("Disconnect on dcBus, " + this.getHubId() + " stream sessions available: " + sessionsize);
 		
 		// if no longer connected then get the agent out of the manager's hair
 		if (sessionsize == 0) {
 			if (direct)
 		    	// let hub know we are disconnected, in another thread
 		    	..getWorkPool().submit(trun -> { 
 		    		trun.complete();
 		    	});
 		}
 	}
 	
 	public void addTunnel(RecordStruct precHubRouter tunnel) {
 		this. = prec.getFieldAsString("Squad");
 		
 		ListStruct slist = prec.getFieldAsList("Services");
 
 		this..clear();
 		this..addAll(slist.toStringList());
 		
 		this..lock();
 		
 		int proxysize = this..size();
 		int priorsize = proxysize;
 		
 		try {
 			if (!this..contains(tunnel)) {
 				this..add(tunnel);			
 				proxysize = this..size();
 			}
 			
 			..getCountManager().allocateSetNumberCounter("dcBus_" + this.getHubId() + "_Proxies"proxysize);
 		}
 		finally {
 		}
 
 		if (proxysize != priorsize)
 			Logger.info("Connect on dcBus, " + this.getHubId() + " proxies available: " + proxysize);
 		
 		// if no longer connected then get the agent out of the manager's hair
 		if (proxysize == 1) 
 	}
 	
 	public void removeTunnel(HubRouter tunnel) {
 		this..lock();
 		
 		int proxysize = this..size();
 		
 		try {
 			this..remove(tunnel);
 			
 			proxysize = this..size();
 			
 			..getCountManager().allocateSetNumberCounter("dcBus_" + this.getHubId() + "_Proxies"proxysize);
 		}
 		finally {
 		}
 
 		Logger.info("Disconnect on dcBus, " + this.getHubId() + " proxies available: " + proxysize);
 		
 		// if no longer connected then get the agent out of the manager's hair
 		if (proxysize == 0) 
 	}
 
 	public void close() {
 		// calling close will trigger a call to remove (above) via the Manager
 		for (Session s : this.)
 			s.close();
 	}
 	
 	public void clearMyTunnels(Collection<HubRouterlist) {
 		for (HubRouter hub : list
 			hub.removeTunnel(this);
 	}
 
 	// round robin approach to finding routes
 	public Session nextDirectRoute() {
 		this..lock();
 		
 		try {
 			int subcount = this..size();
 			
 			if (subcount == 0)
 				return null;
 			
 			if (this. >= subcount)
 				this. = 0;
 
 			Session np = this..get(this.);
 			this.++;
 			return np;
 		}
 		finally {
 		}
 	}
 
 	// round robin approach to finding routes
 	public HubRouter nextTunnelRoute() {
 		this..lock();
 		
 		try {
 			int subcount = this..size();
 			
 			if (subcount == 0)
 				return null;
 			
 			if (this. >= subcount)
 				this. = 0;
 
 			HubRouter np = this..get(this.);
 			this.++;
 			return np;
 		}
 		finally {
 		}
 	}
 	
 		return this.;
 	}
 
 	public int getCountSessions(SocketInfo info) {
 		int cnt = 0;
 		
 		for (Session sess : this.)
 			if (sess.getSocketInfo() == info)
 				cnt++;
 		
 		return cnt;
 	}
 
 	public int getCountStreamSessions(SocketInfo info) {
 		int cnt = 0;
 		
 		for (StreamSession sess : this.)
 			if (sess.getSocketInfo() == info)
 				cnt++;
 		
 		return cnt;
 	}
 
 	// from this point on release StreamMessage
 	public OperationResult deliverMessage(final StreamMessage msg) {
 		
 		if (msg == null) {
 			or.error(1, "Message missing.");  // TODO code
 			return or;
 		}
         
 		String sessid = msg.getFieldAsString("ToSession");
 		String chanid = msg.getFieldAsString("ToChannel");
 		
         // route to local
         if (this.) {    	
 	        //System.out.println("dcBus received stream message: " + msg);
     		
     		final divconq.session.Session sess = ..getSessions().lookup(sessid);
     		
     		if (sess == null) {
     			or.error(1, "Unable to find session: " + sessid);			// TODO make sure if this came off network that a response is sent
             	msg.release();
 	            return or;
     		}
     		
     		final DataStreamChannel chan = sess.getChannel(chanid);
     		
     		if (chan == null) {
 	            or.error(1, "Unable to find channel: " + chanid);
             	msg.release();
 	            return or;
     		}
     		
     		// stream in foreground to keep data in order
     		// (over wire this is accomplished by always using the same network path)
     		chan.deliverMessage(msg);		// TODO return errors if any
 		
 			return or;
         }
         
         String pathid = (StringUtil.isEmpty(sessid) || StringUtil.isEmpty(chanid)) ? null : sessid + "_" + chanid;
         
         if (StringUtil.isNotEmpty(pathid)) {
 	        StreamPath path = null;
 	        
 	        // get the path while locked because other wise the cleanup might steal the path after we get it but before we
 	        // touch it - thus removing the path that is just about to get used - thus allowing for possible out of order
 	        // data packets
 			this..lock();
 			
 			try {
 		        path = this..get(pathid);
 		        
 		        if (path != null)
 		        	path.touched = System.currentTimeMillis();
 			}
 			finally {
 			}
 	        
 	        if (path != null) {
 	        	if (path.direct != null) {
 	        		if (!path.direct.write(msg))
 	    				or.error(1, "Unable to route message to remote hub: " + msg); // TODO log, better code
 	    			
 	        		return or;
 	        	}
 	        	
 	        	if (path.tunneled != null) {
 	        		// TODO check that it is still connected/active
 	        		path.tunneled.deliverMessage(msg);
 	        		return or;
 	        	}
 	        }
         }
 
         // route to remote
 		
 		if (sess != null) {
 			if (sess.write(msg)) {
 		        if (StringUtil.isNotEmpty(pathid)) {
 					StreamPath sp = new StreamPath();
 					sp.direct = sess;
 
 					// lock so we are not interfering with cleanup (see below)
 					// if we get to here though our path is not a candidate for cleanup yet so
 					// our operation is not a concern for cleanup, just don't change hash while cleanup is using it
 					this..lock();
 					
 					try {
 						this..put(pathidsp);
 					}
 					finally {
 					}
 		        }
 			}
 			else 
 				or.error(1, "Unable to route message to remote hub: " + msg); // TODO log, better code
 				
 			return or;
 		}
 		
 		HubRouter tunnel = this.nextTunnelRoute();
 		
 		if (tunnel != null) {
 	        if (StringUtil.isNotEmpty(pathid)) {
 				StreamPath sp = new StreamPath();
 				sp.tunneled = tunnel;
 				
 				// lock so we are not interfering with cleanup (see below)
 				// if we get to here though our path is not a candidate for cleanup yet so
 				// our operation is not a concern for cleanup, just don't change hash while cleanup is using it
 				this..lock();
 				
 				try {
 					this..put(pathidsp);
 				}
 				finally {
 				}
 	        }
 	        
 			return tunnel.deliverMessage(msg);
 		}
 		
 		if (!"HELLO".equals(msg.getFieldAsString("Kind")))
 			or.error(1, "Unable to route message to proxied hub: " + msg); // TODO log, better code
 		
 		return or;
 	}
 
 	// round robin approach to finding routes
 		this..lock();
 		
 		try {
 			int subcount = this..size();
 			
 			if (subcount == 0) {
 				..println("Missing stream routes to " + this.);
 				return null;
 			}
 			
 			if (this. >= subcount)
 				this. = 0;
 
 			this.++;
 			return np;
 		}
 		finally {
 		}
 	}
 	
 	public class StreamPath {
 		protected StreamSession direct = null;
 		protected HubRouter tunneled = null;
 		protected long touched = System.currentTimeMillis();
 	}
 
 	public void keepAlive() {
 		if (!this.)
 			return;
 		
 		// loop direct connections
 		for (Session sess : this.)
 			sess.keepAlive();
 		
 		for (StreamSession sess : this.
 			sess.keepAlive();
 	}
 	
 	public void cleanup() {
		// clearing the stream paths has no real damaging impact - it's purpose is to corral data sent by one session/channel
		// into using the same StreamSession for a burst of activity - such that data will not get out of sequence by
		// going across different StreamSessions and packet 2 arriving before packet 1
		// however, if data has been silent for 1 minute then packets are not out of order and as such not an issue if 
		// clear this and assign an alternative StreamSession later - should the data sender still be active but very slow 
		long timeout = System.currentTimeMillis() - 60000;		// 1 min of no activity
		List<StreamPathcleanlist = new ArrayList<>();
		try {
			for (StreamPath path : this..values()) {
				// has path been quiet for too long?  
				if (path.touched < timeout)
					cleanlist.add(path);
			for (StreamPath path : cleanlist)
				this..remove(path);
		finally {
New to GrepCode? Check out our FAQ X