Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.uberfire.io.impl.cluster.helix;
  
  import java.util.HashMap;
  import java.util.Map;
 
 import  org.uberfire.commons.cluster.ClusterService;
 import  org.uberfire.commons.data.Pair;
 import  org.uberfire.commons.message.AsyncCallback;
 import  org.uberfire.commons.message.MessageHandlerResolver;
 import  org.uberfire.commons.message.MessageType;
 
 import static java.util.Arrays.*;
 import static java.util.UUID.*;
 import static org.apache.helix.HelixManagerFactory.*;
 
 public class ClusterServiceHelix implements ClusterService {
 
     private static final Logger logger = LoggerFactory.getLoggerClusterServiceHelix.class );
 
     private final String clusterName;
     private final String instanceName;
     private final HelixManager participantManager;
     private final String resourceName;
     private final Map<String, MessageHandlerResolver> messageHandlerResolver = new ConcurrentHashMap<String, MessageHandlerResolver>();
     private final AtomicBoolean started = new AtomicBooleanfalse );
     private final Collection<RunnableonStart = new ArrayList<Runnable>();
 
     private final SimpleLock lock = new SimpleLock();
     private final AtomicInteger stackSize = new AtomicInteger( 0 );
 
     public ClusterServiceHelixfinal String clusterName,
                                 final String zkAddress,
                                 final String instanceName,
                                 final String resourceName,
                                 final MessageHandlerResolver messageHandlerResolver ) {
         this. = clusterName;
         this. = instanceName;
         this. = resourceName;
         this..put( messageHandlerResolver.getServiceId(), messageHandlerResolver );
 
         this. = getZKHelixManagerclusterNameinstanceName.zkAddress );
     }
 
     //TODO {porcelli} quick hack for now, the real solution would have a cluster per repo
     @Override
     public void addMessageHandlerResolverfinal MessageHandlerResolver resolver ) {
         this..put( resolver.getServiceId(), resolver );
     }
 
     @Override
     public void start() {
         if ( isStarted() ) {
             return;
         }
         try {
             this..connect();
             disablePartition();
             this..getStateMachineEngine().registerStateModelFactory"LeaderStandby"new LockTransitionalFactory ) );
             this..settrue );
             for ( final Runnable runnable :  ) {
                 runnable.run();
             }
         } catch ( final Exception ex ) {
             throw new RuntimeExceptionex );
         }
     }
 
     public boolean isStarted() {
         return .get();
     }
 
     @Override
     public void dispose() {
         if ( this. != null && this..isConnected() ) {
             this..disconnect();
         }
     }
 
     @Override
     public void onStartRunnable runnable ) {
         this..addrunnable );
     }
    @Override
    public boolean isInnerLocked() {
        return .get() > 1;
    }
    private void enablePartition() {
        if ( !isStarted() ) {
            return;
        }
    }
    private void disablePartition() {
        if ( !isStarted() ) {
            return;
        }
    }
    @Override
    public void lock() {
        if ( !isStarted() ) {
            return;
        }
        .incrementAndGet();
        if ( .isLocked() ) {
            return;
        }
        enablePartition();
        while ( !.isLocked() ) {
            try {
                Thread.sleep( 10 );
            } catch ( final InterruptedException ignored ) {
            }
        }
    }
    @Override
    public void unlock() {
        if ( !isStarted() ) {
            return;
        }
        .decrementAndGet();
        if ( !.isLocked() ) {
            .set( 0 );
            return;
        }
        if ( .get() == 0 ) {
            disablePartition();
            while ( .isLocked() ) {
                try {
                    Thread.sleep( 10 );
                } catch ( InterruptedException e ) {
                }
            }
        }
    }
    @Override
    public boolean isLocked() {
        if ( !isStarted() ) {
            return true;
        }
        return .isLocked();
    }
    @Override
    public void broadcastAndWaitfinal String serviceId,
                                  final MessageType type,
                                  final Map<StringStringcontent,
                                  int timeOut ) {
        if ( !isStarted() ) {
            return;
        }
        .getMessagingService().sendAndWaitbuildCriteria(), buildMessageserviceIdtypecontent ), new org.apache.helix.messaging.AsyncCallbacktimeOut ) {
            @Override
            public void onTimeOut() {
            }
            @Override
            public void onReplyMessagefinal Message message ) {
            }
        }, timeOut );
    }
    @Override
    public void broadcastAndWaitfinal String serviceId,
                                  final MessageType type,
                                  final Map<StringStringcontent,
                                  final int timeOut,
                                  final AsyncCallback callback ) {
        if ( !isStarted() ) {
            return;
        }
        int msg = .getMessagingService().sendAndWaitbuildCriteria(), buildMessageserviceIdtypecontent ), new org.apache.helix.messaging.AsyncCallback() {
            @Override
            public void onTimeOut() {
                callback.onTimeOut();
            }
            @Override
            public void onReplyMessagefinal Message message ) {
                final MessageType type = buildMessageTypeFromReplymessage );
                final Map<StringStringmap = getMessageContentFromReplymessage );
                callback.onReply( typemap );
            }
        }, timeOut );
        if ( msg == 0 ) {
            callback.onTimeOut();
        }
    }
    @Override
    public void broadcastfinal String serviceId,
                           final MessageType type,
                           final Map<StringStringcontent ) {
        if ( !isStarted() ) {
            return;
        }
        .getMessagingService().sendbuildCriteria(), buildMessageserviceIdtypecontent ) );
    }
    @Override
    public void broadcastfinal String serviceId,
                           final MessageType type,
                           final Map<StringStringcontent,
                           final int timeOut,
                           final AsyncCallback callback ) {
        if ( !isStarted() ) {
            return;
        }
        .getMessagingService().sendbuildCriteria(), buildMessageserviceIdtypecontent ), new org.apache.helix.messaging.AsyncCallback() {
            @Override
            public void onTimeOut() {
                callback.onTimeOut();
            }
            @Override
            public void onReplyMessagefinal Message message ) {
                final MessageType type = buildMessageTypeFromReplymessage );
                final Map<StringStringmap = getMessageContentmessage );
                callback.onReply( typemap );
            }
        }, timeOut );
    }
    @Override
    public void sendTofinal String serviceId,
                        final String resourceId,
                        final MessageType type,
                        final Map<StringStringcontent ) {
        if ( !isStarted() ) {
            return;
        }
        .getMessagingService().sendbuildCriteriaresourceId ), buildMessageserviceIdtypecontent ) );
    }
    private Criteria buildCriteriafinal String resourceId ) {
        return new Criteria() {{
            setInstanceNameresourceId );
            setResource );
            setSelfExcludedtrue );
            setSessionSpecifictrue );
        }};
    }
    private Criteria buildCriteria() {
        return buildCriteria"%" );
    }
    private Message buildMessagefinal String serviceId,
                                  final MessageType type,
                                  final Map<StringStringcontent ) {
        return new Message..randomUUID().toString() ) {{
            setMsgState.. );
            getRecord().setMapField"content"content );
            getRecord().setSimpleField"serviceId"serviceId );
            getRecord().setSimpleField"type"type.toString() );
            getRecord().setSimpleField"origin" );
        }};
    }
        MessageHandlerFactory convert() {
            return new MessageHandlerFactory() {
                @Override
                public MessageHandler createHandlerfinal Message message,
                                                     final NotificationContext context ) {
                    return new MessageHandlermessagecontext ) {
                        @Override
                        public HelixTaskResult handleMessage() throws InterruptedException {
                            try {
                                final String serviceId = .getRecord().getSimpleField"serviceId" );
                                final MessageType type = buildMessageType.getRecord().getSimpleField"type" ) );
                                final Map<StringStringmap = getMessageContent );
                                final Pair<MessageType, Map<StringString>> result = .get( serviceId ).resolveHandler( serviceIdtype ).handleMessage( typemap );
                                if ( result == null ) {
                                    return new HelixTaskResult() {{
                                        setSuccesstrue );
                                    }};
                                }
                                return new HelixTaskResult() {{
                                    setSuccesstrue );
                                    getTaskResultMap().put"serviceId"serviceId );
                                    getTaskResultMap().put"type"result.getK1().toString() );
                                    getTaskResultMap().put"origin" );
                                    for ( Map.Entry<StringStringentry : result.getK2().entrySet() ) {
                                        getTaskResultMap().putentry.getKey(), entry.getValue() );
                                    }
                                }};
                            } catch ( final Throwable e ) {
                                .error"Error while processing cluster message"e );
                                return new HelixTaskResult() {{
                                    setSuccessfalse );
                                    setMessagee.getMessage() );
                                    setExceptionnew RuntimeExceptione ) );
                                }};
                            }
                        }
                        @Override
                        public void onErrorfinal Exception e,
                                             final ErrorCode code,
                                             final ErrorType type ) {
                        }
                    };
                }
                @Override
                public String getMessageType() {
                    return ...toString();
                }
                @Override
                public void reset() {
                }
            };
        }
    }
    private MessageType buildMessageTypefinal String _type ) {
        if ( _type == null ) {
            return null;
        }
        MessageType type;
        try {
            type = ClusterMessageType.valueOf_type );
        } catch ( Exception ex ) {
            type = new MessageType() {
                @Override
                public String toString() {
                    return _type;
                }
                @Override
                public int hashCode() {
                    return _type.hashCode();
                }
            };
        }
        return type;
    }
    private MessageType buildMessageTypeFromReplyMessage message ) {
        final Map<StringStringresult = message.getRecord().getMapField...toString() );
        return buildMessageTyperesult.get"type" ) );
    }
    private Map<StringStringgetMessageContentfinal Message message ) {
        return message.getRecord().getMapField"content" );
    }
    private Map<StringStringgetMessageContentFromReplyfinal Message message ) {
        return new HashMap<StringString>() {{
            for ( final Map.Entry<StringStringfield : message.getRecord().getMapField...toString() ).entrySet() ) {
                if ( !field.getKey().equals"serviceId" ) && !field.getKey().equals"origin" ) && !field.getKey().equals"type" ) ) {
                    putfield.getKey(), field.getValue() );
                }
            }
        }};
    }
New to GrepCode? Check out our FAQ X