Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /* ************************************************************************
  #
  #  DivConq
  #
  #  http://divconq.com/
  #
  #  Copyright:
  #    Copyright 2014 eTimeline, LLC. All rights reserved.
  #
 #  License:
 #    See the license.txt file in the project's top-level directory for details.
 #
 #  Authors:
 #    * Andy White
 #
 ************************************************************************ */
 package divconq.bus;
 
 
 import java.util.List;
 
 
 /*
  * Service, Attachments, Extension are all reserved headers - Feature and Op semi-reserved.  All other headers are fine as long as the don't start with _.
  */
 
 public class Bus {
 	// info for this Hub
 	protected HubRouter localhub = null;
     
 	protected boolean proxymode = false;
 	
 	// includes Hubs exposed through PP2 connectors
 	protected final ConcurrentHashMap<StringHubRouterhubrouters = new ConcurrentHashMap<>();
 
 	protected AclFilter acl = new AclFilter();		// TODO support
 
 	protected Lock connectLock = new ReentrantLock();
 	protected Lock hubLock = new ReentrantLock();
 	
 	// desired listeners
 	protected List<SocketInfolisteners = new CopyOnWriteArrayList<>();
 	
 	// desired connectors
 	
 	// bus event group is separate from rest
 	protected EventLoopGroup eventLoopGroup = null;
 	
     /*
      * set localHub before calling this
     */
    public void init(OperationResult orXElement config) {
    	// TODO this should be separate from keep alive.  Sync Services should happen on demand as services change on Hub 
    	// - update based on timestamp, so keep alive may have the timestamp but not the services payload - other side then needs to poll services if out of date.
    	// keep alive can be more like 1 minute or so, except when pushed to indicate status change (Hub Status or Service Status changes)
    	int syncperiodsec = 15;		
    	int conninterval = 5;	
		this. = new HubRouter();
    	if (config != null) {
    		this. = "True".equals(config.getAttribute("Proxy"));
    		
    		SslContextFactory.init(config);
    		
        	for(XElement node : config.selectAll("Acl")) 
        		this..loadConfig(node);
        	
        	for(XElement node : config.selectAll("Listener")) {
        		SocketInfo si = new SocketInfo();
        		si.loadConfig(node);
        		this.addListener(si);
        	}
        	
        	for(XElement node : config.selectAll("Connector")) {
        		SocketInfo si = new SocketInfo();
        		si.loadConfig(node);
        		this.addConnector(si);
        	}
        	
        	/*
        	// TODO how many threads for eventGroup?
    		// as of yet, nothing to do - get settings as/when required
        	
        	for(XElement node : config.selectAll("Pool")) 
        		this.sessionpool.init(node);        	
        	*/
        	
        	if (config.hasAttribute("SyncServiceList")) 
        		syncperiodsec = (int) StringUtil.parseInt(config.getAttribute("SyncServiceList"), syncperiodsec);
        	
        	if (config.hasAttribute("ConnectingInterval")) 
        		conninterval = (int) StringUtil.parseInt(config.getAttribute("ConnectingInterval"), conninterval);
    	}
    	final int connint = conninterval;
    	
    	ISystemWork busconnector = new ISystemWork() {			
			public void run(SysReporter reporter) {
				reporter.setStatus("dcBus Connect");
					Bus.this.connect();
				reporter.setStatus("after dcBus Connect");
			}
			public int period() {
				return connint;
			}
		};
    	
	    ..getClock().addSlowSystemWorker(busconnector);
    	
	    final int syncperiodsec2 = syncperiodsec;
	    
	    // keep the connected hubs up to date with our services list
			public void run(SysReporter reporter) {
				reporter.setStatus("dcBus keep alive");
				for (HubRouter hub : Bus.this..values()) {
					hub.keepAlive();
					hub.cleanup();
				}
				reporter.setStatus("after dcBus keep alive");
			}
			public int period() {
				return syncperiodsec2;
			}
		});
    }
		if (this. == null)
		return this.;
	}
    public boolean isProxyMode() {
    	return this.;
    }
    
	public HubRouter getLocalHub() {
		return this.;
	}
    
    public Collection<HubRoutergetHubs() {
    	return this..values();
    }
		return this.sendMessage(msg, (ServiceResult)null);
    }
		return this.sendMessage(msgnew ServiceResult() {			
			public void callback() {
				r.result(this);
			}
		});
	}
	// designed to be used either with callback or OpResult
	public OperationResult sendMessage(Message msgServiceResult callback) {
		OperationResult or = callback;
		if (or == null)
			or = new OperationResult();
		OperationContext tc = OperationContext.get();
		if (tc == null) {
			or.errorTr(219, msg);		
			if (callback != null
				callback.abandon();
			return or;
		}
    	String srv = msg.getFieldAsString("Service");
		if (srv == null) {
			or.errorTr(220, msg);
			if (callback != null
				callback.abandon();
			return or;
		}
    	
		ServiceRouter router = this..get(srv);
		if (router == null) {
			or.errorTr(221, msg);
			if (callback != null
				callback.abandon();
			return or;
		}
		// if no FromHub then add myself
		if (msg.isFieldEmpty("FromHub"))
			msg.setField("FromHub", OperationContext.getHubId());
		if (callback != null
			callback.setReplyTag(this..registerForReply(msgcallback));
		tc.freeze(msg);
		OperationResult routeres = router.sendMessage(msg);
		if (routeres.hasErrors()) {	
			if (callback != null
				// put the routing errors into the callback
				callback.abandon();
			return or;
		}
		// message was sent, record it here
		..getCountManager().countObjects("dcBusMessageSent"msg);
		return or;
    }
	public boolean isServiceAvailable(String service) {
		ServiceRouter router = this..get(service);
		if (router == null
			return false;
		return router.isAvailable();
	}
	// used on gateways to lookup the hub id of the one hub they are connected to
	// return the hub if of the single, active, hub that provides backend services for us
	public String getTetherId() {
		for (HubRouter hub : this..values()) {
			// is direct detects if the hub is active and not local - if so we use that hub
			if (hub.isDirect())
				return hub.getHubId();
		}
		return null;
	}
	/* TODO restore but no waits
	public void sendMessages(ServiceResult callback, TimeoutPlan timeout, Message... msgs) {
		final Semaphore flag = new Semaphore(0);		
		final CountDownLatch latch = new CountDownLatch(msgs.length);
		final RecordStruct responses = new RecordStruct();
		for (Message msg : msgs) {
			this.sendMessage(msg, new ServiceResult(timeout) {
				@Override
				public void callback() {
					Message rmsg = this.getResult();
					// flag is just for fair play with responses - if it fails still need to count down
					try {
						flag.acquire();
						String org = rmsg.getFieldAsString("Tag");
						if (org != null)
							responses.setField(org, rmsg);
						flag.release();
					catch (InterruptedException e) {
					}
					latch.countDown();					
				}
			});
		}		
		flag.release();
		try {
			latch.await();
			callback.setReply(MessageUtil.success(new RecordStruct(new FieldStruct("Responses", responses))));
		catch (InterruptedException e) {
			callback.error(1, "Latch failed");		// TODO code 
		}
		callback.complete();
    }
    */
    public void sendReply(Message msgMessage original) {
    	if (msg == null
    		return;
    	
    	// sender didn't want a reply
    	if (!MessageUtil.addressReply(msgoriginal))
    		return;
    	
    	// TODO consider a setting to disable response checks
		if (mr.hasErrors()) {
			..println("Bad Message Content: " + msg);
			msg = mr.toLogMessage();		// explain problem to original requester
			// TODO temp
			..println("Bad Message Error: " + mr);
	    	MessageUtil.addressReply(msgoriginal);
		}
		this.sendMessage(msg);
    }
    // for when original Service/Feature/Op don't match your validation needs
    public void sendReply(Message msgMessage originalString servString featString op) {
    	if (msg == null)
    		return;
    	
    	// sender didn't want a reply
    	if (!MessageUtil.addressReply(msgoriginal))
    		return;
    	
    	// TODO consider a setting to disable response checks
		OperationResult mr = ..getSchema().validateResponse(msgservfeatop);
		if (mr.hasErrors()) {
			..println("Bad Message Content: " + msg);
			msg = mr.toLogMessage();		// explain problem to original requester
			// TODO temp
			..println("Bad Message Error: " + mr);
	    	
	    	MessageUtil.addressReply(msgoriginal);
		}
		this.sendMessage(msg);
    }
    
    /*
    // TODO replace with dcEvents
    public void whenAvailable(String service, OperationCallback callback) {
    	// TODO support fabric/peers and delayed discovery
    	
		ServiceRouter router = (service != null) 
				? this.servicerouters.get(service) 
				: null;
		if (router == null) 
			callback.error(1, "Unable to find service: " + service);		// TODO code
		callback.completed();
    }
    */
    
    public HubRouter allocateOrGetHub(String idboolean gateway) {
    	this..lock();
   
    	try {
    		HubRouter hr = this..get(id);
			if (hr == null) {
				hr = new HubRouter(idgateway);
				this..put(idhr);
			}
			return hr;
		}
    	finally {
    		this..unlock();
    	}
    }
    
	public void indexServices(HubRouter hub) {
		//System.out.println(" ****************************************** ");
		//System.out.println("       SERVICES INDEXED!!! ");
		//System.out.println(" ****************************************** ");
		for (String srv : hub.getServices())
			if (!this..containsKey(srv))
				this..put(srvnew ServiceRouter(srv));
		for (ServiceRouter rt : this..values())
			rt.index(hub);
	}
	public void removeServices(HubRouter hub) {
		for (ServiceRouter rt : this..values())
			rt.remove(hub);
	}
	public void addConnector(SocketInfo info) {
		if (info == null)
			return;
		this..putIfAbsent(info.getHubId(), info);
	}

Unlike removeListener this does not remove any "binding" just won't try connecting again anymore

Parameters:
info descriptor of connector to remove
	public void removeConnector(SocketInfo info) {
		HubRouter router = this..get(info.getHubId());
		if (router.isLocal())
			return;
		router.remove(info);
	}
	public SocketInfo getHubConnector(String hubid) {
		return this..get(hubid);
	}
	public void addListener(SocketInfo info) {
		if (info == null)
			return;
		this..add(info);
	}
	public void removeListener(SocketInfo info) {
		this..remove(info);
	}
    
	public void connect() {
		// never try to connect until init has run
			return;
		// if connect method is already running then skip - it will try again later 
		if (!this..tryLock())
			return;
		try {
			// ==========================================================================
			//   Add client connections when not enough
			// ==========================================================================
			for (final SocketInfo info : this..values()) {
				HubRouter router = this.allocateOrGetHub(info.getHubId(), info.isGateway());
				if (router.isLocal())
					continue;
				// -------------------------------------------------
				// message port
				// -------------------------------------------------
				int conncount = router.getCountSessions(info);
				// add a coonection only once per call to connect (should be between 2 - 15 seconds between calls)
				if (conncount < info.getCount()) {
			        Bootstrap b = new Bootstrap();
			        
			        b.group(this.getEventLoopGroup())
			         .channel(NioSocketChannel.class)
			         .option(., 250)		
			         .handler(new ChannelInitializer<SocketChannel>() {
			             @Override
			             public void initChannel(SocketChannel chthrows Exception {
			                 ChannelPipeline pipeline = ch.pipeline();
			                 
			                 if (info.isUseSsl())
			                	 pipeline.addLast("ssl"new SslHandler(SslContextFactory.getClientEngine()));
			                 
			                 pipeline.addLast("http-codec"new HttpClientCodec());
			                 pipeline.addLast("aggregator"new HttpObjectAggregator(8192));		// TODO is this too small?
			                 
			                 pipeline.addLast("readTimeoutHandler"new ReadTimeoutHandler(60));  // TODO config
			                 pipeline.addLast("writeTimeoutHandler"new WriteTimeoutHandler(30));   // TODO config
			                 
			                 pipeline.addLast("ws-handler"new ClientHandler(info));
			             }
			         });
			        Logger.debug("dcBus Client connecting");
			        
			        try {
			        	// must wait here to make sure we don't release connectLock too soon
			        	// we want channel init (above) to complete before we try connect again
			        	b.connect(info.getAddress()).sync();
			        }
			        catch (InterruptedException x) {
			        	Logger.warn("dcBus Client interrupted while connecting: " + x);
			        }
			        catch (Exception x) {
			        	Logger.debug("dcBus Client unable to connect: " + x);
			        }
				}
				// -------------------------------------------------
				// stream port
				// -------------------------------------------------
				conncount = router.getCountStreamSessions(info);
				// add a coonection only once per call to connect (should be between 2 - 15 seconds between calls)
				if (conncount < info.getStreamCount()) {
			        Bootstrap b = new Bootstrap();
			        
			        b.group(this.getEventLoopGroup())
			         .channel(NioSocketChannel.class)
			         .option(., 250)			
			         .handler(new ChannelInitializer<SocketChannel>() {
			             @Override
			             public void initChannel(SocketChannel chthrows Exception {
							ChannelPipeline pipeline = ch.pipeline();
							 
							if (info.isUseSsl())
								pipeline.addLast("ssl"new SslHandler(SslContextFactory.getClientEngine()));
							// TODO consider compression
							 
							pipeline.addLast("decoder"new StreamDecoder());
							pipeline.addLast("encoder"new StreamEncoder());
			                 
							pipeline.addLast("readTimeoutHandler"new ReadTimeoutHandler(60));  // TODO config
							pipeline.addLast("writeTimeoutHandler"new WriteTimeoutHandler(30));   // TODO config
			                 							
							pipeline.addLast("handler"new StreamHandler(infofalse));
			             }
			         });
			        Logger.debug("dcBus Client stream connecting");
			        
			        try {
			        	// must wait here to make sure we don't release connectLock too soon
			        	// we want chanel init (above) to complete before we try connect again
							public void operationComplete(ChannelFuture cfthrows Exception {
								if (!cf.isSuccess()) {
									Logger.debug("dcBus Stream unable to connect: " + cf.cause());
									return;
								}
						    	
								// client starts the HELLO thing once connected!
					            cf.channel().writeAndFlush(icmd);
							}                	
						}).sync();
			        }
			        catch (InterruptedException x) {
			        	Logger.warn("dcBus Client stream interrupted while connecting: " + x);
			        }
			        catch (Exception x) {
			        	Logger.debug("dcBus Client stream unable to connect: " + x);
			        }
				}
			}
			// ==========================================================================
			//   Add server binding when missing
			// ==========================================================================
			for (final SocketInfo info : this.) {
				// only if not currently bound
					continue;
				// -------------------------------------------------
				// message port
				// -------------------------------------------------
		        ServerBootstrap b = new ServerBootstrap();
		        
		        b.group(this.getEventLoopGroup())
		         .channel(NioServerSocketChannel.class)
		         //.option(ChannelOption.SO_BACKLOG, 125)			// this is probably not needed but serves as note to research
		         .childHandler(new ChannelInitializer<SocketChannel>() {
					protected void initChannel(SocketChannel chthrows Exception {
		    	        ChannelPipeline pipeline = ch.pipeline();
		    	        
		    	        if (info.isUseSsl())
		    	        	pipeline.addLast("ssl"new SslHandler(SslContextFactory.getServerEngine()));
		    	        
		    	        pipeline.addLast("codec-http"new HttpServerCodec());
		    	        pipeline.addLast("aggregator"new HttpObjectAggregator(65536));
		                 
		    	        pipeline.addLast("readTimeoutHandler"new ReadTimeoutHandler(60));  // TODO config
		    	        pipeline.addLast("writeTimeoutHandler"new WriteTimeoutHandler(30));   // TODO config
		                 
		    	        pipeline.addLast("handler"new ServerHandler(info));
					}        	 
		         });
		        try {
		        	// must wait here, both to keep the activelisteners listeners up to date
		        	// but also to make sure we don't release connectLock too soon
			        ChannelFuture bfuture = b.bind(info.getAddress()).sync();
			        
			        if (bfuture.isSuccess()) {
			        	Logger.info("dcBus Message Server listening");
				        this..put(infobfuture.channel());
			        }
			        else
			        	Logger.error("dcBus Server unable to bind: " + bfuture.cause());
		        }
		        catch (InterruptedException x) {
		        	Logger.warn("dcBus Server interrupted while binding: " + x);
		        }
		        catch (Exception x) {
		        	Logger.error("dcBus Server unable to bind: " + x);
		        }
				// -------------------------------------------------
				// stream port
				// -------------------------------------------------
		        b = new ServerBootstrap();
		        
		        b.group(this.getEventLoopGroup())
		         .channel(NioServerSocketChannel.class)
		         //.option(ChannelOption.SO_BACKLOG, 125)			// this is probably not needed but serves as note to research
		         .childHandler(new ChannelInitializer<SocketChannel>() {
					protected void initChannel(SocketChannel chthrows Exception {
		    	        ChannelPipeline pipeline = ch.pipeline();
		    	        
		    	        if (info.isUseSsl())
		    	        	pipeline.addLast("ssl"new SslHandler(SslContextFactory.getServerEngine()));
						// TODO consider compression
		    	        
						pipeline.addLast("decoder"new StreamDecoder());
						pipeline.addLast("encoder"new StreamEncoder());		    	        
		                 
		    	        pipeline.addLast("readTimeoutHandler"new ReadTimeoutHandler(60));  // TODO config
		    	        pipeline.addLast("writeTimeoutHandler"new WriteTimeoutHandler(30));   // TODO config
		    	        
		    	        pipeline.addLast("handler"new StreamHandler(infotrue));
					}        	 
		         });
		        try {
		        	// must wait here, both to keep the activelisteners listeners up to date
		        	// but also to make sure we don't release connectLock too soon
			        ChannelFuture bfuture = b.bind(info.getStreamAddress()).sync();
			        
			        if (bfuture.isSuccess()) {
			        	Logger.info("dcBus Stream Server listening");
				        this..put(infobfuture.channel());
			        }
			        else
			        	Logger.error("dcBus Stream Server unable to bind: " + bfuture.cause());
		        }
		        catch (InterruptedException x) {
		        	Logger.warn("dcBus Stream Server interrupted while binding: " + x);
		        }
		        catch (Exception x) {
		        	Logger.error("dcBus Stream Server unable to bind: " + x);
		        }
			}
			// ==========================================================================
			//   Remove server binding as needed
			// ==========================================================================
			for (final SocketInfo info : this..keySet()) {
				// all is well if in the listeners list
				if (this..contains(info))
					continue;
				// otherwise we don't want to bind anymore
			}
		}
		finally {
		}
	}
		if (msg == null) {
			res.error(1, "Message is missing");		// TODO log codes
			return res;
		}
		if (msg.isFieldEmpty("ToHub") || msg.isFieldEmpty("ToSession") || msg.isFieldEmpty("ToChannel")) {
			res.error(1, "Message is missing addressing");		// TODO log codes
			msg.release();
			return res;
		}
		String hub = msg.getFieldAsString("ToHub");
		HubRouter router = this..get(hub);
		if (router == null) {
			res.error(1, "No network path to hub");		// TODO log codes
			msg.release();
			return res;
		}
		OperationResult routeres = router.deliverMessage(msg);
		if (routeres.hasErrors()) 	
			return res;
		// message was sent, record it here
		..getCountManager().countObjects("dcBusStreamMessageSent"msg);
		return res;
    }
    public OperationResult sendReply(StreamMessage msgRecordStruct original) {
    	//if (msg == null) { 
    	//	return ;
    	
    	MessageUtil.streamAddressReply(msgoriginal);
		return this.sendMessage(msg);
    }
    
    protected void stopSocketListener(SocketInfo info) {
		// tear down message port
		Channel ch = this..remove(info);
        try {
        	// must wait here, both to keep the activelisteners listeners up to date
        	// but also to make sure we don't release connectLock too soon
        	ChannelFuture bfuture = ch.close().sync();
	        
	        if (bfuture.isSuccess()) 
		        ..println("dcBus Server unbound");
	        else
	        	..println("dcBus Server unable to unbind: " + bfuture.cause());
        }
        catch (InterruptedException x) {
        	..println("dcBus Server unable to unbind: " + x);
        }
		// tear down stream port
		ch = this..remove(info);
        try {
        	if (ch != null) {
	        	ChannelFuture bfuture = ch.close().sync();
		        
		        if (bfuture.isSuccess()) 
			        ..println("dcBus Stream Server unbound");
		        else
		        	..println("dcBus Stream Server unable to unbind: " + bfuture.cause());
        	}
        	else
        		..println("dcBus Stream Server missing channel");
        }
        catch (InterruptedException x) {
        	..println("dcBus Stream Server unable to unbind: " + x);
        }
    }
    
    public void stopMatrix(OperationResult or) {
		try {
			// we don't want to listen anymore
			for (final SocketInfo info : this..keySet()) 
	    	
			for (HubRouter router : this..values())
				router.close();
		}
		finally {
		}
    }
	public boolean isConnected() {
		for (HubRouter hub : this..values()) 
			if (hub.isDirect())
				return true;
		return false;
	}
    
    public void stopFinal(OperationResult or) {
    	// TODO sync these guys
		try {
			if (this. != null)
		catch (InterruptedException x) {
		}
    }
	public void dumpInfo() {
		..println("End-points and connections: ");
		..println("Hubs: ");
		for (HubRouter hub : this..values()) {
			String connectkind = hub.isLocal() 
"Self"
hub.isDirect()
"Direct"
hub.isTunneled()
"Tunneled"
"???";
			..println("- " + hub.getHubId() + " - " + (hub.isActive() ? "Active" : "Inactive") + " - " + connectkind);
			..println("  >>> " + StringUtil.join(hub.services","));
			if (hub.isDirect()) {
				..println("  $$$ cmd: " + hub.sessions.size() + " - data: " + hub.streamsessions.size());
				for (Session sess : hub.sessions) {
					..println("      +++ msg sess " + sess.getChannel()
							 + " - open: " + sess.getChannel().isOpen()
							 + " - active: " + sess.getChannel().isActive()
							 + " - write: " + sess.getChannel().isWritable()
							 + " - regist: " + sess.getChannel().isRegistered());					
				}				
				for (StreamSession sess : hub.streamsessions) {
					..println("      +++ data sess " + sess.getChannel() + " written: " + sess.getWritten() + " - read: " + sess.getRead()
							 + " - open: " + sess.getChannel().isOpen()
							 + " - active: " + sess.getChannel().isActive()
							 + " - write: " + sess.getChannel().isWritable()
							 + " - regist: " + sess.getChannel().isRegistered());					
				}				
			}
			String proxied = StringUtil.join(hub.proxied.keySet(), ",");
			if (StringUtil.isNotEmpty(proxied)) 
				..println("  ||| " + proxied);
		}
		..println("Services: ");
		for (ServiceRouter router : this..values()) {
			..println("- " + router.getName());
			List<Stringal = new ArrayList<String>();
			for (HubRouter a : router.hubList())
				al.add(a.getHubId());
			..println("  >>> " + StringUtil.join(al","));
		}
	}
	public AclFilter getAcl() {
		return this. ;
	}
		return this..get(srv);
	}
New to GrepCode? Check out our FAQ X