Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   /*-
    * See the file LICENSE for redistribution information.
    *
    * Copyright (c) 2002, 2013 Oracle and/or its affiliates.  All rights reserved.
    *
    */
   
   package com.sleepycat.je.rep.impl.node;
   
  import static com.sleepycat.je.rep.impl.node.ReplicaStatDefinition.N_LAG_CONSISTENCY_WAITS;
  import static com.sleepycat.je.rep.impl.node.ReplicaStatDefinition.N_LAG_CONSISTENCY_WAIT_MS;
  import static com.sleepycat.je.rep.impl.node.ReplicaStatDefinition.N_VLSN_CONSISTENCY_WAITS;
  import static com.sleepycat.je.rep.impl.node.ReplicaStatDefinition.N_VLSN_CONSISTENCY_WAIT_MS;
  
  import java.net.Socket;
  import java.util.TreeMap;
  
The Replica class is the locus of the replay operations and replica transaction consistency tracking and management operations at a replica node. A single instance of this class is created when the replication node is created and exists for the lifetime of the replication node, although it is only really used when the node is operating as a Replica. Note that the Replica (like the FeederManager) does not have its own independent thread of control; it runs in the RepNode's thread.
  
  public class Replica {
  
      /* The Node to which the Replica belongs. */
      private final RepNode repNode;
      private final RepImpl repImpl;
  
      /* The replay component of the Replica */
      private final Replay replay;
  
      /* The exception that provoked the replica exit. */
      private Exception shutdownException = null;
  
      /* It's non null when the loop is active. */
      private NamedChannelWithTimeout replicaFeederChannel = null;
  
      /* The consistency component. */
      private final ConsistencyTracker consistencyTracker;

    
The latest txn-ending (commit or abort) VLSN that we have on this replica.
 
     private VLSN txnEndVLSN;
 
     /*
      * A test delay introduced in the replica loop to simulate a loaded
      * replica. The replica waits this amount of time before processing each
      * message.
      */
     private int testDelayMs = 0;
 
     /* For testing only - mimic a network partition. */
     private boolean dontProcessStream = false;
 
     /* Number of times to retry on a network connection failure. */
     private static final int NETWORK_RETRIES = 2 ;
 
     /*
      * Service unavailable retries. These are typically the result of service
      * request being made before the node is ready to provide them. For
      * example, the feeder service is only available after a node has
      * transitioned to becoming the master.
      */
     private static final int SERVICE_UNAVAILABLE_RETRIES = 10;
 
     /*
      * The number of ms to wait between above retries, allowing time for the
      * master to assume its role, and start listening on its port.
      */
     private static final int CONNECT_RETRY_SLEEP_MS = 1000;
 
     /* The protocol instance if one is currently in use by the Replica. */
     private Protocol protocol = null;
 
     /*
      * Protocol statistics aggregated across all past protocol instantiations.
      * It does not include the statistics for the current Protocol object in
      * use. A node can potentially go through the Replica state multiple time
      * during it's lifetime. This instance aggregates replica statistics
      * across all transitions into and out of the Replica state.
      */
     private final StatGroup aggProtoStats;
 
     /*
      * Holds the exception that is thrown to indicate that an election is
      * needed before a hard recovery can proceed. It's set to a non-null value
      * when the need for a hard recovery is first discovered and is
      * subsequently cleared after an election is held and before the next
      * attempt at a syncup with the newly elected master. The election ensures
      * that the master being used for an actual rollback is current and is not
      * an isolated master that is out of date, due to a network partition that
      * has since been resolved.
      */
 
     /* For testing only. */
 
     /*
      * A cache of DatabaseImpls for the Replay to speed up DbTree.getId().
      * Cleared/invalidated by a heartbeat or if je.rep.dbIdCacheOpCount
      * operations have gone by, or if any replay operations on Name LNs are
      * executed.
      */
     private final DbCache dbCache;
 
     private final Logger logger;
    
The number of times a message entry could not be inserted into the queue within the poll period and had to be retried.
 
     private final LongStat nMessageQueueOverflows;
 
     Replica(RepNode repNodeReplay replay) {
         this. = repNode;
         this. = repNode.getRepImpl();
         DbConfigManager configManager = repNode.getConfigManager();
          = new DbCache(.getDbTree(),
                               configManager.getInt
                                   (.),
                                   configManager.getDuration
                                   (.));
 
          = new ConsistencyTracker();
         this. = replay;
          = LoggerUtils.getLogger(getClass());
          =
             new StatGroup(.,
                           .);
          = replay.getMessageQueueOverflows();
          =
             repNode.getConfigManager().getInt(.);
          = ;
     }

    
Shutdown the Replica, free any threads that may have been waiting for the replica to reach some degree of consistency. This method is only invoked as part of the repnode shutdown. If the shutdown is being executed from a different thread, it attempts to interrupt the thread by first shutting down the channel it may be waiting on for input from the feeder. The replica thread should notice the channel shutdown and/or the shutdown state of the rep node itself. The caller will use harsher methods, like an interrupt, if the rep node thread (Replica or Feeder) is still active.
 
     public void shutdown() {
         if (!.isShutdown()) {
             throw EnvironmentFailureException.unexpectedState
                 ("Rep node must have initiated the shutdown.");
         }
         .shutdown();
         if (Thread.currentThread() == ) {
             return;
         }
 
         /*
          * Perform the actions to provoke a "soft" shutdown.
          *
          * Since the replica shares the RepNode thread, it will take care of
          * the actual thread shutdown itself.
          */
 
         /*
          * Shutdown the channel as an attempt to interrupt just the socket
          * read/write operation.
          */
         RepUtils.shutdownChannel();
 
         /*
          * Clear the latch in case the replica loop is waiting for the outcome
          * of an election.
          */
     }

    
For unit testing only!
 
     public void setTestDelayMs(int testDelayMs) {
         this. = testDelayMs;
     }

    
For unit testing only!
 
     public void setDontProcessStream() {
          = true;
     }
 
     public Replay replay() {
         return ;
     }
 
     public DbCache getDbCache() {
         return ;
     }
 
         return ;
     }
 
         return .getChannel();
     }
 
     Protocol getProtocol() {
         return ;
     }

    
