Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.mapdb;
  
  import java.io.IOError;
  import java.util.Arrays;

Write-Ahead-Log
  
 public class StoreWAL extends StoreDirect {
 
     protected static final byte WAL_INDEX_LONG = 101;
     protected static final byte WAL_PHYS_LONG = 102;
     protected static final byte WAL_PHYS_SIX_LONG = 103;
     protected static final byte WAL_PHYS_ARRAY = 104;
     protected static final byte WAL_SKIP_REST_OF_BLOCK = 105;

    
last instruction in log file
 
     protected static final byte WAL_SEAL = 111;
    
added to offset 8 into log file, indicates that log was synced and close
 
     protected static final long LOG_SEAL = 4566556446554645L;
 
     public static final String TRANS_LOG_FILE_EXT = ".t";
 
     protected static final long[] TOMBSTONE = new long[1];
 
     protected final Volume.Factory volFac;
     protected Volume log;
 
     protected long logSize;
 
     protected final LongConcurrentHashMap<long[]> modified = new LongConcurrentHashMap<long[]>();
     protected final LongMap<long[]> longStackPages = new LongHashMap<long[]>();
     protected final long[] indexVals = new long[/8];
     protected final boolean[] indexValsModified = new boolean[.];
 
 
     public StoreWAL(Volume.Factory volFac) {
         this(volFac,false,false);
     }
     public StoreWAL(Volume.Factory volFacboolean readOnlyboolean deleteFilesAfterClose) {
         super(volFacreadOnlydeleteFilesAfterClose);
         this. = volFac;
         this. = volFac.createTransLogVolume();
         reloadIndexFile();
         replayLogFile();
          = null;
     }
 
     protected void reloadIndexFile() {
          = 0;
         .clear();
          = .getLong();
          = .getLong();
         for(int i = ;i<;i+=8){
             [i/8] = .getLong(i);
         }
         Arrays.fill(false);
     }
 
     protected void openLogIfNeeded(){
         if( !=nullreturn;
          = .createTransLogVolume();
         .ensureAvailable(16);
         .putLong(0, );
         .putLong(8, 0L);
          = 16;
     }
 
 
 
 
     @Override
     public <A> long put(A valueSerializer<A> serializer) {
         DataOutput2 out = serialize(valueserializer);
 
         final long ioRecid;
         final long[] physPos;
         final long[] logPos;
 
         .lock();
         try{
             openLogIfNeeded();
             ioRecid = freeIoRecidTake(false);
             //first get space in phys
             physPos = physAllocate(out.pos,false);
             //now get space in log
             logPos = logAllocate(physPos);
 
         }finally{
             .unlock();
         }
 
         //write data into log
         walIndexVal((logPos[0]&) - 1-8-8-1-8, ioRecidphysPos[0]);
         walPhysArray(outphysPoslogPos);
 
         .put(ioRecid,logPos);
         return (ioRecid-)/8;
    }
    protected void walPhysArray(DataOutput2 outlong[] physPoslong[] logPos) {
        //write byte[] data
        int outPos = 0;
        for(int i=0;i<logPos.length;i++){
            int c = ccc(logPos.lengthi);
            long pos = logPos[i]&;
            int size = (int) ((logPos[i]&) >>>48);
            .putByte(pos -  8 - 1, );
            .putLong(pos -  8, physPos[i]);
            if(c>0){
                .putLong(posphysPos[i + 1]);
                pos+=8;
            }
            if(c==12){
                .putInt(posout.pos);
                pos+=4;
            }
            .putData(posout.bufoutPossize - c);
            outPos +=size-c;
        }
        if(outPos!=out.pos)throw new InternalError();
    }
    protected void walIndexVal(long logPoslong ioRecidlong indexVal) {
        .putByte(logPos);
        .putLong(logPos + 1, ioRecid);
        .putLong(logPos + 9, indexVal);
    }
    protected long[] logAllocate(long[] physPos) {
        openLogIfNeeded();
        +=1+8+8; //space used for index val
        long[] ret = new long[physPos.length];
        for(int i=0;i<physPos.length;i++){
            long size = (physPos[i]&)>>>48;
            //would overlaps Volume Block?
            +=1+8; //space used for WAL_PHYS_ARRAY
            ret[i] = (size<<48) | ;
            +=size;
            checkLogRounding();
        }
        .ensureAvailable();
        return ret;
    }
    protected void checkLogRounding() {
            .ensureAvailable(+1);
            .putByte();
             += . - %.;
        }
    }
    @Override
    public <A> A get(long recidSerializer<A> serializer) {
        final long ioRecid =  + recid*8;
        Utils.readLock(,recid);
        try{
            return get2(ioRecidserializer);
        }catch(IOException e){
            throw new IOError(e);
        }finally{
            Utils.readUnlock(,recid);
        }
    }
    @Override
    protected <A> A get2(long ioRecidSerializer<A> serializerthrows IOException {
        //check if record was modified in current transaction
        long[] r = .get(ioRecid);
        //yes, read version
        if(r==nullreturn super.get2(ioRecidserializer);
        //chech for tombstone (was deleted in current trans)
        if(r== || r.length==0) return null;
        //was modified in current transaction, so read it from trans log
        if(r.length==1){
            //single record
            final int size = (int) ((r[0]&)>>>48);
            DataInput2 in = .getDataInput(r[0]&size);
            return serializer.deserialize(insize);
        }else{
            //linked record
            int totalSize = 0;
            for(int i=0;i<r.length;i++){
                int c = ccc(r.lengthi);
                totalSize+=  (int) ((r[i]&)>>>48)-c;
            }
            byte[] b = new byte[totalSize];
            int pos = 0;
            for(int i=0;i<r.length;i++){
                int c = ccc(r.lengthi);
                int size = (int) ((r[i]&)>>>48) -c;
                .getDataInput((r[i] & ) + csize).readFully(b,pos,size);
                pos+=size;
            }
            if(pos!=totalSize)throw new InternalError();
            return serializer.deserialize(new DataInput2(b),totalSize);
        }
    }
    @Override
    public <A> void update(long recid, A valueSerializer<A> serializer) {
        DataOutput2 out = serialize(valueserializer);
        final long ioRecid =  + recid*8;
        Utils.writeLock(,recid);
        try{
            final long[] physPos;
            final long[] logPos;
            long indexVal = 0;
            long[] linkedRecords = getLinkedRecordsFromLog(ioRecid);
            if(linkedRecords==null){
                indexVal = .getLong(ioRecid);
                linkedRecords = getLinkedRecordsIndexVals(indexVal);
            }
            .lock();
            try{
                openLogIfNeeded();
                //free first record pointed from indexVal
                if(indexVal!=0)
                    freePhysPut(indexVal);
                //if there are more linked records, free those as well
                if(linkedRecords!=null){
                    for(int i=0; i<linkedRecords.length &&linkedRecords[i]!=0;i++){
                        freePhysPut(linkedRecords[i]);
                    }
                }
                //first get space in phys
                physPos = physAllocate(out.pos,false);
                //now get space in log
                logPos = logAllocate(physPos);
            }finally{
                .unlock();
            }
            //write data into log
            walIndexVal((logPos[0]&) - 1-8-8-1-8, ioRecidphysPos[0]);
            walPhysArray(outphysPoslogPos);
            .put(ioRecid,logPos);
        }finally{
            Utils.writeUnlock(,recid);
        }
    }
    @Override
    public <A> boolean compareAndSwap(long recid, A expectedOldValue, A newValueSerializer<A> serializer) {
        final long ioRecid =  + recid*8;
        Utils.writeLock(,recid);
        try{
            A oldVal = get2(ioRecid,serializer);
            if((oldVal == null && expectedOldValue!=null) || (oldVal!=null && !oldVal.equals(expectedOldValue)))
                return false;
            DataOutput2 out = serialize(newValueserializer);
            final long[] physPos;
            final long[] logPos;
            long indexVal = 0;
            long[] linkedRecords = getLinkedRecordsFromLog(ioRecid);
            if(linkedRecords==null){
                indexVal = .getLong(ioRecid);
                linkedRecords = getLinkedRecordsIndexVals(indexVal);
            }
            .lock();
            try{
                openLogIfNeeded();
                //free first record pointed from indexVal
                if(indexVal!=0)
                    freePhysPut(indexVal);
                //if there are more linked records, free those as well
                if(linkedRecords!=null){
                    for(int i=0; i<linkedRecords.length &&linkedRecords[i]!=0;i++){
                        freePhysPut(linkedRecords[i]);
                    }
                }
                //first get space in phys
                physPos = physAllocate(out.pos,false);
                //now get space in log
                logPos = logAllocate(physPos);
            }finally{
                .unlock();
            }
            //write data into log
            walIndexVal((logPos[0]&) - 1-8-8-1-8, ioRecidphysPos[0]);
            walPhysArray(outphysPoslogPos);
            .put(ioRecid,logPos);
            return true;
        }catch(IOException e){
            throw new IOError(e);
        }finally{
            Utils.writeUnlock(,recid);
        }
    }
    @Override
    public <A> void delete(long recidSerializer<A> serializer) {
        final long ioRecid =  + recid*8;
        Utils.writeLock(,recid);
        try{
            final long logPos;
            long indexVal = 0;
            long[] linkedRecords = getLinkedRecordsFromLog(ioRecid);
            if(linkedRecords==null){
                indexVal = .getLong(ioRecid);
                linkedRecords = getLinkedRecordsIndexVals(indexVal);
            }
            .lock();
            try{
                openLogIfNeeded();
                logPos = ;
                checkLogRounding();
                +=1+8+8; //space used for index val
                .ensureAvailable();
                longStackPut(ioRecid);
                //free first record pointed from indexVal
                if(indexVal!=0)
                    freePhysPut(indexVal);
                //if there are more linked records, free those as well
                if(linkedRecords!=null){
                    for(int i=0; i<linkedRecords.length &&linkedRecords[i]!=0;i++){
                        freePhysPut(linkedRecords[i]);
                    }
                }
            }finally {
                .unlock();
            }
            walIndexVal(logPos,ioRecid,0);
            .put(ioRecid,);
        }finally {
            Utils.writeUnlock(,recid);
        }
        }
    @Override
    public void commit() {
        .lock();
        Utils.writeLockAll();
        try{
            if(!.isEmpty() && ==nullopenLogIfNeeded();
            if(==null){
                return//no modifications
            }
            //update physical and logical filesize
            //dump long stack pages
            LongMap.LongMapIterator<long[]> iter = .longMapIterator();
            while(iter.moveToNext()){
                .ensureAvailable(+1+8+);
                .putByte();
                +=1;
                .putLong(, (((long))<<48)|iter.key());
                +=8;
                //first long in array
                long[] array = iter.value();
                .putLong(,array[0]);
                +=8;
                for(int i=0;i<;i++){
                    .putSixLong(,array[i+1]);
                    +=6;
                }
                checkLogRounding();
            }
            .ensureAvailable( + 17 + 17 + 1);
            walIndexVal(,);
            +=17;
            walIndexVal(,);
            +=17;
            for(int i=;i<;i+=8){
                if(![i/8]) continue;
                .ensureAvailable( + 17);
                walIndexVal(i,[i/8]);
                +=17;
            }
            //seal log file
            .putByte();
            +=1;
            //flush log file
            .sync();
            //and write mark it was sealed
            .putLong(8, );
            .sync();
            replayLogFile();
            reloadIndexFile();
        }finally {
            Utils.writeUnlockAll();
            .unlock();
        }
    }
    protected void replayLogFile(){
         = 0;
        if( !=null){
            .sync();
        }
        //read headers
        if(.isEmpty() || .getLong(0)!= || .getLong(8) !=){
            //wrong headers, discard log
            .close();
            .deleteFile();
             = null;
            return;
        }
        //all good, start replay
        =16;
        byte ins = .getByte();
        +=1;
        while(ins!=){
            if(ins == ){
                long ioRecid = .getLong();
                +=8;
                long indexVal = .getLong();
                +=8;
                .ensureAvailable(ioRecid+8);
                .putLong(ioRecidindexVal);
            }else if(ins == ){
                long offset = .getLong();
                +=8;
                long val = .getLong();
                +=8;
                .ensureAvailable(offset+8);
                .putLong(offset,val);
            }else if(ins == ){
                long offset = .getLong();
                +=8;
                long val = .getSixLong();
                +=6;
                .ensureAvailable(offset+6);
                .putSixLong(offsetval);
            }else if(ins == ){
                long offset = .getLong();
                +=8;
                final int size = (int) ((offset&)>>>48);
                offset = offset&;
                //transfer byte[] directly from log file without copying into memory
                DataInput2 input = .getDataInput(size);
                synchronized (input.buf){
                    input.buf.position(input.pos);
                    input.buf.limit(input.pos+size);
                    .ensureAvailable(offset+size);
                    .putData(offsetinput.buf);
                    input.buf.clear();
                }
                +=size;
            }else if(ins == ){
                 += .-%.;
            }else{
                throw new InternalError("unknown trans log instruction: "+ins +" at log offset: "+(-1));
            }
            ins = .getByte();
            +=1;
        }
        =0;
        //flush dbs
        .sync();
        .sync();
        //and discard log
        .putLong(0, 0);
        .putLong(8, 0); //destroy seal to prevent log file from being replayed
        .close();
        .deleteFile();
         = null;
    }
    @Override
    public void rollback() throws UnsupportedOperationException {
        .lock();
        Utils.writeLockAll();
        try{
            //discard trans log
            if( !=null){
                .close();
                .deleteFile();
                 = null;
            }
            reloadIndexFile();
        }finally {
            Utils.writeUnlockAll();
            .unlock();
        }
    }
    private long[] getLongStackPage(final long physOffsetboolean read){
        long[] buf = .get(physOffset);
        if(buf == null){
            buf = new long[+1];
            if(read){
                buf[0] = .getLong(physOffset);
                for(int i=1;i<buf.length;i++){
                    buf[i] = .getSixLong(physOffset + 2 + i * 6);
                }
            }
            .put(physOffset,buf);
        }
        return buf;
    }
    @Override
    protected long longStackTake(long ioList) {
        final long physOffset = [((int) (ioList / 8))] &;
        if(physOffset == 0) return 0; //empty
        long[] buf = getLongStackPage(physOffset,true);
        final int numberOfRecordsInPage = (int) (buf[0]>>>(8*7));
        if(numberOfRecordsInPage<=0)
            throw new InternalError();
        if(numberOfRecordsInPage>throw new InternalError();
        final long ret = buf[numberOfRecordsInPage];
        final long previousListPhysid = buf[0] & ;
        //was it only record at that page?
        if(numberOfRecordsInPage == 1){
            //yes, delete this page
            long value = previousListPhysid !=0 ?
                    previousListPhysid | (((long) << 48) :
                    0L;
            //update index so it points to previous (or none)
            int ii = ((int) (ioList / 8));
            [ii] = value;
            [ii] = true;
            //put space used by this page into free list
            .remove(physOffset); //TODO write zeroes to phys file
            freePhysPut(physOffset | (((long))<<48));
        }else{
            //no, it was not last record at this page, so just decrement the counter
            buf[0] = previousListPhysid | ((1L*numberOfRecordsInPage-1L)<<(8*7));
        }
        return ret;
    }
    @Override
    protected void longStackPut(long ioListlong offset) {
        if(offset>>>48!=0) throw new IllegalArgumentException();
        //index position was cleared, put into free index list
        final long listPhysid2 = [((int) (ioList / 8))] &;
        if(listPhysid2 == 0){ //empty list?
            //yes empty, create new page and fill it with values
            final long listPhysid = freePhysTake(,false) &;
            long[] buf = getLongStackPage(listPhysid,false);
            if(listPhysid == 0) throw new InternalError();
            //set number of free records in this page to 1
            buf[0] = 1L<<(8*7);
            //set  record
            buf[1] = offset;
            //and update index file with new page location
            int ii = ((int) (ioList / 8));
            [ii] =  (((long) << 48) | listPhysid;
            [ii] = true;
        }else{
            long[] buf = getLongStackPage(listPhysid2,true);
            final int numberOfRecordsInPage = (int) (buf[0]>>>(8*7));
            if(numberOfRecordsInPage == ){ //is current page full?
                //yes it is full, so we need to allocate new page and write our number there
                final long listPhysid = freePhysTake(false) &;
                long[] bufNew = getLongStackPage(listPhysid,false);
                if(listPhysid == 0) throw new InternalError();
                //final ByteBuffers dataBuf = dataBufs[((int) (listPhysid / BUF_SIZE))];
                //set location to previous page
                //set number of free records in this page to 1
                bufNew[0] = listPhysid2 | (1L<<(8*7));
                //set free record
                bufNew[1] = offset;
                //and update index file with new page location
                int ii = ((int) (ioList / 8));
                [ii] =  (((long) << 48) | listPhysid;
                [ii] = true;
            }else{
                //there is space on page, so just write released recid and increase the counter
                buf[1+numberOfRecordsInPage] = offset;
                buf[0] = (buf[0]&) | ((1L*numberOfRecordsInPage+1L)<<(8*7));
            }
        }
    }
    protected long[] getLinkedRecordsFromLog(long ioRecid){
        long[] ret0 = .get(ioRecid);
        if(ret0!=null){
            long[] ret = new long[ret0.length];
            for(int i=0;i<ret0.length;i++){
                long offset = ret0[i] & ;
                //offset now points to log file, read phys offset from log file
                ret[i] =  .getLong(offset-8);
            }
            return ret;
        }
        return null;
    }
    @Override
    public void close() {
        .lock();
        Utils.writeLockAll();
        try{
            if( !=null){
                .sync();
                .close();
                if(){
                    .deleteFile();
                }
            }
            .sync();
            .sync();
            .close();
            .close();
            if(){
                .deleteFile();
                .deleteFile();
            }
             = null;
             = null;
        }finally {
            Utils.writeUnlockAll();
            .unlock();
        }
    }
    @Override
    public void compact() {
        //TODO lock it down here
        if(!=null && !.isEmpty()) //TODO thread unsafe?
            throw new IllegalAccessError("WAL not empty; commit first, than compact");
        super.compact();
        reloadIndexFile();
    }
New to GrepCode? Check out our FAQ X