Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package brooklyn.policy.followthesun;
  
  import java.util.List;
  import java.util.Map;
  import java.util.Set;
  
 
 
 
 // TODO: extract interface
 public class FollowTheSunStrategy<ContainerType extends Entity, ItemType extends Movable> {
     
     // This is a modified version of the InterGeographyLatencyPolicy (aka Follow-The-Sun) policy from Monterey v3.
     
     // TODO location constraints
     
     private static final Logger LOG = LoggerFactory.getLogger(FollowTheSunStrategy.class);
     
     private final FollowTheSunParameters parameters;
     private final FollowTheSunModel<ContainerType,ItemType> model;
     private final String name;
     
     public FollowTheSunStrategy(FollowTheSunModel<ContainerType,ItemType> modelFollowTheSunParameters parameters) {
         this. = model;
         this. = parameters;
         this. = model.getName();
     }
     
     public void rebalance() {
         try {
             Set<ItemType> items = .getItems();
             Map<ItemType, Map<LocationDouble>> directSendsToItemByLocation = .getDirectSendsToItemByLocation();
             
             for (ItemType item : items) {
                 String itemName = .getName(item);
                 Location activeLocation = .getItemLocation(item);
                 ContainerType activeContainer = .getItemContainer(item);
                 Map<LocationDoublesendsByLocation = directSendsToItemByLocation.get(item);
                 if (sendsByLocation == nullsendsByLocation = Collections.emptyMap();
                 
                 if (..contains(activeLocation)) {
                     if (.isTraceEnabled()) .trace("Ignoring segment {} as it is in {}"itemNameactiveLocation);
                     continue;
                 }
                 if (!.isItemMoveable(item)) {
                     if (.isDebugEnabled()) .debug("POLICY {} skipping any migration of {}, it is not moveable"itemName);
                     continue;
                 }
                 if (.hasActiveMigration(item)) {
                     .info("POLICY {} skipping any migration of {}, it is involved in an active migration already"itemName);
                     continue;
                 }
                 
                 double total = DefaultFollowTheSunModel.sum(sendsByLocation.values());
 
                 if (.isTraceEnabled()) .trace("POLICY {} detected {} msgs/sec in {}, split up as: {}"new Object[] {totalitemNamesendsByLocation});
                 
                 Double current = sendsByLocation.get(activeLocation);
                 if (current == nullcurrent=0d;
                 List<WeightedObject<Location>> locationsWtd = new ArrayList<WeightedObject<Location>>();
                 if (total > 0) {
                     for (Map.Entry<LocationDoubleentry : sendsByLocation.entrySet()) {
                         Location l = entry.getKey();
                         Double d = entry.getValue();
                         if (d > currentlocationsWtd.add(new WeightedObject<Location>(ld));
                     }
                 }
                 Collections.sort(locationsWtd);
                 Collections.reverse(locationsWtd);
                 
                 double highestMsgRate = -1;
                 Location highestLocation = null;
                 ContainerType optimalContainerInHighest = null;
                 while (!locationsWtd.isEmpty()) {
                     WeightedObject<LocationweightedObject = locationsWtd.remove(0);
                     highestMsgRate = weightedObject.getWeight();
                     highestLocation = weightedObject.getObject();
                     optimalContainerInHighest = findOptimal(.getAvailableContainersFor(itemhighestLocation));
                     if (optimalContainerInHighest != null) {
                         break;
                     }
                 }
                 if (optimalContainerInHighest == null) {
                     if (.isDebugEnabled()) .debug("POLICY {} detected {} is already in optimal permitted location ({} of {} msgs/sec)"new Object[] {itemNamehighestMsgRatetotal});
                     continue;                   
                 }
                 
                 double nextHighestMsgRate = -1;
                 ContainerType optimalContainerInNextHighest = null;
                 while (!locationsWtd.isEmpty()) {
                    WeightedObject<LocationweightedObject = locationsWtd.remove(0);
                    nextHighestMsgRate = weightedObject.getWeight();
                    Location nextHighestLocation = weightedObject.getObject();
                    optimalContainerInNextHighest = findOptimal(.getAvailableContainersFor(itemnextHighestLocation));
                    if (optimalContainerInNextHighest != null) {
                        break;
                    }
                }
                if (optimalContainerInNextHighest == null) {
                    nextHighestMsgRate = current;
                }
                
                if (.isTriggered(highestMsgRatetotalnextHighestMsgRatecurrent)) {
                    .info("POLICY "++" detected "+itemName+" should be in location "+highestLocation+" on "+optimalContainerInHighest+" ("+highestMsgRate+" of "+total+" msgs/sec), migrating");
                    try {
                        if (activeContainer.equals(optimalContainerInHighest)) {
                            //shouldn't happen
                            .warn("POLICY "++" detected "+itemName+" should move to "+optimalContainerInHighest+" ("+highestMsgRate+" of "+total+" msgs/sec) but it is already there with "+current+" msgs/sec");
                        } else {
                            item.move(optimalContainerInHighest);
                            .onItemMoved(itemoptimalContainerInHighest);
                        }
                    } catch (Exception e) {
                        .warn("POLICY "++" detected "+itemName+" should be on "+optimalContainerInHighest+", but can't move it: "+ee);
                    }
                } else {
                    if (.isTraceEnabled()) .trace("POLICY "++" detected "+itemName+" need not move to "+optimalContainerInHighest+" ("+highestMsgRate+" of "+total+" msgs/sec not much better than "+current+" at "+activeContainer+")");
                }
            }
        } catch (Exception e) {
            .warn("Error in policy "++" (ignoring): "+ee);
        }
    }
    private ContainerType findOptimal(Collection<ContainerType> contenders) {
        /*
         * TODO should choose the least loaded mediator. Currently chooses first available, and relies 
         * on a load-balancer to move it again; would be good if these could share decision code so move 
         * it to the right place immediately. e.g.
         *   policyUtil.findLeastLoadedMediator(nodesInLocation);
         */
        return (contenders.isEmpty() ? null : Iterables.get(contenders, 0));
    }
New to GrepCode? Check out our FAQ X