Returns the last commit VLSN at the master, as known at the replica.

Returns:
the commit VLSN
 
     public long getMasterTxnEndVLSN() {
         return .getMasterTxnEndVLSN();
     }

    
The core control loop when the node is serving as a Replica. Note that if a Replica is also serving the role of a feeder, it will run additional feeder loops in separate threads. The loop exits when it encounters one of the following possible conditions: 1) The connection to the master can no longer be maintained, due to connectivity issues, or because the master has explicitly shutdown its connections due to an election. 2) The node becomes aware of a change in master, that is, assertSync() fails. 3) The loop is interrupted, which is interpreted as a request to shutdown the replication node as a whole. 4) It fails to establish its node information in the master as it attempts to join the replication group for the first time. Normal exit from this run loop results in the rep node retrying an election and continuing in its new role as determined by the outcome of the election. A thrown exception, on the other hand, results in the rep node as a whole terminating its operation and no longer participating in the replication group, that is, it enters the DETACHED state. Note that the in/out streams are handled synchronously on the replica, while they are handled asynchronously by the Feeder.

Throws:
java.lang.InterruptedException
RestoreFailedException
com.sleepycat.je.DatabaseException if the environment cannot be closed/for a re-init
com.sleepycat.je.rep.GroupShutdownException
 
     void runReplicaLoop()
         throws InterruptedException,
                DatabaseException,
                GroupShutdownException {
 
         Class<? extends RetryExceptionretryExceptionClass = null;
         int retryCount = 0;
         try {
 
             while (true) {
                 try {
                     runReplicaLoopInternal();
                     /* Normal exit */
                     break;
                 } catch (RetryException e) {
                     if (!.getMasterStatus().inSync()) {
                         LoggerUtils.fine(,
                                          "Retry terminated, out of sync.");
                         break;
                     }
                     if ((e.getClass() == retryExceptionClass) ||
                         (e.retries == 0)) {
                         if (++retryCount >= e.retries) {
                             /* Exit replica retry elections */
                             LoggerUtils.info
                                 (,
                                  "Failed to recover from exception: " +
                                  e.getMessage() + ", despite " + e.retries +
                                  " retries.\n" +
                                  LoggerUtils.getStackTrace(e));
                             break;
                         }
                     } else {
                         retryCount = 0;
                         retryExceptionClass = e.getClass();
                     }
                     LoggerUtils.info("Retry #: " +
                                      retryCount + "/" + e.retries +
                                      " Will retry replica loop after " +
                                      e.retrySleepMs + "ms. ");
                     Thread.sleep(e.retrySleepMs);
                     if (!.getMasterStatus().inSync()) {
                         break;
                     }
                 }
             }
         } finally {
             /*
              * Reset the rep node ready latch unless the replica is not ready
              * because it's going to hold an election before proceeding with
              * hard recovery and joining the group.
              */
             if ( == null) {
                 .resetReadyLatch();
             }
         }
         /* Exit use elections to try a different master. */
     }
 
     private void runReplicaLoopInternal()
         throws RestartRequiredException,
                InterruptedException,
                RetryException,
                InsufficientLogException {
 
          = null;
         LoggerUtils.info(,
                          "Replica loop started with master: " +
                          .getMasterStatus().getNodeMasterNameId());
         if ( > 0) {
             LoggerUtils.info(,
                              "Test delay of: " +  + "ms." +
                              " after each message sent");
         }
         try {
             initReplicaLoop();
             doRunReplicaLoopInternalWork();
         } catch (RestartRequiredException rre) {
              = rre;
             throw rre;
         } catch (ClosedByInterruptException closedByInterruptException) {
             if (.isShutdown()) {
                 LoggerUtils.info(,
                                  "Replica loop interrupted for shutdown.");
                 return;
             }
             LoggerUtils.warning(,
                                 "Replica loop unexpected interrupt.");
             throw new InterruptedException
                 (closedByInterruptException.getMessage());
         } catch (IOException e) {
 
             /*
              * Master may have changed with the master shutting down its
              * connection as a result. Normal course of events, log it and
              * return to the outer node level loop.
              */
             LoggerUtils.info(,
                              "Replica IO exception: " + e.getMessage() +
                              "\n" + LoggerUtils.getStackTrace(e));
         } catch (RetryException e) {
             /* Propagate it outwards. Node does not need to shutdown. */
             throw e;
         } catch (GroupShutdownException e) {
              = e;
             throw e;
         } catch (RuntimeException e) {
              = e;
             LoggerUtils.severe(,
                                "Replica unexpected exception " + e +
                                 " " + LoggerUtils.getStackTrace(e));
             throw e;
         } catch (MasterSyncException e) {
             /* expected change in masters from an election. */
             LoggerUtils.info(e.getMessage());
         } catch (HardRecoveryElectionException e) {
             /*
              * Exit the replica loop so that elections can be held and the
              * master confirmed.
              */
              = e;
             LoggerUtils.info(e.getMessage());
         } catch (Exception e) {
              = e;
             LoggerUtils.severe(,
                                "Replica unexpected exception " + e +
                                " " + LoggerUtils.getStackTrace(e));
             throw EnvironmentFailureException.unexpectedException(e);
         } finally {
             loopExitCleanup();
         }
     }
 
     protected void doRunReplicaLoopInternalWork()
        throws Exception {
 
         final int timeoutMs = .getConfigManager().
                 getDuration(.);
         .setTimeoutMs(timeoutMs);
 
         ReplayThread replayThread = new ReplayThread("ReplayThread");
         replayThread.start();
 
         try {
             while (true) {
                 Message message = .read();
 
                 if (.isShutdown() || (message == null)) {
                     return;
                 }
 
                 while (!replayThread.messageQueue.
                         offer(message,
                               .,
                               .)) {
                     /* Offer timed out. */
                     if (!replayThread.isAlive()) {
                         return;
                     }
                     /* Retry the offer */
                     .increment();
                 }
             }
         } catch (IOException ioe) {
             /*
              * Make sure messages in the queue are processed. Ensure, in
              * particular, that shutdown requests are processed and not ignored
              * due to the IOEException resulting from a closed connection.
              */
             replayThread.exitRequest = .;
         } finally {
 
             if (replayThread.exitRequest == .) {
                 /*
                  * Drain all queued messages, exceptions may be generated
                  * in the process. They logically precede IO exceptions.
                  */
                 replayThread.join();
             }
 
             if (replayThread.exception != null) {
                 /* replay thread is dead or exiting. */
                 throw replayThread.exception;
             }
 
             /* Ensure thread has exited in all circumstances */
             replayThread.exitRequest = .;
             replayThread.join();
         }
     }

    
