Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.flume.channel.file;
 
 
 import  javax.annotation.Nullable;
 import java.io.File;
 import java.util.List;
 
 public abstract class LogFile {
 
   private static final Logger LOG = LoggerFactory
       .getLogger(LogFile.class);


  
This class preallocates the data files 1MB at time to avoid the updating of the inode on each write and to avoid the disk filling up during a write. It's also faster, so there.
 
   private static final ByteBuffer FILL = DirectMemoryUtils.
       allocate(1024 * 1024); // preallocation, 1MB
 
   public static final byte OP_RECORD = .;
   public static final byte OP_NOOP = (. + .)/2;
   public static final byte OP_EOF = .;
 
   static {
     for (int i = 0; i < .capacity(); i++) {
       .put();
     }
   }
 
   protected static void skipRecord(RandomAccessFile fileHandle,
     int offsetthrows IOException {
     fileHandle.seek(offset);
     int length = fileHandle.readInt();
     fileHandle.skipBytes(length);
   }
 
   abstract static class MetaDataWriter {
     private final File file;
     private final int logFileID;
     private final RandomAccessFile writeFileHandle;
 
     private long lastCheckpointOffset;
     private long lastCheckpointWriteOrderID;
 
     protected MetaDataWriter(File fileint logFileIDthrows IOException {
       this. = file;
       this. = logFileID;
        = new RandomAccessFile(file"rw");
 
     }
     protected RandomAccessFile getFileHandle() {
       return ;
     }
     protected void setLastCheckpointOffset(long lastCheckpointOffset) {
      this. = lastCheckpointOffset;
    }
    protected void setLastCheckpointWriteOrderID(long lastCheckpointWriteOrderID) {
      this. = lastCheckpointWriteOrderID;
    }
    protected long getLastCheckpointOffset() {
      return ;
    }
    protected long getLastCheckpointWriteOrderID() {
      return ;
    }
    protected File getFile() {
      return ;
    }
    protected int getLogFileID() {
      return ;
    }
    void markCheckpoint(long logWriteOrderID)
        throws IOException {
      markCheckpoint(logWriteOrderID);
    }
    abstract void markCheckpoint(long currentPositionlong logWriteOrderID)
        throws IOException;
    abstract int getVersion();
    void close() {
      try {
        .close();
      } catch (IOException e) {
        .warn("Unable to close " + e);
      }
    }
  }
  static class CachedFSUsableSpace {
    private final File fs;
    private final long interval;
    private final AtomicLong lastRefresh;
    private final AtomicLong value;
    CachedFSUsableSpace(File fslong interval) {
      this. = fs;
      this. = interval;
      this. = new AtomicLong(fs.getUsableSpace());
      this. = new AtomicLong(System.currentTimeMillis());
    }
    void decrement(long numBytes) {
      Preconditions.checkArgument(numBytes >= 0, "numBytes less than zero");
      .addAndGet(-numBytes);
    }
    long getUsableSpace() {
      long now = System.currentTimeMillis();
      if(now -  > .get()) {
        .set(.getUsableSpace());
        .set(now);
      }
      return Math.max(.get(), 0L);
    }
  }
  static abstract class Writer {
    private final int logFileID;
    private final File file;
    private final long maxFileSize;
    private final RandomAccessFile writeFileHandle;
    private final FileChannel writeFileChannel;
    private final CipherProvider.Encryptor encryptor;
    private final CachedFSUsableSpace usableSpace;
    private volatile boolean open;
    private long lastCommitPosition;
    private long lastSyncPosition;
    private final boolean fsyncPerTransaction;
    private final int fsyncInterval;
    private final ScheduledExecutorService syncExecutor;
    private volatile boolean dirty = false;
    // To ensure we can count the number of fsyncs.
    private long syncCount;
    Writer(File fileint logFileIDlong maxFileSize,
        CipherProvider.Encryptor encryptorlong usableSpaceRefreshInterval,
        boolean fsyncPerTransactionint fsyncIntervalthrows IOException {
      this. = file;
      this. = logFileID;
      this. = Math.min(maxFileSize,
      this. = encryptor;
       = new RandomAccessFile(file"rw");
      this. = fsyncPerTransaction;
      this. = fsyncInterval;
      if(!fsyncPerTransaction) {
        .info("Sync interval = " + fsyncInterval);
         = Executors.newSingleThreadScheduledExecutor();
          @Override
          public void run() {
            try {
              sync();
            } catch (Throwable ex) {
              .error("Data file, " + getFile().toString() + " could not " +
                "be synced to disk due to an error."ex);
            }
          }
        }, fsyncIntervalfsyncInterval.);
      } else {
         = null;
      }
       = new CachedFSUsableSpace(fileusableSpaceRefreshInterval);
      .info("Opened " + file);
       = true;
    }
    abstract int getVersion();
    protected CipherProvider.Encryptor getEncryptor() {
      return ;
    }
    int getLogFileID() {
      return ;
    }
    File getFile() {
      return ;
    }
    String getParent() {
      return .getParent();
    }
    long getUsableSpace() {
      return .getUsableSpace();
    }
    long getMaxSize() {
      return ;
    }
    long getLastCommitPosition(){
      return ;
    }
    long getLastSyncPosition() {
      return ;
    }
    long getSyncCount() {
      return ;
    }
    synchronized long position() throws IOException {
      return getFileChannel().position();
    }
    // encrypt and write methods may not be thread safe in the following
    // methods, so all methods need to be synchronized.
    synchronized FlumeEventPointer put(ByteBuffer bufferthrows IOException {
      if( != null) {
        buffer = ByteBuffer.wrap(.encrypt(buffer.array()));
      }
      Pair<IntegerIntegerpair = write(buffer);
      return new FlumeEventPointer(pair.getLeft(), pair.getRight());
    }
    synchronized void take(ByteBuffer bufferthrows IOException {
      if( != null) {
        buffer = ByteBuffer.wrap(.encrypt(buffer.array()));
      }
      write(buffer);
    }
    synchronized void rollback(ByteBuffer bufferthrows IOException {
      if( != null) {
        buffer = ByteBuffer.wrap(.encrypt(buffer.array()));
      }
      write(buffer);
    }
    synchronized void commit(ByteBuffer bufferthrows IOException {
      if ( != null) {
        buffer = ByteBuffer.wrap(.encrypt(buffer.array()));
      }
      write(buffer);
       = true;
       = position();
    }
    private Pair<IntegerIntegerwrite(ByteBuffer buffer)
      throws IOException {
      if(!isOpen()) {
        throw new LogFileRetryableIOException("File closed " + );
      }
      long length = position();
      long expectedLength = length + (longbuffer.limit();
      if(expectedLength > ) {
        throw new LogFileRetryableIOException(expectedLength + " > " +
            );
      }
      int offset = (int)length;
      Preconditions.checkState(offset >= 0, String.valueOf(offset));
      // OP_RECORD + size + buffer
      int recordLength = 1 + (int). + buffer.limit();
      .decrement(recordLength);
      preallocate(recordLength);
      ByteBuffer toWrite = ByteBuffer.allocate(recordLength);
      toWrite.put();
      writeDelimitedBuffer(toWritebuffer);
      toWrite.position(0);
      int wrote = getFileChannel().write(toWrite);
      Preconditions.checkState(wrote == toWrite.limit());
      return Pair.of(getLogFileID(), offset);
    }
    synchronized boolean isRollRequired(ByteBuffer bufferthrows IOException {
      return isOpen() && position() + (longbuffer.limit() > getMaxSize();
    }

    
Sync the underlying log file to disk. Expensive call, should be used only on commits. If a sync has already happened after the last commit, this method is a no-op

Throws:
IOException
LogFileRetryableIOException - if this log file is closed.
    synchronized void sync() throws IOException {
      if (! && !) {
        if(.isDebugEnabled()) {
          .debug(
            "No events written to file, " + getFile().toString() +
              " in last " +  + " or since last commit.");
        }
        return;
      }
      if (!isOpen()) {
        throw new LogFileRetryableIOException("File closed " + );
      }
      if ( < ) {
        getFileChannel().force(false);
         = position();
        ++;
         = false;
      }
    }
    protected boolean isOpen() {
      return ;
    }
    protected RandomAccessFile getFileHandle() {
      return ;
    }
    protected FileChannel getFileChannel() {
      return ;
    }
    synchronized void close() {
      if() {
         = false;
        if (!) {
          // Shutdown the executor before attempting to close.
          if( != null) {
            // No need to wait for it to shutdown.
            .shutdown();
          }
        }
        if(.isOpen()) {
          .info("Closing " + );
          try {
            .force(true);
          } catch (IOException e) {
            .warn("Unable to flush to disk " + e);
          }
          try {
            .close();
          } catch (IOException e) {
            .warn("Unable to close " + e);
          }
        }
      }
    }
    protected void preallocate(int sizethrows IOException {
      long position = position();
      if(position + size > getFileChannel().size()) {
        .debug("Preallocating at position " + position);
        synchronized () {
          .position(0);
          getFileChannel().write(position);
        }
      }
    }
  }

  
This is an class meant to be an internal Flume API, and can change at any time. Intended to be used only from File Channel Integrity test tool. Not to be used for any other purpose.
  public static class OperationRecordUpdater {
    private final RandomAccessFile fileHandle;
    private final File file;
    public OperationRecordUpdater(File filethrows FileNotFoundException {
      Preconditions.checkState(file.exists(), "File to update, " +
        file.toString() + " does not exist.");
      this. = file;
       = new RandomAccessFile(file"rw");
    }
    public void markRecordAsNoop(long offsetthrows IOException {
      // First ensure that the offset actually is an OP_RECORD. There is a
      // small possibility that it still is OP_RECORD,
      // but is not actually the beginning of a record. Is there anything we
      // can do about it?
      .seek(offset);
      byte byteRead = .readByte();
      Preconditions.checkState(byteRead ==  || byteRead == ,
        "Expected to read a record but the byte read indicates EOF");
      .seek(offset);
      .info("Marking event as " +  + " at " + offset + " for file " +
        .toString());
    }
    public void close() {
      try {
        .getFD().sync();
        .close();
      } catch (IOException e) {
        .error("Could not close file handle to file " +
          .toString(), e);
      }
    }
  }
  static abstract class RandomReader {
    private final File file;
        new ArrayBlockingQueue<RandomAccessFile>(50, true);
    private final KeyProvider encryptionKeyProvider;
    private final boolean fsyncPerTransaction;
    private volatile boolean open;
    public RandomReader(File file, @Nullable KeyProvider
      encryptionKeyProviderboolean fsyncPerTransaction)
        throws IOException {
      this. = file;
      this. = encryptionKeyProvider;
      .add(open());
      this. = fsyncPerTransaction;
       = true;
    }
    protected abstract TransactionEventRecord doGet(RandomAccessFile fileHandle)
        throws IOExceptionCorruptEventException;
    abstract int getVersion();
    File getFile() {
      return ;
    }
    protected KeyProvider getKeyProvider() {
      return ;
    }
    FlumeEvent get(int offsetthrows IOExceptionInterruptedException,
      Preconditions.checkState("File closed");
      RandomAccessFile fileHandle = checkOut();
      boolean error = true;
      try {
        fileHandle.seek(offset);
        byte operation = fileHandle.readByte();
        if(operation == ) {
          throw new NoopRecordException("No op record found. Corrupt record " +
            "may have been repaired by File Channel Integrity tool");
        }
        if (operation != ) {
          throw new CorruptEventException(
            "Operation code is invalid. File " +
              "is corrupt. Please run File Channel Integrity tool.");
        }
        TransactionEventRecord record = doGet(fileHandle);
        if(!(record instanceof Put)) {
          Preconditions.checkState(false"Record is " +
              record.getClass().getSimpleName());
        }
        error = false;
        return ((Put)record).getEvent();
      } finally {
        if(error) {
          close(fileHandle);
        } else {
          checkIn(fileHandle);
        }
      }
    }
    synchronized void close() {
      if() {
         = false;
        .info("Closing RandomReader " + );
        List<RandomAccessFilefileHandles = Lists.newArrayList();
        while(.drainTo(fileHandles) > 0) {
          for(RandomAccessFile fileHandle : fileHandles) {
            synchronized (fileHandle) {
              try {
                fileHandle.close();
              } catch (IOException e) {
                .warn("Unable to close fileHandle for " + e);
              }
            }
          }
          fileHandles.clear();
          try {
            Thread.sleep(5L);
          } catch (InterruptedException e) {
            // this is uninterruptable
          }
        }
      }
    }
    private RandomAccessFile open() throws IOException {
      return new RandomAccessFile("r");
    }
    private void checkIn(RandomAccessFile fileHandle) {
      if(!.offer(fileHandle)) {
        close(fileHandle);
      }
    }
    private RandomAccessFile checkOut()
        throws IOExceptionInterruptedException {
      RandomAccessFile fileHandle = .poll();
      if(fileHandle != null) {
        return fileHandle;
      }
      int remaining = .remainingCapacity();
      if(remaining > 0) {
        .info("Opening " +  + " for read, remaining number of file " +
          "handles available for reads of this file is " + remaining);
        return open();
      }
      return .take();
    }
    private static void close(RandomAccessFile fileHandleFile file) {
      if(fileHandle != null) {
        try {
          fileHandle.close();
        } catch (IOException e) {
          .warn("Unable to close " + filee);
        }
      }
    }
  }
  public static abstract class SequentialReader {
    private final RandomAccessFile fileHandle;
    private final FileChannel fileChannel;
    private final File file;
    private final KeyProvider encryptionKeyProvider;
    private int logFileID;
    private long lastCheckpointPosition;
    private long lastCheckpointWriteOrderID;
    private long backupCheckpointPosition;
    private long backupCheckpointWriteOrderID;

    
Construct a Sequential Log Reader object

Parameters:
file
Throws:
IOException if an I/O error occurs
EOFException if the file is empty
    SequentialReader(File file, @Nullable KeyProvider encryptionKeyProvider)
        throws IOExceptionEOFException {
      this. = file;
      this. = encryptionKeyProvider;
       = new RandomAccessFile(file"r");
    }
    abstract LogRecord doNext(int offsetthrows IOExceptionCorruptEventException;
    abstract int getVersion();
    protected void setLastCheckpointPosition(long lastCheckpointPosition) {
      this. = lastCheckpointPosition;
    }
    protected void setLastCheckpointWriteOrderID(long lastCheckpointWriteOrderID) {
      this. = lastCheckpointWriteOrderID;
    }
    protected void setPreviousCheckpointPosition(
      long backupCheckpointPosition) {
      this. = backupCheckpointPosition;
    }
    protected void setPreviousCheckpointWriteOrderID(
      long backupCheckpointWriteOrderID) {
      this. = backupCheckpointWriteOrderID;
    }
    protected void setLogFileID(int logFileID) {
      this. = logFileID;
      Preconditions.checkArgument(logFileID >= 0, "LogFileID is not positive: "
          + Integer.toHexString(logFileID));
    }
    protected KeyProvider getKeyProvider() {
      return ;
    }
    protected RandomAccessFile getFileHandle() {
      return ;
    }
    int getLogFileID() {
      return ;
    }
    void skipToLastCheckpointPosition(long checkpointWriteOrderID)
      throws IOException {
      if ( > 0L) {
        long position = 0;
        if ( <= checkpointWriteOrderID) {
          position = ;
        } else if ( <= checkpointWriteOrderID
          &&  > 0) {
          position = ;
        }
        .position(position);
        .info("fast-forward to checkpoint position: " + position);
      } else {
        .info("Checkpoint for file(" + .getAbsolutePath() + ") "
          + "is: " +  + ", which is beyond the "
          + "requested checkpoint time: " + checkpointWriteOrderID
          + " and position " + );
      }
    }
    public LogRecord next() throws IOExceptionCorruptEventException {
      int offset = -1;
      try {
        long position = .position();
        if (position > .) {
          .info("File position exceeds the threshold: "
                + .
                + ", position: " + position);
        }
        offset = (intposition;
        Preconditions.checkState(offset >= 0);
        while (offset < .length()) {
          byte operation = .readByte();
          if (operation == ) {
            break;
          } else if (operation == ) {
            .info("Encountered EOF at " + offset + " in " + );
            return null;
          } else if (operation == ) {
            .info("No op event found in file: " + .toString() +
              " at " + offset + ". Skipping event.");
            skipRecord(offset + 1);
            offset = (int.getFilePointer();
            continue;
          } else {
            .error("Encountered non op-record at " + offset + " " +
              Integer.toHexString(operation) + " in " + );
            return null;
          }
        }
        if(offset >= .length()) {
          return null;
        }
        return doNext(offset);
      } catch(EOFException e) {
        return null;
      } catch (IOException e) {
        throw new IOException("Unable to read next Transaction from log file " +
            .getCanonicalPath() + " at offset " + offsete);
      }
    }
    public long getPosition() throws IOException {
      return .position();
    }
    public void close() {
      if( != null) {
        try {
          .close();
        } catch (IOException e) {}
      }
    }
  }
  protected static void writeDelimitedBuffer(ByteBuffer outputByteBuffer buffer)
      throws IOException {
    output.putInt(buffer.limit());
    output.put(buffer);
  }
  protected static byte[] readDelimitedBuffer(RandomAccessFile fileHandle)
      throws IOExceptionCorruptEventException {
    int length = fileHandle.readInt();
    if (length < 0) {
      throw new CorruptEventException("Length of event is: " + String.valueOf
        (length) + ". Event must have length >= 0. Possible corruption of " +
        "data or partial fsync.");
    }
    byte[] buffer = new byte[length];
    try {
      fileHandle.readFully(buffer);
    } catch (EOFException ex) {
      throw new CorruptEventException("Remaining data in file less than " +
        "expected size of event."ex);
    }
    return buffer;
  }
  public static void main(String[] argsthrows EOFExceptionIOExceptionCorruptEventException {
    File file = new File(args[0]);
    LogFile.SequentialReader reader = null;
    try {
      reader = LogFileFactory.getSequentialReader(filenullfalse);
      LogRecord entry;
      FlumeEventPointer ptr;
      // for puts the fileId is the fileID of the file they exist in
      // for takes the fileId and offset are pointers to a put
      int fileId = reader.getLogFileID();
      int count = 0;
      int readCount = 0;
      int putCount = 0;
      int takeCount = 0;
      int rollbackCount = 0;
      int commitCount = 0;
      while ((entry = reader.next()) != null) {
        int offset = entry.getOffset();
        TransactionEventRecord record = entry.getEvent();
        short type = record.getRecordType();
        long trans = record.getTransactionID();
        long ts = record.getLogWriteOrderID();
        readCount++;
        ptr = null;
        if (type == ...get()) {
          putCount++;
          ptr = new FlumeEventPointer(fileIdoffset);
        } else if (type == ...get()) {
          takeCount++;
          Take take = (Takerecord;
          ptr = new FlumeEventPointer(take.getFileID(), take.getOffset());
        } else if (type == ...get()) {
          rollbackCount++;
        } else if (type == ...get()) {
          commitCount++;
        } else {
          Preconditions.checkArgument(false"Unknown record type: "
              + Integer.toHexString(type));
        }
        ..println(Joiner.on(", ").skipNulls().join(
            transtsfileIdoffset, TransactionEventRecord.getName(type), ptr));
      }
      ..println("Replayed " + count + " from " + file + " read: " + readCount
          + ", put: " + putCount + ", take: "
          + takeCount + ", rollback: " + rollbackCount + ", commit: "
          + commitCount);
    } catch (EOFException e) {
      ..println("Hit EOF on " + file);
    } finally {
      if (reader != null) {
        reader.close();
      }
    }
  }
New to GrepCode? Check out our FAQ X