Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.mapdb;
  
  import java.util.Set;
Naive implementation of Snapshots on top of StorageEngine. On update it takes old value and stores it aside.

TODO merge snapshots down with Storage for best performance

Author(s):
Jan Kotek
 
 public class TxEngine extends EngineWrapper{
 
     protected static final Object TOMBSTONE = new Object();

    
write lock: - `TX.modified` and `TX.old` are updated - commits and rollback read lock on everything else
 
     protected final ReentrantReadWriteLock[] locks = Utils.newReadWriteLocks();
 
     protected final Set<Reference<TX>> txs = new CopyOnWriteArraySet<Reference<TX>>();
     private ReferenceQueue<TXtxsQueue = new ReferenceQueue<TX>();
 
     protected void txsCleanup(){
         for(Reference ref=.poll();ref!=null;ref=.poll()){
             .remove(ref);
         }
     }


    
true if there are data which can be rolled back
 
     protected boolean uncommitedData = false;
 
     protected final boolean fullTx;
 
 
     protected TxEngine(Engine engineboolean fullTx) {
         super(engine);
         this. = fullTx;
     }
 
     @Override
     public <A> long put(A valueSerializer<A> serializer) {
         final Lock lock = [(int)(10000*Math.random())&.].writeLock();
         lock.lock();
         try{
              = true;
             long recid = super.put(valueserializer);
             txsCleanup();
             for(Reference<TXr:){
                 TX tx = r.get();
                 if(tx==nullcontinue//TODO remove expired refs
                 tx.old.putIfAbsent(recid,);
             }
             return recid;
         }finally{
             lock.unlock();
         }
     }
 
 
     @Override
     public <A> A get(long recidSerializer<A> serializer) {
 //        lock.readLock().lock();
 //        try{
             return super.get(recidserializer);
 //        }finally{
 //            lock.readLock().unlock();
 //        }
     }
 
     @Override
     public <A> void update(long recid, A valueSerializer<A> serializer) {
         final Lock lock = [Utils.longHash(recid)&.].writeLock();
         lock.lock();
         try{
             updateNoLock(recidvalueserializer);
         }finally{
             lock.unlock();
         }
     }
 
     private <A> void updateNoLock(long recid, A valueSerializer<A> serializer) {
          = true;
         Object old = get(recid,serializer);
         if(old == nullold=;
         txsCleanup();
         for(Reference<TXr:){
             TX tx = r.get();
             if(tx==nullcontinue//TODO remove expired refs
            tx.old.putIfAbsent(recid,old);
        }
        super.update(recidvalueserializer);
    }
    @Override
    public <A> boolean compareAndSwap(long recid, A expectedOldValue, A newValueSerializer<A> serializer) {
        final Lock lock = [Utils.longHash(recid)&.].writeLock();
        lock.lock();
        try{
            boolean ret =  super.compareAndSwap(recidexpectedOldValuenewValueserializer);
            if(ret){
                 = true;
                for(Reference<TXr:){
                    TX tx = r.get();
                    if(tx==nullcontinue//TODO remove expired refs
                    tx.old.putIfAbsent(recid,expectedOldValue);
                }
            }
            return ret;
        }finally{
            lock.unlock();
        }
    }
    @Override
    public <A> void delete(long recidSerializer<A> serializer) {
        final Lock lock = [Utils.longHash(recid)&.].writeLock();
        lock.lock();
        try{
            deleteNoLock(recidserializer);
        }finally{
            lock.unlock();
        }
    }
    private <A> void deleteNoLock(long recidSerializer<A> serializer) {
         = true;
        Object old = get(recid,serializer);
        if(old == nullold=;
        txsCleanup();
        for(Reference<TXr:){
            TX tx = r.get();
            if(tx==nullcontinue//TODO remove expired refs
            tx.old.putIfAbsent(recid,old);
        }
        super.delete(recidserializer);
    }
    @Override
    public void close() {
        for(ReentrantReadWriteLock lock:)lock.writeLock().lock();
        try{
            super.close();
        }finally{
            for(ReentrantReadWriteLock lock:)lock.writeLock().unlock();
        }
    }
    @Override
    public void commit() {
        for(ReentrantReadWriteLock lock:)lock.writeLock().lock();
        try{
            commitNoLock();
        }finally{
            for(ReentrantReadWriteLock lock:)lock.writeLock().unlock();
        }
    }
    private void commitNoLock() {
        super.commit();
         = false;
    }
    @Override
    public void rollback() {
        for(ReentrantReadWriteLock lock:)lock.writeLock().lock();
        try{
            super.rollback();
             = false;
        }finally{
            for(ReentrantReadWriteLock lock:)lock.writeLock().unlock();
        }
    }
    public static Engine createSnapshotFor(Engine engine) {
        while(true){
            if(engine instanceof TxEngine){
                return ((TxEngineengine).snapshot();
            }else if(engine instanceof EngineWrapper){
                engine = ((EngineWrapper)engine).getWrappedEngine();
            }else{
                throw new IllegalArgumentException("Could not create Snapshot for Engine: "+engine);
            }
        }
    }
    public Engine snapshot() {
        for(ReentrantReadWriteLock lock:)lock.writeLock().lock();
        try{
            if( && canRollback()){
                //TODO we can not create snapshot if user can rollback data, it would ruin consistency
                throw new IllegalAccessError("Can not create snapshot with uncommited data");
            }
            return new TX();
        }finally{
            for(ReentrantReadWriteLock lock:)lock.writeLock().unlock();
        }
    }
    public class TX implements Engine{
        public TX(){
            TxEngine.this..add();
        }
        protected final Reference<TXref = new WeakReference<TX>(TX.this);
        protected final LongConcurrentHashMap<Objectold = new LongConcurrentHashMap<Object>();
        protected LongConcurrentHashMap<Fun.Tuple2<Object,Serializer>> modified =
                 ? new LongConcurrentHashMap<Fun.Tuple2<Object,Serializer>>() : null;
        protected LongConcurrentHashMap<TXread =
                 ? new LongConcurrentHashMap<TX>() : null;
        @Override
        public long preallocate() {
            //TODO does not respect TX
            return TxEngine.this.preallocate();
        }
        @Override
        public void preallocate(long[] recids) {
            //TODO does not respect TX
            TxEngine.this.preallocate(recids);
        }
        @Override
        public <A> long put(A valueSerializer<A> serializer) {
            checkFullTx();
            final Lock lock = [(int)(10000*Math.random())&.].writeLock();
            lock.lock();
            try{
                //put null into underlying engine
                long recid = TxEngine.this.preallocate();
                .put(recid,Fun.t2((Object)value,(Serializer)serializer));
                return recid;
                //TODO remove empty recid on rollback
            }finally{
                lock.unlock();
            }
        }
        @Override
        public <A> A get(long recidSerializer<A> serializer) {
            final Lock lock = [Utils.longHash(recid)&.].readLock();
            lock.lock();
            try{
                return getNoLock(recidserializer);
            }finally{
                lock.unlock();
            }
        }
        private <A> A getNoLock(long recidSerializer<A> serializer) {
            if(.put(recid,this);
            Fun.Tuple2 o =  ? .get(recid) : null;
            if(o!=nullreturn o.a== ? null : (A) o.a;
            Object o2 = .get(recid);
            if(o2 == return null;
            if(o2!=nullreturn (A) o2;
            return TxEngine.this.get(recidserializer);
        }
        @Override
        public <A> void update(long recid, A valueSerializer<A> serializer) {
            checkFullTx();
            final Lock lock = [Utils.longHash(recid)&.].writeLock();
            lock.lock();
            try{
                if(.containsKey(recid)){
                    close();
                    throw new TxRollbackException();
                }
                .put(recid,Fun.t2((Object)value,(Serializer)serializer));
            }finally{
                lock.unlock();
            }
        }
        @Override
        public <A> boolean compareAndSwap(long recid, A expectedOldValue, A newValueSerializer<A> serializer) {
            checkFullTx();
            final Lock lock = [Utils.longHash(recid)&.].writeLock();
            lock.lock();
            try{
                Object oldObj = getNoLock(recid,serializer);
                if(oldObj==expectedOldValue || (oldObj!=null && oldObj.equals(expectedOldValue))){
                    if(.containsKey(recid)){
                        close();
                        throw new TxRollbackException();
                    }
                    .put(recid,Fun.t2((Object)newValue,(Serializer)serializer));
                    return true;
                }
                return false;
            }finally{
                lock.unlock();
            }
        }
        @Override
        public <A> void delete(long recidSerializer<A> serializer) {
            checkFullTx();
            final Lock lock = [Utils.longHash(recid)&.].writeLock();
            lock.lock();
            try{
                if(.containsKey(recid)){
                    close();
                    throw new TxRollbackException();
                }
                .put(recid,Fun.t2(,(Serializer)serializer));
            }finally{
                lock.unlock();
            }
        }
        protected void checkFullTx() {
            if(!throw new UnsupportedOperationException("read-only snapshot");
        }
        @Override
        public void close() {
            .clear();
            TxEngine.this..remove();
            if(){
                 = null;
                 = null;
            }
            .clear();
        }
        @Override
        public boolean isClosed() {
            return .get()!=null;
        }
        @Override
        public void commit() {
            checkFullTx();
            txsCleanup();
            for(ReentrantReadWriteLock lock:)lock.writeLock().lock();
            try{
                //check no other transactions has modified our data
                LongMap.LongMapIterator readIter = .longMapIterator();
                while(readIter.moveToNext()){
                    long recid = readIter.key();
                    for(Reference<TXref2:){
                        TX tx = ref2.get();
                        if(tx==this||tx==nullcontinue;
                        if(tx.modified.containsKey(recid)){
                            close();
                            throw new TxRollbackException();
                        }
                    }
                }
                LongMap.LongMapIterator<Fun.Tuple2<Object,Serializer>> iter = .longMapIterator();
                while(iter.moveToNext()){
                    long recid = iter.key();
                    if(.containsKey(recid)){
                        TxEngine.this.rollback();
                        close();
                        throw new TxRollbackException();
                    }
                    Fun.Tuple2<Object,Serializerval = iter.value();
                    if(val.a==){
                        TxEngine.this.deleteNoLock(recid,val.b);
                    }else {
                        TxEngine.this.updateNoLock(recidval.aval.b);
                    }
                }
                TxEngine.this.commitNoLock();
                close();
            }finally{
                for(ReentrantReadWriteLock lock:)lock.writeLock().unlock();
            }
        }
        @Override
        public void rollback() throws UnsupportedOperationException {
            checkFullTx();
            for(ReentrantReadWriteLock lock:)lock.writeLock().lock();
            try{
                close();
            }finally{
                for(ReentrantReadWriteLock lock:)lock.writeLock().unlock();
            }
        }
        @Override
        public boolean isReadOnly() {
            return !;
        }
        @Override
        public boolean canRollback() {
            return !;
        }
        @Override
        public void clearCache() {
            //nothing to do
        }
        @Override
        public void compact() {
            //nothing to do
        }
        @Override
        public SerializerPojo getSerializerPojo() {
            //TODO make readonly if needed
            return TxEngine.this.getSerializerPojo();
        }
    }
New to GrepCode? Check out our FAQ X