Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.fasterxml.clustermate.service.cluster;
  
  import java.util.*;
  
  import org.slf4j.Logger;
  
 
 
 public class ClusterViewByServerImpl<K extends EntryKey, E extends StoredEntry<K>>
 {
     private final Logger LOG = LoggerFactory.getLogger(getClass());
 
     protected final SharedServiceStuff _stuff;
    
    
Key space used by this cluster.
 
     protected final KeySpace _keyspace;
   
     protected final Stores<K,E> _stores;
    
    
Information about this node; not included in the list of peer nodes.
 
     protected NodeState _localState;

    
States of all nodes found during bootstrapping, including the local node.
 
     protected final Map<IpAndPortClusterPeerImpl<K,E>> _peers;
 
     protected final TimeMaster _timeMaster;
 
     protected final ClusterStatusAccessor _clusterAccessor;
    
    
Timestamp of the last update to aggregated state; used for letting clients know whether to try to access updated cluster information
 
     protected final AtomicLong _lastUpdated;
 
     protected final boolean _isTesting;
     
     /*
     /**********************************************************************
     /* Life-cycle
     /**********************************************************************
      */
     
     public ClusterViewByServerImpl(SharedServiceStuff stuffStores<K,E> stores,
             KeySpace keyspace,
             ActiveNodeState localMap<IpAndPort,ActiveNodeStateremoteNodes,
             long updateTime)
     {
          = stuff;
          = local;
          = keyspace;
          = stores;
          = stuff.getTimeMaster();
          = stuff.isRunningTests();
         ServiceConfig config = stuff.getServiceConfig();
                 stuff.jsonMapper()),
                 config.servicePathRootconfig.getServicePathStrategy());
 
          = new LinkedHashMap<IpAndPort,ClusterPeerImpl<K,E>>(remoteNodes.size());
         for (Map.Entry<IpAndPort,ActiveNodeStateentry : remoteNodes.entrySet()) {
             .put(entry.getKey(), _createPeer(entry.getValue()));
         }
          = new AtomicLong(updateTime);
     }
 
     private ClusterPeerImpl<K,E> _createPeer(ActiveNodeState nodeState) {
         return new ClusterPeerImpl<K,E>(this,
                 .getNodeStore(), .getEntryStore(), nodeState,
                );
    }
    
    @Override
    public synchronized void start()
    {
        .info("Starting sync threads to peers...");
        int count = 0;
        // no need to sync yet
        for (final ClusterPeerImpl<?,?> peer : .values()) {
            /* 20-Nov-2012, tatu: Let's create a thread for every peer from now
             *   on; even if there is currently no sync range, things may change.
             *   Plus, for fast cluster view updates, may want to share info
             *   between non-neighbors too.
             */
            ++count;
            peer.startSyncing();
        }
        .info("Completed creation of sync threads ({}/{}) to peers"count.size());
    }
    @Override
    public synchronized void stop()
    {
        .info("Shutting down sync threads to peers...");
        
        for (ClusterPeerImpl<?,?> peer : .values()) {
            peer.stop();
        }
        .info("Completed shutting down sync threads to peers");
    }
    /*
    /**********************************************************************
    /* Simple accessors
    /**********************************************************************
     */
    
    
