Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.fasterxml.clustermate.service.cluster;
  
  import java.util.Iterator;
  import java.util.List;
 
 
 
 
 public class ClusterPeerImpl<K extends EntryKey, E extends StoredEntry<K>>
     extends ClusterPeer
     implements StartAndStoppable
 {
    
If access to peer's sync/list fails, wait for this duration before trying again.
 
     private final static long SLEEP_FOR_SYNCLIST_ERRORS_MSECS = 10000L;
 
     private final static long SLEEP_FOR_SYNCPULL_ERRORS_MSECS = 3000L;
 
     // no point trying to sleep for trivial time
     private final static long MINIMAL_SLEEP_MSECS = 10L;
    
    
If synclist is empty and server does not instruct us, simply sleep for 1 second (to try avoid congestion)
 
     private final static long SLEEP_FOR_EMPTY_SYNCLIST_MSECS = 1000L;
     
     // no real hurry; use 20 seconds to account for GC, congestion etc
     private final static TimeSpan TIMEOUT_FOR_SYNCLIST = new TimeSpan(10L, .);

    
Lowish timeout for "bye bye" message, so it won't block shutdown
 
     private final static TimeSpan TIMEOUT_FOR_BYEBYE = new TimeSpan(250L, .);
    
    
We will limit maximum estimate response size to some reasonable limit: starting with 250 megs. The idea is to use big enough sizes for efficient bulk transfer; but small enough not to cause timeouts during normal operation.
 
     private final static long MAX_TOTAL_PAYLOAD = 250 * 1000 * 1000;
    
    
During fetching of items to sync, let's cap number of failures to some number; this should make it easier to recover from cases where peer shuts down during individual sync operation (after sync list received but before all entries are fetched)
 
     private final static int MAX_SYNC_FAILURES = 8;

    
Also, let's limit maximum individual calls per sync-pull fetch portion, to avoid excessive calls.
 
     private final int MAX_FETCH_TRIES = 20;
 
     private final static Logger LOG = LoggerFactory.getLogger(ClusterPeer.class);
     
     /*
     /**********************************************************************
     /* Configuration, general helpers
     /**********************************************************************
      */
 
     protected final SharedServiceStuff _stuff;

    
This object is necessary to support "virtual time" for test cases.
 
     protected final TimeMaster _timeMaster;
    /*
    /**********************************************************************
    /* Operation state of the peer object
    /**********************************************************************
     */
    
    
Synchronization thread if (and only if) this peer shares part of keyspace with the local node; otherwise null. Note that threads may be started and stopped based on changes to cluster configuration.
    protected Thread _syncThread;

    
Flag used to request termination of the sync thread.
    protected AtomicBoolean _running = new AtomicBoolean(false);

    
Let's keep track of number of failures (as per caught exceptions); mostly so that tests can verify passing, but also potentially for monitoring.
    protected AtomicInteger _failCount = new AtomicInteger(0);
    
    protected final byte[] _readBuffer = new byte[8000];
    
    /*
    /**********************************************************************
    /* Helper objects for entry handling
    /**********************************************************************
     */

    
And to store fetched missing entities, we need the store
    protected final StorableStore _entryStore;

    
Need to construct metadata nuggets with this factory
    protected final StoredEntryConverter<K,E,?> _entryConverter;
    
    /*
    /**********************************************************************
    /* Helper objects for sync handling
    /**********************************************************************
     */
    protected final ClusterViewByServerUpdatable _cluster;
    
    
Object used to access Node State information, needed to construct view of the cluster.
    protected final ClusterStatusAccessor _statusAccessor;

    
Helper object used for doing HTTP requests
    protected final SyncListAccessor _syncListAccessor;

    
Persistent data store in which we store information regarding synchronization.
    protected final NodeStateStore _stateStore;

    
Hash code of contents of the last cluster view we received from the peer. Used as optimization: cluster view only piggy-backed on list response if hash differs.
    protected long _lastClusterHash;
    
    /*
    /**********************************************************************
    /* Local information for peer (which for us is external but...)
    /**********************************************************************
     */

    
Synchronization state of this peer
    protected ActiveNodeState _syncState;
    
    /*
    /**********************************************************************
    /* Life-cycle
    /**********************************************************************
     */
    
            NodeStateStore stateStoreStorableStore entryStore,
            ActiveNodeState state,
            ClusterStatusAccessor accessor)
    {
        super();
         = cluster;
         = stuff;
         = new SyncListAccessor(stuff);
         = state;
         = stateStore;
         = entryStore;
         = stuff.getTimeMaster();
         = stuff.getEntryConverter();
         = accessor;
    }
    @Override
    public void start() {
        startSyncing();
    }

    