Process the shutdown message from the master and return the GroupShutdownException that must be thrown to exit the Replica loop.

Returns:
the GroupShutdownException
 
         throws IOException {
 
         /*
          * Acknowledge the shutdown message right away, since the checkpoint
          * operation can take a long time to complete. Long enough to exceed
          * the feeder timeout on the master. The master only needs to know that
          * the replica has received the message.
          */
 
         /*
          * Turn off network timeouts on the replica, since we don't want the
          * replica to timeout the connection. The connection itself is no
          * longer used past this point and will be reclaimed as part of normal
          * replica exit cleanup.
          */
 
         /*
          * TODO: Share the following code with the standalone Environment
          * shutdown, or better yet, call EnvironmentImpl.doClose here.
          */
 
         /*
          * Begin shutdown of the deamons before checkpointing.  Cleaning during
          * the checkpoint is wasted and slows down the checkpoint, plus it may
          * cause additional checkpoints.
          */
 
         /*
          * Now start a potentially long running checkpoint.
          */
         LoggerUtils.info("Checkpoint initiated.");
         CheckpointConfig config = new CheckpointConfig();
         config.setForce(true);
         config.setMinimizeRecoveryTime(true);
         .getRepImpl().invokeCheckpoint(config"Group Shutdown");
         /* Force final shutdown of the daemons. */
         .getRepImpl().shutdownDaemons();
         LoggerUtils.info("Checkpoint completed.");
 
         return new GroupShutdownException(,
                                           ,
                                           shutdown.getShutdownTimeMs());
     }

    
