Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.flume.channel.file;
 
 
 import  javax.annotation.Nullable;
 import java.io.File;
Represents a single data file on disk. Has methods to write, read sequentially (replay), and read randomly (channel takes).
 
 public class LogFileV3 extends LogFile {
   protected static final Logger LOGGER =
       LoggerFactory.getLogger(LogFileV3.class);
 
   private LogFileV3() {}
 
   static class MetaDataWriter extends LogFile.MetaDataWriter {
     private final File metaDataFile;
     protected MetaDataWriter(File logFileint logFileIDthrows IOException {
       super(logFilelogFileID);
        = Serialization.getMetaDataFile(logFile);
       MetaDataReader metaDataReader = new MetaDataReader(logFilelogFileID);
        = metaDataReader.read();
       int version = .getVersion();
       if(version != getVersion()) {
         throw new IOException("Version is " + Integer.toHexString(version) +
             " expected " + Integer.toHexString(getVersion())
             + " file: " + logFile);
       }
     }
 
     @Override
     int getVersion() {
       return .;
     }
 
     @Override
     void markCheckpoint(long currentPositionlong logWriteOrderID)
         throws IOException {
       ProtosFactory.LogFileMetaData.Builder metaDataBuilder =
           ProtosFactory.LogFileMetaData.newBuilder();
       metaDataBuilder.setCheckpointPosition(currentPosition);
       metaDataBuilder.setCheckpointWriteOrderID(logWriteOrderID);
       /*
        * Set the previous checkpoint position and write order id so that it
        * would be possible to recover from a backup.
        */
       metaDataBuilder.setBackupCheckpointPosition(
         .getCheckpointPosition());
         .getCheckpointWriteOrderID());
        = metaDataBuilder.build();
     }
   }
  static class MetaDataReader {
    private final File logFile;
    private final File metaDataFile;
    private final int logFileID;
    protected MetaDataReader(File logFileint logFileIDthrows IOException {
      this. = logFile;
       = Serialization.getMetaDataFile(logFile);
      this. = logFileID;
    }
      FileInputStream inputStream = new FileInputStream();
      try {
        ProtosFactory.LogFileMetaData metaData = Preconditions.checkNotNull(
          ProtosFactory.LogFileMetaData.
            parseDelimitedFrom(inputStream), "Metadata cannot be null");
        if (metaData.getLogFileID() != ) {
          throw new IOException("The file id of log file: "
              +  + " is different from expected "
              + " id: expected = " +  + ", found = "
              + metaData.getLogFileID());
        }
        return metaData;
      } finally {
        try {
          inputStream.close();
        } catch(IOException e) {
          .warn("Unable to close " + e);
        }
      }
    }
  }

  
Writes a GeneratedMessage to a temp file, synchronizes it to disk and then renames the file over file.

Parameters:
msg GeneratedMessage to write to the file
file destination file
Throws:
IOException if a write error occurs or the File.renameTo method returns false meaning the file could not be overwritten.
  public static void writeDelimitedTo(GeneratedMessage msgFile file)
  throws IOException {
    File tmp = Serialization.getMetaDataTempFile(file);
    FileOutputStream outputStream = new FileOutputStream(tmp);
    boolean closed = false;
    try {
      msg.writeDelimitedTo(outputStream);
      outputStream.getChannel().force(true);
      outputStream.close();
      closed = true;
      if(!tmp.renameTo(file)) {
        //Some platforms don't support moving over an existing file.
        //So:
        //log.meta -> log.meta.old
        //log.meta.tmp -> log.meta
        //delete log.meta.old
        File oldFile = Serialization.getOldMetaDataFile(file);
        if(!file.renameTo(oldFile)){
          throw new IOException("Unable to rename " + file + " to " + oldFile);
        }
        if(!tmp.renameTo(file)) {
          throw new IOException("Unable to rename " + tmp + " over " + file);
        }
        oldFile.delete();
      }
    } finally {
      if(!closed) {
        try {
          outputStream.close();
        } catch(IOException e) {
          .warn("Unable to close " + tmpe);
        }
      }
    }
  }
  static class Writer extends LogFile.Writer {
    Writer(File fileint logFileIDlong maxFileSize,
        @Nullable Key encryptionKey,
        @Nullable String encryptionKeyAlias,
        @Nullable String encryptionCipherProvider,
        long usableSpaceRefreshIntervalboolean fsyncPerTransaction,
        int fsyncIntervalthrows IOException {
      super(filelogFileIDmaxFileSize, CipherProviderFactory.
          getEncrypter(encryptionCipherProviderencryptionKey),
          usableSpaceRefreshIntervalfsyncPerTransactionfsyncInterval);
      ProtosFactory.LogFileMetaData.Builder metaDataBuilder =
          ProtosFactory.LogFileMetaData.newBuilder();
      if(encryptionKey != null) {
        Preconditions.checkNotNull(encryptionKeyAlias"encryptionKeyAlias");
        Preconditions.checkNotNull(encryptionCipherProvider,
            "encryptionCipherProvider");
        ProtosFactory.LogFileEncryption.Builder logFileEncryptionBuilder =
            ProtosFactory.LogFileEncryption.newBuilder();
        logFileEncryptionBuilder.setCipherProvider(encryptionCipherProvider);
        logFileEncryptionBuilder.setKeyAlias(encryptionKeyAlias);
        logFileEncryptionBuilder.setParameters(
            ByteString.copyFrom(getEncryptor().getParameters()));
        metaDataBuilder.setEncryption(logFileEncryptionBuilder);
      }
      metaDataBuilder.setVersion(getVersion());
      metaDataBuilder.setLogFileID(logFileID);
      metaDataBuilder.setCheckpointPosition(0L);
      metaDataBuilder.setCheckpointWriteOrderID(0L);
      metaDataBuilder.setBackupCheckpointPosition(0L);
      metaDataBuilder.setBackupCheckpointWriteOrderID(0L);
      File metaDataFile = Serialization.getMetaDataFile(file);
      writeDelimitedTo(metaDataBuilder.build(), metaDataFile);
    }
    @Override
    int getVersion() {
      return .;
    }
  }
  static class RandomReader extends LogFile.RandomReader {
    private volatile boolean initialized;
    private volatile boolean encryptionEnabled;
    private volatile Key key;
    private volatile String cipherProvider;
    private volatile byte[] parameters;
    RandomReader(File file, @Nullable KeyProvider encryptionKeyProvider,
      boolean fsyncPerTransactionthrows IOException {
      super(fileencryptionKeyProviderfsyncPerTransaction);
    }
    private void initialize() throws IOException {
      File metaDataFile = Serialization.getMetaDataFile(getFile());
      FileInputStream inputStream = new FileInputStream(metaDataFile);
      try {
        ProtosFactory.LogFileMetaData metaData =
            Preconditions.checkNotNull(ProtosFactory.LogFileMetaData.
                parseDelimitedFrom(inputStream), "MetaData cannot be null");
        int version = metaData.getVersion();
        if(version != getVersion()) {
          throw new IOException("Version is " + Integer.toHexString(version) +
              " expected " + Integer.toHexString(getVersion())
              + " file: " + getFile().getCanonicalPath());
        }
         = false;
        if(metaData.hasEncryption()) {
          if(getKeyProvider() == null) {
            throw new IllegalStateException("Data file is encrypted but no " +
                " provider was specified");
          }
          ProtosFactory.LogFileEncryption encryption = metaData.getEncryption();
           = getKeyProvider().getKey(encryption.getKeyAlias());
           = encryption.getCipherProvider();
           = encryption.getParameters().toByteArray();
           = true;
        }
      } finally {
        try {
          inputStream.close();
        } catch(IOException e) {
          .warn("Unable to close " + metaDataFilee);
        }
      }
    }
      CipherProvider.Decryptor decryptor = .poll();
      if(decryptor == null) {
        decryptor = CipherProviderFactory.getDecrypter(,
            );
      }
      return decryptor;
    }
    @Override
    int getVersion() {
      return .;
    }
    @Override
    protected TransactionEventRecord doGet(RandomAccessFile fileHandle)
        throws IOExceptionCorruptEventException {
      // readers are opened right when the file is created and thus
      // empty. As such we wait to initialize until there is some
      // data before we we initialize
      synchronized (this) {
        if(!) {
           = true;
          initialize();
        }
      }
      boolean success = false;
      CipherProvider.Decryptor decryptor = null;
      try {
        byte[] buffer = readDelimitedBuffer(fileHandle);
        if() {
          decryptor = getDecryptor();
          buffer = decryptor.decrypt(buffer);
        }
        TransactionEventRecord event = TransactionEventRecord.
            fromByteArray(buffer);
        success = true;
        return event;
      } catch(DecryptionFailureException ex) {
        throw new CorruptEventException("Error decrypting event"ex);
      } finally {
        if(success &&  && decryptor != null) {
          .offer(decryptor);
        }
      }
    }
  }
  public static class SequentialReader extends LogFile.SequentialReader {
    private final boolean fsyncPerTransaction;
    public SequentialReader(File file, @Nullable KeyProvider
      encryptionKeyProviderboolean fsyncPerTransactionthrows EOFException,
      IOException {
      super(fileencryptionKeyProvider);
      this. = fsyncPerTransaction;
      File metaDataFile = Serialization.getMetaDataFile(file);
      FileInputStream inputStream = new FileInputStream(metaDataFile);
      try {
        ProtosFactory.LogFileMetaData metaData = Preconditions.checkNotNull(
            ProtosFactory.LogFileMetaData.parseDelimitedFrom(inputStream),
            "MetaData cannot be null");
        int version = metaData.getVersion();
        if(version != getVersion()) {
          throw new IOException("Version is " + Integer.toHexString(version) +
              " expected " + Integer.toHexString(getVersion())
              + " file: " + file.getCanonicalPath());
        }
        if(metaData.hasEncryption()) {
          if(getKeyProvider() == null) {
            throw new IllegalStateException("Data file is encrypted but no " +
                " provider was specified");
          }
          ProtosFactory.LogFileEncryption encryption = metaData.getEncryption();
          Key key = getKeyProvider().getKey(encryption.getKeyAlias());
           = CipherProviderFactory.
              getDecrypter(encryption.getCipherProvider(), key,
                  encryption.getParameters().toByteArray());
        }
        setLogFileID(metaData.getLogFileID());
          metaData.getBackupCheckpointWriteOrderID());
      } finally {
        try {
          inputStream.close();
        } catch(IOException e) {
          .warn("Unable to close " + metaDataFilee);
        }
      }
    }
    @Override
    public int getVersion() {
      return .;
    }
    @Override
    LogRecord doNext(int offsetthrows IOExceptionCorruptEventException,
      byte[] buffer = null;
      TransactionEventRecord event = null;
      try {
        buffer = readDelimitedBuffer(getFileHandle());
        if ( != null) {
          buffer = .decrypt(buffer);
        }
        event = TransactionEventRecord.fromByteArray(buffer);
      } catch (CorruptEventException ex) {
        .warn("Corrupt file found. File id: log-" + this.getLogFileID(),
          ex);
        // Return null so that replay handler thinks all events in this file
        // have been taken.
        if (!) {
          return null;
        }
        throw ex;
      } catch (DecryptionFailureException ex) {
        if (!) {
          .warn("Could not decrypt even read from channel. Skipping " +
            "event."ex);
          return null;
        }
        throw ex;
      }
      return new LogRecord(getLogFileID(), offsetevent);
    }
  }
New to GrepCode? Check out our FAQ X