Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package brooklyn.policy.loadbalancing;
  
  import static brooklyn.util.GroovyJavaMethods.elvis;
  import static brooklyn.util.GroovyJavaMethods.truth;
  
  import java.util.Map;
 
 
 

Policy that is attached to a pool of "containers", each of which can host one or more migratable "items". The policy monitors the workrates of the items and effects migrations in an attempt to ensure that the containers are all sufficiently utilized without any of them being overloaded.

The particular sensor that defines the items' workrates is specified when the policy is constructed. High- and low-thresholds are defined as configuration keys on each of the container entities in the pool: for an item sensor named foo.bar.sensorName, the corresponding container config keys would be named foo.bar.sensorName.threshold.low and foo.bar.sensorName.threshold.high.

In addition to balancing items among the available containers, this policy causes the pool Entity to emit POOL_COLD and POOL_HOT events when it is determined that there is a surplus or shortfall of container resource in the pool respectively. These events may be consumed by a separate policy that is capable of resizing the container pool.

 
 public class LoadBalancingPolicy<NodeType extends Entity, ItemType extends Movableextends AbstractPolicy {
     
     private static final Logger LOG = LoggerFactory.getLogger(LoadBalancingPolicy.class);
     
     @SetFromFlag(defaultVal="100")
     private long minPeriodBetweenExecs;
     
     private final AttributeSensor<? extends Numbermetric;
     private final String lowThresholdConfigKeyName;
     private final String highThresholdConfigKeyName;
     private final BalanceablePoolModel<NodeType, ItemType> model;
     private final BalancingStrategy<NodeType, ItemType> strategy;
     private BalanceableWorkerPool poolEntity;
     
     private volatile ScheduledExecutorService executor;
     private final AtomicBoolean executorQueued = new AtomicBoolean(false);
     private volatile long executorTime = 0;
 
     private int lastEmittedDesiredPoolSize = 0;
     private String lastEmittedPoolTemperature = null// "cold" or "hot"
     
     private final SensorEventListener<ObjecteventHandler = new SensorEventListener<Object>() {
         public void onEvent(SensorEvent<Objectevent) {
             if (.isTraceEnabled()) .trace("{} received event {}"LoadBalancingPolicy.thisevent);
             Entity source = event.getSource();
             Object value = event.getValue();
             Sensor sensor = event.getSensor();
             
             if (sensor.equals()) {
                 onItemMetricUpdate((ItemType)source, ((Numbervalue).doubleValue(), true);
             } else if (sensor.equals(.)) {
                 onContainerAdded((NodeType) valuetrue);
             } else if (sensor.equals(.)) {
                 onContainerRemoved((NodeType) valuetrue);
             } else if (sensor.equals(.)) {
                 ContainerItemPair pair = (ContainerItemPairvalue;
                 onItemAdded((ItemType)pair.item, (NodeType)pair.containertrue);
             } else if (sensor.equals(.)) {
                 ContainerItemPair pair = (ContainerItemPairvalue;
                 onItemRemoved((ItemType)pair.item, (NodeType)pair.containertrue);
             } else if (sensor.equals(.)) {
                 ContainerItemPair pair = (ContainerItemPairvalue;
                 onItemMoved((ItemType)pair.item, (NodeType)pair.containertrue);
             }
         }
     };
 
     public LoadBalancingPolicy(AttributeSensor<? extends Numbermetric,
             BalanceablePoolModel<NodeType, ItemType> model) {
        this(MutableMap.of(), metricmodel);
    }
    public LoadBalancingPolicy(Map propsAttributeSensor<? extends Numbermetric,
            BalanceablePoolModel<NodeType, ItemType> model) {
        
        super(props);
        this. = metric;
        this. = metric.getName()+".threshold.low";
        this. = metric.getName()+".threshold.high";
        this. = model;
        this. = new BalancingStrategy(getName(), model); // TODO: extract interface, inject impl
        
        // TODO Should re-use the execution manager's thread pool, somehow
    }
    
    @Override
    public void setEntity(EntityLocal entity) {
        Preconditions.checkArgument(entity instanceof BalanceableWorkerPool"Provided entity must be a BalanceableWorkerPool");
        super.setEntity(entity);
        this. = (BalanceableWorkerPoolentity;
        
        // Detect when containers are added to or removed from the pool.
        
        // Take heed of any extant containers.
        for (Entity container : .getContainerGroup().getMembers()) {
            onContainerAdded((NodeType)containerfalse);
        }
        for (Entity item : .getItemGroup().getMembers()) {
            onItemAdded((ItemType)item, (NodeType)item.getAttribute(.), false);
        }
        scheduleRebalance();
    }
    
    @Override
    public void suspend() {
        // TODO unsubscribe from everything? And resubscribe on resume?
        super.suspend();
        if ( != null.shutdownNow();;
        .set(false);
    }
    
    @Override
    public void resume() {
        super.resume();
         = 0;
        .set(false);
    }
    
    private ThreadFactory newThreadFactory() {
        return new ThreadFactoryBuilder()
                .setNameFormat("brooklyn-followthesunpolicy-%d")
                .build();
    }
    private void scheduleRebalance() {
        if (isRunning() && .compareAndSet(falsetrue)) {
            long now = System.currentTimeMillis();
            long delay = Math.max(0, ( + ) - now);
            
            .schedule(new Runnable() {
                public void run() {
                    try {
                         = System.currentTimeMillis();
                        .set(false);
                        .rebalance();
                        
                        if (.isDebugEnabled()) .debug("{} post-rebalance: poolSize={}; workrate={}; lowThreshold={}; " + 
                                "highThreshold={}"new Object[] {this.getPoolSize(), .getCurrentPoolWorkrate(), 
                                .getPoolLowThreshold(), .getPoolHighThreshold()});
                        
                        if (.isCold()) {
                            Map eventVal = ImmutableMap.of(
                                    ..getPoolSize(),
                                    ..getCurrentPoolWorkrate(),
                                    ..getPoolLowThreshold(),
                                    ..getPoolHighThreshold());
            
                            .emit(.eventVal);
                            
                            if (.isInfoEnabled()) {
                                int desiredPoolSize = (int) Math.ceil(.getCurrentPoolWorkrate() / (.getPoolLowThreshold()/.getPoolSize()));
                                if (desiredPoolSize !=  ||  != "cold") {
                                    .info("{} emitted COLD (suggesting {}): {}"new Object[] {thisdesiredPoolSizeeventVal});
                                     = desiredPoolSize;
                                     = "cold";
                                }
                            }
                        
                        } else if (.isHot()) {
                            Map eventVal = ImmutableMap.of(
                                    ..getPoolSize(),
                                    ..getCurrentPoolWorkrate(),
                                    ..getPoolLowThreshold(),
                                    ..getPoolHighThreshold());
                            
                            .emit(.eventVal);
                            
                            if (.isInfoEnabled()) {
                                int desiredPoolSize = (int) Math.ceil(.getCurrentPoolWorkrate() / (.getPoolHighThreshold()/.getPoolSize()));
                                if (desiredPoolSize !=  ||  != "hot") {
                                    .info("{} emitted HOT (suggesting {}): {}"new Object[] {thisdesiredPoolSizeeventVal});
                                     = desiredPoolSize;
                                     = "hot";
                                }
                            }
                        }
                    } catch (Exception e) {
                        if (isRunning()) {
                            .error("Error rebalancing"e);
                        } else {
                            .debug("Error rebalancing, but no longer running"e);
                        }
                    }
                }},
                delay,
                .);
        }
    }
    
    // TODO Can get duplicate onContainerAdded events.
    //      I presume it's because we subscribe and then iterate over the extant containers.
    //      Solution would be for subscription to give you events for existing / current value(s).
    //      Also current impl messes up single-threaded updates model: the setEntity is a different thread than for subscription events.
    private void onContainerAdded(NodeType newContainerboolean rebalanceNow) {
        Preconditions.checkArgument(newContainer instanceof BalanceableContainer"Added container must be a BalanceableContainer");
        if (.isTraceEnabled()) .trace("{} recording addition of container {}"thisnewContainer);
        // Low and high thresholds for the metric we're interested in are assumed to be present
        // in the container's configuration.
        Number lowThreshold = (NumberfindConfigValue(newContainer);
        Number highThreshold = (NumberfindConfigValue(newContainer);
        if (lowThreshold == null || highThreshold == null) {
            .warn(
                "Balanceable container '"+newContainer+"' does not define low- and high- threshold configuration keys: '"+
                +"' and '"++"', skipping");
            return;
        }
        
        .onContainerAdded(newContainerlowThreshold.doubleValue(), highThreshold.doubleValue());
        
        // Note: no need to scan the container for items; they will appear via the ITEM_ADDED events.
        // Also, must abide by any item-filters etc defined in the pool.
        
        if (rebalanceNowscheduleRebalance();
    }
    
    private static Object findConfigValue(Entity entityString configKeyName) {
        Map<ConfigKey<?>, Objectconfig = ((EntityInternal)entity).getAllConfig();
        for (Entry<ConfigKey<?>, Objectentry : config.entrySet()) {
            if (configKeyName.equals(entry.getKey().getName()))
                return entry.getValue();
        }
        return null;
    }
    
    // TODO Receiving duplicates of onContainerRemoved (e.g. when running LoadBalancingInmemorySoakTest)
    private void onContainerRemoved(NodeType oldContainerboolean rebalanceNow) {
        if (.isTraceEnabled()) .trace("{} recording removal of container {}"thisoldContainer);
        .onContainerRemoved(oldContainer);
        if (rebalanceNowscheduleRebalance();
    }
    
    private void onItemAdded(ItemType item, NodeType parentContainerboolean rebalanceNow) {
        Preconditions.checkArgument(item instanceof Movable"Added item "+item+" must implement Movable");
        if (.isTraceEnabled()) .trace("{} recording addition of item {} in container {}"new Object[] {thisitemparentContainer});
        
        subscribe(item);
        
        // Update the model, including the current metric value (if any).
        boolean immovable = elvis(item.getConfig(.), false);
        Number currentValue = item.getAttribute();
        .onItemAdded(itemparentContainerimmovable);
        if (currentValue != null)
            .onItemWorkrateUpdated(itemcurrentValue.doubleValue());
        
        if (rebalanceNowscheduleRebalance();
    }
    
    private void onItemRemoved(ItemType item, NodeType parentContainerboolean rebalanceNow) {
        if (.isTraceEnabled()) .trace("{} recording removal of item {}"thisitem);
        unsubscribe(item);
        .onItemRemoved(item);
        if (rebalanceNowscheduleRebalance();
    }
    
    private void onItemMoved(ItemType item, NodeType parentContainerboolean rebalanceNow) {
        if (.isTraceEnabled()) .trace("{} recording moving of item {} to {}"new Object[] {thisitemparentContainer});
        .onItemMoved(itemparentContainer);
        if (rebalanceNowscheduleRebalance();
    }
    
    private void onItemMetricUpdate(ItemType itemdouble newValueboolean rebalanceNow) {
        if (.isTraceEnabled()) .trace("{} recording metric update for item {}, new value {}"new Object[] {thisitemnewValue});
        .onItemWorkrateUpdated(itemnewValue);
        if (rebalanceNowscheduleRebalance();
    }
    
    @Override
    public String toString() {
        return getClass().getSimpleName() + (truth() ? "("++")" : "");
    }
New to GrepCode? Check out our FAQ X