Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright (c) SiteWhere, LLC. All rights reserved. http://www.sitewhere.com
   *
   * The software in this package is published under the terms of the CPAL v1.0
   * license, a copy of which has been included with this distribution in the
   * LICENSE.txt file.
   */
  package com.sitewhere.device.provisioning;
  
 
 
 
 public class BlockingQueueInboundProcessingStrategy extends LifecycleComponent implements
Static logger instance
 
 	private static Logger LOGGER = Logger.getLogger(BlockingQueueInboundProcessingStrategy.class);

Maximum size of queues
 
 	private static final int MAX_QUEUE_SIZE = 10000;

Number of threads used for event processing
 
 	private static final int EVENT_PROCESSOR_THREAD_COUNT = 100;

Interval between monitoring log output messages
 
 	private static final int MONITORING_INTERVAL_SEC = 5;

Number of thread used for event processing
 
Indicates whether monitoring messages should be logged
 
 	private boolean enableMonitoring = false;

Number of seconds between monitoring messages
 
Counter for number of events
 
 	private AtomicLong eventCount = new AtomicLong();

Counter for number of errors
 
 	private AtomicLong errorCount = new AtomicLong();

Total wait time
 
 	private AtomicLong totalWaitTime = new AtomicLong();

Total processing time
 
 	private AtomicLong totalProcessingTime = new AtomicLong();

Blocking queue of pending event create requests from event sources
 
Thread pool for processing events
 
Pool for monitoring thread
 
 
 	}
 
 	/*
 	 * (non-Javadoc)
 	 * 
 	 * @see com.sitewhere.spi.server.lifecycle.ILifecycleComponent#start()
 	 */
 	public void start() throws SiteWhereException {
		for (int i = 0; i < getEventProcessorThreadCount(); i++) {
		}
		.info("Started blocking queue inbound processing strategy with queue size of " + 
" and " + getEventProcessorThreadCount() + " threads.");
		// Only show monitoring data if enabled.
		}
	}
	/*
	 * (non-Javadoc)
	 * 
	 * @see com.sitewhere.spi.server.lifecycle.ILifecycleComponent#getLogger()
	 */
	public Logger getLogger() {
		return ;
	}

Used for naming processor threads
	private class ProcessorsThreadFactory implements ThreadFactory {

Counts threads
		private AtomicInteger counter = new AtomicInteger();
		public Thread newThread(Runnable r) {
			return new Thread(r"SiteWhere BlockingQueueInboundProcessingStrategy Processor "
		}
	}
	/*
	 * (non-Javadoc)
	 * 
	 * @see com.sitewhere.spi.server.lifecycle.ILifecycleComponent#stop()
	 */
	public void stop() throws SiteWhereException {
		if ( != null) {
		}
		if ( != null) {
		}
		.info("Stopped blocking queue inbound processing strategy.");
	}
	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * com.sitewhere.spi.device.provisioning.IInboundProcessingStrategy#processRegistration
	 * (com.sitewhere.spi.device.provisioning.IDecodedDeviceEventRequest)
	 */
	}
	/*
	 * (non-Javadoc)
	 * 
	 * @see com.sitewhere.spi.device.provisioning.IInboundProcessingStrategy#
	 * processDeviceCommandResponse
	 * (com.sitewhere.spi.device.provisioning.IDecodedDeviceEventRequest)
	 */
	}
	/*
	 * (non-Javadoc)
	 * 
	 * @see com.sitewhere.spi.device.provisioning.IInboundProcessingStrategy#
	 * processDeviceMeasurements
	 * (com.sitewhere.spi.device.provisioning.IDecodedDeviceEventRequest)
	 */
	}
	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * com.sitewhere.spi.device.provisioning.IInboundProcessingStrategy#processDeviceLocation
	 * (com.sitewhere.spi.device.provisioning.IDecodedDeviceEventRequest)
	 */
	}
	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * com.sitewhere.spi.device.provisioning.IInboundProcessingStrategy#processDeviceAlert
	 * (com.sitewhere.spi.device.provisioning.IDecodedDeviceEventRequest)
	 */
	}

Adds an com.sitewhere.spi.device.provisioning.IDecodedDeviceEventRequest to the queue, blocking if no space is available.

Parameters:
request
Throws:
com.sitewhere.spi.SiteWhereException
		try {
			wrapper.setRequest(request);
			.put(wrapper);
catch (InterruptedException e) {
			throw new SiteWhereException(e);
		}
	}

Get the number of events processed.

Returns:
	public long getEventCount() {
		return .get();
	}

Get the number of errors in processing.

