Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package brooklyn.policy.loadbalancing;
  
  import java.util.Map;
  import java.util.Set;
 
 
 
Standard implementation of BalanceablePoolModel, providing essential arithmetic for item and container workrates and thresholds. See subclasses for specific requirements for migrating items.
 
 public class DefaultBalanceablePoolModel<ContainerType, ItemType> implements BalanceablePoolModel<ContainerType, ItemType> {
     
     private static final Logger LOG = LoggerFactory.getLogger(DefaultBalanceablePoolModel.class);
     
     /*
      * Performance comments.
      *  - Used hprof with LoadBalancingPolicySoakTest.testLoadBalancingManyManyItemsTest (1000 items)
      *  - Prior to adding containerToItems, it created a new set by iterating over all items.
      *    This was the biggest percentage of any brooklyn code.
      *    Hence it's worth duplicating the values, keyed by item and keyed by container.
      *  - Unfortunately changing threading model (so have a "rebalancer" thread, and a thread that 
      *    processes events to update the model), get ConcurrentModificationException if don't take
      *    copy of containerToItems.get(node)...
      */
     
     // Concurrent maps cannot have null value; use this to represent when no container is supplied for an item 
     private static final String NULL_CONTAINER = "null-container";
     
     private final String name;
     private final Set<ContainerType> containers = Collections.newSetFromMap(new ConcurrentHashMap<ContainerType,Boolean>());
     private final Map<ContainerType, DoublecontainerToLowThreshold = new ConcurrentHashMap<ContainerType, Double>();
     private final Map<ContainerType, DoublecontainerToHighThreshold = new ConcurrentHashMap<ContainerType, Double>();
     private final Map<ItemType, ContainerType> itemToContainer = new ConcurrentHashMap<ItemType, ContainerType>();
     private final SetMultimap<ContainerType, ItemType> containerToItems =  Multimaps.synchronizedSetMultimap(HashMultimap.<ContainerType, ItemType>create());
     private final Map<ItemType, DoubleitemToWorkrate = new ConcurrentHashMap<ItemType, Double>();
     private final Set<ItemType> immovableItems = Collections.newSetFromMap(new ConcurrentHashMap<ItemType, Boolean>());
     
     private volatile double poolLowThreshold = 0;
     private volatile double poolHighThreshold = 0;
     private volatile double currentPoolWorkrate = 0;
     
     public DefaultBalanceablePoolModel(String name) {
         this. = name;
     }
     
     public ContainerType getParentContainer(ItemType item) {
         ContainerType result = .get(item);
         return (result != ) ? result : null;
     }
     
     public Set<ItemType> getItemsForContainer(ContainerType node) {
         Set<ItemType> result = .get(node);
         synchronized () {
             return (result != null) ? ImmutableSet.copyOf(result) : Collections.<ItemType>emptySet();
         }
     }
     
     public Double getItemWorkrate(ItemType item) {
         return .get(item);
     }
     
     @Override public double getPoolLowThreshold() { return ; }
     @Override public double getPoolHighThreshold() { return ; }
     @Override public double getCurrentPoolWorkrate() { return ; }
     @Override public boolean isHot() { return !.isEmpty() &&  > ; }
     @Override public boolean isCold() { return !.isEmpty() &&  < ; }
     
     
     // Provider methods.
     
     @Override public String getName() { return ; }
     @Override public int getPoolSize() { return .size(); }
     @Override public Set<ContainerType> getPoolContents() { return ; }
     @Override public String getName(ContainerType container) { return container.toString(); } // TODO: delete?
     @Override public Location getLocation(ContainerType container) { return null; } // TODO?
     
     @Override public double getLowThreshold(ContainerType container) {
         Double result = .get(container);
         return (result != null) ? result : -1;
     }
     
     @Override public double getHighThreshold(ContainerType container) {
         Double result = .get(container);
         return (result != null) ? result : -1;
    }
    
    @Override public double getTotalWorkrate(ContainerType container) {
        double totalWorkrate = 0;
        for (ItemType item : getItemsForContainer(container)) {
            Double workrate = .get(item);
            if (workrate != null)
                totalWorkrate += Math.abs(workrate);
        }
        return totalWorkrate;
    }
    
    @Override public Map<ContainerType, DoublegetContainerWorkrates() {
        Map<ContainerType, Doubleresult = new LinkedHashMap<ContainerType, Double>();
        for (ContainerType node : )
            result.put(nodegetTotalWorkrate(node));
        return result;
    }
    
    @Override public Map<ItemType, DoublegetItemWorkrates(ContainerType node) {
        Map<ItemType, Doubleresult = new LinkedHashMap<ItemType, Double>();
        for (ItemType item : getItemsForContainer(node))
            result.put(item.get(item));
        return result;
    }
    
    @Override public boolean isItemMoveable(ItemType item) {
        // If don't know about item, then assume not movable; otherwise has this item been explicitly flagged as immovable?
        return .containsKey(item) && !.contains(item);
    }
    
    @Override public boolean isItemAllowedIn(ItemType itemLocation location) {
        return true// TODO?
    }
    
    
    // Mutators.
    
    @Override
    public void onItemMoved(ItemType item, ContainerType newNode) {
        if (!.containsKey(item)) {
            // Item may have been deleted; order of events received from different sources 
            // (i.e. item itself and for itemGroup membership) is non-deterministic.
            .info("Balanceable pool model ignoring onItemMoved for unknown item {} to container {}; " +
            		"if onItemAdded subsequently received will get new container then"itemnewNode);
            return;
        }
        ContainerType newNodeNonNull = toNonNullContainer(newNode);
        ContainerType oldNode = .put(itemnewNodeNonNull);
        if (oldNode != null && oldNode != .remove(oldNodeitem);
        if (newNode != null.put(newNodeitem);
    }
    
    @Override
    public void onContainerAdded(ContainerType newContainerdouble lowThresholddouble highThreshold) {
        boolean added = .add(newContainer);
        if (!added) {
            // See LoadBalancingPolicy.onContainerAdded for possible explanation of why can get duplicate calls
            .debug("Duplicate container-added event for {}; ignoring"newContainer);
            return;
        }
        .put(newContainerlowThreshold);
        .put(newContainerhighThreshold);
         += lowThreshold;
         += highThreshold;
    }
    
    @Override
    public void onContainerRemoved(ContainerType oldContainer) {
        .remove(oldContainer);
        Double containerLowThreshold = .remove(oldContainer);
        Double containerHighThresold = .remove(oldContainer);
         -= (containerLowThreshold != null ? containerLowThreshold : 0);
         -= (containerHighThresold != null ? containerHighThresold : 0);
        
        // TODO: assert no orphaned items
    }
    
    @Override
    public void onItemAdded(ItemType item, ContainerType parentContainer) {
        onItemAdded(itemparentContainerfalse);
    }
    
    @Override
    public void onItemAdded(ItemType item, ContainerType parentContainerboolean immovable) {
        // Duplicate calls to onItemAdded do no harm, as long as most recent is most accurate!
        // Important that it stays that way for now - See LoadBalancingPolicy.onContainerAdded for explanation.
        if (immovable)
            .add(item);
        
        ContainerType parentContainerNonNull = toNonNullContainer(parentContainer);
        ContainerType oldNode = .put(itemparentContainerNonNull);
        if (oldNode != null && oldNode != .remove(oldNodeitem);
        if (parentContainer != null.put(parentContaineritem);
    }
    
    @Override
    public void onItemRemoved(ItemType item) {
        ContainerType oldNode = .remove(item);
        if (oldNode != null && oldNode != .remove(oldNodeitem);
        Double workrate = .remove(item);
        if (workrate != null)
             -= workrate;
        .remove(item);
    }
    
    @Override
    public void onItemWorkrateUpdated(ItemType itemdouble newValue) {
        if (hasItem(item)) {
            Double oldValue = .put(itemnewValue);
            double delta = ( newValue - (oldValue != null ? oldValue : 0) );
             += delta;
        } else {
            // Can happen when item removed - get notification of removal and workrate from group and item
            // respectively, so can overtake each other
            if (.isDebugEnabled()) .debug("Ignoring setting of workrate for unknown item {}, to {}"itemnewValue);
        }
    }
    
    private boolean hasItem(ItemType item) {
        return .containsKey(item);
    }
    
    
    // Additional methods for tests.

    
Warning: this can be an expensive (time and memory) operation if there are a lot of items/containers.
    public String itemDistributionToString() {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        dumpItemDistribution(new PrintStream(baos));
        return new String(baos.toByteArray());
    }
    public void dumpItemDistribution() {
    }
    
    public void dumpItemDistribution(PrintStream out) {
        for (ContainerType container : getPoolContents()) {
            out.println("Container '"+container+"': ");
            for (ItemType item : getItemsForContainer(container)) {
                Double workrate = getItemWorkrate(item);
                out.println("\t"+"Item '"+item+"' ("+workrate+")");
            }
        }
        out.flush();
    }
    
    @SuppressWarnings("unchecked")
    private ContainerType nullContainer() {
        return (ContainerType) // relies on erasure
    }
    
    private ContainerType toNonNullContainer(ContainerType container) {
        return (container != null) ? container : nullContainer();
    }
    
New to GrepCode? Check out our FAQ X