Returns size of cluster, which should be one greater than number of peer nodes (to add local node)
    @Override
    public int size() { return 1 + .size(); }
    @Override
    public KeySpace getKeySpace() { return ; }
    
    @Override
    public NodeState getLocalState() { return ; }
    @Override
    public NodeState getRemoteState(IpAndPort key) {
        ClusterPeerImpl<?,?> peer;
        synchronized () {
            peer = .get(key);
        }
        return (peer == null) ? null : peer.getSyncState();
    }
    @Override
    public List<ClusterPeergetPeers() {
        synchronized () {
            return new ArrayList<ClusterPeer>(.values());
        }
    }
    @Override
    public Collection<NodeStategetRemoteStates() {
        ArrayList<NodeStateresult = new ArrayList<NodeState>(.size());
        for (ClusterPeerImpl<?,?> peer : .values()) {
            result.add(peer.getSyncState());
        }
        return result;
    }
    @Override
    public long getLastUpdated() {
        return .get();
    }
    @Override
    public boolean containsLocally(EntryKey key)
    {
        int hash = .getKeyConverter().routingHashFor(key);
        return .totalRange().contains(hash);
    }
    /*
    /**********************************************************************
    /* NodeStatusUpdater impl
    /**********************************************************************
     */
    @Override
    public void nodeActivated(IpAndPort endpointlong timestampKeyRange totalRange)
    {
        // for now, no real difference so:
        checkMembership(endpointtimestamptotalRange);
    }
    @Override
    public void nodeDeactivated(IpAndPort endpointlong timestamp)
    {
        // First, a sanity check:
        if (.getAddress().equals(endpoint)) { // just sanity check
            .warn("checkMembership() called with local node address; ignoring");
            return;
        }
        ClusterPeerImpl<K,E> peer;
        synchronized () {
            peer = .get(endpoint);
            if (peer != null) {
                peer.markDisabled(timestamptrue);
            }
        }
        if (peer == null) {
            .warn("Unknown node {} reported being deactivated; ignoring"endpoint);
        } else {
            .warn("Node {} reported being deactivated: marked as such"endpoint);
        }
    }
    
    @Override
    public void checkMembership(IpAndPort endpointlong timestampKeyRange totalRange)
    {
        // First, a sanity check:
        if (.getAddress().equals(endpoint)) {
            .warn("checkMembership() called with local node address; ignoring");
            return;
        }
        try {
            synchronized () {
                ClusterPeerImpl<K,E> peer = .get(endpoint);
                if (peer != null) { // already known...
                    if (peer.isDisabled()) { // but do we enable it?
                        peer.markDisabled(timestampfalse);
                        .info("Node {} activated due to received request"endpoint);
                    }
                    return;
                }
                /* How interesting! Someone who we don't even know seems to be joining...
                 * Two possible cases, then; (a) We are seeing something for which we do
                 * have data in local DB, just not in config file, or (b) New entry for
                 * which no data exists.
                 */
                ActiveNodeState oldState = .getNodeStore().findEntry(endpoint);
                // If such data found, assume it's accurate; we'll be updated soon if not
                if (oldState != null) { 
                    peer = _createPeer(oldState);
                    .warn("Request from node {} for which we have info in Local DB, restoring"endpoint);
                } else {
                    // But if not found need minimal bootstrapping
                    ActiveNodeState initialStatus = new ActiveNodeState(,
                            new NodeDefinition(endpoint.,
                                    totalRangetotalRange),
                            .currentTimeMillis());
                    peer = _createPeer(initialStatus);
                    .warn("Request from node {} for which we have no information, bootstrap with range of {}",
                            endpointtotalRange);
                }
                .put(endpointpeer);
                peer.startSyncing();
                /* No need to update local DB, since we really have little new information;
                 * should be getting it via sync-list by this node, or from other peers,
                 * depending on whether we are neighbors or not.
                 */
            }
        } catch (IOException e) {
            .warn("Failed to update Node status for "+endpoint+": "+e.getMessage(), e);
        }
    }
    
    
Method called with a response to node status request; will update information we have if more recent information is available.
    @Override
    public void updateWith(ClusterStatusMessage msg)
    {
        final long updateTime = .currentTimeMillis();
        int mods = 0;
        NodeState local = msg.local;
        if (local == null) { // should never occur
            .info("msg.local is null, should never happen");
        } else {
            if (updateStatus(localtrue)) {
                ++mods;
            }
        }
        if (msg.remote != null) {
            for (NodeState state : msg.remote) {
                if (updateStatus(statefalse)) {
                    ++mods;
                }
            }
        }
        // If any data was changed, update our local state
        if (mods > 0) {
            boolean wasChanged;
            synchronized () {
                long old = .get();
                wasChanged = (updateTime > old);
                if (wasChanged) {
                    .set(updateTime);
                }
            }
            if (wasChanged) {
                .info("updateStatus() with {} changes: updated lastUpdated to: {}"modsupdateTime);
            } else { // may occur with concurrent updates?
                .warn("updateStatus() with {} changes: but lastUpdated remains at {}"mods,
                        .get());
            }
        }
    }

    
Method called for each node status entry in cluster status information message.

Parameters:
nodeStatus Status to use for updating node state.
byNodeItself True if the status is from the node itself; true for 'local' entry in status message.
Returns:
True if there was a status update
    protected boolean updateStatus(NodeState nodeStatusboolean byNodeItself)
    {
        // First: do we have info for the node?
        final IpAndPort endpoint = nodeStatus.getAddress();
        if (endpoint == null) {
            .warn("Missing endpoint info (sender? "+byNodeItself+"); need to skip update");
            return false;
        }
        // also, local state must be produced locally; ignore what others think
        // (in future could try pro-active fixing but...)
        if (endpoint.equals(.getAddress())) {
            return false;
        }
        ClusterPeerImpl<K,E> peer;
        try {
            synchronized () {
                peer = .get(endpoint);
                if (peer == null) { // Interesting: need to add a new entry
                    .warn("Status for new node {} received: must create a peer"endpoint);
                    // node: should not ever occur for node itself... but...
                    _updateMissingPeer(nodeStatusbyNodeItself);
                } else { // more common, just update...
                    // But first, see that information is more up-to-date
                    ActiveNodeState currentState = peer.getSyncState();
                    if (currentState.getLastUpdated() >= nodeStatus.getLastUpdated()) {
                        return false;
                    }
                    return _updateExistingPeer(nodeStatusbyNodeItselfpeer);
                }
            }
        } catch (IOException e) {
            .warn("Failed to update Node status for "+endpoint+": "+e.getMessage(), e);
        }
        return false;
    }

    
