Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   *  Copyright (c) 2012 Jan Kotek
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *    http://www.apache.org/licenses/LICENSE-2.0
   *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
 
 package org.mapdb;
 
Engine wrapper which provides asynchronous serialization and asynchronous write. This class takes an object instance, passes it to background writer thread (using Write Cache) where it is serialized and written to disk. Async write does not affect commit durability, write cache is flushed into disk on each commit. Modified records are held in small instance cache, until they are written into disk. This feature is enabled by default and can be disabled by calling DBMaker.asyncWriteDisable(). Write Cache is flushed in regular intervals or when it becomes full. Flush interval is 100 ms by default and can be controlled by DBMaker.asyncFlushDelay(int). Increasing this interval may improve performance in scenarios where frequently modified items should be cached, typically BTreeMap import where keys are presorted. Asynchronous write does not affect commit durability. Write Cache is flushed during each commit, rollback and close call. You may also flush Write Cache manually by using clearCache() method. There is global lock which prevents record being updated while commit is in progress. This wrapper starts one threads named `MapDB writer #N` (where N is static counter). Async Writer takes modified records from Write Cache and writes them into store. It also preallocates new recids, as finding empty `recids` takes time so small stash is pre-allocated. It runs as `daemon`, so it does not prevent JVM to exit. Asynchronous Writes have several advantages (especially for single threaded user). But there are two things user should be aware of: * Because data are serialized on back-ground thread, they need to be thread safe or better immutable. When you insert record into MapDB and modify it latter, this modification may happen before item was serialized and you may not be sure what version was persisted * Asynchronous writes have some overhead and introduce single bottle-neck. This usually not issue for single or two threads, but in multi-threaded environment it may decrease performance. So in truly concurrent environments with many updates (network servers, parallel computing ) you should disable Asynchronous Writes.

