Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.mapdb;
  
Transaction factory

Author(s):
Jan Kotek
 
 public class TxMaker {

    
marker for deleted record
 
     protected static final Object DELETED = new Object();

    
parent engine under which modifications are stored
 
     protected SnapshotEngine engine;
 
     protected final ReentrantReadWriteLock commitLock = new ReentrantReadWriteLock();
 
     protected volatile boolean commitPending;
 
     public TxMaker(SnapshotEngine engine) {
         if(engine==nullthrow new IllegalArgumentException();
         if(engine.isReadOnly()) throw new IllegalArgumentException("read only");
         if(!engine.canRollback()) throw new IllegalArgumentException("no rollback");
         this. = engine;
     }
 
     
     public DB makeTx(){
         return new DB(new TxEngine(.snapshot()));
     }
 
     public void close() {
         if( == nullreturn;
         .writeLock().lock();
         try{
             .close();
              = null;
              = null;
         }finally {
             .writeLock().unlock();
         }
     }

    
Executes given block withing single transaction. If block throws TxRollbackException execution is repeated until it does not fail.

Parameters:
txBlock
 
     public void execute(TxBlock txBlock) {
         for(;;){
             DB tx = makeTx();
             try{
                 txBlock.tx(tx);
                 if(!tx.isClosed())
                     tx.commit();
                 return;
             }catch(TxRollbackException e){
                 //failed, so try again
             }
         }
     }
 
     protected class TxEngine extends EngineWrapper{

        
list of modifications made by this transaction
 
         protected LongConcurrentHashMap<Fun.Tuple2<?, Serializer>> mods =
                 new LongConcurrentHashMap<Fun.Tuple2<?, Serializer>>();
 
         protected LongConcurrentHashMap<SerializernewRecids =
                 new LongConcurrentHashMap<Serializer>();
 
 
 
         protected final ReentrantReadWriteLock[] locks = Utils.newReadWriteLocks();
 
 
 
         protected TxEngine(Engine snapshot) {
             super(snapshot);
         }
 
         @Override
         public <A> long put(A valueSerializer<A> serializer) {
             final long recid;
 
             //need to get new recid from underlying engine
             .readLock().lock();
             try{
                  = true;
                 recid = .put(..);
             }finally {
                 .readLock().unlock();
             }
             lockGlobalMods(recid);
            Lock lock = [Utils.longHash(recid)&.].writeLock();
            lock.lock();
            try{
                //update local modifications
                .put(recidserializer);
                .put(recid,Fun.t2(value,(Serializer)serializer));
            }finally {
                lock.unlock();
            }
            return recid;
        }
        protected void lockGlobalMods(long recid) {
            Engine other = .putIfAbsent(recid,this);
            if(other!=this && other!=null){
                rollback();
                throw new TxRollbackException();
            }
        }
        @Override
        public <A> A get(long recidSerializer<A> serializer) {
            Lock lock = [Utils.longHash(recid)&.].readLock();
            lock.lock();
            try{
                return getNoLock(recidserializer);
            }finally {
                lock.unlock();
            }
        }
        private <A> A getNoLock(long recidSerializer<A> serializer) {
            //try local mods
            Fun.Tuple2 t = .get(recid);
            if(t!=null){
                if(t.a == return null;
                else return (A) t.a;
            }
            return super.get(recid,serializer);
        }
        @Override
        public <A> void update(long recid, A valueSerializer<A> serializer) {
            lockGlobalMods(recid);
            Lock lock = [Utils.longHash(recid)&.].writeLock();
            lock.lock();
            try{
                //update local modifications
                .put(recid,Fun.t2(value,(Serializer)serializer));
            }finally {
                lock.unlock();
            }
        }
        @Override
        public <A> boolean compareAndSwap(long recid, A expectedOldValue, A newValueSerializer<A> serializer) {
            if(.get(recid)!=this){
                rollback();
                throw new TxRollbackException();
            }
            Lock lock = [Utils.longHash(recid)&.].writeLock();
            lock.lock();
            try{
                Object oldVal = getNoLock(recid,serializer);
                if(expectedOldValue==oldVal || (expectedOldValue!=null && expectedOldValue.equals(oldVal))){
                    lockGlobalMods(recid);
                    .put(recid,Fun.t2(newValue,(Serializer)serializer));
                    return true;
                }
                return false;
            }finally {
                lock.unlock();
            }
        }
        @Override
        public <A> void delete(long recidSerializer<A> serializer) {
            lockGlobalMods(recid);
            Lock lock = [Utils.longHash(recid)&.].writeLock();
            lock.lock();
            try{
                //add marked which indicates deleted
                .put(recid,Fun.t2(,(Serializer)serializer));
            }finally {
                lock.unlock();
            }
        }
        @Override
        public void close() {
             = null;
             = null;
            super.close();
        }
        @Override
        public void commit() {
            //replay all items in transactions
            for(ReentrantReadWriteLock lock:lock.writeLock().lock();
            try{
                super.close();
                synchronized (){
                    if(){
                        .commit();
                         = false;
                    }
                    LongMap.LongMapIterator<Fun.Tuple2<?, Serializer>> iter = .longMapIterator();
                    while(iter.moveToNext()){
                        final long recid = iter.key();
                        if(!.remove(recid,this)){
                            .rollback();
                            throw new InternalError("record was not modified by this transaction");
                        }
                        final Object value = iter.value().;
                        final Serializer serializer = iter.value().;
                        if(value==){
                            .delete(recid,serializer);
                        }else{
                            .update(recid,value,serializer);
                        }
                    }
                }
                .commit();
            }finally{
                 = null;
                 = null;
                for(ReentrantReadWriteLock lock:lock.writeLock().lock();
            }
        }
        @Override
        public void rollback() {
            for(ReentrantReadWriteLock lock:lock.writeLock().lock();
            try{
                super.close();
                .writeLock().lock();
                try{
                    if(){
                        .commit();
                         = false;
                    }
                    LongMap.LongMapIterator<Fun.Tuple2<?, Serializer>> iter2 = .longMapIterator();
                    while(iter2.moveToNext()){
                        final long recid = iter2.key();
                        if(!.remove(recid,this)){
                            .rollback();
                            throw new InternalError("record was not modified by this transaction");
                        }
                    }
                    //remove allocated recids from store
                    LongMap.LongMapIterator<Serializeriter = .longMapIterator();
                    while(iter.moveToNext()){
                        long recid = iter.key();
                        .delete(recid,iter.value());
                    }
                    .commit();
                }finally {
                    .writeLock().unlock();
                }
            }finally{
                 = null;
                 = null;
                for(ReentrantReadWriteLock lock:lock.writeLock().lock();
            }
        }
        @Override
        public boolean isReadOnly() {
            return false;
        }
    }
New to GrepCode? Check out our FAQ X