Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (C) 2011 the original author or authors. See the notice.md file distributed with this work for additional information regarding copyright ownership. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
  
  package org.iq80.leveldb.impl;
  
  
  import java.io.File;
  import java.util.List;
  
  import static com.google.common.collect.Lists.newArrayList;
  import static org.iq80.leveldb.impl.DbConstants.L0_SLOWDOWN_WRITES_TRIGGER;
  import static org.iq80.leveldb.impl.DbConstants.L0_STOP_WRITES_TRIGGER;
  import static org.iq80.leveldb.impl.DbConstants.NUM_LEVELS;
  import static org.iq80.leveldb.impl.SequenceNumber.MAX_SEQUENCE_NUMBER;
  import static org.iq80.leveldb.impl.ValueType.DELETION;
  import static org.iq80.leveldb.impl.ValueType.VALUE;
  import static org.iq80.leveldb.util.SizeOf.SIZE_OF_INT;
  import static org.iq80.leveldb.util.SizeOf.SIZE_OF_LONG;
  import static org.iq80.leveldb.util.Slices.readLengthPrefixedBytes;
  import static org.iq80.leveldb.util.Slices.writeLengthPrefixedBytes;
  
  // todo make thread safe and concurrent
  public class DbImpl implements DB
  {
      private final Options options;
      private final File databaseDir;
      private final TableCache tableCache;
      private final DbLock dbLock;
      private final VersionSet versions;
  
      private final AtomicBoolean shuttingDown = new AtomicBoolean();
      private final ReentrantLock mutex = new ReentrantLock();
      private final Condition backgroundCondition = .newCondition();
  
      private final List<LongpendingOutputs = newArrayList(); // todo
  
      private LogWriter log;
  
      private MemTable memTable;
     private MemTable immutableMemTable;
 
 
     private volatile Throwable backgroundException;
     private Future<?> backgroundCompaction;
 
 
     public DbImpl(Options optionsFile databaseDir)
             throws IOException
     {
         Preconditions.checkNotNull(options"options is null");
         Preconditions.checkNotNull(databaseDir"databaseDir is null");
         this. = options;
 
         ifthis..compressionType() == . && !Snappy.available() ) {
             // Disable snappy if it's not available.
             this..compressionType(.);
         }
 
         this. = databaseDir;
 
         //use custom comparator if set
         DBComparator comparator = options.comparator();
         UserComparator userComparator;
         if (comparator != null) {
             userComparator = new CustomUserComparator(comparator);
         }else{
             userComparator = new BytewiseComparator();
         }
          = new InternalKeyComparator(userComparator);
          = new MemTable();
          = null;
 
 
 
         ThreadFactory compactionThreadFactory = new ThreadFactoryBuilder()
                 .setNameFormat("leveldb-compaction-%s")
                 .setUncaughtExceptionHandler(new UncaughtExceptionHandler()
                 {
                     @Override
                     public void uncaughtException(Thread tThrowable e)
                     {
                         // todo need a real UncaughtExceptionHandler
                         ..printf("%s%n"t);
                         e.printStackTrace();
                     }
                 })
                 .build();
          = Executors.newSingleThreadExecutor(compactionThreadFactory);
 
         // Reserve ten files or so for other uses and give the rest to TableCache.
         int tableCacheSize = options.maxOpenFiles() - 10;
          = new TableCache(databaseDirtableCacheSizenew InternalUserComparator(), options.verifyChecksums());
 
         // create the version set
 
         // create the database dir if it does not already exist
         databaseDir.mkdirs();
         Preconditions.checkArgument(databaseDir.exists(), "Database directory '%s' does not exist and could not be created"databaseDir);
         Preconditions.checkArgument(databaseDir.isDirectory(), "Database directory '%s' is not a directory"databaseDir);
 
         .lock();
         try {
             // lock the database dir
              = new DbLock(new File(databaseDir, Filename.lockFileName()));
 
             // verify the "current" file
             File currentFile = new File(databaseDir, Filename.currentFileName());
             if (!currentFile.canRead()) {
                 Preconditions.checkArgument(options.createIfMissing(), "Database '%s' does not exist and the create if missing option is disabled"databaseDir);
             }
             else {
                 Preconditions.checkArgument(!options.errorIfExists(), "Database '%s' exists and the error if exists option is enabled"databaseDir);
             }
 
              = new VersionSet(databaseDir);
 
             // load  (and recover) current version
             .recover();
 
             // Recover from all newer log files than the ones named in the
             // descriptor (new log files may have been added by the previous
             // incarnation without registering them in the descriptor).
             //
             // Note that PrevLogNumber() is no longer used, but we pay
             // attention to it in case we are recovering a database
             // produced by an older version of leveldb.
             long minLogNumber = .getLogNumber();
             long previousLogNumber = .getPrevLogNumber();
             List<Filefilenames = Filename.listFiles(databaseDir);
 
             List<Longlogs = Lists.newArrayList();
             for (File filename : filenames) {
                 FileInfo fileInfo = Filename.parseFileName(filename);
 
                 if (fileInfo != null &&
                         fileInfo.getFileType() == . &&
                         ((fileInfo.getFileNumber() >= minLogNumber) || (fileInfo.getFileNumber() == previousLogNumber))) {
                     logs.add(fileInfo.getFileNumber());
                 }
             }
 
             // Recover in the order in which the logs were generated
             VersionEdit edit = new VersionEdit();
             Collections.sort(logs);
             for (Long fileNumber : logs) {
                 long maxSequence = recoverLogFile(fileNumberedit);
                 if (.getLastSequence() < maxSequence) {
                     .setLastSequence(maxSequence);
                 }
             }
 
             // open transaction log
             long logFileNumber = .getNextFileNumber();
             this. = Logs.createLogWriter(new File(databaseDir, Filename.logFileName(logFileNumber)), logFileNumber);
             edit.setLogNumber(.getFileNumber());
 
             // apply recovered edits
             .logAndApply(edit);
 
             // cleanup unused files
             deleteObsoleteFiles();
 
             // schedule compactions
             maybeScheduleCompaction();
         }
         finally {
             .unlock();
         }
     }
 
     public void close() {
         if (.getAndSet(true)) {
             return;
         }
 
         .lock();
         try {
             while ( != null) {
                 .awaitUninterruptibly();
             }
         } finally {
             .unlock();
         }
 
         .shutdown();
         try {
             .awaitTermination(1, .);
         }
         catch (InterruptedException e) {
             Thread.currentThread().interrupt();
         }
         try {
             .destroy();
         }
         catch (IOException ignored) {
         }
         try {
             .close();
         }
         catch (IOException ignored) {
         }
         .close();
         .release();
     }
 
     @Override
     public String getProperty(String name)
     {
         checkBackgroundException();
         return null;
     }
 
     private void deleteObsoleteFiles()
     {
         Preconditions.checkState(.isHeldByCurrentThread());
 
         // Make a set of all of the live files
         List<Longlive = newArrayList(this.);
         for (FileMetaData fileMetaData : .getLiveFiles()) {
             live.add(fileMetaData.getNumber());
         }
 
         for (File file : Filename.listFiles()) {
             FileInfo fileInfo = Filename.parseFileName(file);
             if (fileInfo == null)
               continue;
             long number = fileInfo.getFileNumber();
             boolean keep = true;
             switch (fileInfo.getFileType()) {
                 case :
                     keep = ((number >= .getLogNumber()) ||
                             (number == .getPrevLogNumber()));
                     break;
                 case :
                     // Keep my manifest file, and any newer incarnations'
                     // (in case there is a race that allows other incarnations)
                     keep = (number >= .getManifestFileNumber());
                     break;
                 case :
                     keep = live.contains(number);
                     break;
                 case :
                     // Any temp files that are currently being written to must
                     // be recorded in pending_outputs_, which is inserted into "live"
                     keep = live.contains(number);
                     break;
                 case :
                 case :
                 case :
                     keep = true;
                     break;
             }
 
             if (!keep) {
                 if (fileInfo.getFileType() == .) {
                     .evict(number);
                 }
                 // todo info logging system needed
 //                Log(options_.info_log, "Delete type=%d #%lld\n",
 //                int(type),
 //                        static_cast < unsigned long long>(number));
                 file.delete();
             }
         }
     }
     public void flushMemTable()
     {
         .lock();
         try {
             // force compaction
             makeRoomForWrite(true);
 
             // todo bg_error code
             while( != null) {
                 .awaitUninterruptibly();
             }
 
         } finally {
             .unlock();
         }
     }
 
     public void compactRange(int levelSlice startSlice end)
     {
         Preconditions.checkArgument(level >= 0, "level is negative");
         Preconditions.checkArgument(level + 1 < "level is greater than or equal to %s");
         Preconditions.checkNotNull(start"start is null");
         Preconditions.checkNotNull(end"end is null");
 
         .lock();
         try {
             while (this. != null) {
                 .awaitUninterruptibly();
             }
             ManualCompaction manualCompaction = new ManualCompaction(levelstartend);
             this. = manualCompaction;
 
             maybeScheduleCompaction();
 
             while (this. == manualCompaction) {
                 .awaitUninterruptibly();
             }
         }
         finally {
             .unlock();
         }
 
     }
 
     private void maybeScheduleCompaction()
     {
         Preconditions.checkState(.isHeldByCurrentThread());
 
         if ( != null) {
             // Already scheduled
         }
         else if (.get()) {
             // DB is being shutdown; no more background compactions
         }
         else if ( == null &&
                  == null &&
                 !.needsCompaction()) {
             // No work to be done
         }
         else {
              = .submit(new Callable<Void>()
             {
                 @Override
                 public Void call()
                         throws Exception
                 {
                     try {
                         backgroundCall();
                     }
                     catch (DatabaseShutdownException ignored) {
                     } catch (Throwable e) {
                          = e;
                     }
                     return null;
                 }
             });
         }
     }
     
     public void checkBackgroundException() {
         Throwable e = ;
         if(e!=null) {
             throw new BackgroundProcessingException(e);
         }
     }
 
     private void backgroundCall()
             throws IOException
     {
         .lock();
         try {
             if ( == null) {
                 return;
             }
 
             try {
                 if (!.get()) {
                     backgroundCompaction();
                 }
             }
             finally {
                  = null;
             }
         }
         finally {
             try {
                 // Previous compaction may have produced too many files in a level,
                 // so reschedule another compaction if needed.
                 maybeScheduleCompaction();
             }
             finally {
                 try {
                     .signalAll();
                 }
                 finally {
                     .unlock();
                 }
             }
         }
     }
 
     private void backgroundCompaction()
             throws IOException
     {
         Preconditions.checkState(.isHeldByCurrentThread());
 
         compactMemTableInternal();
 
         Compaction compaction;
         if ( != null) {
             compaction = .compactRange(.,
                     new InternalKey(..),
                     new InternalKey(., 0, .));
         } else {
             compaction = .pickCompaction();
         }
 
         if (compaction == null) {
             // no compaction
         } else if ( == null && compaction.isTrivialMove()) {
             // Move file to next level
             Preconditions.checkState(compaction.getLevelInputs().size() == 1);
             FileMetaData fileMetaData = compaction.getLevelInputs().get(0);
             compaction.getEdit().deleteFile(compaction.getLevel(), fileMetaData.getNumber());
             compaction.getEdit().addFile(compaction.getLevel() + 1, fileMetaData);
             .logAndApply(compaction.getEdit());
             // log
         } else {
             CompactionState compactionState = new CompactionState(compaction);
             doCompactionWork(compactionState);
             cleanupCompaction(compactionState);
         }
 
         // manual compaction complete
         if ( != null) {
              = null;
         }
     }
 
     private void cleanupCompaction(CompactionState compactionState)
     {
         Preconditions.checkState(.isHeldByCurrentThread());
 
         if (compactionState.builder != null) {
             compactionState.builder.abandon();
         } else {
             Preconditions.checkArgument(compactionState.outfile == null);
         }
 
         for (FileMetaData output : compactionState.outputs) {
             .remove(output.getNumber());
         }
     }
 
     private long recoverLogFile(long fileNumberVersionEdit edit)
             throws IOException
     {
         Preconditions.checkState(.isHeldByCurrentThread());
         File file = new File(, Filename.logFileName(fileNumber));
         FileChannel channel = new FileInputStream(file).getChannel();
 
         LogMonitor logMonitor = LogMonitors.logMonitor();
         LogReader logReader = new LogReader(channellogMonitortrue, 0);
 
         // Log(options_.info_log, "Recovering log #%llu", (unsigned long long) log_number);
 
         // Read all the records and add to a memtable
         long maxSequence = 0;
         MemTable memTable = null;
         for (Slice record = logReader.readRecord(); record != nullrecord = logReader.readRecord()) {
             SliceInput sliceInput = record.input();
             // read header
             if (sliceInput.available() < 12) {
                 logMonitor.corruption(sliceInput.available(), "log record too small");
                 continue;
             }
             long sequenceBegin = sliceInput.readLong();
             int updateSize = sliceInput.readInt();
 
             // read entries
             WriteBatchImpl writeBatch = readWriteBatch(sliceInputupdateSize);
 
             // apply entries to memTable
             if (memTable == null) {
                 memTable = new MemTable();
             }
             writeBatch.forEach(new InsertIntoHandler(memTablesequenceBegin));
 
             // update the maxSequence
             long lastSequence = sequenceBegin + updateSize - 1;
             if (lastSequence > maxSequence) {
                 maxSequence = lastSequence;
             }
 
             // flush mem table if necessary
             if (memTable.approximateMemoryUsage() > .writeBufferSize()) {
                 writeLevel0Table(memTableeditnull);
                 memTable = null;
             }
         }
 
         // flush mem table
         if (memTable != null && !memTable.isEmpty()) {
             writeLevel0Table(memTableeditnull);
         }
 
         return maxSequence;
     }
 
     @Override
     public byte[] get(byte[] key)
             throws DBException
     {
         return get(keynew ReadOptions());
     }
 
     @Override
     public byte[] get(byte[] keyReadOptions options)
             throws DBException
     {
         checkBackgroundException();
         LookupKey lookupKey;
         .lock();
         try {
             SnapshotImpl snapshot = getSnapshot(options);
             lookupKey = new LookupKey(Slices.wrappedBuffer(key), snapshot.getLastSequence());
 
             // First look in the memtable, then in the immutable memtable (if any).
             LookupResult lookupResult = .get(lookupKey);
             if (lookupResult != null) {
                 Slice value = lookupResult.getValue();
                 if (value == null) {
                     return null;
                 }
                 return value.getBytes();
             }
             if ( != null) {
                 lookupResult = .get(lookupKey);
                 if (lookupResult != null) {
                     return lookupResult.getValue().getBytes();
                 }
             }
         }
         finally {
             .unlock();
         }
 
         // Not in memTables; try live files in level order
         LookupResult lookupResult = .get(lookupKey);
 
         // schedule compaction if necessary
         .lock();
         try {
             if (.needsCompaction()) {
                 maybeScheduleCompaction();
             }
         }
         finally {
             .unlock();
         }
 
         if (lookupResult != null) {
             Slice value = lookupResult.getValue();
             if (value != null) {
                 return value.getBytes();
             }
         }
         return null;
     }
 
     @Override
     public void put(byte[] keybyte[] value)
             throws DBException
     {
         put(keyvaluenew WriteOptions());
     }
 
     @Override
     public Snapshot put(byte[] keybyte[] valueWriteOptions options)
             throws DBException
     {
         return writeInternal(new WriteBatchImpl().put(keyvalue), options);
     }
 
     @Override
     public void delete(byte[] key)
             throws DBException
     {
         writeInternal(new WriteBatchImpl().delete(key), new WriteOptions());
     }
 
     @Override
     public Snapshot delete(byte[] keyWriteOptions options)
             throws DBException
     {
         return writeInternal(new WriteBatchImpl().delete(key), options);
     }
 
     @Override
     public void write(WriteBatch updates)
             throws DBException
     {
         writeInternal((WriteBatchImplupdatesnew WriteOptions());
     }
 
     @Override
     public Snapshot write(WriteBatch updatesWriteOptions options)
             throws DBException
     {
         return writeInternal((WriteBatchImplupdatesoptions);
     }
 
     public Snapshot writeInternal(WriteBatchImpl updatesWriteOptions options)
             throws DBException
     {
         checkBackgroundException();
         .lock();
         try {
             long sequenceEnd;
             if (updates.size() != 0) {
                 makeRoomForWrite(false);
 
                 // Get sequence numbers for this change set
                 final long sequenceBegin = .getLastSequence() + 1;
                 sequenceEnd = sequenceBegin + updates.size() - 1;
 
                 // Reserve this sequence in the version set
                 .setLastSequence(sequenceEnd);
 
                 // Log write
                 Slice record = writeWriteBatch(updatessequenceBegin);
                 try {
                     .addRecord(recordoptions.sync());
                 }
                 catch (IOException e) {
                     throw Throwables.propagate(e);
                 }
 
                 // Update memtable
                 updates.forEach(new InsertIntoHandler(sequenceBegin));
             } else {
                 sequenceEnd = .getLastSequence();
             }
 
             if(options.snapshot()) {
                 return new SnapshotImpl(.getCurrent(), sequenceEnd);
             } else {
                 return null;
             }
         }
         finally {
             .unlock();
         }
     }
 
     @Override
     public WriteBatch createWriteBatch()
     {
         checkBackgroundException();
         return new WriteBatchImpl();
     }
 
     @Override
     public SeekingIteratorAdapter iterator()
     {
         return iterator(new ReadOptions());
     }
 
     public SeekingIteratorAdapter iterator(ReadOptions options)
     {
         checkBackgroundException();
         .lock();
         try {
             DbIterator rawIterator = internalIterator();
 
 
             // filter any entries not visible in our snapshot
             SnapshotImpl snapshot = getSnapshot(options);
             SnapshotSeekingIterator snapshotIterator = new SnapshotSeekingIterator(rawIteratorsnapshot.getUserComparator());
             return new SeekingIteratorAdapter(snapshotIterator);
         }
         finally {
             .unlock();
         }
     }
 
     {
         return new SeekingIterable<InternalKeySlice>()
         {
             @Override
             public DbIterator iterator()
             {
                 return internalIterator();
             }
         };
     }
 
     {
         .lock();
         try {
             // merge together the memTable, immutableMemTable, and tables in version set
             MemTableIterator iterator = null;
             if ( != null) {
                 iterator = .iterator();
             }
             Version current = .getCurrent();
             return new DbIterator(.iterator(), iteratorcurrent.getLevel0Files(), current.getLevelIterators(), );
         }
         finally {
             .unlock();
         }
     }
 
     public Snapshot getSnapshot()
     {
         checkBackgroundException();
         .lock();
         try {
             return new SnapshotImpl(.getCurrent(), .getLastSequence());
         }
         finally {
             .unlock();
         }
     }
 
     private SnapshotImpl getSnapshot(ReadOptions options)
     {
         SnapshotImpl snapshot;
         if (options.snapshot() != null) {
             snapshot = (SnapshotImploptions.snapshot();
         }
         else {
             snapshot = new SnapshotImpl(.getCurrent(), .getLastSequence());
             snapshot.close(); // To avoid holding the snapshot active..
         }
         return snapshot;
     }
 
     private void makeRoomForWrite(boolean force)
     {
         Preconditions.checkState(.isHeldByCurrentThread());
 
         boolean allowDelay = !force;
 
         while (true) {
             // todo background processing system need work
 //            if (!bg_error_.ok()) {
 //              // Yield previous error
 //              s = bg_error_;
 //              break;
 //            } else
             if (allowDelay && .numberOfFilesInLevel(0) > ) {
                 // We are getting close to hitting a hard limit on the number of
                 // L0 files.  Rather than delaying a single write by several
                 // seconds when we hit the hard limit, start delaying each
                 // individual write by 1ms to reduce latency variance.  Also,
                 // this delay hands over some CPU to the compaction thread in
                 // case it is sharing the same core as the writer.
                 try {
                     .unlock();
                     Thread.sleep(1);
                 }
                 catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     throw new RuntimeException(e);
                 } finally {
                     .lock();
                 }
 
                 // Do not delay a single write more than once
                 allowDelay = false;
             }
             else if (!force && .approximateMemoryUsage() <= .writeBufferSize()) {
                 // There is room in current memtable
                 break;
             }
             else if ( != null) {
                 // We have filled up the current memtable, but the previous
                 // one is still being compacted, so we wait.
                 .awaitUninterruptibly();
             }
             else if (.numberOfFilesInLevel(0) >= ) {
                 // There are too many level-0 files.
 //                Log(options_.info_log, "waiting...\n");
                 .awaitUninterruptibly();
             }
             else {
                 // Attempt to switch to a new memtable and trigger compaction of old
                 Preconditions.checkState(.getPrevLogNumber() == 0);
 
                 // close the existing log
                 try {
                     .close();
                 }
                 catch (IOException e) {
                     throw new RuntimeException("Unable to close log file " + .getFile(), e);
                 }
 
 
                 // open a new log
                 long logNumber = .getNextFileNumber();
                 try {
                     this. = Logs.createLogWriter(new File(, Filename.logFileName(logNumber)), logNumber);
                 }
                 catch (IOException e) {
                     throw new RuntimeException("Unable to open new log file " +
                             new File(, Filename.logFileName(logNumber)).getAbsoluteFile(), e);
                 }
 
                 // create a new mem table
                  = ;
                  = new MemTable();
 
                 // Do not force another compaction there is space available
                 force = false;
 
                 maybeScheduleCompaction();
             }
         }
     }
 
     public void compactMemTable()
             throws IOException
     {
         .lock();
         try {
             compactMemTableInternal();
         }
         finally {
             .unlock();
         }
     }
 
     private void compactMemTableInternal()
             throws IOException
     {
         Preconditions.checkState(.isHeldByCurrentThread());
         if ( == null) {
             return;
         }
 
         try {
             // Save the contents of the memtable as a new Table
             VersionEdit edit = new VersionEdit();
             Version base = .getCurrent();
             writeLevel0Table(editbase);
 
             if (.get()) {
                 throw new DatabaseShutdownException("Database shutdown during memtable compaction");
             }
 
             // Replace immutable memtable with the generated Table
             edit.setPreviousLogNumber(0);
             edit.setLogNumber(.getFileNumber());  // Earlier logs no longer needed
             .logAndApply(edit);
 
              = null;
 
             deleteObsoleteFiles();
         }
         finally {
             .signalAll();
         }
     }
 
     private void writeLevel0Table(MemTable memVersionEdit editVersion base)
             throws IOException
     {
         Preconditions.checkState(.isHeldByCurrentThread());
 
         // skip empty mem table
         if (mem.isEmpty()) {
             return;
         }
 
         // write the memtable to a new sstable
         long fileNumber = .getNextFileNumber();
         .add(fileNumber);
         .unlock();
         FileMetaData meta;
         try {
             meta = buildTable(memfileNumber);
         } finally {
             .lock();
         }
         .remove(fileNumber);
 
         // Note that if file size is zero, the file has been deleted and
         // should not be added to the manifest.
         int level = 0;
         if (meta != null && meta.getFileSize() > 0) {
             Slice minUserKey = meta.getSmallest().getUserKey();
             Slice maxUserKey = meta.getLargest().getUserKey();
             if (base != null) {
                 level = base.pickLevelForMemTableOutput(minUserKeymaxUserKey);
             }
             edit.addFile(levelmeta);
         }
     }
 
     private FileMetaData buildTable(SeekingIterable<InternalKeySlicedatalong fileNumber)
             throws IOException
     {
         File file = new File(, Filename.tableFileName(fileNumber));
         try {
             FileChannel channel = new FileOutputStream(file).getChannel();
             TableBuilder tableBuilder = new TableBuilder(channelnew InternalUserComparator());
 
             InternalKey smallest = null;
             InternalKey largest = null;
             for (Entry<InternalKeySliceentry : data) {
                 // update keys
                 InternalKey key = entry.getKey();
                 if (smallest == null) {
                     smallest = key;
                 }
                 largest = key;
 
                 tableBuilder.add(key.encode(), entry.getValue());
             }
 
             tableBuilder.finish();
 
             channel.force(true);
             channel.close();
 
             if (smallest == null) {
                 return null;
             }
             FileMetaData fileMetaData = new FileMetaData(fileNumberfile.length(), smallestlargest);
 
             // verify table can be opened
             .newIterator(fileMetaData);
 
             .remove(fileNumber);
 
             return fileMetaData;
         }
         catch (IOException e) {
             file.delete();
             throw e;
         }
     }
 
     private void doCompactionWork(CompactionState compactionState)
             throws IOException
     {
         Preconditions.checkState(.isHeldByCurrentThread());
         Preconditions.checkArgument(.numberOfBytesInLevel(compactionState.getCompaction().getLevel()) > 0);
        Preconditions.checkArgument(compactionState.builder == null);
        Preconditions.checkArgument(compactionState.outfile == null);
        // todo track snapshots
        compactionState.smallestSnapshot = .getLastSequence();
        // Release mutex while we're actually doing the compaction work
        .unlock();
        try {
            MergingIterator iterator = .makeInputIterator(compactionState.compaction);
            Slice currentUserKey = null;
            boolean hasCurrentUserKey = false;
            long lastSequenceForKey = ;
            while (iterator.hasNext() && !.get()) {
                // always give priority to compacting the current mem table
                .lock();
                try {
                    compactMemTableInternal();
                }
                finally {
                    .unlock();
                }
                InternalKey key = iterator.peek().getKey();
                if (compactionState.compaction.shouldStopBefore(key) && compactionState.builder != null) {
                    finishCompactionOutputFile(compactionState);
                }
                // Handle key/value, add to state, etc.
                boolean drop = false;
                // todo if key doesn't parse (it is corrupted),
                if (false /*!ParseInternalKey(key, &ikey)*/) {
                    // do not hide error keys
                    currentUserKey = null;
                    hasCurrentUserKey = false;
                    lastSequenceForKey = ;
                }
                else {
                    if (!hasCurrentUserKey || .getUserComparator().compare(key.getUserKey(), currentUserKey) != 0) {
                        // First occurrence of this user key
                        currentUserKey = key.getUserKey();
                        hasCurrentUserKey = true;
                        lastSequenceForKey = ;
                    }
                    if (lastSequenceForKey <= compactionState.smallestSnapshot) {
                        // Hidden by an newer entry for same user key
                        drop = true// (A)
                    }
                    else if (key.getValueType() == . &&
                            key.getSequenceNumber() <= compactionState.smallestSnapshot &&
                            compactionState.compaction.isBaseLevelForKey(key.getUserKey())) {
                        // For this user key:
                        // (1) there is no data in higher levels
                        // (2) data in lower levels will have larger sequence numbers
                        // (3) data in layers that are being compacted here and have
                        //     smaller sequence numbers will be dropped in the next
                        //     few iterations of this loop (by rule (A) above).
                        // Therefore this deletion marker is obsolete and can be dropped.
                        drop = true;
                    }
                    lastSequenceForKey = key.getSequenceNumber();
                }
                if (!drop) {
                    // Open output file if necessary
                    if (compactionState.builder == null) {
                        openCompactionOutputFile(compactionState);
                    }
                    if (compactionState.builder.getEntryCount() == 0) {
                        compactionState.currentSmallest = key;
                    }
                    compactionState.currentLargest = key;
                    compactionState.builder.add(key.encode(), iterator.peek().getValue());
                    // Close output file if it is big enough
                    if (compactionState.builder.getFileSize() >=
                            compactionState.compaction.getMaxOutputFileSize()) {
                        finishCompactionOutputFile(compactionState);
                    }
                }
                iterator.next();
            }
            if (.get()) {
                throw new DatabaseShutdownException("DB shutdown during compaction");
            }
            if (compactionState.builder != null) {
</