Method called when the system is shutting down.
    @Override
    public void stop()
    {
        // stopSyncing():
        Thread t;
        synchronized (this) {
            .set(false);
            t = ;
            if (t != null) {
                 = null;
                .info("Stop requested for sync thread for peer at {}".getAddress());
            }
        }
        if (t != null) {
//            t.notify();
            t.interrupt();
        }
        .stop();
    }
    /*
    /**********************************************************************
    /* Actual synchronization task
    /**********************************************************************
     */
    
    
Method that can be called to ensure that there is a synchronization thread running to sync between the local node and this peer

Returns:
True if a new sync thread was started; false if there already was a thread
    public boolean startSyncing()
    {
        Thread t;
        
        synchronized (this) {
            t = ;
            if (t != null) { // sanity check
                return false;
            }
            .set(true);
             = t = new Thread(new Runnable() {
                @Override
                public void run() {
                    syncLoop();
                }
            });
            .setName("NodeSync-"+.getAddress());
        }
        t.start();
        return true;
    }
    
    /*
    /**********************************************************************
    /* State access
    /**********************************************************************
     */
    @Override
    public int getFailCount() { return .get(); }
    @Override
    public void resetFailCount() { .set(0); }
    @Override
    public long getSyncedUpTo() {
        return .getSyncedUpTo();
    }
    /*
    /**********************************************************************
    /* Public API
    /**********************************************************************
     */
    @Override
    public IpAndPort getAddress() {
        return .getAddress();
    }
    @Override
    public KeyRange getActiveRange() {
        return .getRangeActive();
    }
    @Override
    public KeyRange getTotalRange() {
        return .totalRange();
    }

    
Accessor for getting key range that is shared between the local node and this peer; for non-overlapping nodes this may be an empty range.
    @Override
    public KeyRange getSyncRange() {
        return .getRangeSync();
    }
    /*
    /**********************************************************************
    /* Extended accessors
    /**********************************************************************
     */
    public ActiveNodeState getSyncState() {
        return ;
    }
    public boolean isDisabled() {
        return .isDisabled();
    }
    
    /*
    /**********************************************************************
    /* Background synchronization processing
    /**********************************************************************
     */

    