Method called when we get information on a peer for which we have peer object and status information; so generally just need to merge it.
    protected boolean _updateExistingPeer(NodeState nodeStatusboolean forSender,
            ClusterPeerImpl<K,E> peerthrows IOException
    {
//        final IpAndPort endpoint = nodeStatus.getAddress();
        // !!! TODO: actual updating...
        
        return false;
    }
    
    
Method called when we get information on a peer for which we do not have peer thread.
    protected void _updateMissingPeer(NodeState nodeStatusboolean forSenderthrows IOException
    {
        final IpAndPort endpoint = nodeStatus.getAddress();
        /* Ok: let's also see if we have old state information in the
         * local DB. If we do, we may be able to avoid syncing from
         * the beginning of time; and/or obtain actual key range.
         */
        NodeStateStore stateStore = .getNodeStore();
        ActiveNodeState initialStatus = new ActiveNodeState(nodeStatus,
                .currentTimeMillis());
        // TODO: should perhaps also find by index + range?
        ActiveNodeState oldState = stateStore.findEntry(endpoint);
        ClusterPeerImpl<K,E> peer = null;
        // First common case: info was persisted earlier; we just "unthaw it"
        if (oldState != null) {
            if (oldState.equals(initialStatus)) {
                peer = _createPeer(oldState);
                .put(endpointpeer);
                .info("Restoring node {} from persisted data: no change"endpoint);
            } else {
                // Some changes; but is the sync range unaffected?
            }
        } else { // No info, just create with info we received
            
        }
        if (peer != null) {
            .put(endpointpeer);
            peer.startSyncing();
            .info("Started a new Peer thread for {}"endpoint);
        }
    }
    
    /*
    /**********************************************************************
    /* Advanced accessors
    /**********************************************************************
     */
    @Override
    public int getActiveCoverage()
    {
        ArrayList<KeyRangeranges = new ArrayList<KeyRange>();
        ranges.add(.getRangeActive());
        for (ClusterPeer peer : getPeers()) {
            ranges.add(peer.getActiveRange());
        }
        return .getCoverage(ranges);
    }
    @Override
    public int getActiveCoveragePct() {
        return _coveragePct(getActiveCoverage());
    }
    @Override
    public int getTotalCoverage()
    {
        ArrayList<KeyRangeranges = new ArrayList<KeyRange>();
        ranges.add(.totalRange());
        for (ClusterPeer peer : getPeers()) {
            ranges.add(peer.getTotalRange());
        }
        return .getCoverage(ranges);
    }
    
    @Override
    public int getTotalCoveragePct() {
        return _coveragePct(getTotalCoverage());
    }
    @Override
    public ClusterStatusMessage asMessage() {
        return new ClusterStatusMessage(.currentTimeMillis(),
                getLastUpdated(), getLocalState(), getRemoteStates());
    }
    @Override
    public long getHashOverState()
    {
        int hash = .hashCode();
        hash ^= .hashCode();
        for (ClusterPeerImpl<?,?> peer : _peerImpls()) {
            hash += peer.getSyncState().hashCode();
        }
        return hash;
    }
    
    /*
    /**********************************************************************
    /* Adding cluster info in requests, responses
    /**********************************************************************
     */
    @Override
    {
        long clusterUpdated = getLastUpdated();
                clusterUpdated);
    }
    @Override
    {
        /* Since key range information will be included anyway, all we need here
         * is just the endpoint name ("caller").
         */
        requestBuilder = requestBuilder.addParameter(.,
                .getAddress().toString());
        return requestBuilder;
    }
    
    /*
    /**********************************************************************
    /* Internal methods
    /**********************************************************************
     */
    private int _coveragePct(int absCoverage) {
        int len = .getLength();
        if (absCoverage == len) {
            return 100;
        }
        return (int) ((100.0 * absCoverage) / len);
    }
    protected List<ClusterPeerImpl<K,E>> _peerImpls() {
        synchronized () {
            return new ArrayList<ClusterPeerImpl<K,E>>(.values());
        }
    }
New to GrepCode? Check out our FAQ X