Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright 2005-2012 The Kuali Foundation Licensed under the Educational Community License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.opensource.org/licenses/ecl2.php Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package org.kuali.rice.ksb.impl.bus;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 
 
 public class ServiceBusImpl extends BaseLifecycle implements ServiceBusInitializingBeanDisposableBean {
 	
 	private static final Logger LOG = Logger.getLogger(ServiceBusImpl.class);
 	
 	private final Object serviceLock = new Object();
 	private final Object synchronizeLock = new Object();
 	private final Random randomNumber = new Random();
 	
 	// injected values
 	private String instanceId;
 	
Contains endpoints for services which were published by this client application.
 
 	private final Map<QNameLocalServicelocalServices;

Contains endpoints for services which exist remotely. This list may not be entirely complete as entries get lazily loaded into it as services are requested.
 
 		
 	public ServiceBusImpl() {
 		this. = new HashMap<QNameLocalService>();
 	}
 	
 	public void afterPropertiesSet() throws Exception {
 		if (StringUtils.isBlank()) {
 			throw new IllegalStateException("a valid instanceId was not injected");
 		}
 		if ( == null) {
 			throw new IllegalStateException("serviceRegistry was not injected");
 		}
 		if ( == null) {
 			throw new IllegalStateException("diffCalculator was not injected");
 		}
 		if ( == null) {
 			throw new IllegalStateException("scheduledPool was not injected");
		}
	}
	public void start() throws Exception {
		super.start();
	}
	protected boolean isDevMode() {
		return ConfigContext.getCurrentContextConfig().getDevMode();
	}
	protected void startSynchronizationThread() {
		synchronized () {
			.info("Starting Service Bus synchronization thread...");
			if (!isDevMode()) {
				int refreshRate = ConfigContext.getCurrentContextConfig().getRefreshRate();
				Runnable runnable = new Runnable() {
					public void run() {
						try {
catch (Throwable t) {
							.error("Failed to execute background service bus synchronization."t);
						}
					}
				};
			}
			.info("...Service Bus synchronization thread successfully started.");
		}
	}
	public void destroy() throws Exception {
		.info("Stopping the Service Bus...");
		.info("...Service Bus successfully stopped.");
	}
	protected void stopSynchronizationThread() {
		synchronized () {
			// remove services from the bus
			if (this. != null) {
				if (!this..cancel(false)) {
					.warn("Failed to cancel registry sychronization.");
				}
				this. = null;
			}
		}
	}
	public String getInstanceId() {
		return this.;
	}
	public void setInstanceId(String instanceId) {
		this. = instanceId;
	}
	public List<EndpointgetEndpoints(QName serviceName) {
		if (serviceName == null) {
			throw new IllegalArgumentException("serviceName cannot be null");
		}
		List<Endpointendpoints = new ArrayList<Endpoint>();
		synchronized () {
			endpoints.addAll(getRemoteEndpoints(serviceName));
			Endpoint localEndpoint = getLocalEndpoint(serviceName);
			if (localEndpoint != null) {
				for (Iterator<Endpointiterator = endpoints.iterator(); iterator.hasNext();) {
					Endpoint endpoint = (Endpointiterator.next();
					if (localEndpoint.getServiceConfiguration().equals(endpoint.getServiceConfiguration())) {
						iterator.remove();
						break;
					}
				}
				// add at first position, just because we like the local endpoint the best, it's our friend ;)
				endpoints.add(0, localEndpoint);
			}
		}
		return Collections.unmodifiableList(endpoints);
	}
	public List<EndpointgetRemoteEndpoints(QName serviceName) {
		if (serviceName == null) {
			throw new IllegalArgumentException("serviceName cannot be null");
		}
		List<Endpointendpoints = new ArrayList<Endpoint>();
		synchronized () {
			Set<RemoteServiceremoteServices = .get(serviceName);
			if (remoteServices != null) {
				for (RemoteService remoteService : remoteServices) {
					endpoints.add(remoteService.getEndpoint());
				}
			}
		}
		return Collections.unmodifiableList(endpoints);
	}
	public Endpoint getLocalEndpoint(QName serviceName) {
		if (serviceName == null) {
			throw new IllegalArgumentException("serviceName cannot be null");
		}
		synchronized () {
			LocalService localService = .get(serviceName);
			if (localService != null) {
				return localService.getEndpoint();
			}
			return null;
		}
	}
		Map<QNameEndpointlocalEndpoints = new HashMap<QNameEndpoint>();
		synchronized () {
			for (QName localServiceName : .keySet()) {
				LocalService localService = .get(localServiceName);
				localEndpoints.put(localServiceNamelocalService.getEndpoint());
			}
		}
		return Collections.unmodifiableMap(localEndpoints);
	}
		List<EndpointallEndpoints = new ArrayList<Endpoint>();
		synchronized () {
			for (QName serviceName : this..keySet()) {
				allEndpoints.add(this..get(serviceName).getEndpoint());
			}
			for (QName serviceName : this..keySet()) {
				Set<RemoteServiceremoteServices = .get(serviceName);
				for (RemoteService remoteService : remoteServices) {
					allEndpoints.add(remoteService.getEndpoint());
				}
			}
		}
		return Collections.unmodifiableList(allEndpoints);
	}
	public Endpoint getEndpoint(QName serviceName) {
		return getEndpoint(serviceNamenull);
	}
    public Endpoint getEndpoint(QName serviceNameString applicationId) {
        if (serviceName == null) {
            throw new IllegalArgumentException("serviceName cannot be null");
        }
        Endpoint availableEndpoint = null;
        synchronized () {
            // look at local services first
            availableEndpoint = getLocalEndpoint(serviceName);
            if (availableEndpoint == null || (!StringUtils.isBlank(applicationId) && !availableEndpoint.getServiceConfiguration().getApplicationId().equals(applicationId))) {
                 // TODO - would be better to return an Endpoint that contained an internal proxy to all the services so fail-over would be easier to implement!
                Set<RemoteServiceremoteServices = .get(serviceName);
                remoteServices = filterByApplicationId(applicationIdremoteServices);
                if (remoteServices != null && !remoteServices.isEmpty()) {
                    // TODO - this should also probably check the current status of the service?
                    RemoteService[] remoteServiceArray = remoteServices.toArray(new RemoteService[0]);
                    RemoteService availableRemoteService = remoteServiceArray[this..nextInt(remoteServiceArray.length)];
                    availableEndpoint = availableRemoteService.getEndpoint();
                }
            }
        }
        return availableEndpoint;
    }
	protected Set<RemoteServicefilterByApplicationId(String applicationIdSet<RemoteServiceremoteServices) {
	    if (StringUtils.isBlank(applicationId) || remoteServices == null || remoteServices.isEmpty()) {
	        return remoteServices;
	    }
	    Set<RemoteServicefiltered = new HashSet<RemoteService>();
	    for (RemoteService remoteService : remoteServices) {
	        if (remoteService.getServiceInfo().getApplicationId().equals(applicationId)) {
	            filtered.add(remoteService);
	        }
	    }
	    return filtered;
	}
	public Endpoint getConfiguredEndpoint(ServiceConfiguration serviceConfiguration) {
		if (serviceConfiguration == null) {
			throw new IllegalArgumentException("serviceConfiguration cannot be null");
		}
		synchronized () {
			Endpoint localEndpoint = getLocalEndpoint(serviceConfiguration.getServiceName());
			if (localEndpoint != null && localEndpoint.getServiceConfiguration().equals(serviceConfiguration)) {
				return localEndpoint;
			}
			List<EndpointremoteEndpoints = getRemoteEndpoints(serviceConfiguration.getServiceName());
			for (Endpoint remoteEndpoint : remoteEndpoints) {
				if (remoteEndpoint.getServiceConfiguration().equals(serviceConfiguration)) {
					return remoteEndpoint;
				}
			}
		}
		return null;
	}
    public Object getService(QName serviceName) {
        return getService(serviceNamenull);
    }
	public Object getService(QName serviceNameString applicationId) {
		Endpoint availableEndpoint = getEndpoint(serviceNameapplicationId);
		if (availableEndpoint == null) {
			return null;
		}
		return availableEndpoint.getService();
	}
	public ServiceConfiguration publishService(ServiceDefinition serviceDefinitionboolean synchronize) {
		if (serviceDefinition == null) {
			throw new IllegalArgumentException("serviceDefinition cannot be null");
		}
		LocalService localService = new LocalService(getInstanceId(), serviceDefinition);
		synchronized () {
			.put(serviceDefinition.getServiceName(), localService);
		}
		if (synchronize) {
		}
		return localService.getEndpoint().getServiceConfiguration();
	}
	public List<ServiceConfigurationpublishServices(List<ServiceDefinitionserviceDefinitionsboolean synchronize) {
		if (serviceDefinitions == null) {
			throw new IllegalArgumentException("serviceDefinitions list cannot be null");
		}
		List<ServiceConfigurationserviceConfigurations = new ArrayList<ServiceConfiguration>();
		synchronized () {
			for (ServiceDefinition serviceDefinition : serviceDefinitions) {
				ServiceConfiguration serviceConfiguration = publishService(serviceDefinitionfalse);
				serviceConfigurations.add(serviceConfiguration);
			}
		}
		if (synchronize) {
		}
		return Collections.unmodifiableList(serviceConfigurations);
	}
	public boolean removeService(QName serviceNameboolean synchronize) {
		if (serviceName == null) {
			throw new IllegalArgumentException("serviceName cannot be null");
		}
		boolean serviceRemoved = false;
		synchronized () {
			LocalService localService = .remove(serviceName);
			serviceRemoved = localService != null;
		}
		if (serviceRemoved && synchronize) {
		}
		return serviceRemoved;
	}
	public List<BooleanremoveServices(List<QNameserviceNamesboolean synchronize) {
		if (serviceNames == null) {
			throw new IllegalArgumentException("serviceNames cannot be null");
		}
		boolean serviceRemoved = false;
		List<BooleanservicesRemoved = new ArrayList<Boolean>();
		synchronized () {
			for (QName serviceName : serviceNames) {
				LocalService localService = .remove(serviceName);
				if (localService != null) {
					servicesRemoved.add(.);
					serviceRemoved = true;
else {
					servicesRemoved.add(.);
				}
			}
		}
		if (serviceRemoved && synchronize) {
		}
		return servicesRemoved;
	}
    protected void synchronizeAndProcess(SyncProcessor processor) {
        if (!isDevMode()) {
			synchronized () {
				List<LocalServicelocalServicesList;
				List<RemoteServiceclientRegistryCacheList;
				synchronized () {
					// first, flatten the lists
					localServicesList = new ArrayList<LocalService>(this..values());
					clientRegistryCacheList = new ArrayList<RemoteService>();
					for (Set<RemoteServiceremoteServices : this..values()) {
						clientRegistryCacheList.addAll(remoteServices);
					}
				}
				CompleteServiceDiff serviceDiff = .diffServices(getInstanceId(), localServicesListclientRegistryCacheList);
                logCompleteServiceDiff(serviceDiff);
                processor.sync(serviceDiff);
            }
        }
    }
	public void synchronize() {
        synchronizeAndProcess(new SyncProcessor() {
            @Override
            public void sync(CompleteServiceDiff diff) {
                RemoteServicesDiff remoteServicesDiff = diff.getRemoteServicesDiff();
				processRemoteServiceDiff(remoteServicesDiff);
				LocalServicesDiff localServicesDiff = diff.getLocalServicesDiff();
				processLocalServiceDiff(localServicesDiff);
            }
        });
	}
    @Override
	public void synchronizeRemoteServices() {
        synchronizeAndProcess(new SyncProcessor() {
            @Override
            public void sync(CompleteServiceDiff diff) {
                RemoteServicesDiff remoteServicesDiff = diff.getRemoteServicesDiff();
				processRemoteServiceDiff(remoteServicesDiff);
            }
        });
	}
    @Override
    public void synchronizeLocalServices() {
        synchronizeAndProcess(new SyncProcessor() {
            @Override
            public void sync(CompleteServiceDiff diff) {
                LocalServicesDiff localServicesDiff = diff.getLocalServicesDiff();
                processLocalServiceDiff(localServicesDiff);
            }
        });
    }
    protected void logCompleteServiceDiff(CompleteServiceDiff serviceDiff) {
        RemoteServicesDiff remoteServicesDiff = serviceDiff.getRemoteServicesDiff();
        int newServices = remoteServicesDiff.getNewServices().size();
        int removedServices = remoteServicesDiff.getRemovedServices().size();
        LocalServicesDiff localServicesDiff = serviceDiff.getLocalServicesDiff();
        int servicesToPublish = localServicesDiff.getLocalServicesToPublish().size();
        int servicesToUpdate = localServicesDiff.getLocalServicesToUpdate().size();
        int servicesToRemove = localServicesDiff.getServicesToRemoveFromRegistry().size();
        if (newServices + removedServices + servicesToPublish + servicesToUpdate + servicesToRemove > 0) {
            .info("Found service changes during synchronization: remoteNewServices=" + newServices +
                    ", remoteRemovedServices=" + removedServices +
                    ", localServicesToPublish=" + servicesToPublish +
                    ", localServicesToUpdate=" + servicesToUpdate +
                    ", localServicesToRemove=" + servicesToRemove);
        }
    }
	protected void processRemoteServiceDiff(RemoteServicesDiff remoteServicesDiff) {
		// note that since there is a gap between when the original services are acquired, the diff, and this subsequent critical section
		// the list of local and client registry services could have changed, so that needs to be considered in the remaining code
		synchronized () {
			// first, let's update what we know about the remote services
			List<RemoteServiceremovedServices = remoteServicesDiff.getRemovedServices();
			for (RemoteService removedRemoteService : removedServices) {
				Set<RemoteServiceremoteServiceSet = this..get(removedRemoteService.getServiceName());
				if (remoteServiceSet != null) {
					boolean wasRemoved = remoteServiceSet.remove(removedRemoteService);
					if (!wasRemoved) {
						.warn("Failed to remove remoteService during synchronization: " + removedRemoteService);
					}
				}
			}
			List<ServiceInfonewServices = remoteServicesDiff.getNewServices();
			for (ServiceInfo newService : newServices) {
				Set<RemoteServiceremoteServiceSet = .get(newService.getServiceName());
				if (remoteServiceSet == null) {
					remoteServiceSet = new HashSet<RemoteService>();
					.put(newService.getServiceName(), remoteServiceSet);
				}
				remoteServiceSet.add(new RemoteService(newServicethis.));
			}
		}
	}
	protected void processLocalServiceDiff(LocalServicesDiff localServicesDiff) {
		List<StringremoveServiceEndpointIds = new ArrayList<String>();
		List<ServiceEndpointpublishServiceEndpoints = new ArrayList<ServiceEndpoint>();
		for (ServiceInfo serviceToRemove : localServicesDiff.getServicesToRemoveFromRegistry()) {
			removeServiceEndpointIds.add(serviceToRemove.getServiceId());
		}
		for (LocalService localService : localServicesDiff.getLocalServicesToPublish()) {
			publishServiceEndpoints.add(localService.getServiceEndpoint());
		}
		for (LocalService localService : localServicesDiff.getLocalServicesToUpdate().keySet()) {
			ServiceInfo registryServiceInfo = localServicesDiff.getLocalServicesToUpdate().get(localService);
			publishServiceEndpoints.add(rebuildServiceEndpointForUpdate(localService.getServiceEndpoint(), registryServiceInfo));
		}
		boolean batchMode = ConfigContext.getCurrentContextConfig().getBooleanProperty(.false);
		if (!batchMode && (!removeServiceEndpointIds.isEmpty() || !publishServiceEndpoints.isEmpty())) {
			RemoveAndPublishResult result = this..removeAndPublish(removeServiceEndpointIdspublishServiceEndpoints);
			// now update the ServiceEndpoints for our local services so we can get the proper id for them
			if (!result.getServicesPublished().isEmpty()) {
				synchronized () {
					for (ServiceEndpoint publishedService : result.getServicesPublished()) {
					}
				}
			}
		}
	}
	protected ServiceEndpoint rebuildServiceEndpointForUpdate(ServiceEndpoint originalEndpointServiceInfo registryServiceInfo) {
		ServiceEndpoint.Builder builder = ServiceEndpoint.Builder.create(originalEndpoint);
		builder.getInfo().setServiceId(registryServiceInfo.getServiceId());
		builder.getInfo().setServiceDescriptorId(registryServiceInfo.getServiceDescriptorId());
		builder.getInfo().setVersionNumber(registryServiceInfo.getVersionNumber());
		builder.getDescriptor().setId(registryServiceInfo.getServiceDescriptorId());
		return builder.build();
	}
	protected void rebuildLocalServiceEndpointAfterPublishing(ServiceEndpoint publishedService) {
		// verify the service is still published
		QName serviceName = publishedService.getInfo().getServiceName();
		if (.containsKey(serviceName)) {
			LocalService newLocalService = new LocalService(.get(serviceName), publishedService);
			.put(serviceNamenewLocalService);
		}
	}
	public void setServiceRegistry(ServiceRegistry serviceRegistry) {
		this. = serviceRegistry;
	}
	public void setDiffCalculator(ServiceRegistryDiffCalculator diffCalculator) {
		this. = diffCalculator;
	}
	public void setServiceExportManager(ServiceExportManager serviceExportManager) {
		this. = serviceExportManager;
	}
	public void setScheduledPool(KSBScheduledPool scheduledPool) {
		this. = scheduledPool;
	}
    private static interface SyncProcessor {
        void sync(CompleteServiceDiff diff);
    }
New to GrepCode? Check out our FAQ X