Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.splout.db.hazelcast;
   * #%L
   * Splout SQL Server
   * %%
   * Copyright (C) 2012 Datasalt Systems S.L.
   * %%
   * This program is free software: you can redistribute it and/or modify
  * it under the terms of the GNU Affero General Public License as published by
  * the Free Software Foundation, either version 3 of the License, or
  * (at your option) any later version.
  * This program is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * GNU General Public License for more details.
  * You should have received a copy of the GNU Affero General Public License
  * along with this program.  If not, see <>.
  * #L%
 import static com.splout.db.hazelcast.HazelcastUtils.getHZAddress;
Creates a registry of members using a com.hazelcast.core.IMap. The user can create a com.hazelcast.core.EntryListener for receiving events from the IMap wich inform of joining or leaving members. Some update events can be generated, because of change in the information, or as cause of some re-registerings that are done for safety.
Main difficulty with the registry is the case of network partitions, or split-brain. In this case, all registry replicas of one member could be lost. It is an improbable case, but possible. For this case, we have a thread that periodically check that the node is registered so assuring eventual consistency. Another design principle of this registry is the idea of trying to minimize network communication between nodes, as that could affect the system scalability.
 public class DistributedRegistry {
 	private final static Log log = LogFactory.getLog(DistributedRegistry.class);
 	private final String registryName;
 	private Object nodeInfo;
 	private final HazelcastInstance hzInstance;
 	private final int minutesToCheckRegister;
 	private final int oldestMembersLeading;
 	private final AtomicBoolean amIRegistered = new AtomicBoolean(false);
 	private final AtomicBoolean disableChecking = new AtomicBoolean(false);
 	private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
 	private final Random random = new Random();
 	public class MyListener implements MembershipListener {
 		public void memberAdded(MembershipEvent membershipEvent) {
 			// Nothing to do. I should be registered. Just in case...
 		public void memberRemoved(MembershipEvent membershipEvent) {
 			synchronized(DistributedRegistry.this) {
 				 * When a node leaves, somebody in the cluster must remove its info from the distributed map. We
 				 * restrict this removal to some of the oldest members, in order to reduce coordination traffic
 				 * while keeping replication
 				    )) {
 					String member = getHZAddress(membershipEvent.getMember());
 					.info("Member " + member + " leaves. Unregistering it from registry [" +  + "]");
			// Just in case...

In the case of a network partition where some data is lost (unprovable, as replication should mitigate that), we could have some members that believe they are registered but they are not. We set a periodical check to test that. We schedule a test each time a new member arrives or leaves to the cluster. But in order to avoid to much checking, only one check can be performed in a period of time. Also, to reduce the herd behavior, we schedule the check randomly in this period.
		private void scheduleCheck() {
			ScheduledFuture<?> checkerFuture = .get();
			if(checkerFuture == null || checkerFuture.isDone()) {
				int seconds = .nextInt(Math.max(1,  * 60));
					public void run() {
						synchronized(DistributedRegistry.this) {
								String member = localMember();
								.info("Checking if registered [" + member + "] ...");
								if(members.get(member) == null) {
									.warn("Detected wrongly unregistered ["
									    + member
									    + "]. Could be due a network partition problem or due to a software bug. Reregistering.");
				}, seconds.));

When a split-brain happens, and two clusters will merge, the member of the smallest cluster are restarted. When we detect that, we reregister the clusters in order to assure that their info is present in the registry distributed map.
This alone does not assure complete coherence in the case of a network partition merge, as members of the bigger cluster could have an incomplete registry of themselves as well.
	public class RestartListener implements LifecycleListener {
		public void stateChanged(LifecycleEvent event) {
			if(event.getState() == .) {
				synchronized(DistributedRegistry.this) {
						.info("Hazelcast RESTARTED event received. Reregistering myself to asure I'm properly registered");
	public DistributedRegistry(String registryNameObject nodeInfoHazelcastInstance hzInstance,
	    int minutesToCheckRegisterint oldestMembersLeading) {
		this. = registryName;
		this. = nodeInfo;
		this. = hzInstance;
		this. = minutesToCheckRegister;
		this. = oldestMembersLeading;
	public synchronized void register() {
		String myself = localMember();
		.info("Registering myself [" + myself + "] on registry [" +  + "]");
	public synchronized void unregister() {
		String myself = localMember();
		.info("Unregistering myself [" + myself + " -> " +  + "] on registry [" + 
		    + "]");
	public synchronized void changeInfo(Object nodeInfo) {
		// Changing memory information. Needed for future reregistration
		this. = nodeInfo
		String myself = localMember();
		.info("Changing my info [" + myself + "] on registry [" +  + "]");

Enables or disable preventive registration checking.
	protected void disableChecking(boolean disable) {
	private String localMember() {
	public void dumpRegistry() {
		..println("Registry [" +  + "] {");
		for(Entry<StringObjectentry : members.entrySet()) {
			..println("\t" + entry.getKey() + " -> " + entry.getValue());
New to GrepCode? Check out our FAQ X