Main synchronization loop
    protected void syncLoop()
    {
        .info("Starting sync thread for peer at {}".getAddress());
        
        // For testing (and only testing!), let's add little bit of
        // virtual sleep (TimeMaster will block threads) before starting
        // the loop; this to stabilize situation
        if (.isRunningTests()) {
            try {
                .sleep(1L);
            } catch (InterruptedException e) { }
        }
        /* At high level, we have two kinds of tasks, depending on whether
         * there is any overlap:
         * 
         * 1. If ranges overlap, we need to do proper sync list/pull handling
         * 2. If no overlap, we just need to keep an eye towards changes, to
         *   try to keep whole cluster view up to date (since clients need it)
         */
        while (.get()) {
            try {
                if (hasOverlap(.getLocalState(), )) {
                    doRealSync();
                } else {
                    doMinimalSync();
                }
            } catch (InterruptedException e) {
                if (.get()) {
                    .warn("syncLoop() interrupted without clearing '_running' flag; ignoring");
                }
            } catch (Exception e) {
                .warn("Uncaught processing exception during syncLoop(): ({}) {}",
                        e.getClass().getName(), e.getMessage());
                if (.get()) {
                    // Ignore failures during shutdown, so only increase here
                    .addAndGet(1);
                    try {
                        .sleep();
                    } catch (InterruptedException e2) { }
                }
            }
        }
        if (.isRunningTests()) {
            .info("Stopped sync thread for peer at {} -- testing, all done!".getAddress());
            return;
        }
        // And send the byebye message if peer is NOT (known to be) disabled
        if (.isDisabled()) {
            .info("Stopped sync thread for peer at {}: is disabled, no need to send bye-bye".getAddress());
            return;
        }
        .info("Stopped sync thread for peer at {}: let's send bye-bye".getAddress());
        long start = System.currentTimeMillis();
                .getAddress(), .);
        .info("Bye-bye message to {} sent in {} msec".getAddress(), System.currentTimeMillis()-start);
    }
    protected void doRealSync() throws Exception
    {
        /* Sequence for each iterations consists of:
         * 
         * 1. Fetch list of newly inserted/deleted entries from peer (sync/list)
         * 2a. Find subset of entries unknown to this node, if any
         * 2b. Fetch unknown entries, possibly with multiple requests
         * 
         * and we will also add bit of sleep between requests, depending on how many
         * entries we get in step 1.
         */
        
        long listTime = .currentTimeMillis();
        SyncListResponse<?> syncResp = _fetchSyncList();
        if (syncResp == null) { // only for hard errors
            return;
        }
        if (!.get()) { // short-circuit during shutdown
            return;
        }
        // First things first:
        if (syncResp.clusterStatus != null) {
            .updateWith(syncResp.clusterStatus);
             = syncResp.clusterHash;
        } else {
            // This is fine, as long as hashes match
            if (syncResp.clusterHash != ) {
                .warn("Did not get cluster status from {} even though hashes differ 0x{} (old) vs 0x{} (response)",
                        .getAddress(),
                        Long.toHexString(), Long.toHexString(syncResp.clusterHash));
            }
        }
        final long lastSeenTimestamp = syncResp.lastSeen();
        // Sanity check; should never happen
        if (lastSeenTimestamp <= 0L) {
            .error("Invalid lastSeen timestamp value {} for SyncList, from {}",
                    lastSeenTimestamp.getAddress());
            // should we sleep bit extra? Probably good to avoid flooding logs, if we end up here
            Thread.sleep(100L);
        }
        // comment out or remove for production; left here during testing:
//long diff = (listTime - syncResp.lastSeen()) >> 10; // in seconds
//LOG.warn("Received syncList with {} responses; last timestamp {} secs ago", syncResp.size(), diff);
        
        List<SyncListResponseEntrynewEntries = syncResp.entries;
        int insertedEntryCount = newEntries.size();
        if (insertedEntryCount == 0) { // nothing to update
            // may still need to update timestamp?
            _updatePersistentState(listTimesyncResp.lastSeen());
            // Ok: maybe server instructed us as to how long to sleep?
            long sleepMsecs = syncResp.clientWait;
            if (sleepMsecs < ) { // if not, use some lowish default amount
                sleepMsecs = ;
            }
//            long timeSpent = _timeMaster.currentTimeMillis() - listTime;
            .sleep(sleepMsecs);
            return;
        }
        // Ok, we got something, good.
        // First: handle tombstones we may be getting:
        @SuppressWarnings("unused")
        int tombstoneCount = _handleTombstones(newEntries);
        // then filter out entries that we already have:
        _filterSeen(newEntries);
        if (!.get()) { // short-circuit during shutdown
            return;
        }
        if (newEntries.isEmpty()) { // nope: just update state then
            /*
            long msecs = syncResp.lastSeen() - _syncState.syncedUpTo;
            if (!_stuff.isRunningTests()) {
                LOG.warn("No unseen entries out of {} entries: timestamp = {} (+{} sec)",
                    new Object[] { insertedEntryCount, syncResp.lastSeen(), String.format("%.1f", msecs/1000.0)});
            }
            */
            _updatePersistentState(listTimesyncResp.lastSeen());
        } else { // yes: need to do batch updates
            // but can at least update syncUpTo to first entry, right?
            int newCount = newEntries.size();
            AtomicInteger rounds = new AtomicInteger(0);
            long lastProcessed = _fetchMissing(newEntriesrounds);
            int fetched = newCount - newEntries.size();
            double secs = (.currentTimeMillis() - listTime) / 1000.0;
            String timeDesc = String.format("%.2f"secs);
            .info("Fetched {}/{} missing entries from {} in {} seconds ({} rounds)",
                    new Object[] { fetchednewCountgetAddress(), timeDescrounds.get()});
            _updatePersistentState(listTimelastProcessed);
        }
        // And then sleep a bit, before doing next round of syncing
        double secsBehind = (.currentTimeMillis() - .getSyncedUpTo()) / 1000.0;
        long delay = _calculateSleepBetweenSync(insertedEntryCount, (intsecsBehind);
        
        if (delay > 0L) {
            // only bother informing if above 50 msec sleep
            if (delay >= 50L) {
                .info("With {} listed entries, {} seconds behind, will do {} msec sleep",
                        new Object[] { insertedEntryCount, String.format("%.2f"secsBehind), delay});
            }
            .sleep(delay);
        }
    }

    
Method called when there is no key range overlap, and at most we want to synchronize cluster view occasionally.
    protected void doMinimalSync() throws Exception
    {
        .info("doMinimalSync(): let's just... Sleep for a bit (TBD)");
        Thread.sleep(30 * 1000L);
        
        // !!! TODO: do something!
    }
    
    /*
    /**********************************************************************
    /* Internal methods, cluster state updates
    /**********************************************************************
     */

    
Method called to indicate that the node should (or should not) be disabled.
    public void markDisabled(long timestampboolean isDisabled)
    {
        if (timestamp <= 0L) { // optional
            timestamp = .getDisabledUpdated();
        }
        ActiveNodeState state = .withDisabled(timestampisDisabled);
        if (state != ) {
             = state;
            try {
                .upsertEntry(state);
            } catch (Exception e) {
                .error("Failed to update node state (disabled to {}) for {}. Problem ({}): {}",
                        isDisablede.getClass().getName(), e.getMessage());
            }
        }
    }
    
    /*
    /**********************************************************************
    /* Internal methods, other
    /**********************************************************************
     */
    
    
Helper method called to update persistent state, based on sync list information.

Parameters:
syncStartTime Timestamp of when sync attempt was done
lastSeen
    private void _updatePersistentState(long syncStartTimelong lastSeen)
    {
        ActiveNodeState orig = ;
         = .withLastSyncAttempt(syncStartTime);
        if (lastSeen > .getSyncedUpTo()) {
             = .withSyncedUpTo(lastSeen);
        }
        if ( != orig) {
//LOG.warn("Saving sync state ({}) (args: {}, {}): lastStartTime {}, lastSeen {}", orig.address, syncStartTime, lastSeen, _syncState.lastSyncAttempt, _syncState.syncedUpTo);
            try {
                .upsertEntry();
            } catch (Exception e) {
                .error("Failed to update node state for {}. Problem ({}): {}",
                        e.getClass().getName(), e.getMessage());
            }
        }
    }
    
    {
        try {
            return .fetchSyncList(,
                    );
        } catch (InterruptedException e) {
            // no point in complaining if we are being shut down:
            if (.get()) {
                .warn("Failed to fetch syncList from {} ({}): {}",
                        new Object[] { .getAddress(), e.getClass().getName(), e.getMessage()});
            }
        }
        return null;
    }

    
Helper method called to handle removal of entries, by handling tombstones received and converting existing non-deleted local entries into tombstones.

Returns:
Number of tombstone entries found on the list
    protected int _handleTombstones(List<SyncListResponseEntryentries)
        throws IOExceptionStoreException
    {
        int count = 0;
        Iterator<SyncListResponseEntryit = entries.iterator();
        while (it.hasNext()) {
            SyncListResponseEntry entry = it.next();
            // Tombstone: if we have an entry, convert to a tombstone.
            /* 06-Jul-2012, tatu: But if we don't have one, should we create one?
             *   Could think of it either way; but for now, let's not waste time and space
             */
            if (entry.deleted()) {
                ++count;
                it.remove();
                .softDelete(entry.keytruetrue);
            }
        }
        return count;
    }
    protected void _filterSeen(List<SyncListResponseEntryentriesthrows StoreException
    {
        Iterator<SyncListResponseEntryit = entries.iterator();
        while (it.hasNext()) {
            SyncListResponseEntry entry = it.next();
            /* We can skip ALL entries, even if states do not match. Why?
             * because incoming tombstones should have been already handled;
             * and since reverse (getting non-deleted entry whereas we already
             * have tombstone) is something we want to skip anyway.
             */
            // either needs to have been seen
            if (.hasEntry(entry.key)) {
                it.remove();
            }
        }
    }

    
Helper method that handles actual fetching of missing entries, to synchronize content.

Parameters:
missingEntries Entries to try to fetch
Returns:
Timestamp to use as the new 'syncedUpTo' value
Rounds:
Integer to update with number of rounds done to sync things completely
    private long _fetchMissing(List<SyncListResponseEntrymissingEntriesAtomicInteger rounds)
        throws InterruptedException
    {
        // initially create as big batches as possible
        int maxToFetch = missingEntries.size();
        int tries = 0;
        int fails = 0;
        long syncedUpTo = 0L;
        do {
            ++tries;
            final long startTime = .currentTimeMillis();
            AtomicInteger payloadSize = new AtomicInteger(0);
            SyncPullRequest req = _buildSyncPullRequest(missingEntriesmaxToFetchpayloadSize);
            final int expCount = req.size();
            if (expCount == 0) { // sanity check, shouldn't happen but...
                throw new IllegalStateException("Internal error: empty syncPullRequest list ("+missingEntries.size()+" missing entries)");
            }
            rounds.addAndGet(1);
            AtomicInteger status = new AtomicInteger(0);
            InputStream in = null;
            try {
                in = .readSyncPullResponse(req,
                         getAddress(), statuspayloadSize.get());
//            } catch (org.apache.http.conn.HttpHostConnectException e) {
            } catch (java.net.ConnectException e) {
                ++fails;
                .warn("Failed to connect server "+getAddress()+" to fetch missing entries"e);
                .sleep();
            } catch (Exception e) {
                .warn("Problem trying to make syncPull call to fetch "+expCount+" entries: ("
                        +e.getClass().getName() + ") " + e.getMessage(), e);
                ++fails;
                .sleep();
            }
            if (in == null) {
                .warn("Problem trying to fetch {} entries, received status code of {}",
                        expCountstatus.get());
                .sleep();
                ++fails;
                continue;
            }
            Iterator<SyncListResponseEntryit = missingEntries.iterator();
            int count = 0;
            int headerLength = 0;
            long payloadLength = 0;
            try {
                // let's see if we can correlate entries nicely
                headerLength = -1;
                payloadLength = -1;
                for (; it.hasNext(); ++countit.remove()) {
                    SyncListResponseEntry reqEntry = it.next();
                    headerLength = SyncPullResponse.readHeaderLength(in);
                    // Service will indicate end-of-response with marker length
                    if (headerLength == .) {
                        break;
                    }
                    // sanity check:
                    if (count == expCount) {
                        .warn("Server returned more than expected {} entries; ignoring rest!"expCount);
                        break;
                    }
                    // missing header? Unexpected, but not illegal
                    if (headerLength == 0) {
                        .warn("Missing entry {}/{}, id {}: expired?",
                                new Object[] { countexpCountreqEntry.key});
                        continue;
                    }
                    
                    byte[] headerBytes = new byte[headerLength];
                    int len = IOUtil.readFully(inheaderBytes);
                    if (len < headerLength) {
                        throw new IOException("Unexpected end-of-input: got "+len+" bytes; needed "+headerLength);
                    }
                    SyncPullEntry header = .decodePullEntry(headerBytes);
                    payloadLength = header.storageSize;
                    // and then create the actual entry:
                    _pullEntry(reqEntryheaderin);
                    syncedUpTo = reqEntry.insertionTime
                }
                if (count < expCount) {
                    // let's consider 0 entries to be an error, to prevent infinite loops
                    if (count == 0) {
                        .warn("Server returned NO entries, when requested {}"expCount);
                        ++fails;
                    }
                    .warn("Server returned fewer entries than requested for sync pull: {} vs {} (in {} msecs)",
                            new Object[] { countexpCount, (.currentTimeMillis() - startTime)});
                }
            } catch (Exception e) {
                .warn("Problem trying to fetch syncPull entry {}/{} (header-length: {}, length: {}): ({}) {}",
                        new Object[] { count+1, expCountheaderLengthpayloadLengthe.getClass().getName(), e.getMessage() } );
                .sleep();
                ++fails;
            } finally {
                if (in != null) {
                    try { in.close(); } catch (Exception e) { // shouldn't really happen
                        .warn("Failed to close HTTP stream: {}"e.getMessage());
                    }
                }
            }
        } while (fails <  && !missingEntries.isEmpty() && tries < );
        if (fails > 0) {
            .addAndGet(fails);
        }
        
        return syncedUpTo;
    }
            int maxEntriesAtomicInteger expectedPayloadSize)
    {
        SyncPullRequest req = new SyncPullRequest();
        Iterator<SyncListResponseEntryit = missingEntries.iterator();
        SyncListResponseEntry entry = it.next();
        req.addEntry(entry.key);
        long expSize = entry.size;
        while (it.hasNext() && req.size() < maxEntries) {
            entry = it.next();
            expSize += entry.size;
            if (expSize > ) {
                expSize -= entry.size;
                break;
            }
            req.addEntry(entry.key);
        }
        expectedPayloadSize.set((intexpSize);
        return req;
    }
    
    
Helper method called to figure out how long to sleep before doing next syncList call. Note that sleep times are rather arbitrary: we hope to be able to better tune these in future.

Parameters:
listedCount number of 'newly inserted' entries that were returned
Number of second that we are "behind" current time (note: due to grace period, will never be zero, but more like a minute or so at minimum)
    private long _calculateSleepBetweenSync(int listedCountlong secondsBehind)
    {
        // if we are behind by more than 5 minutes, or get "full" response, no delay:
        if ((secondsBehind >= 300)
                || (listedCount >= .getServiceConfig().)) {
            return 0L;
        }
        // otherwise nominal delay
        return 50L;
    }
    
    
Method that does the heavy lifting of pulling a single synchronized entry.
    private void _pullEntry(SyncListResponseEntry reqEntrySyncPullEntry header,
            InputStream in)
        throws IOException
    {
        final StorableKey key = header.key;
        /* first things first: either read things in memory (for inline inclusion),
         * or pipe into a file.
         */
        long expSize = header.storageSize;
        // Sanity check: although rare, deletion could have occured after we got
        // the initial sync list, so:
        if (header.isDeleted) {
            .softDelete(keytruetrue);
            return;
        }
        StorableCreationResult result;
        StorableCreationMetadata stdMetadata = new StorableCreationMetadata(header.compression,
                header.checksumheader.checksumForCompressed);
        stdMetadata.uncompressedSize = header.size;
        stdMetadata.storageSize = header.storageSize;
                header.lastAccessMethodheader.minTTLSecsheader.maxTTLSecs);
        // although not 100% required, we can simplify handling of smallest entries
        if (expSize <= .getServiceConfig()..) { // inlineable
            ByteContainer data;
            if (expSize == 0) {
                data = ByteContainer.emptyContainer();
            } else {
                byte[] bytes = new byte[(intexpSize];
                int len = IOUtil.readFully(inbytes);
                if (len < expSize) {
                    throw new IOException("Unexpected end-of-input: got "+len+" bytes; needed "+expSize);
                }
                data = ByteContainer.simple(bytes);
            }
            result = .insert(keydatastdMetadatacustomMetadata);
        } else {
            /* 21-Sep-2012, tatu: Important -- we must ensure that store only reads
             *   bytes that belong to the entry payload. The easiest way is by adding
             *   a wrapper stream that ensures this...
             */
            BoundedInputStream bin = new BoundedInputStream(instdMetadata.storageSizefalse);
            result = .insert(keybinstdMetadatacustomMetadata);
            if (result.succeeded() && !bin.isCompletelyRead()) { // error or warning?
                Storable entry = result.getNewEntry();
                long ssize = (entry == null) ? -1L : entry.getStorageLength();
                .warn("Problems with sync-pull for '{}': read {} bytes, should have read {} more; entry storageSize: {}",
                        new Object[] { header.keybin.bytesRead(), bin.bytesLeft(), ssize });
            }
        }
        // should we care whether this was redundant or not?
        if (!result.succeeded()) {
            if (result.getPreviousEntry() != null) {
                // most likely ok: already had the entry
                .info("Redundant sync-pull for '{}': entry already existed locally"header.key);
            } else {
                // should this add to 'failCount'? For now, don't
                .warn("Failed sync-pull for '{}': no old entry. Strange!"header.key);
            }
        }
    }
    protected final boolean hasOverlap(NodeState state1NodeState state2)
    {
        return state1.totalRange().overlapsWith(state2.totalRange());
    }
New to GrepCode? Check out our FAQ X