Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.mapdb;
  
  import java.io.IOError;
  import java.util.Arrays;
Write-Ahead-Log
 
 public class StoreWAL extends StoreDirect {
 
     protected static final long LOG_MASK_OFFSET = 0x0000FFFFFFFFFFFFL;
 
     protected static final byte WAL_INDEX_LONG = 101;
     protected static final byte WAL_PHYS_LONG = 102;
     protected static final byte WAL_PHYS_SIX_LONG = 103;
     protected static final byte WAL_PHYS_ARRAY = 104;
     protected static final byte WAL_SKIP_REST_OF_BLOCK = 105;

    
last instruction in log file
 
     protected static final byte WAL_SEAL = 111;
    
added to offset 8 into log file, indicates that log was synced and close
 
     protected static final long LOG_SEAL = 4566556446554645L;
 
     public static final String TRANS_LOG_FILE_EXT = ".t";
 
     protected static final long[] TOMBSTONE = new long[1];
 
     protected final Volume.Factory volFac;
     protected Volume log;
 
     protected long logSize;
 
     protected final LongConcurrentHashMap<long[]> modified = new LongConcurrentHashMap<long[]>();
     protected final LongMap<long[]> longStackPages = new LongHashMap<long[]>();
     protected final long[] indexVals = new long[/8];
     protected final boolean[] indexValsModified = new boolean[.];
 
 
     public StoreWAL(Volume.Factory volFac) {
         this(volFac,false,false,5,false,0L,false,false,null);
     }
     public StoreWAL(Volume.Factory volFacboolean readOnlyboolean deleteFilesAfterClose,
                     int spaceReclaimModeboolean syncOnCommitDisabledlong sizeLimit,
                     boolean checksumboolean compressbyte[] password) {
         super(volFacreadOnlydeleteFilesAfterClosespaceReclaimMode,syncOnCommitDisabled,sizeLimit,
                 checksum,compress,password);
         this. = volFac;
         this. = volFac.createTransLogVolume();
 
         reloadIndexFile();
         replayLogFile();
          = null;
     }
 
     protected void reloadIndexFile() {
          = 0;
         .clear();
         .clear();
          = .getLong();
          = .getLong();
          = .getLong();
         for(int i = ;i<;i+=8){
             [i/8] = .getLong(i);
         }
         Arrays.fill(false);
     }
 
     protected void openLogIfNeeded(){
         if( !=nullreturn;
          = .createTransLogVolume();
         .ensureAvailable(16);
         .putLong(0, );
         .putLong(8, 0L);
          = 16;
     }
 
 
 
 
     @Override
     public <A> long put(A valueSerializer<A> serializer) {
         assert(value!=null);
         DataOutput2 out = serialize(valueserializer);
 
         final long ioRecid;
         final long[] physPos;
         final long[] logPos;
 
         .lock();
         try{
             openLogIfNeeded();
             ioRecid = freeIoRecidTake(false);
             //first get space in phys
             physPos = physAllocate(out.pos,false,false);
             //now get space in log
            logPos = logAllocate(physPos);
        }finally{
            .unlock();
        }
        //write data into log
        walIndexVal((logPos[0]&) - 1-8-8-1-8, ioRecidphysPos[0]|);
        walPhysArray(outphysPoslogPos);
        .put(ioRecid,logPos);
        .offer(out);
        return (ioRecid-)/8;
    }
    protected void walPhysArray(DataOutput2 outlong[] physPoslong[] logPos) {
        //write byte[] data
        int outPos = 0;
        for(int i=0;i<logPos.length;i++){
            int c =  i==logPos.length-1 ? 0: 8;
            long pos = logPos[i]&;
            int size = (int) (logPos[i]>>>48);
            .putByte(pos -  8 - 1, );
            .putLong(pos -  8, physPos[i]);
            if(c>0){
                .putLong(posphysPos[i + 1]);
                pos+=8;
            }
            .putData(posout.bufoutPossize - c);
            outPos +=size-c;
        }
        if(outPos!=out.pos)throw new InternalError();
    }
    protected void walIndexVal(long logPoslong ioRecidlong indexVal) {
        .putByte(logPos);
        .putLong(logPos + 1, ioRecid);
        .putLong(logPos + 9, indexVal);
    }
    protected long[] logAllocate(long[] physPos) {
        openLogIfNeeded();
        +=1+8+8; //space used for index val
        long[] ret = new long[physPos.length];
        for(int i=0;i<physPos.length;i++){
            long size = physPos[i]>>>48;
            //would overlaps Volume Block?
            +=1+8; //space used for WAL_PHYS_ARRAY
            ret[i] = (size<<48) | ;
            +=size;
            checkLogRounding();
        }
        .ensureAvailable();
        return ret;
    }
    protected void checkLogRounding() {
            .ensureAvailable(+1);
            .putByte();
             += . - (&.);
        }
    }
    @Override
    public <A> A get(long recidSerializer<A> serializer) {
        assert(recid>0);
        final long ioRecid =  + recid*8;
        final Lock lock  = [Utils.longHash(recid)&.].readLock();
        lock.lock();
        try{
            return get2(ioRecidserializer);
        }catch(IOException e){
            throw new IOError(e);
        }finally{
            lock.unlock();
        }
    }
    @Override
    protected <A> A get2(long ioRecidSerializer<A> serializerthrows IOException {
        //check if record was modified in current transaction
        long[] r = .get(ioRecid);
        //yes, read version
        if(r==nullreturn super.get2(ioRecidserializer);
        //chech for tombstone (was deleted in current trans)
        if(r== || r.length==0) return null;
        //was modified in current transaction, so read it from trans log
        if(r.length==1){
            //single record
            final int size = (int) (r[0]>>>48);
            DataInput2 in = .getDataInput(r[0]&size);
            return deserialize(serializer,size,in);
        }else{
            //linked record
            int totalSize = 0;
            for(int i=0;i<r.length;i++){
                int c =  i==r.length-1 ? 0: 8;
                totalSize+=  (int) (r[i]>>>48)-c;
            }
            byte[] b = new byte[totalSize];
            int pos = 0;
            for(int i=0;i<r.length;i++){
                int c =  i==r.length-1 ? 0: 8;
                int size = (int) (r[i]>>>48) -c;
                .getDataInput((r[i] & ) + csize).readFully(b,pos,size);
                pos+=size;
            }
            if(pos!=totalSize)throw new InternalError();
            return deserialize(serializer,totalSizenew DataInput2(b));
        }
    }
    @Override
    public <A> void update(long recid, A valueSerializer<A> serializer) {
        assert(recid>0);
        assert(value!=null);
        DataOutput2 out = serialize(valueserializer);
        final long ioRecid =  + recid*8;
        final Lock lock  = [Utils.longHash(recid)&.].writeLock();
        lock.lock();
        try{
            final long[] physPos;
            final long[] logPos;
            long indexVal = 0;
            long[] linkedRecords = getLinkedRecordsFromLog(ioRecid);
            if(linkedRecords==null){
                indexVal = .getLong(ioRecid);
                linkedRecords = getLinkedRecordsIndexVals(indexVal);
            }
            .lock();
            try{
                openLogIfNeeded();
                //free first record pointed from indexVal
                if(indexVal!=0)
                    freePhysPut(indexVal,false);
                //if there are more linked records, free those as well
                if(linkedRecords!=null){
                    for(int i=0; i<linkedRecords.length &&linkedRecords[i]!=0;i++){
                        freePhysPut(linkedRecords[i],false);
                    }
                }
                //first get space in phys
                physPos = physAllocate(out.pos,false,false);
                //now get space in log
                logPos = logAllocate(physPos);
            }finally{
                .unlock();
            }
            //write data into log
            walIndexVal((logPos[0]&) - 1-8-8-1-8, ioRecidphysPos[0]|);
            walPhysArray(outphysPoslogPos);
            .put(ioRecid,logPos);
        }finally{
            lock.unlock();
        }
        .offer(out);
    }
    @Override
    public <A> boolean compareAndSwap(long recid, A expectedOldValue, A newValueSerializer<A> serializer) {
        assert(recid>0);
        assert(expectedOldValue!=null && newValue!=null);
        final long ioRecid =  + recid*8;
        final Lock lock  = [Utils.longHash(recid)&.].writeLock();
        lock.lock();
        DataOutput2 out;
        try{
            A oldVal = get2(ioRecid,serializer);
            if((oldVal == null && expectedOldValue!=null) || (oldVal!=null && !oldVal.equals(expectedOldValue)))
                return false;
            out = serialize(newValueserializer);
            final long[] physPos;
            final long[] logPos;
            long indexVal = 0;
            long[] linkedRecords = getLinkedRecordsFromLog(ioRecid);
            if(linkedRecords==null){
                indexVal = .getLong(ioRecid);
                linkedRecords = getLinkedRecordsIndexVals(indexVal);
            }
            .lock();
            try{
                openLogIfNeeded();
                //free first record pointed from indexVal
                if(indexVal!=0)
                    freePhysPut(indexVal,false);
                //if there are more linked records, free those as well
                if(linkedRecords!=null){
                    for(int i=0; i<linkedRecords.length &&linkedRecords[i]!=0;i++){
                        freePhysPut(linkedRecords[i],false);
                    }
                }
                //first get space in phys
                physPos = physAllocate(out.pos,false,false);
                //now get space in log
                logPos = logAllocate(physPos);
            }finally{
                .unlock();
            }
            //write data into log
            walIndexVal((logPos[0]&) - 1-8-8-1-8, ioRecidphysPos[0]|);
            walPhysArray(outphysPoslogPos);
            .put(ioRecid,logPos);
        }catch(IOException e){
            throw new IOError(e);
        }finally{
            lock.unlock();
        }
        .offer(out);
        return true;
    }
    @Override
    public <A> void delete(long recidSerializer<A> serializer) {
        assert(recid>0);
        final long ioRecid =  + recid*8;
        final Lock lock  = [Utils.longHash(recid)&.].writeLock();
        lock.lock();
        try{
            final long logPos;
            long indexVal = 0;
            long[] linkedRecords = getLinkedRecordsFromLog(ioRecid);
            if(linkedRecords==null){
                indexVal = .getLong(ioRecid);
                linkedRecords = getLinkedRecordsIndexVals(indexVal);
            }
            .lock();
            try{
                openLogIfNeeded();
                logPos = ;
                checkLogRounding();
                +=1+8+8; //space used for index val
                .ensureAvailable();
                longStackPut(ioRecid,false);
                //free first record pointed from indexVal
                if(indexVal!=0)
                    freePhysPut(indexVal,false);
                //if there are more linked records, free those as well
                if(linkedRecords!=null){
                    for(int i=0; i<linkedRecords.length &&linkedRecords[i]!=0;i++){
                        freePhysPut(linkedRecords[i],false);
                    }
                }
            }finally {
                .unlock();
            }
            walIndexVal(logPos,ioRecid,0|);
            .put(ioRecid,);
        }finally {
            lock.unlock();
        }
        }
    @Override
    public void commit() {
        .lock();
        for(ReentrantReadWriteLock lock:lock.writeLock().lock();
        try{
            if(!.isEmpty() && ==nullopenLogIfNeeded();
            if(==null){
                return//no modifications
            }
            //update physical and logical filesize
            //dump long stack pages
            LongMap.LongMapIterator<long[]> iter = .longMapIterator();
            while(iter.moveToNext()){
                long pageSize = iter.value()[0]>>>48;
                .ensureAvailable(+1+8+pageSize);
                .putByte();
                +=1;
                .putLong(, (pageSize<<48)|iter.key());
                +=8;
                //first long in array
                long[] array = iter.value();
                .putLong(,array[0]);
                +=8;
                int numItems = (int) ((pageSize-8)/6);
                for(int i=0;i<numItems;i++){
                    .putSixLong(,array[i+1]);
                    +=6;
                }
                checkLogRounding();
            }
            .ensureAvailable( + 17 + 17 + 17 + 1);
            walIndexVal(,);
            +=17;
            walIndexVal(,);
            +=17;
            walIndexVal(,);
            +=17;
            for(int i=;i<;i+=8){
                if(![i/8]) continue;
                .ensureAvailable( + 17);
                walIndexVal(i,[i/8]);
                +=17;
            }
            //seal log file
            .putByte();
            +=1;
            //flush log file
            if(!.sync();
            //and write mark it was sealed
            .putLong(8, );
            if(!.sync();
            replayLogFile();
            reloadIndexFile();
        }finally {
            for(ReentrantReadWriteLock lock:lock.writeLock().unlock();
            .unlock();
        }
    }
    protected void replayLogFile(){
         = 0;
        if( !=null && !){
            .sync();
        }
        //read headers
        if(.isEmpty() || .getLong(0)!= || .getLong(8) !=){
            //wrong headers, discard log
            .close();
            .deleteFile();
             = null;
            return;
        }
        //all good, start replay
        =16;
        byte ins = .getByte();
        +=1;
        while(ins!=){
            if(ins == ){
                long ioRecid = .getLong();
                +=8;
                long indexVal = .getLong();
                +=8;
                .ensureAvailable(ioRecid+8);
                .putLong(ioRecidindexVal);
            }else if(ins == ){
                long offset = .getLong();
                +=8;
                long val = .getLong();
                +=8;
                .ensureAvailable(offset+8);
                .putLong(offset,val);
            }else if(ins == ){
                long offset = .getLong();
                +=8;
                long val = .getSixLong();
                +=6;
                .ensureAvailable(offset+6);
                .putSixLong(offsetval);
            }else if(ins == ){
                long offset = .getLong();
                +=8;
                final int size = (int) (offset>>>48);
                offset = offset&;
                //transfer byte[] directly from log file without copying into memory
                DataInput2 input = .getDataInput(size);
                ByteBuffer buf = input.buf.duplicate();
                buf.position(input.pos);
                buf.limit(input.pos+size);
                .ensureAvailable(offset+size);
                .putData(offsetbuf);
                +=size;
            }else if(ins == ){
                 += .-(&.);
            }else{
                throw new InternalError("unknown trans log instruction: "+ins +" at log offset: "+(-1));
            }
            ins = .getByte();
            +=1;
        }
        =0;
        //flush dbs
        if(!){
            .sync();
            .sync();
        }
        //and discard log
        .putLong(0, 0);
        .putLong(8, 0); //destroy seal to prevent log file from being replayed
        .close();
        .deleteFile();
         = null;
    }
    @Override
    public void rollback() throws UnsupportedOperationException {
        .lock();
        for(ReentrantReadWriteLock lock:lock.writeLock().lock();
        try{
            //discard trans log
            if( !=null){
                .close();
                .deleteFile();
                 = null;
            }
            reloadIndexFile();
        }finally {
            for(ReentrantReadWriteLock lock:lock.writeLock().unlock();
            .unlock();
        }
    }
    private long[] getLongStackPage(final long physOffsetboolean read){
        long[] buf = .get(physOffset);
        if(buf == null){
            buf = new long[+1];
            if(read){
                buf[0] = .getLong(physOffset);
                for(int i=1;i<buf.length;i++){
                    buf[i] = .getSixLong(physOffset + 2 + i * 6);
                }
            }
            .put(physOffset,buf);
        }
        return buf;
    }
    @Override
    protected long longStackTake(long ioListboolean recursive) {
//        if(recursive) throw new InternalError();
        final int ii = ((int) (ioList / 8));
        long dataOffset = [ii];
        if(dataOffset == 0) return 0; //empty
        long pos = dataOffset>>>48;
        dataOffset&=;
        if(pos<8) throw new InternalError();
        long[] buf = getLongStackPage(dataOffset,true);
        final long ret = buf[((int) ((pos - 2) / 6))];
        //was it only record at that page?
        if(pos==8){
            //yes, delete this page
            long next = buf[0]&;
            long size = buf[0]>>>48;
            if(next != 0){
                //update index so it points to previous page
                long nextSize = getLongStackPage(next,true)[0]>>>48;
                [ii] = ((nextSize-6)<<48)|next;
                [ii] = true;
            }else{
                [ii] = 0;
                [ii] = true;
            }
            //put space used by this page into free list
            .remove(dataOffset); //TODO write zeroes to phys file
            freePhysPut((size<<48)|dataOffset,true);
        }else{
            //no, it was not last record at this page, so just decrement the counter
            pos-=6;
            [ii] = (pos<<48)|dataOffset;
            [ii] = true;
        }
        return ret;
    }
    @Override
    protected void longStackPut(long ioListlong offset,boolean recursive) {
//        if(recursive) throw new InternalError();
        if(offset>>>48!=0) throw new IllegalArgumentException();
        //index position was cleared, put into free index list
        final int ii = ((int) (ioList / 8));
        long dataOffset = [ii];
        long pos = dataOffset>>>48;
        dataOffset &= ;
        if(dataOffset == 0){ //empty list?
            //yes empty, create new page and fill it with values
            final long listPhysid = freePhysTake((int,false,true) &;
            long[] buf = getLongStackPage(listPhysid,false);
            if(listPhysid == 0) throw new InternalError();
            //set size and link to old page
            buf[0] = (<<48) | dataOffset;
            //set  record
            buf[1] = offset;
            //and update index file with new page location
            [ii] =  (8L << 48) | listPhysid;
            [ii] = true;
        }else{
            //non empty list
            long[] buf = getLongStackPage(dataOffset,true);
            final long next = buf[0]&;
            final long size = buf[0]>>>48;
            final int numberOfRecordsInPage = (int) (buf[0]>>>(8*7));
            if(pos+6==size){ //is current page full?
                //yes it is full, so we need to allocate new page and write our number there
                final long listPhysid = freePhysTake((intfalse,true) &;
                long[] bufNew = getLongStackPage(listPhysid,false);
                if(listPhysid == 0) throw new InternalError();
                //set location to previous page and set current page size
                bufNew[0]=(<<48)|dataOffset;
                //set the value itself
                bufNew[1] = offset;
                //and update index file with new page location and number of records
                [ii] =  (8L<<48) | listPhysid;
                [ii] = true;
            }else{
                //there is space on page, so just write released recid and increase the counter
                pos+=6;
                buf[((int) ((pos - 2) / 6))] = offset;
                [ii] = (pos<<48)|dataOffset;
                [ii] = true;
            }
        }
    }
    protected long[] getLinkedRecordsFromLog(long ioRecid){
        long[] ret0 = .get(ioRecid);
        if(ret0!=null){
            long[] ret = new long[ret0.length];
            for(int i=0;i<ret0.length;i++){
                long offset = ret0[i] & ;
                //offset now points to log file, read phys offset from log file
                ret[i] =  .getLong(offset-8);
            }
            return ret;
        }
        return null;
    }
    @Override
    public void close() {
        .lock();
        for(ReentrantReadWriteLock lock:lock.writeLock().lock();
        try{
            if( !=null){
                .sync();
                .close();
                if(){
                    .deleteFile();
                }
            }
            .sync();
            .sync();
            .close();
            .close();
            if(){
                .deleteFile();
                .deleteFile();
            }
             = null;
             = null;
        }finally {
            for(ReentrantReadWriteLock lock:lock.writeLock().unlock();
            .unlock();
        }
    }
    @Override
    public void compact() {
        //TODO lock it down here
        if(!=null && !.isEmpty()) //TODO thread unsafe?
            throw new IllegalAccessError("WAL not empty; commit first, than compact");
        super.compact();
        reloadIndexFile();
    }
    @Override
    public boolean canRollback(){
        return true;
    }
New to GrepCode? Check out our FAQ X