Initialize for replica loop entry, which involves completing the following steps successfully: 1) The replica feeder handshake. 2) The replica feeder syncup. 3) Processing the first heartbeat request from the feeder.
 
     private void initReplicaLoop()
         throws IOException,
                ConnectRetryException,
                DatabaseException,
                ProtocolException,
                InterruptedException,
                HardRecoveryElectionException {
 
         createReplicaFeederChannel();
         ReplicaFeederHandshake handshake =
             new ReplicaFeederHandshake();
          = handshake.execute();
         .notifyReplicaConnected();
 
         final boolean hardRecoveryNeedsElection;
 
         if ( != null) {
             LoggerUtils.info(,
                              "Replica syncup after election to verify master:"+
                              .getMaster() +
                              " elected master:" +
                              .getMasterStatus().getNodeMasterNameId());
             hardRecoveryNeedsElection = false;
         } else {
             hardRecoveryNeedsElection = true;
         }
          = null;
 
         ReplicaFeederSyncup syncup =
             new ReplicaFeederSyncup(,
                                     hardRecoveryNeedsElection);
         syncup.execute(.getCBVLSNTracker());
 
          = syncup.getMatchedVLSN();
         long matchedTxnEndTime = syncup.getMatchedVLSNTime();
                                   matchedTxnEndTime);
         Protocol.Heartbeat heartbeat =
             .read(.getChannel(),
                           Protocol.Heartbeat.class);
         processHeartbeat(heartbeat);
         long replicaDelta = .getMasterTxnEndVLSN() -
         LoggerUtils.info(, String.format
                          ("Replica initialization completed. Replica VLSN: %s "
                           + " Heartbeat master commit VLSN: %,d " +
                           "VLSN delta: %,d",
                           .,
                           .getMasterTxnEndVLSN(),
                           replicaDelta));
 
         /*
          * The replica is ready for business, indicate that the node is
          * ready by counting down the latch and releasing any waiters.
          */
         .getReadyLatch().countDown();
     }

    
