Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.apache.helix.controller.stages;
  
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 import java.util.List;
 import java.util.Map;
 
 
 public class TaskAssignmentStage extends AbstractBaseStage {
   private static Logger logger = Logger.getLogger(TaskAssignmentStage.class);
 
   @Override
   public void process(ClusterEvent eventthrows Exception {
     long startTime = System.currentTimeMillis();
     .info("START TaskAssignmentStage.process()");
 
     HelixManager manager = event.getAttribute("helixmanager");
     Map<ResourceIdResourceConfigresourceMap =
         event.getAttribute(..toString());
     BestPossibleStateOutput bestPossibleStateOutput =
     Cluster cluster = event.getAttribute("ClusterDataCache");
     Map<ParticipantIdParticipantliveParticipantMap = cluster.getLiveParticipantMap();
 
     if (manager == null || resourceMap == null || messageOutput == null || cluster == null
         || liveParticipantMap == null) {
       throw new StageException(
           "Missing attributes in event:"
               + event
               + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|BEST_POSSIBLE_STATE|DataCache|liveInstanceMap");
     }
 
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
     List<MessagemessagesToSend = new ArrayList<Message>();
     for (ResourceId resourceId : resourceMap.keySet()) {
       for (PartitionId partitionId : bestPossibleStateOutput.getResourceAssignment(resourceId)
           .getMappedPartitionIds()) {
         List<Messagemessages = messageOutput.getMessages(resourceIdpartitionId);
         messagesToSend.addAll(messages);
       }
     }
 
     List<MessageoutputMessages =
         batchMessage(dataAccessor.keyBuilder(), messagesToSendresourceMapliveParticipantMap,
             manager.getProperties());
     sendMessages(dataAccessoroutputMessages);
 
     long endTime = System.currentTimeMillis();
     .info("END TaskAssignmentStage.process(). took: " + (endTime - startTime) + " ms");
 
   }
 
   List<MessagebatchMessage(Builder keyBuilderList<Messagemessages,
       Map<ResourceIdResourceConfigresourceMap,
       Map<ParticipantIdParticipantliveParticipantMapHelixManagerProperties properties) {
     // group messages by its CurrentState path + "/" + fromState + "/" + toState
     Map<StringMessagebatchMessages = new HashMap<StringMessage>();
     List<MessageoutputMessages = new ArrayList<Message>();
 
     Iterator<Messageiter = messages.iterator();
     while (iter.hasNext()) {
       Message message = iter.next();
       ResourceId resourceId = message.getResourceId();
      ResourceConfig resource = resourceMap.get(resourceId);
      ParticipantId participantId = ParticipantId.from(message.getTgtName());
      Participant liveParticipant = liveParticipantMap.get(participantId);
      String participantVersion = null;
      if (liveParticipant != null) {
        participantVersion = liveParticipant.getRunningInstance().getVersion().toString();
      }
      if (resource == null || !resource.getBatchMessageMode() || participantVersion == null
          || !properties.isFeatureSupported("batch_message"participantVersion)) {
        outputMessages.add(message);
        continue;
      }
      String key =
          keyBuilder.currentState(message.getTgtName(), message.getTypedTgtSessionId().stringify(),
              message.getResourceId().stringify()).getPath()
              + "/" + message.getTypedFromState() + "/" + message.getTypedToState();
      if (!batchMessages.containsKey(key)) {
        Message batchMessage = new Message(message.getRecord());
        batchMessage.setBatchMessageMode(true);
        outputMessages.add(batchMessage);
        batchMessages.put(keybatchMessage);
      }
      batchMessages.get(key).addPartitionName(message.getPartitionId().stringify());
    }
    return outputMessages;
  }
  protected void sendMessages(HelixDataAccessor dataAccessorList<Messagemessages) {
    if (messages == null || messages.isEmpty()) {
      return;
    }
    Builder keyBuilder = dataAccessor.keyBuilder();
    List<PropertyKeykeys = new ArrayList<PropertyKey>();
    for (Message message : messages) {
      .info("Sending Message " + message.getMessageId() + " to " + message.getTgtName()
          + " transit " + message.getPartitionId() + "|" + message.getPartitionIds() + " from:"
          + message.getTypedFromState() + " to:" + message.getTypedToState());
      // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to "
      // + message.getTgtName() + " transit " + message.getPartitionId() + "|"
      // + message.getPartitionIds() + " from: " + message.getFromState() + " to: "
      // + message.getToState());
      keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
    }
    dataAccessor.createChildren(keysnew ArrayList<Message>(messages));
  }
New to GrepCode? Check out our FAQ X