Author(s):
Jan Kotek
See also:
Engine
EngineWrapper
 
 public class AsyncWriteEngine extends EngineWrapper implements Engine {

    
ensures thread name is followed by number
 
     protected static final AtomicLong threadCounter = new AtomicLong();


    
used to signal that object was delete
 
     protected static final Object TOMBSTONE = new Object();
 
 
 
     protected final long[] newRecids = new long[.];
     protected int newRecidsPos = 0;
     protected final ReentrantLock newRecidsLock = new ReentrantLock();


    
Associates `recid` from Write Queue with record data and serializer.
 
             = new LongConcurrentHashMap<Fun.Tuple2<ObjectSerializer>>();

    
Each insert to Write Queue must hold read lock. Commit, rollback and close operations must hold write lock
 
     protected final ReentrantReadWriteLock commitLock = new ReentrantReadWriteLock();

    
number of active threads running, used to await thread termination on close
 
     protected final CountDownLatch activeThreadsCount = new CountDownLatch(1);

    
If background thread fails with exception, it is stored here, and rethrown to all callers
    protected volatile Throwable threadFailedException = null;

    
indicates that `close()` was called and background threads are being terminate
    protected volatile boolean closeInProgress = false;

    
flush Write Queue every N milliseconds
    protected final int asyncFlushDelay;
    protected final AtomicReference<CountDownLatchaction = new AtomicReference<CountDownLatch>(null);


    
Construct new class and starts background threads. User may provide executor in which background tasks will be executed, otherwise MapDB starts two daemon threads.

Parameters:
engine which stores data.
_asyncFlushDelay flush Write Queue every N milliseconds
executor optional executor to run tasks. If null daemon threads will be created
    public AsyncWriteEngine(Engine engineint _asyncFlushDelay,Executor executor) {
        super(engine);
        this. = _asyncFlushDelay;
        startThreads(executor);
    }
    public AsyncWriteEngine(Engine engine) {
        this(engine.null);
    }
    protected static final class WriterRunnable implements Runnable{
        protected final WeakReference<AsyncWriteEngineengineRef;
        protected final long asyncFlushDelay;
        public WriterRunnable(AsyncWriteEngine engine) {
            this. = new WeakReference<AsyncWriteEngine>(engine);
            this. = engine.asyncFlushDelay;
        }
        @Override public void run() {
            try{
                //run in loop
                for(;;){
                    //if conditions are right, slow down writes a bit
                    if(!=0 ){
                        LockSupport.parkNanos(1000L * 1000L * );
                    }
                    AsyncWriteEngine engine = .get();
                    if(engine==nullreturn//stop thread if this engine has been GCed
                    if(engine.threadFailedException !=nullreturn//other thread has failed, no reason to continue
                    if(!engine.runWriter()) return;
                }
            } catch (Throwable e) {
                AsyncWriteEngine engine = .get();
                if(engine!=nullengine.threadFailedException = e;
            }finally {
                AsyncWriteEngine engine = .get();
                if(engine!=nullengine.activeThreadsCount.countDown();
            }
        }
    }

    
Starts background threads. You may override this if you wish to start thread different way

Parameters:
executor optional executor to run tasks, if null deamon threads will be created
    protected void startThreads(Executor executor) {
        final Runnable writerRun = new WriterRunnable(this);
        if(executor!=null){
            executor.execute(writerRun);
            return;
        }
        final long threadNum = .incrementAndGet();
        Thread writerThread = new Thread(writerRun,"MapDB writer #"+threadNum);
        writerThread.setDaemon(true);
        writerThread.start();
    }


    
runs on background thread. Takes records from Write Queue, serializes and writes them
    protected boolean runWriter() throws InterruptedException {
        final CountDownLatch latch = .getAndSet(null);
        int counter=0;
        do{
            while(iter.moveToNext()){
                //usual write
                final long recid = iter.key();
                Fun.Tuple2<ObjectSerializeritem = iter.value();
                if(item == nullcontinue//item was already written
                if(item.a==){
                    //item was not updated, but deleted
                    AsyncWriteEngine.super.delete(reciditem.b);
                }else{
                    //call update as usual
                    AsyncWriteEngine.super.update(reciditem.aitem.b);
                }
                //record has been written to underlying Engine, so remove it from cache with CAS
                .remove(reciditem);
                if(((++counter)&(63))==0){   //it is like modulo 64, but faster
                    preallocateRefill();
                }
            }
        }while(latch!=null && !.isEmpty());
        preallocateRefill();
        //operations such as commit,close, compact or close needs to be executed in Writer Thread
        //for this case CountDownLatch is used, it also signals when operations has been completed
        //CountDownLatch is used as special case to signalise special operation
        if(latch!=null){
            if(!.isEmpty()) throw new InternalError();
            final long count = latch.getCount();
            if(count == 0){ //close operation
                return false;
            }else if(count == 1){ //commit operation
                AsyncWriteEngine.super.commit();
                latch.countDown();
            }else if(count==2){ //rollback operation
                AsyncWriteEngine.super.rollback();
                preallocateRollback();
                latch.countDown();
                latch.countDown();
            }else if(count==3){ //compact operation
                AsyncWriteEngine.super.compact();
                latch.countDown();
                latch.countDown();
                latch.countDown();
            }else{throw new InternalError();}
        }
        return true;
    }
    protected void preallocateRollback(){
        .lock();
        try{
            getWrappedEngine().preallocate();
             = .;
        }finally {
            .unlock();
        }
    }
    @Override
    public long preallocate() {
        .lock();
        try{
            if(==0){
                getWrappedEngine().preallocate();
                 = .;
            }
            return [--];
        }finally {
            .unlock();
        }
    }
    protected void preallocateRefill(){
        .lock();
        try{
            if(==0){
                getWrappedEngine().preallocate();
                 = .;
            }
        }finally {
            .unlock();
        }
    }



    
checks that background threads are ready and throws exception if not
    protected void checkState() {
        if(throw new IllegalAccessError("db has been closed");
        if( !=nullthrow new RuntimeException("Writer thread failed");
    }
    @Override
    public void preallocate(long[] recids) {
        for(int i=0;i<recids.length;i++){
            recids[i] = preallocate();
        }
    }


    
Recids are managed by underlying Engine. Finding free or allocating new recids may take some time, so for this reason recids are preallocated by Writer Thread and stored in queue. This method just takes preallocated recid from queue with minimal delay. Newly inserted records are not written synchronously, but forwarded to background Writer Thread via queue. ![async-put](async-put.png)

Uml:
async-put.png actor user participant "put method" as put participant "Writer Thread" as wri note over wri: has preallocated \n recids in queue activate put user -> put: User calls put method wri-> put: takes preallocated recid put -> wri: forward record into Write Queue put -> user: return recid to user deactivate put note over wri: eventually\n writes record\n before commit
    @Override
    public <A> long put(A valueSerializer<A> serializer) {
        .readLock().lock();
        try{
            long recid = preallocate();
            update(recidvalueserializer);
            return recid;
        }finally{
            .readLock().unlock();
        }
    }


    
This method first looks up into Write Cache if record is not currently being written. If not it continues as usually
    @Override
    public <A> A get(long recidSerializer<A> serializer) {
        .readLock().lock();
        try{
            checkState();
            Fun.Tuple2<Object,Serializeritem = .get(recid);
            if(item!=null){
                if(item.a == return null;
                return (A) item.a;
            }
            return super.get(recidserializer);
        }finally{
            .readLock().unlock();
        }
    }


    
This methods forwards record into Writer Thread and returns asynchronously. ![async-update](async-update.png)

Uml:
async-update.png actor user participant "update method" as upd participant "Writer Thread" as wri activate upd user -> upd: User calls update method upd -> wri: forward record into Write Queue upd -> user: returns deactivate upd note over wri: eventually\n writes record\n before commit
    @Override
    public <A> void update(long recid, A valueSerializer<A> serializer) {
        if(serializer!=..readLock().lock();
        try{
            checkState();
            .put(recidnew Fun.Tuple2(valueserializer));
        }finally{
            if(serializer!=..readLock().unlock();
        }
    }

    
This method first looks up Write Cache if record is not currently being written. Successful modifications are forwarded to Write Thread and method returns asynchronously. Asynchronicity does not affect atomicity.
    @Override
    public <A> boolean compareAndSwap(long recid, A expectedOldValue, A newValueSerializer<A> serializer) {
        .writeLock().lock();
        try{
            checkState();
            Fun.Tuple2<ObjectSerializerexisting = .get(recid);
            A oldValue = existing!=null? (A) existing.a : super.get(recidserializer);
            if(oldValue == expectedOldValue || (oldValue!=null && oldValue.equals(expectedOldValue))){
                .put(recidnew Fun.Tuple2(newValueserializer));
                return true;
            }else{
                return false;
            }
        }finally{
            .writeLock().unlock();
        }
    }

    
This method places 'tombstone' into Write Queue so record is eventually deleted asynchronously. However record is visible as deleted immediately.
    @Override
    public <A> void delete(long recidSerializer<A> serializer) {
        update(recid, (A) serializer);
    }

    
This method blocks until Write Queue is flushed and Writer Thread writes all records and finishes. When this method was called `closeInProgress` is set and no record can be modified.
    @Override
    public void close() {
        .writeLock().lock();
        try {
            if(return;
            checkState();
             = true;
            //notify background threads
            if(!.compareAndSet(null,new CountDownLatch(0)))throw new InternalError();
            //wait for background threads to shutdown
            .await();
            //put preallocated recids back to store
            .lock();
            try{
                while(>0){
                    super.delete(newRecids[--newRecidsPos],.);
                }
            }finally{
                .unlock();
            }
            AsyncWriteEngine.super.close();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }finally {
            .writeLock().unlock();
        }
    }



    
This method blocks until Write Queue is flushed. All put/update/delete methods are blocked while commit is in progress (via global ReadWrite Commit Lock). After this method returns, commit lock is released and other operations may continue
    @Override
    public void commit() {
        .writeLock().lock();
        try{
            checkState();
            //notify background threads
            CountDownLatch msg = new CountDownLatch(1);
            if(!.compareAndSet(null,msg))throw new InternalError();
            //wait for response from writer thread
            while(!msg.await(1,.)){
                checkState();
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }finally {
            .writeLock().unlock();
        }
    }

    
This method blocks until Write Queue is cleared. All put/update/delete methods are blocked while rollback is in progress (via global ReadWrite Commit Lock). After this method returns, commit lock is released and other operations may continue
    @Override
    public void rollback() {
        .writeLock().lock();
        try{
            checkState();
            //notify background threads
            CountDownLatch msg = new CountDownLatch(2);
            if(!.compareAndSet(null,msg))throw new InternalError();
            //wait for response from writer thread
            while(!msg.await(1,.)){
                checkState();
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }finally {
            .writeLock().unlock();
        }
    }

    
This method blocks all put/update/delete operations until it finishes (via global ReadWrite Commit Lock).
    @Override
    public void compact() {
        .writeLock().lock();
        try{
            checkState();
            //notify background threads
            CountDownLatch msg = new CountDownLatch(3);
            if(!.compareAndSet(null,msg))throw new InternalError();
            //wait for response from writer thread
            while(!msg.await(1,.)){
                checkState();
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }finally {
            .writeLock().unlock();
        }
    }


    
This method blocks until Write Queue is empty (written into disk). It also blocks any put/update/delete operations until it finishes (via global ReadWrite Commit Lock).
    @Override
    public void clearCache() {
        .writeLock().lock();
        try{
            checkState();
            //wait for response from writer thread
            while(!.isEmpty()){
                checkState();
                Thread.sleep(250);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }finally {
            .writeLock().unlock();
        }
        super.clearCache();
    }
New to GrepCode? Check out our FAQ X