Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.fasterxml.clustermate.service.sync;
  
  import java.util.*;
  
  
 
Class that handles requests related to node-to-node synchronization process.
 
 public class SyncHandler<K extends EntryKey, E extends StoredEntry<K>>
     extends HandlerBase
 {
    
Since 'list sync' operation can potentially scan through sizable chunk of the store, let's limit actual time allowed to be spent on that. For now, 400 msecs seems reasonable.
 
     private final static long MAX_LIST_PROC_TIME_IN_MSECS = 400L;

    
End marker we use to signal end of response
 
     public final static int LENGTH_EOF = 0xFFFF;
 
     public final static int MAX_HEADER_LENGTH = 0x7FFF;
 
     /*
     /**********************************************************************
     /* Helper objects
     /**********************************************************************
      */
     
     protected final ClusterViewByServerUpdatable _cluster;
 
     protected final Stores<K,E> _stores;
 
     protected final StoredEntryConverter<K,E,?> _entryConverter;
 
     protected final EntryKeyConverter<K> _keyConverter;
     
     protected final FileManager _fileManager;
 
     protected final TimeMaster _timeMaster;
 
     // // Helpers for JSON/Smile:
     
     protected final ObjectWriter _syncListJsonWriter;
     
     protected final ObjectWriter _syncListSmileWriter;
 
     protected final ObjectWriter _syncPullSmileWriter;
     
     protected final ObjectWriter _errorJsonWriter;
     
     protected final ObjectReader _jsonSyncPullReader;
 
     /*
     /**********************************************************************
     /* Configuration
     /**********************************************************************
      */

    
We will list entries up until N seconds from current time; this to reduce likelihood that we see an entry we do not yet have, but are about to be sent by client; that is, to reduce hysteresis caused by differing arrival times of entries.
 
     protected final long _cfgSyncGracePeriodMsecs;
 
     protected final long _cfgMaxTimeToLiveMsecs;
 
     protected final long _cfgMaxLongPollTimeMsecs;
    
    
We will limit number of entries listed in couple of ways; count limitation is mostly to limit response message size. Note that this limit must sometimes be relaxed when there are blocks of entries with identical last-modified timestamp.
    protected final int _maxToListPerRequest;
    /*
    /**********************************************************************
    /* Life-cycle
    /**********************************************************************
     */
    public SyncHandler(SharedServiceStuff stuffStores<K,E> stores,
            ClusterViewByServer cluster)
    {
        this(stuffstorescluster,
                stuff.getServiceConfig().);
    }
    public SyncHandler(SharedServiceStuff stuffStores<K,E> stores,
            ClusterViewByServer clusterint maxToListPerRequest)
    {
         = stores;
         = (ClusterViewByServerUpdatablecluster;
         = stuff.getEntryConverter();
         = stuff.getFileManager();
         = stuff.getTimeMaster();
         = stuff.getKeyConverter();
         = stuff.smileWriter();
         = stuff.smileWriter();
         = stuff.jsonReader(SyncPullRequest.class);
        // error responses always as JSON:
         = stuff.jsonWriter();
         = maxToListPerRequest;
    }
    /*
    /**********************************************************************
    /* Simple accessors
    /**********************************************************************
     */
    public ClusterViewByServer getCluster() {
        return ;
    }
    
    /*
    /**********************************************************************
    /* API, file listing
    /**********************************************************************
     */
    
    
End point clients use to find out metadata for entries this node has, starting with the given timestamp.
    @SuppressWarnings("unchecked")
    public <OUT extends ServiceResponse> OUT listEntries(ServiceRequest request, OUT response,
            Long sinceLOperationDiagnostics metadata)
        throws InterruptedExceptionStoreException
    {
        // simple validation first
        if (sinceL == null) {
            return (OUT) badRequest(response"Missing path parameter for 'list-since'");
        }
        Integer keyRangeStart = _findIntParam(request.);
        if (keyRangeStart == null) {
            return (OUT) missingArgument(response.);
        }
        Integer keyRangeLength = _findIntParam(request.);
        if (keyRangeLength == null) {
            return (OUT) missingArgument(response.);
        }
        long clusterHash = _findLongParam(request.);
        KeyRange range;
        try {
            range = .getKeySpace().range(keyRangeStartkeyRangeLength);
        } catch (Exception e) {
            return (OUT) badRequest(response"Invalid key-range definition (start '%s', end '%s'): %s",
                    keyRangeStartkeyRangeLengthe.getMessage());
        }
        /* 20-Nov-2012, tatu: We can now piggyback auto-registration by sending minimal
         *   info about caller...
         */
        IpAndPort caller = getCallerQueryParam(request);
        if (caller != null) {
            .checkMembership(caller, 0L, range);
        }
        boolean useSmile = _acceptSmileContentType(request);
        long currentTime = .currentTimeMillis();
        final long upUntil = currentTime - ;
        long since = (sinceL == null) ? 0L : sinceL.longValue();
        /* [Issue#8] Let's impose minimum timestamp to consider, to try to avoid
         * exposing potentially obsolete entries (ones that are due to cleanup
         * and will disappear anyway).
         */
        long minSince = currentTime - ;
        if (minSince > since) {
            .warn("Sync list 'since' argument of {} updated to {}, to use maximum TTL of {}",
                    sinceminSince);
            since = minSince;
        }
        
        /* One more thing: let's sanity check that our key range overlaps request
         * range. If not, can avoid (possibly huge) database scan.
         */
        NodeState localState = .getLocalState();
        final SyncListResponse<E> resp;
        KeyRange localRange = localState.totalRange();
        if (localRange.overlapsWith(range)) {
            resp = _listEntries(rangesinceupUntil);
    /*
System.err.println("Sync for "+_localState.getRangeActive()+" (slice of "+range+"); between "+sinceL+" and "+upUntil+", got "+entries.size()+"/"
+_stores.getEntryStore().getEntryCount()+" entries... (time: "+_timeMaster.currentTimeMillis()+")");
*/
        } else {
            .warn("Sync list request by {} for range {}; does not overlap with local range of {}; skipping",
                    callerrangelocalRange);
            resp = SyncListResponse.emptyResponse();
        }
        if (metadata != null) {
            metadata = metadata.setItemCount(resp.size());
        }
        long currentHash = .getHashOverState();
        resp.setClusterHash(currentHash);
        ClusterStatusMessage clusterStatus = (clusterHash == 0L || clusterHash != currentHash) ?
                .asMessage() : null;
        resp.setClusterStatus(clusterStatus);                
        final ObjectWriter w = useSmile ?  : ;
        final String contentType = useSmile ? ..toString() : ..toString();
        
        return (OUT) response.ok(new StreamingEntityImpl(wresp))
                .setContentType(contentType);
    }
    
    /*
    /**********************************************************************
    /* API, direct content download
    /* (no (un)compression etc)
    /**********************************************************************
     */

    
Access endpoint used by others nodes to 'pull' data for entries they are missing. Note that request payload must be JSON; could change to Smile in future if need be.
    @SuppressWarnings("unchecked")
    public <OUT extends ServiceResponse> OUT pullEntries(ServiceRequest request, OUT response,
            InputStream in,
            OperationDiagnostics metadatathrows StoreException
    {
        SyncPullRequest requestEntity = null;
        try {
            requestEntity = .readValue(in);
        } catch (Exception e) {
            return (OUT) badRequest(response"JSON parsing error: %s"e.getMessage());
        }
        // Bit of validation, as unknown props are allowed:
        if (requestEntity.hasUnknownProperties()) {
            .warn("Unrecognized properties in SyncPullRequest: "+requestEntity.unknownProperties());
        }
        
        List<StorableKeyids = requestEntity.entries;
        ArrayList<E> entries = new ArrayList<E>(ids.size());
        StorableStore store = .getEntryStore();
        
        for (StorableKey key : ids) {
            Storable raw = store.findEntry(key);
            // note: this may give null as well; caller needs to check (converter passes null as-is)
            E entry = (E) .entryFromStorable(raw);
            entries.add(entry);
        }
        if (metadata != null) {
            metadata = metadata.setItemCount(entries.size());
        }
        return (OUT) response.ok(new SyncPullResponse<E>(entries));
    }
    /*
    /**********************************************************************
    /* Helper methods, accessing entries
    /**********************************************************************
     */
    
    protected SyncListResponse<E> _listEntries(final KeyRange inRange,
            final long sincelong upTo0final int maxCount)
        throws InterruptedExceptionStoreException
    {
        final StorableStore store = .getEntryStore();
        final ArrayList<E> result = new ArrayList<E>(Math.min(100, maxCount));
        long lastSeenTimestamp = 0L;
        long clientWait = 0L; // we may instruct client to do bit of waiting before retry
        
        // let's only allow single wait; hence two rounds
        for (int round = 0; round < 2; ++round) {
            /* 19-Sep-2012, tatu: Alas, it is difficult to make this work with virtual time,
             *   tests; so for now we will use actual real system time and not virtual time.
             *   May need to revisit in future.
             */
            final long realStartTime = .realSystemTimeMillis();
            if (upTo0 >= realStartTime) { // sanity check (better safe than sorry)
                throw new IllegalStateException("Argument 'upTo' too high ("+upTo0+"): can not exceed current time ("
                        +realStartTime);
            }
            /* one more thing: need to make sure we can skip entries that
             * have been updated concurrently (rare, but possible).
             */
            long oldestInFlight = store.getOldestInFlightTimestamp();
            if (oldestInFlight != 0L) {
                if (upTo0 > oldestInFlight) {
                    // since it's rare, add INFO logging to see if ever encounter this
                    .info("Oldest in-flight ({}) higher than upTo ({}), use former as limit",
                            oldestInFlightupTo0);
                    upTo0 = oldestInFlight;
                }
            }
            final long upTo = upTo0;
            final long processUntil = realStartTime + ;
    
            LastModLister<K,E> cb = new LastModLister<K,E>(inRange,
                    sinceupToprocessUntilmaxCountresult);
            IterationResult r = .getEntryStore().iterateEntriesByModifiedTime(cbsince);
            // "timeout" is indicated by termination at primary key:
            if (r == .) {
                lastSeenTimestamp = cb.getLastSeenTimestamp();
                int totals = cb.getTotal();
                long msecs = .realSystemTimeMillis() - realStartTime;
                double secs = msecs / 1000.0;
                .warn(String.format("Had to stop processing 'listEntries' after %.2f seconds; scanned through %d entries, collected %d entries",
                        secstotalsresult.size()));
                break;
            }
            // No waiting if there are results
            if (result.size() > 0) {
                if (r == .) { // means we got through all data (nothing to see)
                    // Oh. Also, if we got this far, we better update last-seen timestamp;
                    // otherwise we'll be checking same last entry over and over again
                    lastSeenTimestamp = upTo;
                } else {
                    lastSeenTimestamp = cb.getLastSeenTimestamp();
                }
                break;
            }
            
            // Behavior for empty lists differs between first and second round.
            // During first round, we will try bit of server-side sleep (only)
            if (round == 0) {
                long delay;
                if (r == .) {
                    // and running out of valid data by terminating for timestamp; if so, we have seen
                    // a later timestamp, but one that's not out of sync grace period yet
                    long targetTime = (cb.getNextTimestamp() + );
                    delay = targetTime - .currentTimeMillis();
                } else if (r == .) { // means we got through all data (nothing to see)
                    lastSeenTimestamp = upTo;
                    delay = ;
                } else { // this then should be TERMINATED_FOR_ENTRY, which means entries were added, no need for delay
                    break;
                }
                if (delay <= 0L) { // sanity check, should not occur
                    .warn("No SYNCs to list, but calculated delay is {}, which is invalid; ignoring"delay);
                } else {
//LOG.warn("Server long-poll wait: {} msecs", delay);
                    Thread.sleep(Math.min(delay));
                }
            } else {
                // and during second round, inform client how long it should sleep
                if (r == .) {
                    long targetTime = (cb.getNextTimestamp() + );
                    clientWait = targetTime - .currentTimeMillis();
                } else if (r == .) {
                    lastSeenTimestamp = upTo;
                    clientWait = ;
                } // otherwise should be TERMINATED_FOR_ENTRY, which means entries were added, no need for delay
//LOG.warn("Server setting clientWait at {} msecs", clientWait);
                if (clientWait < 0L) { // sanity check, should not occur
                    .warn("No SYNCs to list, but calculated client-delay is {}, which is invalid"clientWait);
                }
            }
        }
        SyncListResponse<E> resp = new SyncListResponse<E>(result);
        // one more twist; if no entries found, can sync up to 'upUntil' time...
        if (result.size() == 0 && upTo0 > lastSeenTimestamp) {
            lastSeenTimestamp = upTo0-1;
        }
        resp.setLastSeenTimestamp(lastSeenTimestamp);
        if (clientWait > 0L) {
            resp.setClientWait(clientWait);
        }
        return resp;
    }
    
    /*
    /**********************************************************************
    /* Helper methods, other
    /**********************************************************************
     */
    @SuppressWarnings("unchecked")
    @Override
    protected <OUT extends ServiceResponse> OUT _badRequest(ServiceResponse responseString msg) {
        return (OUT) response.badRequest(new SyncListResponse<E>(msg)).setContentTypeJson();
    }
    /*
    /**********************************************************************
    /* Helper classes for callback handling
    /**********************************************************************
     */
    static class LastModLister<K extends EntryKey, E extends StoredEntry<K>>
        extends StorableLastModIterationCallback
    {
        private final TimeMaster _timeMaster;
        private final StoredEntryConverter<K,E,?> _entryConverter;
        // // Limits
        
        private final KeyRange _inRange;
        private final EntryKeyConverter<K> _keyConverter;
        private final long _since_upTo;
        private final long _processUntil;
        private final int _maxCount;
        
        // // Temporary values
        
        private K key = null;
        // // Result values
        
        private int _total = 0;
        private final ArrayList<E> _result;
        
        // last timestamp traversed that was in legal timestamp range
        private long _lastSeenValidTimestamp;
        // first timestamp out of valid range
        private long _nextTimestamp;
        // to ensure List advances timestamp:
        private boolean _timestampHasAdvanced = false;
        
        public LastModLister(TimeMaster timeMasterStoredEntryConverter<K,E,?> entryConverter,
                KeyRange inRangelong sincelong upTolong processUntilint maxCount,
                ArrayList<E> result)
        {
             = timeMaster;
             = entryConverter;
             = entryConverter.keyConverter();
             = inRange;
             = since;
             = upTo;
             = processUntil;
             = maxCount;
             = result;
        }
        
        /* We can do most efficient checks for timestamp range by
         * verifying timestamp first, right off the index we are
         * using...
         */
        @Override
        public IterationAction verifyTimestamp(long timestamp) {
            if (timestamp > ) {
                /* 21-Sep-2012, tatu: Should we try to approximate latest
                 *  possible "lastSeen" timestamp here? As long as we avoid
                 *  in-flight-modifiable things, it would seem possible.
                 *  However, let's play this safe for now.
                 */
                 = timestamp;
                return .;
            }
            // First things first: we do want to know last seen entry that's "in range"
             = timestamp;
             |= (timestamp > );
            return .;
        }
        
        // Most of filtering can actually be done with just keys...
        @Override public IterationAction verifyKey(StorableKey rawKey) {
            // check time limits every 64 entries processed
            if ((++ & 0x3F) == 0) {
                if ( &&
                        .realSystemTimeMillis() > ) {
                    return .;
                }
            }
            // and then verify that we are in range...
             = .rawToEntryKey(rawKey);
            int hash = .routingHashFor();
            if (.contains(hash)) {
                return .;
            }
            return .;
        }
        @Override
        public IterationAction processEntry(Storable storable)
        {
            E entry = .entryFromStorable(storable);
            .add(entry);
            /* One limitation, however; we MUST advance timer beyond initial
             * 'since' time. This may require including more than 'max' entries.
             */
            if ( && .size() >= ) {
                return .;
            }
            return .;
        }
        public int getTotal() { return ; }
        public long getLastSeenTimestamp() { return ; }
        public long getNextTimestamp() { return ; }
    }
New to GrepCode? Check out our FAQ X