Process a heartbeat message. It sends back a response and updates the consistency tracker with the information in the heartbeat.

Parameters:
channel
heartbeat
Throws:
java.io.IOException
 
     private void processHeartbeat(NamedChannel namedChannel,
                                   Heartbeat heartbeat)
         throws IOException {
 
                        (.getCBVLSNTracker().getBroadcastCBVLSN(),
                        ),
                        namedChannel);
         .trackHeartbeat(heartbeat);
     }

    
Performs the cleanup actions upon exit from the internal replica loop.

Parameters:
replicaFeederChannel
 
     private void loopExitCleanup() {
 
         if ( != null) {
             if ( instanceof RetryException) {
                 LoggerUtils.info(,
                                  "Retrying connection to feeder. Message: " +
                                  .getMessage());
             } else if ( instanceof GroupShutdownException) {
                 LoggerUtils.info(,
                                  "Exiting inner Replica loop." +
                                  " Master requested shutdown.");
             } else {
                 LoggerUtils.warning
                     (,
                      "Exiting inner Replica loop with exception " +
                       + "\n" +
                      LoggerUtils.getStackTrace());
             }
         } else {
             LoggerUtils.info("Exiting inner Replica loop." );
         }
 
         clearDbTreeCache();
         RepUtils.shutdownChannel();
 
         if ( != null) {
             .logStats();
         }
 
         /* Sum up statistics for the loop. */
         if ( != null) {
         }
          = null;
     }
 
     /*
      * Clear the DatabaseId -> DatabaseImpl cache used to speed up DbTree
      * lookup operations.
      */
     void clearDbTreeCache() {
         .clear();
     }

    
Invoked when this node transitions to the master state. Aborts all inflight replay transactions outstanding from a previous state as a Replica, because they were initiated by a different master and will never complete. Also, release any Replica transactions that were waiting on consistency policy requirements.
 
     void masterTransitionCleanup()
         throws DatabaseException {
          = null;
         .abortOldTxns();
             (new MasterStateException(.getRepImpl().
                                       getStateChangeEvent()));
     }

    
Returns a channel used by the Replica to connect to the Feeder. The socket is configured with a read timeout that's a multiple of the heartbeat interval to help detect, or initiate a change in master.

 
     private void createReplicaFeederChannel()
         throws IOExceptionConnectRetryException {
 
         SocketChannel channel = SocketChannel.open();
 
         final DbConfigManager configManager = .getConfigManager();
         final int timeoutMs = configManager.
             getDuration(.);
 
          =
             new NamedChannelWithTimeout(channeltimeoutMs);
 
         final Socket socket = channel.socket();
 
         final int receiveBufferSize =
                 configManager.getInt(.);
 
         if (receiveBufferSize > 0) {
             socket.setReceiveBufferSize(receiveBufferSize);
         }
 
         /*
          * Note that soTimeout is not set since it's a blocking channel and
          * setSoTimeout has no effect on a blocking nio channel.
          */
         channel.configureBlocking(true);
 
         /*
          * Push responses out rapidly, they are small (heart beat or commit
          * response) and need timely delivery to the master.
          */
         socket.setTcpNoDelay(true);
 
         try {
             final int openTimeout = configManager.
                 getDuration(.);
             socket.connect(.getMasterStatus().getNodeMaster(),
                            openTimeout);
             ServiceDispatcher.doServiceHandshake
                 (channel.);
         } catch (ConnectException e) {
 
             /*
              * A network problem, or the node went down between the time we
              * learned it was the master and we tried to connect.
              */
             throw new ConnectRetryException(e.getMessage(),
                                             ,
                                             );
         } catch (ServiceConnectFailedException e) {
 
             /*
              * The feeder may not have established the Feeder Service
              * as yet. For example, the transition to the master may not have
              * been completed. Wait longer.
              */
            if (e.getResponse() == .) {
                throw new ConnectRetryException(e.getMessage(),
                                                ,
                                                );
            }
            throw EnvironmentFailureException.unexpectedException(e);
         }
     }

    
