Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package brooklyn.policy.followthesun;
  
  import static brooklyn.util.GroovyJavaMethods.elvis;
  import static brooklyn.util.GroovyJavaMethods.truth;
  import static com.google.common.base.Preconditions.checkArgument;
  
  import java.util.Map;
 
 
 
 
 public class FollowTheSunPolicy extends AbstractPolicy {
 
     private static final Logger LOG = LoggerFactory.getLogger(FollowTheSunPolicy.class);
 
     public static final String NAME = "Follow the Sun (Inter-Geography Latency Optimization)";
 
     @SetFromFlag(defaultVal="100")
     private long minPeriodBetweenExecs;
     
     @SetFromFlag
     private Function<EntityLocationlocationFinder;
     
     private final AttributeSensor<Map<? extends MovableDouble>> itemUsageMetric;
     private final FollowTheSunModel<EntityMovablemodel;
     private final FollowTheSunStrategy<EntityMovablestrategy;
     private final FollowTheSunParameters parameters;
     
     private FollowTheSunPool poolEntity;
     
     private volatile ScheduledExecutorService executor;
     private final AtomicBoolean executorQueued = new AtomicBoolean(false);
     private volatile long executorTime = 0;
     private boolean loggedConstraintsIgnored = false;
     
     private final Function<EntityLocationdefaultLocationFinder = new Function<EntityLocation>() {
         public Location apply(Entity e) {
             Collection<Locationlocs = e.getLocations();
             if (locs.isEmpty()) return null;
             Location contender = Iterables.get(locs, 0);
             while (contender.getParent() != null && !(contender instanceof MachineProvisioningLocation)) {
                 contender = contender.getParent();
             }
             return contender;
         }
     };
     
     private final SensorEventListener<ObjecteventHandler = new SensorEventListener<Object>() {
         @Override
         public void onEvent(SensorEvent<Objectevent) {
             if (.isTraceEnabled()) .trace("{} received event {}"FollowTheSunPolicy.thisevent);
             Entity source = event.getSource();
             Object value = event.getValue();
             Sensor<?> sensor = event.getSensor();
             
             if (sensor.equals()) {
                 onItemMetricUpdated((Movable)source, (Map<? extends MovableDouble>) valuetrue);
             } else if (sensor.equals(.)) {
                 onContainerLocationUpdated(sourcetrue);
             } else if (sensor.equals(.)) {
                 onContainerAdded((Entityvaluetrue);
             } else if (sensor.equals(.)) {
                 onContainerRemoved((Entityvaluetrue);
             } else if (sensor.equals(.)) {
                 onItemAdded((Movablevaluetrue);
             } else if (sensor.equals(.)) {
                 onItemRemoved((Movablevaluetrue);
             } else if (sensor.equals(.)) {
                 ContainerItemPair pair = (ContainerItemPairvalue;
                 onItemMoved((Movable)pair.itempair.containertrue);
             }
         }
    };
    
    // FIXME parameters: use a more groovy way of doing it, that's consistent with other policies/entities?
    public FollowTheSunPolicy(AttributeSensor itemUsageMetric
            FollowTheSunModel<EntityMovablemodelFollowTheSunParameters parameters) {
        this(MutableMap.of(), itemUsageMetricmodelparameters);
    }
    
    public FollowTheSunPolicy(Map propsAttributeSensor itemUsageMetric
            FollowTheSunModel<EntityMovablemodelFollowTheSunParameters parameters) {
        super(props);
        this. = itemUsageMetric;
        this. = model;
        this. = parameters;
        this. = new FollowTheSunStrategy<EntityMovable>(modelparameters); // TODO: extract interface, inject impl
        
        // TODO Should re-use the execution manager's thread pool, somehow
    }
    
    @Override
    public void setEntity(EntityLocal entity) {
        checkArgument(entity instanceof FollowTheSunPool"Provided entity must be a FollowTheSunPool");
        super.setEntity(entity);
        this. = (FollowTheSunPoolentity;
        
        // Detect when containers are added to or removed from the pool.
        
        // Take heed of any extant containers.
        for (Entity container : .getContainerGroup().getMembers()) {
            onContainerAdded(containerfalse);
        }
        for (Entity item : .getItemGroup().getMembers()) {
            onItemAdded((Movable)itemfalse);
        }
    }
    
    @Override
    public void suspend() {
        // TODO unsubscribe from everything? And resubscribe on resume?
        super.suspend();
        if ( != null.shutdownNow();
        .set(false);
    }
    
    @Override
    public void resume() {
        super.resume();
         = 0;
        .set(false);
    }
    
    private ThreadFactory newThreadFactory() {
        return new ThreadFactoryBuilder()
                .setNameFormat("brooklyn-followthesunpolicy-%d")
                .build();
    }
    private void scheduleLatencyReductionJig() {
        if (isRunning() && .compareAndSet(falsetrue)) {
            long now = System.currentTimeMillis();
            long delay = Math.max(0, ( + ) - now);
            
            .schedule(new Runnable() {
                public void run() {
                    try {
                         = System.currentTimeMillis();
                        .set(false);
                        
                        if (.isTraceEnabled()) .trace("{} executing follow-the-sun migration-strategy"this);
                        .rebalance();
                        
                    } catch (RuntimeException e) {
                        if (isRunning()) {
                            .error("Error during latency-reduction-jig"e);
                        } else {
                            .debug("Error during latency-reduction-jig, but no longer running"e);
                        }
                    }
                }},
                delay,
                .);
        }
    }
    
    private void onContainerAdded(Entity containerboolean rebalanceNow) {
        subscribe(container.);
        Location location = .apply(container);
        
        if (.isTraceEnabled()) .trace("{} recording addition of container {} in location {}"new Object[] {thiscontainerlocation});
        .onContainerAdded(containerlocation);
        
        if (rebalanceNowscheduleLatencyReductionJig();
    }
    
    private void onContainerRemoved(Entity containerboolean rebalanceNow) {
        if (.isTraceEnabled()) .trace("{} recording removal of container {}"thiscontainer);
        .onContainerRemoved(container);
        if (rebalanceNowscheduleLatencyReductionJig();
    }
    
    private void onItemAdded(Movable itemboolean rebalanceNow) {
        Entity parentContainer = (Entityitem.getAttribute(.);
        
        if (.isTraceEnabled()) .trace("{} recording addition of item {} in container {}"new Object[] {thisitemparentContainer});
        
        subscribe(item);
        
        // Update the model, including the current metric value (if any).
        Map<? extends MovableDoublecurrentValue = item.getAttribute();
        boolean immovable = (Boolean)elvis(item.getConfig(.), false);
        .onItemAdded(itemparentContainerimmovable);
        if (currentValue != null) {
            .onItemUsageUpdated(itemcurrentValue);
        }
        
        if (rebalanceNowscheduleLatencyReductionJig();
    }
    
    private void onItemRemoved(Movable itemboolean rebalanceNow) {
        if (.isTraceEnabled()) .trace("{} recording removal of item {}"thisitem);
        unsubscribe(item);
        .onItemRemoved(item);
        if (rebalanceNowscheduleLatencyReductionJig();
    }
    
    private void onItemMoved(Movable itemEntity parentContainerboolean rebalanceNow) {
        if (.isTraceEnabled()) .trace("{} recording moving of item {} to {}"new Object[] {thisitemparentContainer});
        .onItemMoved(itemparentContainer);
        if (rebalanceNowscheduleLatencyReductionJig();
    }
    
    private void onContainerLocationUpdated(Entity containerboolean rebalanceNow) {
        Location location = .apply(container);
        if (.isTraceEnabled()) .trace("{} recording location for container {}, new value {}"new Object[] {thiscontainerlocation});
        .onContainerLocationUpdated(containerlocation);
        if (rebalanceNowscheduleLatencyReductionJig();
    }
    
    private void onItemMetricUpdated(Movable itemMap<? extends MovableDoublenewValuesboolean rebalanceNow) {
        if (.isTraceEnabled()) .trace("{} recording usage update for item {}, new value {}"new Object[] {thisitemnewValues});
        .onItemUsageUpdated(itemnewValues);
        if (rebalanceNowscheduleLatencyReductionJig();
    }
    
    @Override
    public String toString() {
        return getClass().getSimpleName() + (truth() ? "("++")" : "");
    }
New to GrepCode? Check out our FAQ X