Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.apache.helix.messaging;
  
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
 
 public class DefaultMessagingService implements ClusterMessagingService {
   private final HelixManager _manager;
   private final CriteriaEvaluator _evaluator;
   private final HelixTaskExecutor _taskExecutor;
   // TODO:rename to factory, this is not a service
   private static Logger _logger = Logger.getLogger(DefaultMessagingService.class);
 
   public DefaultMessagingService(HelixManager manager) {
      = manager;
      = new CriteriaEvaluator();
      = new HelixTaskExecutor();
         );
   }
 
   @Override
   public int send(Criteria recipientCriteriafinal Message messageTemplate) {
     return send(recipientCriteriamessageTemplatenull, -1);
   }
 
   @Override
   public int send(final Criteria recipientCriteriafinal Message message,
       AsyncCallback callbackOnReplyint timeOut) {
     return send(recipientCriteriamessagecallbackOnReplytimeOut, 0);
   }
 
   @Override
   public int send(final Criteria recipientCriteriafinal Message message,
       AsyncCallback callbackOnReplyint timeOutint retryCount) {
     Map<InstanceTypeList<Message>> generateMessage = generateMessage(recipientCriteriamessage);
     int totalMessageCount = 0;
     for (List<Messagemessages : generateMessage.values()) {
       totalMessageCount += messages.size();
     }
     .info("Send " + totalMessageCount + " messages with criteria " + recipientCriteria);
     if (totalMessageCount == 0) {
       return 0;
     }
     String correlationId = null;
     if (callbackOnReply != null) {
       int totalTimeout = timeOut * (retryCount + 1);
       if (totalTimeout < 0) {
         totalTimeout = -1;
       }
       callbackOnReply.setTimeout(totalTimeout);
       correlationId = UUID.randomUUID().toString();
      for (List<Messagemessages : generateMessage.values()) {
        callbackOnReply.setMessagesSent(messages);
      }
      .registerAsyncCallback(correlationIdcallbackOnReply);
    }
    for (InstanceType receiverType : generateMessage.keySet()) {
      List<Messagelist = generateMessage.get(receiverType);
      for (Message tempMessage : list) {
        tempMessage.setRetryCount(retryCount);
        tempMessage.setExecutionTimeout(timeOut);
        tempMessage.setSrcInstanceType(.getInstanceType());
        if (correlationId != null) {
          tempMessage.setCorrelationId(correlationId);
        }
        HelixDataAccessor accessor = .getHelixDataAccessor();
        Builder keyBuilder = accessor.keyBuilder();
        if (receiverType == .) {
          accessor.setProperty(keyBuilder.controllerMessage(tempMessage.getId()), tempMessage);
        }
        if (receiverType == .) {
          accessor.setProperty(keyBuilder.message(tempMessage.getTgtName(), tempMessage.getId()),
              tempMessage);
        }
      }
    }
    if (callbackOnReply != null) {
      // start timer if timeout is set
      callbackOnReply.startTimer();
    }
    return totalMessageCount;
  }
  public Map<InstanceTypeList<Message>> generateMessage(final Criteria recipientCriteria,
      final Message message) {
    Map<InstanceTypeList<Message>> messagesToSendMap = new HashMap<InstanceTypeList<Message>>();
    InstanceType instanceType = recipientCriteria.getRecipientInstanceType();
    if (instanceType == .) {
      List<Messagemessages = generateMessagesForController(message);
      messagesToSendMap.put(.messages);
      // _dataAccessor.setControllerProperty(PropertyType.MESSAGES,
      // newMessage.getRecord(), CreateMode.PERSISTENT);
    } else if (instanceType == .) {
      List<Messagemessages = new ArrayList<Message>();
      List<Map<StringString>> matchedList =
          .evaluateCriteria(recipientCriteria);
      if (!matchedList.isEmpty()) {
        Map<StringStringsessionIdMap = new HashMap<StringString>();
        if (recipientCriteria.isSessionSpecific()) {
          HelixDataAccessor accessor = .getHelixDataAccessor();
          Builder keyBuilder = accessor.keyBuilder();
          List<LiveInstanceliveInstances = accessor.getChildValues(keyBuilder.liveInstances());
          for (LiveInstance liveInstance : liveInstances) {
            sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getTypedSessionId()
                .stringify());
          }
        }
        for (Map<StringStringmap : matchedList) {
          MessageId id = MessageId.from(UUID.randomUUID().toString());
          Message newMessage = new Message(message.getRecord(), id);
          String srcInstanceName = .getInstanceName();
          String tgtInstanceName = map.get("instanceName");
          // Don't send message to self
          if (recipientCriteria.isSelfExcluded()
              && srcInstanceName.equalsIgnoreCase(tgtInstanceName)) {
            continue;
          }
          newMessage.setSrcName(srcInstanceName);
          newMessage.setTgtName(tgtInstanceName);
          newMessage.setResourceId(ResourceId.from(map.get("resourceName")));
          newMessage.setPartitionId(PartitionId.from(map.get("partitionName")));
          if (recipientCriteria.isSessionSpecific()) {
            newMessage.setTgtSessionId(SessionId.from(sessionIdMap.get(tgtInstanceName)));
          }
          messages.add(newMessage);
        }
        messagesToSendMap.put(.messages);
      }
    }
    return messagesToSendMap;
  }
    List<Messagemessages = new ArrayList<Message>();
    MessageId id = MessageId.from(UUID.randomUUID().toString());
    Message newMessage = new Message(message.getRecord(), id);
    newMessage.setMessageId(id);
    newMessage.setSrcName(.getInstanceName());
    newMessage.setTgtName("Controller");
    messages.add(newMessage);
    return messages;
  }
  public synchronized void registerMessageHandlerFactory(String typeMessageHandlerFactory factory) {
    if (.isConnected()) {
      registerMessageHandlerFactoryInternal(typefactory);
    } else {
      .put(typefactory);
    }
  }
  public synchronized void onConnected() {
    for (String type : .keySet()) {
    }
  }
    .info("registering msg factory for type " + type);
    int threadpoolSize = .;
    String threadpoolSizeStr = null;
    String key = type + "." + .;
    ConfigAccessor configAccessor = .getConfigAccessor();
    if (configAccessor != null) {
      ConfigScope scope = null;
      // Read the participant config and cluster config for the per-message type thread pool size.
      // participant config will override the cluster config.
        scope =
            new ConfigScopeBuilder().forCluster(.getClusterName())
                .forParticipant(.getInstanceName()).build();
        threadpoolSizeStr = configAccessor.get(scopekey);
      }
      if (threadpoolSizeStr == null) {
        scope = new ConfigScopeBuilder().forCluster(.getClusterName()).build();
        threadpoolSizeStr = configAccessor.get(scopekey);
      }
    }
    if (threadpoolSizeStr != null) {
      try {
        threadpoolSize = Integer.parseInt(threadpoolSizeStr);
        if (threadpoolSize <= 0) {
          threadpoolSize = 1;
        }
      } catch (Exception e) {
        .error(""e);
      }
    }
    .registerMessageHandlerFactory(typefactorythreadpoolSize);
    // Self-send a no-op message, so that the onMessage() call will be invoked
    // again, and
    // we have a chance to process the message that we received with the new
    // added MessageHandlerFactory
    // before the factory is added.
  }
  public void sendNopMessage() {
    if (.isConnected()) {
      try {
        Message nopMsg =
            new Message(., MessageId.from(UUID.randomUUID().toString()));
        nopMsg.setSrcName(.getInstanceName());
        HelixDataAccessor accessor = .getHelixDataAccessor();
        Builder keyBuilder = accessor.keyBuilder();
            || .getInstanceType() == .) {
          nopMsg.setTgtName("Controller");
          accessor.setProperty(keyBuilder.controllerMessage(nopMsg.getId()), nopMsg);
        }
            || .getInstanceType() == .) {
          nopMsg.setTgtName(.getInstanceName());
          accessor.setProperty(keyBuilder.message(nopMsg.getTgtName(), nopMsg.getId()), nopMsg);
        }
      } catch (Exception e) {
        .error(e);
      }
    }
  }
    return ;
  }
  public int sendAndWait(Criteria receipientCriteriaMessage messageAsyncCallback asyncCallback,
      int timeOutint retryCount) {
    int messagesSent = send(receipientCriteriamessageasyncCallbacktimeOutretryCount);
    if (messagesSent > 0) {
      while (!asyncCallback.isDone() && !asyncCallback.isTimedOut()) {
        synchronized (asyncCallback) {
          try {
            asyncCallback.wait();
          } catch (InterruptedException e) {
            .error(e);
            asyncCallback.setInterrupted(true);
            break;
          }
        }
      }
    } else {
      .warn("No messages sent. For Criteria:" + receipientCriteria);
    }
    return messagesSent;
  }
  public int sendAndWait(Criteria recipientCriteriaMessage messageAsyncCallback asyncCallback,
      int timeOut) {
    return sendAndWait(recipientCriteriamessageasyncCallbacktimeOut, 0);
  }
New to GrepCode? Check out our FAQ X