Returns the replay statistics associated with the Replica.

Returns:
the statistics.
 
     public StatGroup getReplayStats(StatsConfig config) {
         return .getStats(config);
     }
 
     /* Get the protocl statistics for this replica. */
     public StatGroup getProtocolStats(StatsConfig config) {
         StatGroup protoStats = .cloneGroup(config.getClear());
 
         /* Guard against concurrent modification. */
         Protocol prot = this.;
         if (prot != null) {
             /* These statistics are not ye a part of the agg statistics. */
             protoStats.addAll(prot.getStats(config));
         }
 
         return protoStats;
     }
 
     /* Get the consistency tracker stats for this replica. */
     public StatGroup getTrackerStats(StatsConfig config) {
         return .getStats(config);
     }
 
     /* Reset the stats associated with this Replica. */
     public void resetStats() {
         .resetStats();
         .clear();
         if ( != null) {
             .resetStats();
         }
         .resetStats();
     }

    
Defines the possible types of exits that can be requested from the ReplayThread.
 
     private enum ReplayExitType {
         IMMEDIATE, /* An immediate exit; ignore queued requests. */
         SOFT       /* Process pending requests in queue, then exit */
     }

    
The thread responsible for the replay of messages delivered over the replication stream. Reading and replay are done in separate threads for two reasons: 1) It allows the two activities to make independent progress. The network can be read and messages assembled even if the replay activity has stalled. 2) The two threads permit use of two cores to perform the replay thus making it less likely that cpu is the replay bottleneck.
 
     class ReplayThread extends StoppableThread {

        
The message queue used for communications between the network read thread and the replay thread.
 
         private final BlockingQueue<MessagemessageQueue;

        
Thread exit exception. It's null if the thread exited due to an exception. It's the responsibility of the main replica thread to propagate the exception across the thread boundary in this case.
 
         volatile private Exception exception;

        
Set asynchronously when a shutdown is being requested.
 
         volatile ReplayExitType exitRequest = null;
 
         /* The queue poll interval. */
         private final static int QUEUE_POLL_INTERVAL_SEC = 1;
 
         protected ReplayThread(EnvironmentImpl envImpl,
                                String threadName) {
             super(envImplthreadName);
 
             final int messageQueueSize = .getConfigManager().
                     getInt(.);
             LoggerUtils.info(,
                              "Replay thread started. Message queue size:" +
                               messageQueueSize);
              = new ArrayBlockingQueue<Message>(messageQueueSize);
         }
 
         @Override
         protected int initiateSoftShutdown() {
            /* Use immediate, since the stream will continue to be read. */
             = .;
            return 0;
         }
 
         @Override
         public void run() {
 
             final int dbTreeCacheClearingOpCount =
                 .getDbTreeCacheClearingOpCount();
 
             long opCount = 0;
 
             try {
                 while (true)  {
                     final Message message =
                             .poll(,
                                               .);
 
                     if (( == .) ||
                         (( == .) &&
                          (message == null)) ||
                          .isShutdown()) {
                         return;
                     }
 
                     .getMasterStatus().assertSync();
 
                     if (message == null) {
                         /* Timeout on poll. */
                         continue;
                     }
                     assert TestHookExecute.doHookIfSet(message);
 
                     final MessageOp messageOp = message.getOp();
 
                     if (messageOp == .) {
                         throw processShutdown((ShutdownRequestmessage);
                     }
 
                     if (messageOp == .) {
                         processHeartbeat(,
                                          (Protocol.Heartbeatmessage);
                         .tick();
                     } else {
                         /* For testing only! */
                         if () {
                             LoggerUtils.info(,
                                              "Not processing " + message);
                             continue;
                         }
 
                         .replayEntry(,
                                            ,
                                            (Protocol.Entrymessage);
 
                         /*
                          * Note: the consistency tracking is more obscure than
                          * it needs to be, because the commit/abort VLSN is set
                          * in Replay.replayEntry() and is then used below. An
                          * alternative would be to promote the following
                          * conditional to a level above, so commit/abort
                          * operations get their own replay method which does
                          * the consistency tracking.
                          */
                         if (((Protocol.Entrymessage).isTxnEnd()) {
                              = .getLastReplayedVLSN();
                             .trackTxnEnd();
                         }
                         .trackVLSN();
                     }
 
                     if ( > 0) {
                         Thread.sleep();
                     }
 
                     if (opCount++ % dbTreeCacheClearingOpCount == 0) {
                         clearDbTreeCache();
                     }
                 }
             } catch (Exception e) {
                  = e;
                 /*
                  * Bring it to the attention of the main thread by freeing
                  * up the "offer" wait right away.
                  */
                 .clear();
 
                 /*
                  * Get the attention of the main replica thread in case it's
                  * waiting in a read on the socket channel.
                  */
                 RepUtils.shutdownChannel();
 
                 LoggerUtils.info(,
                                  "Replay thread exiting with exception:" +
                                   e.getMessage());
             }
         }
 
         @Override
         protected Logger getLogger() {
             return ;
         }
     }

    
Tracks the consistency of this replica wrt the Master. It provides the mechanisms that will cause a beginTransaction() or a joinGroup() to wait until the specified consistency policy is satisfied.
 
     public class ConsistencyTracker {
         private final long NULL_VLSN_SEQUENCE = ..getSequence();
 
         /*
          * Initialized by the Feeder handshake and updated by commit replays.
          * All access to lastReplayedXXXX must be synchronized on the
          * ConsistencyTracker itself.
          */
         private long lastReplayedTxnVLSN = ;
        private VLSN lastReplayedVLSN = .;
        private long masterTxnEndTime = 0l;
        /* Updated by heartbeats */
        private long masterTxnEndVLSN;
        private long masterNow = 0l;
        private final StatGroup stats =
            new StatGroup(.,
                          .);
        private final LongStat nLagConsistencyWaits =
            new LongStat();
        private final LongStat nLagConsistencyWaitMs =
            new LongStat();
        private final LongStat nVLSNConsistencyWaits =
            new LongStat();
        private final LongStat nVLSNConsistencyWaitMs =
            new LongStat();
        private final OrderedLatches vlsnLatches =
            new OrderedLatches(.getRepImpl()) {
                /*
                 * Note that this assumes that NULL_VLSN is -1, and that
                 * the vlsns ascend.
                 */
                @Override
                    boolean tripPredicate(long keyVLSNlong tripVLSN) {
                    return keyVLSN <= tripVLSN;
                }
            };
        private final OrderedLatches lagLatches =
            new OrderedLatches(.getRepImpl()) {
                @Override
                boolean tripPredicate(long keyLaglong currentLag) {
                    return currentLag <= keyLag;
                }
            };

        
Invoked each time after a replica syncup so that the Replica can re-establish it's consistency vis a vis the master and what part of the replication stream it considers as having been replayed.

Parameters:
matchedTxnVLSN the replica state corresponds to this txn
matchedTxnEndTime the time at which this txn was committed or aborted on the master
        void reinit(long matchedTxnVLSNlong matchedTxnEndTime) {
            this. = new VLSN(matchedTxnVLSN);
            this. = matchedTxnVLSN;
            this. = matchedTxnEndTime;
        }
        public long getMasterTxnEndVLSN() {
            return ;
        }
        void close() {
            logStats();
        }
        void logStats() {
            if (.isLoggable(.)) {
                LoggerUtils.info
                    (,
                    "Replica stats - Lag waits: " + .get() +
                     " Lag wait time: " + .get()
                     + "ms. " +
                     " VLSN waits: " + .get() +
                     " Lag wait time: " +  .get() +
                     "ms.");
            }
        }

        
Calculates the time lag in ms at the Replica.
        private long currentLag() {
            if ( == 0l) {
                /*
                 * Have not seen a heartbeat, can't determine the time lag in
                 * its absence. It's the first message sent by the feeder after
                 * completion of the handshake.
                 */
                return .;
            }
            long lag;
            if ( < ) {
                lag = System.currentTimeMillis() - ;
            } else if ( == ) {
                /*
                 * The lag is determined by the transactions (if any) that are
                 * further downstream, assume the worst.
                 */
                lag = System.currentTimeMillis() - ;
            } else {
               /* commit leapfrogged the heartbeat */
               lag = System.currentTimeMillis() - ;
            }
            return lag;
        }

        
Frees all the threads that are waiting on latches.

Parameters:
exception the exception to be thrown to explain the reason behind the latches being forced.
        synchronized void forceTripLatches(DatabaseException exception) {
            assert (exception != null);
            .trip(.exception);
            .trip(0, exception);
        }
        synchronized void trackTxnEnd() {
            Replay.TxnInfo lastReplayedTxn = .getLastReplayedTxn();
             = lastReplayedTxn.getTxnVLSN().getSequence();
             = lastReplayedTxn.getMasterTxnEndTime();
            if (( > ) &&
                ( >= )) {
                 = ;
                 = ;
            }
            /*
             * Advances both replica VLSN and commit time, trip qualifying
             * latches in both sets.
             */
            .trip(null);
            .trip(currentLag(), null);
        }
        synchronized void trackVLSN() {
             = .getLastReplayedVLSN();
            .trip(.getSequence(), null);
        }
        synchronized void trackHeartbeat(Protocol.Heartbeat heartbeat) {
             = heartbeat.getCurrentTxnEndVLSN();
             = heartbeat.getMasterNow();
            /* Trip just the time lag latches. */
            .trip(currentLag(), null);
        }
        public void lagAwait(TimeConsistencyPolicy consistencyPolicy)
            throws InterruptedException,
                   ReplicaConsistencyException,
                   DatabaseException {
            long currentLag = currentLag();
            long lag =
                consistencyPolicy.getPermissibleLag(.);
            if (currentLag <= lag) {
                return;
            }
            long waitStart = System.currentTimeMillis();
            ExceptionAwareCountDownLatch waitLagLatch =
                .getOrCreate(lag);
            await(waitLagLatchconsistencyPolicy);
            .increment();
            .add(System.currentTimeMillis() - waitStart);
        }

        
Wait until the log record identified by VLSN has gone by.
        public void awaitVLSN(long vlsn,
                              ReplicaConsistencyPolicy consistencyPolicy)
            throws InterruptedException,
                   ReplicaConsistencyException,
                   DatabaseException {
            long waitStart = System.currentTimeMillis();
            ExceptionAwareCountDownLatch waitVLSNLatch = null;
            synchronized(this) {
                final long compareVLSN =
                   (consistencyPolicy instanceof CommitPointConsistencyPolicy)?
                     :
                    .getSequence();
                if (vlsn <= compareVLSN) {
                    return;
                }
                waitVLSNLatch = .getOrCreate(vlsn);
            }
            await(waitVLSNLatchconsistencyPolicy);
            /* Stats after the await, so the counts and times are related. */
            .increment();
            .add(System.currentTimeMillis() - waitStart);
        }

        
Wait on the given countdown latch and generate the appropriate exception upon timeout.

        private void await(ExceptionAwareCountDownLatch consistencyLatch,
                           ReplicaConsistencyPolicy consistencyPolicy)
            throws ReplicaConsistencyException,
                   DatabaseException,
                   InterruptedException {
            if (!consistencyLatch.awaitOrException
                 (consistencyPolicy.getTimeout(.),
                  .)) {
                /* Timed out. */
                final boolean detached =
                    .getRepImpl().getState().isDetached();
                throw new ReplicaConsistencyException(consistencyPolicy,
                                                      detached);
            }
        }
        private StatGroup