Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*-
   * See the file LICENSE for redistribution information.
   *
   * Copyright (c) 2002, 2013 Oracle and/or its affiliates.  All rights reserved.
   *
   */
  package com.sleepycat.je.rep.stream;
  
  import static com.sleepycat.je.utilint.DbLsn.NULL_LSN;
 
 
The ReplicaSyncupReader scans the log backwards for requested log entries. The reader must track whether it has passed a checkpoint, and therefore can not used the vlsn index to skip over entries. The ReplicaSyncupReader is not thread safe, and can only be used serially. It will stop at the finishLsn, which should be set using the GlobalCBVLSN.
 
 public class ReplicaSyncupReader extends VLSNReader {
 
     /*
      * True if this particular record retrieval is for a syncable record.
      * False if the reader is looking for a specific VLSN
      */
     private boolean syncableSearch;
 
     private final LogEntry ckptEndLogEntry = 
 
     private final LogEntry commitLogEntry = 
 
     /* 
      * SearchResults retains the information as to whether the found
      * matchpoint is valid.
      */
     private final MatchpointSearchResults searchResults
 
     private final Logger logger1;
 
 
     public ReplicaSyncupReader(EnvironmentImpl envImpl,
                                VLSNIndex vlsnIndex,
                                long endOfLogLsn,
                                int readBufferSize,
                                NameIdPair nameIdPair,
                                VLSN startVLSN,
                                long finishLsn,
                                MatchpointSearchResults searchResults)
         throws DatabaseException {
 
         /*
          * If we go backwards, endOfFileLsn and startLsn must not be null.
          * Make them the same, so we always start at the same very end.
          */
         super(envImpl,
               vlsnIndex,
               false,           // forward
               endOfLogLsn,
               readBufferSize,
               nameIdPair,
               finishLsn);
 
         initScan(startVLSNendOfLogLsn);
         this. = searchResults;
          = LoggerUtils.getLogger(getClass());
     }

    
Set up the ReplicaSyncupReader to start scanning from this VLSN.
 
     private void initScan(VLSN startVLSNlong endOfLogLsn) {
 
         if (startVLSN.equals(.)) {
             throw EnvironmentFailureException.unexpectedState
                 ("ReplicaSyncupReader start can't be NULL_VLSN");
         }
 
          = endOfLogLsn;
         assert  != ;
 
        /*
         * Flush the log so that syncup can assume that all log entries that
         * are represented in the VLSNIndex  are safely out of the log buffers
         * and on disk. Simplifies this reader, so it can use the regular
         * ReadWindow, which only works on a file.
         */
        .getLogManager().flush();
         = startVLSN;
    }

    
Backward scanning for the replica's part in syncup.
    public OutputWireRecord scanBackwards(VLSN vlsn)
        throws DatabaseException {
         = false;
        VLSNRange range = .getRange();
        if (vlsn.compareTo(range.getFirst()) < 0) {
            /*
             * The requested VLSN is before the start of our range, we don't
             * have this record.
             */
            return null;
        }
         = vlsn;
        if (readNextEntry()) {
            return ;
        }
        return null;
    }

    
Backward scanning for finding an earlier candidate syncup matchpoint.
    public OutputWireRecord findPrevSyncEntry(boolean startAtPrev)
        throws DatabaseException {
         = null;
         = true;
        if (startAtPrev) {
            /* Start by looking at the entry before the current record. */
             = .getPrev();
        } else {
            LoggerUtils.info(
                             "Restart ReplicaSyncupReader at " +
                             "vlsn " + );
        }
        VLSNRange range = .getRange();
        if (.compareTo(range.getFirst()) < 0) {
            /*
             * We've walked off the end of the contiguous VLSN range.
             */
            return null;
        }
        if (readNextEntry() == false) {
            /*
             * We scanned all the way to the front of the log, no
             * other sync-able entry found.
             */
            return null;
        }
        assert LogEntryType.isSyncPoint(.getEntryType()) :
        "Unexpected log type= " + ;
        return ;
    }

    

Throw:
an EnvironmentFailureException if we were scanning for a particular VLSN and we have passed it by.
    private void checkForPassingTarget(int compareResult) {
        if (compareResult < 0) {
            /* Hey, we passed the VLSN we wanted. */
            throw EnvironmentFailureException.unexpectedState
                ("want to read " +  + " but reader at " +
                 .getVLSN());
        }
    }

    
Return true for ckpt entries, for syncable entries, and if we're in specific vlsn scan mode, any replicated entry. There is an additional level of filtering in processEntry.
    @Override
    protected boolean isTargetEntry()
        throws DatabaseException {
        if (.isLoggable(.)) {
            LoggerUtils.finest(,
                               " isTargetEntry " +  ); 
        }
        ++;
        /* Skip invisible entries. */
        if (.isInvisible()) {
            return false;
        }
        byte currentType = .getType();
        /* 
         * Return true if this entry is replicated. All entries need to be
         * perused by processEntry, when we are doing a vlsn based search,
         * even if they are not a sync point, because:
         *  (a) If this is a vlsn-based search, it's possible that the replica
         * and feeder are mismatched. The feeder will only propose a sync type
         * entry as a matchpoint but it might be that the replica has a non-
         * sync entry at that vlsn.
         *  (b) We need to note passed commits in processEntry.
         */
        if (entryIsReplicated()) {
            if () {
                if (LogEntryType.isSyncPoint(currentType)) {
                    return true;
                }
                 = .getVLSN().getPrev();
            } else {
                return true;
            }
        }
        /* 
         * We'll also need to read checkpoint end records to record their 
         * presence.
         */
        if (..equalsType(currentType)) {
            return true;
        }
        return false;
    }

    
ProcessEntry does additional filtering before deciding whether to return an entry as a candidate for matching. If this is a record we are submitting as a matchpoint candidate, instantiate a WireRecord to house this log entry. If this is a non-replicated entry or a txn end that follows the candidate matchpoint, record whatever status we need to, but don't use it for comparisons. For example, suppose the log is like this: VLSN entry 10 LN 11 commit 12 LN -- ckpt end 13 commit 14 abort And that the master only has VLSNs 1-12. The replica will suggest vlsn 14 as the first matchpoint. The feeder will counter with a suggestion of vlsn 11, since it does not have vlsn 14. At that point, the ReplicaSyncupReader will scan backwards in the log, looking for vlsn 11. Although the reader should only return an entry when it gets to vlsn 11. the reader must process commits and ckpts that follow 11, so that they can be recorded in the searchResults, so the number of rolled back commits can be accurately reported.
    @Override
    protected boolean processEntry(ByteBuffer entryBuffer) {
        if (.isLoggable(.)) {
            LoggerUtils.finest(,
                               " syncup reader saw " +  ); 
        }
        byte currentType = .getType();
        /*
         * CheckpointEnd entries are tracked in order to see if a rollback
         * must be done, but are not returned as possible matchpoints.
         */
        if (..equalsType(currentType)) {
            /* 
             * Read the entry, which both lets us decipher its contents and 
             * also advances the file reader position.
             */
                                      entryBuffer);
            if (.isLoggable(.)) {
                LoggerUtils.finest(,
                                   " syncup reader read " + 
                                    + );
            }
            if (((CheckpointEnd.getMainItem()).
                getCleanedFilesToDelete()) {
                .notePassedCheckpointEnd();
            }
            
            return false;
        }
        /* 
         * Setup the log entry as a wire record so we can compare it to
         * the entry from the feeder as we look for a matchpoint. Do this
         * before we change positions on the entry buffer by reading it.
         */
        ByteBuffer buffer = entryBuffer.slice();
        buffer.limit(.getItemSize());
         =
            new OutputWireRecord(buffer);
        /* 
         * All commit records must be tracked to figure out if we've exceeded
         * the txn rollback limit. For reporting reasons, we'll need to
         * unmarshal the log entry, so we can read the timestamp in the commit
         * record.
         */
        if (..equalsType(currentType)) {
            .readEntry(entryBuffer);
            TxnCommit commit = (TxnCommit.getMainItem();
            .notePassedCommits(commit.getTime(), 
                                            commit.getId(),
                                            .getVLSN(),
                                            getLastLsn());
            if (.isLoggable(.)) {
                LoggerUtils.finest(,
                                   "syncup reader read " +
                                    + );
            }
        } else {
            entryBuffer.position(entryBuffer.position() +
                                 .getItemSize());
        }
        if () {
            return true;
        } 
        /* We're looking for a particular VLSN. */
        int compareResult = .getVLSN().compareTo();
        checkForPassingTarget(compareResult);
        
        /* return true if this is the entry we want. */
        return (compareResult == 0);
    }

    
TBW
    @Override
    protected void handleGapInBackwardsScan(long prevFileNum) {
                                                  prevFileNum,
                                                  );
        LoggerUtils.info(e.getMessage());
        throw e;
    }
    /*
     * An internal exception indicating that the reader must scan across a
     * gap in the log files. The gap may have been created by cleaning.
     */
    public static class SkipGapException extends DatabaseException {
        private static final long serialVersionUID = 1L;
        private final VLSN currentVLSN;
        public SkipGapException(long currentFileNum,
                                long nextFileNum,
                                VLSN currentVLSN) {
            super("Restarting reader in order to read backwards across gap " +
                  "from file 0x" + Long.toHexString(currentFileNum) + 
                  " to file 0x" + Long.toHexString(nextFileNum) + 
                  ". Reset position to vlsn " + currentVLSN);
            this. = currentVLSN;
        }
        public VLSN getVLSN() {
            return ;
        }
    }
New to GrepCode? Check out our FAQ X