Returns:
	public long getErrorCount() {
		return .get();
	}

Get the number of backlogged requests.

Returns:
	public long getBacklog() {
		return .size();
	}

Get the average wait time in milliseconds.

Returns:
		long total = .get();
		long count = .get();
		if (count == 0) {
			return 0;
		}
		return total / count;
	}

Get the average processing time in milliseconds.

Returns:
	public long getAverageProcessingTime() {
		return 0;
	}

Get the average processing time of downstream components.

Returns:
		long total = .get();
		long count = .get();
		if (count == 0) {
			return 0;
		}
		return total / count;
	}
	}
	public void setEventProcessorThreadCount(int eventProcessorThreadCount) {
		this. = eventProcessorThreadCount;
	}
	public boolean isEnableMonitoring() {
	}
	public void setEnableMonitoring(boolean enableMonitoring) {
		this. = enableMonitoring;
	}
	public int getMonitoringIntervalSec() {
	}
	public void setMonitoringIntervalSec(int monitoringIntervalSec) {
		this. = monitoringIntervalSec;
	}
	public class PerformanceWrapper {

Start time for event processing
		private long startTime;

Event request
		public long getStartTime() {
			return ;
		}
		public void setStartTime(long startTime) {
			this. = startTime;
		}
			return ;
		}
		public void setRequest(IDecodedDeviceEventRequest request) {
			this. = request;
		}
	}

Logs monitor output at a given time interval.

Author(s):
Derek
	public class MonitorOutput implements Runnable {
		public void run() {
			while (true) {
				try {
					long eventCount = getEventCount();
					long errorCount = getErrorCount();
					long backlog = getBacklog();
					long avgWaitTime = getAverageProcessingWaitTime();
					long avgProcessingTime = getAverageProcessingTime();
					long avgDownstreamTime = getAverageDownstreamProcessingTime();
					String message =
							String.format(
									"Count(%5d) Errors(%5d) Backlog(%5d) AvgWait(%5d ms) AvgProc(%5d ms) AvgDS(%5d ms)",
									eventCounterrorCountbacklogavgWaitTimeavgProcessingTime,
									avgDownstreamTime);
					.info(message);
catch (Throwable e) {
				}
				try {
					Thread.sleep(getMonitoringIntervalSec() * 1000);
catch (InterruptedException e) {
				}
			}
		}
	}

Blocking thread that processes com.sitewhere.spi.device.provisioning.IDecodedDeviceEventRequest from a queue.

Parameters:
<T>
Author(s):
Derek
	private class BlockingMessageProcessor implements Runnable {

Queue where messages are placed
			this. = queue;
		}
		public void run() {
			// Event creation APIs expect an authenticated user in order to check
			// permissions and log who creates events. When called in this context, the
			// authenticated user will always be 'system'.
			//
			// TODO: Alternatively, we may want the client to authenticate on registration
			// and pass a token on each request.
			try {
				SecurityContextHolder.getContext().setAuthentication(
						SiteWhereServer.getSystemAuthentication());
catch (SiteWhereException e) {
				throw new RuntimeException("Unable to use system authentication for inbound device "
" event processor thread."e);
			}
			while (true) {
				try {
					long wait = System.currentTimeMillis() - wrapper.getStartTime();
					long processingStart = System.currentTimeMillis();
					IDecodedDeviceEventRequest decoded = wrapper.getRequest();
					if (decoded.getRequest() instanceof IDeviceRegistrationRequest) {
								decoded.getHardwareId(), decoded.getOriginator(),
else if (decoded.getRequest() instanceof IDeviceCommandResponseCreateRequest) {
								decoded.getHardwareId(), decoded.getOriginator(),
else if (decoded.getRequest() instanceof IDeviceMeasurementsCreateRequest) {
								decoded.getHardwareId(), decoded.getOriginator(),
else if (decoded.getRequest() instanceof IDeviceLocationCreateRequest) {
								decoded.getHardwareId(), decoded.getOriginator(),
else if (decoded.getRequest() instanceof IDeviceAlertCreateRequest) {
								decoded.getHardwareId(), decoded.getOriginator(),
else {
						throw new RuntimeException("Unknown device event type: "
decoded.getRequest().getClass().getName());
					}
					long processingTime = System.currentTimeMillis() - processingStart;
catch (SiteWhereException e) {
					.error("Error processing inbound device event."e);
catch (InterruptedException e) {
					break;
catch (Throwable e) {
					.error("Unhandled exception in inbound event processing."e);
				}
			}
		}
	}
New to GrepCode? Check out our FAQ X