Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.sleepycat.je.sync.impl;
  
  import java.util.HashMap;
  import java.util.Map;
  
 
 /*
  * SyncCleanerBarrier keeps track of minimal nextSyncStart for the SyncDataSets
  * registered in this Environment.
  *
  * JE cleaners will use this minSyncStart to calculate the cleaner barrier
  * file:
  *
  * 1. On standalone side, it will return the file that minSyncStart lives as
  * the cleaner barrier file.
  * 2. On replication side, it will compare the minSyncStart and the GloalCBVLSN
  * (it's the minimal fsynced VLSN on the replicas, see SR18728), and choose the
  * file where smaller one lives as the cleaner barrier file.
  *
  * TODO: use a trigger for doing changes to the syncStarts in case the exported
  * transaction aborts.
  */
 public class SyncCleanerBarrier {
     private final EnvironmentImpl envImpl;
 
     /*
      * Saves sync start position information for all SyncDataSets in this
      * Environment. The Key is processorName + syncDataSetName, value is the
      * sync start position for the SyncDataSet.
      */
     private final Map<StringLongsyncStarts = new HashMap<StringLong>();
 
     /* The minimal sync start position for all SyncDataSets. */
     private long minSyncStart = .;

    
The init() method must be called immediately after creation. Initialization is separate to support trigger initialization and reference from the trigger to the SyncCleanerBarrier. See creation of the SyncCleanerBarrier in EnvironmentImpl.
 
     public SyncCleanerBarrier(EnvironmentImpl envImpl) {
         this. = envImpl;
     }

    
Read the SyncDB to get the LogChangeSet information. Must be called immediately after creation (see constructor).
 
     public void init(Environment env) {
         if (.isReadOnly()) {
             /* Cleaning is not possible in a read-only environment. */
             return;
         }
 
         final SyncDB syncDb;
         try {
             syncDb = new SyncDB(false /*allowCreate*/);
         } catch (DatabaseNotFoundException e) {
             return;
         }
 
         /* Read the ChangeSet information. */
         Map<StringDatabaseEntrychangeSets =
             syncDb.readDataForType(.env);
 
         /* Do nothing if there is no written ChangeSet information. */
         if (changeSets == null || changeSets.size() == 0) {
             return;
         }
 
         /* Go over the whole list. */
         LogChangeSetBinding binding = new LogChangeSetBinding();
         long minValue = .;
         for (Map.Entry<StringDatabaseEntryentry : changeSets.entrySet()) {
             LogChangeSet changeSet = binding.entryToObject(entry.getValue());
             .put(entry.getKey(), changeSet.getNextSyncStart());
             /* Find the minSyncStart. */
             if (doCompare(minValuechangeSet.getNextSyncStart()) > 0) {
                 minValue = changeSet.getNextSyncStart();
             }
        }
        /*
         * Change minSyncStart only if there exists LogChangeSet
         * records in SyncDB.
         */
        if (minValue != .) {
             = minValue;
        }
    }
    private int doCompare(long value1long value2) {
       if (.isReplicated()) {
           return (new VLSN(value1)).compareTo(new VLSN(value2));
       } else {
           return DbLsn.compareTo(value1value2);
       }
    }
    /* Update the LogChangeSet information for a SyncDataSet. */
    synchronized void updateSyncStart(String keyStartInfo startInfo) {
        final long syncStart = startInfo.getNextSyncStart();
        final boolean firstDataSet = (.size() == 0);
        /*
         * If the new syncStart is smaller than minSyncStart, should throw an
         * EnvironmentFailureException.
         */
        if (doCompare(syncStart) < 0) {
            StringTokenizer tokenizer = new StringTokenizer(key"-");
            throw EnvironmentFailureException.unexpectedState
                ("Invalid behavior, Processor: " +
                 tokenizer.nextToken() + ", SyncDataSet: " +
                 tokenizer.nextToken() +
                 " is reading log entries on a  protected file at " +
                 (.isReplicated() ?
                  ("VLSN: " + syncStart) :
                  ("lsn: " + DbLsn.getNoFormatString(syncStart))));
        }
        /*
         * Remove the nextSyncStart from the map if a SyncDataSet is removed
         * from a SyncProcessor.
         */
        if (startInfo.isDelete()) {
            .remove(key);
        } else {
            .put(keysyncStart);
        }
        /*
         * If there is no SyncDataSet in the map, set the minSyncStart to
         * LogChangeSet.NULL_POSITION.
         */
        if (.size() == 0) {
             = .;
            return;
        }
        /* Go through the map to find the minSyncStart. */
        long minValue = .;
        for (Map.Entry<StringLongentry : .entrySet()) {
            if (doCompare(minValueentry.getValue()) > 0) {
                minValue = entry.getValue();
            }
        }
        /* Set the new minimal syncStart value. */
        assert minValue != .;
         = minValue;
        /* 
         * Enable LocalCBVLSN changes on the master after adding the first 
         * SyncDataSet. 
         */
        if (.isMaster() && firstDataSet) { 
            .unfreezeLocalCBVLSN();
        }
    }
    /* Return the sync start for a SyncDataSet. */
    public synchronized long getSyncStart(String key) {
        /* Make sure the SyncDataSet is already registered. */
        assert .containsKey(key);
        return .get(key);
    }
    /* Return the value of the minSyncStart to start the LogChangeReader. */
    public synchronized long getMinSyncStart() {
        return ;
    }
    /* Return the current map size for the SyncDataSets. */
    public synchronized boolean isFirstSyncDataSet() {
        return .size() == 0;
    }
    public static class SyncTrigger
                   PersistentTrigger {
        /*
         * Used to save the nextSyncStart changes caused by transactions,
         * a transaction may commit the change tracking information changes for
         * multiple SyncDataSets, so the value of this map has to be a map.
         *
         * The key of txnIdToSyncStarts is the transaction id, the key of the
         * txnIdToSyncStarts' value is the name of a SyncDataSet.
         */
        private transient Map<LongMap<StringStartInfo>> txnIdToSyncStarts;
        /* The name of the trigger. */
        private final String triggerName;
        private transient SyncCleanerBarrier barrier;
        /* The name of the database. */
        private transient String dbName;
        public SyncTrigger(String triggerName) {
            this. = triggerName;
        }
        public String getName() {
            return ;
        }
        public Trigger setDatabaseName(String dbName) {
            this. = dbName;
            return this;
        }
        public String getDatabaseName() {
            return ;
        }
        public void addTrigger(@SuppressWarnings("unused")
                               Transaction txn) {
        }
        public void removeTrigger(@SuppressWarnings("unused")
                                  Transaction txn) {
        }
        public void open(@SuppressWarnings("unused")
                         Transaction txn,
                         Environment environment,
                         @SuppressWarnings("unused")
                         boolean isNew) {
             = DbInternal.getEnvironmentImpl(environment).
                                 getSyncCleanerBarrier();
            assert  != null;
             =
                new ConcurrentHashMap<LongMap<StringStartInfo>>();
        }
        public void close() {
        }
        public void remove(@SuppressWarnings("unused")
                           Transaction txn) {
        }
        public void truncate(@SuppressWarnings("unused")
                             Transaction txn) {
        }
        public void rename(@SuppressWarnings("unused")
                           Transaction txn,
                           @SuppressWarnings("unused")
                           String newName) {
        }
        /* Invoked when inserting new records in the SyncDB. */
        public void put(Transaction txn,
                        DatabaseEntry key,
                        @SuppressWarnings("unused")
                        DatabaseEntry oldData,
                        DatabaseEntry newData) {
            addNewMapEntry(txnkeynewDatafalse);
        }
        /* Save a new nextSyncStart information for the transaction. */
        private void addNewMapEntry(Transaction txn,
                                    DatabaseEntry key,
                                    DatabaseEntry data,
                                    boolean isDelete) {
            String dataSetName = StringBinding.entryToString(key);
            /* Only do it if it's a CHANGE_SET information. */
            if (DataType.getDataType(dataSetName) == .) {
                Map<StringStartInfosyncStartInfos =
                    .get(txn.getId());
                if (syncStartInfos == null) {
                    syncStartInfos =
                        new ConcurrentHashMap<StringStartInfo>();
                    .put(txn.getId(), syncStartInfos);
                }
                LogChangeSetBinding binding = new LogChangeSetBinding();
                LogChangeSet set = binding.entryToObject(data);
                StartInfo startInfo =
                    new StartInfo(set.getNextSyncStart(), isDelete);
                syncStartInfos.put(dataSetNamestartInfo);
            }
        }
        /* Invoked when deleting records from the SyncDB. */
        public void delete(Transaction txn,
                           DatabaseEntry key,
                           DatabaseEntry oldData) {
            addNewMapEntry(txnkeyoldDatatrue);
        }
        public void commit(Transaction txn) {
            Map<StringStartInfosyncStartInfos =
                .get(txn.getId());
            /*
             * Recalculate the minSyncStart if this transaction does put/delete
             * operations.
             */
            if (syncStartInfos != null) {
                for (Map.Entry<StringStartInfoentry :
                     syncStartInfos.entrySet()) {
                    .updateSyncStart(entry.getKey(), entry.getValue());
                }
            }
            /* Remove it from the in-memory map. */
            .remove(txn.getId());
        }
        /* Do nothing, only remove information from the map. */
        public void abort(Transaction txn) {
            .remove(txn.getId());
        }
        /* TODO: implement ReplicatedDatabaseTrigger 'repeat' methods. */
        public void repeatTransaction(Transaction txn) {
        }
        public void repeatAddTrigger(Transaction txn) {
        }
        public void repeatRemoveTrigger(Transaction txn) {
        }
        public void repeatCreate(Transaction txn) {
        }
        public void repeatRemove(Transaction txn) {
        }
        public void repeatTruncate(Transaction txn) {
        }
        public void repeatRename(Transaction txnString newName) {
        }
        public void repeatPut(Transaction txn,
                              DatabaseEntry key,
                              DatabaseEntry newData) {
        }
        public void repeatDelete(Transaction txn,
                                 DatabaseEntry key) {
        }
    }
    /* Object used to save the sync start information for a SyncDataSet. */
    static class StartInfo {
        /* The new nextSyncStart information for the SyncDataSet. */
        private final long nextSyncStart;
        /* True if it's a SyncDataSet remove operation. */
        private final boolean isDelete;
        public StartInfo(long nextSyncStartboolean isDelete) {
            this. = nextSyncStart;
            this. = isDelete;
        }
        public long getNextSyncStart() {
            return ;
        }
        public boolean isDelete() {
            return ;
        }
    }
New to GrepCode? Check out our FAQ X