Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.flume.channel.file;
 
 import java.io.File;
 import java.util.Map;
 import java.util.Set;
 
 
 
 
   private static final Logger LOG = LoggerFactory
   private static final int MAX_ALLOC_BUFFER_SIZE = 2*1024*1024; // 2MB
   protected static final int HEADER_SIZE = 1029;
   protected static final int INDEX_VERSION = 0;
   protected static final int INDEX_WRITE_ORDER_ID = 1;
   protected static final int INDEX_CHECKPOINT_MARKER = 4;
   protected static final int CHECKPOINT_COMPLETE = 0;
   protected static final int CHECKPOINT_INCOMPLETE = 1;
 
   protected static final String COMPRESSED_FILE_EXTENSION = ".snappy";
 
   protected LongBuffer elementsBuffer;
   protected final Map<IntegerLongoverwriteMap = new HashMap<IntegerLong>();
   protected final Map<IntegerAtomicIntegerlogFileIDReferenceCounts = Maps.newHashMap();
   protected final MappedByteBuffer mappedBuffer;
   protected final RandomAccessFile checkpointFileHandle;
   protected final File checkpointFile;
   private final Semaphore backupCompletedSema = new Semaphore(1);
   protected final boolean shouldBackup;
   protected final boolean compressBackup;
   private final File backupDir;
 
   protected EventQueueBackingStoreFile(int capacityString name,
       File checkpointFilethrows IOException,
       BadCheckpointException {
     this(capacitynamecheckpointFilenullfalsefalse);
   }
 
   protected EventQueueBackingStoreFile(int capacityString name,
       File checkpointFileFile checkpointBackupDir,
       boolean backupCheckpointboolean compressBackup)
     throws IOExceptionBadCheckpointException {
     super(capacityname);
     this. = checkpointFile;
     this. = backupCheckpoint;
     this. = compressBackup;
     this. = checkpointBackupDir;
      = new RandomAccessFile(checkpointFile"rw");
     long totalBytes = (capacity + ) * .;
     if(.length() == 0) {
       allocate(checkpointFiletotalBytes);
       .getChannel().force(true);
       .info("Preallocated " + checkpointFile + " to " + .length()
           + " for capacity " + capacity);
     }
     if(checkpointFile.length() != totalBytes) {
       String msg = "Configured capacity is " + capacity + " but the "
          + " checkpoint file capacity is " +
          ((checkpointFile.length() / .) - )
          + ". See FileChannel documentation on how to change a channels" +
          " capacity.";
      throw new BadCheckpointException(msg);
    }
        checkpointFile.length());
    long version = .get();
    if(version != (longgetVersion()) {
      throw new BadCheckpointException("Invalid version: " + version + " " +
              name + ", expected " + getVersion());
    }
    long checkpointComplete = .get();
    if(checkpointComplete != (long) {
      throw new BadCheckpointException("Checkpoint was not completed correctly,"
              + " probably because the agent stopped while the channel was"
              + " checkpointing.");
    }
    if () {
        new ThreadFactoryBuilder().setNameFormat(
          getName() + " - CheckpointBackUpThread").build());
    } else {
       = null;
    }
  }
  protected long getCheckpointLogWriteOrderID() {
  }
  protected abstract void writeCheckpointMetaData() throws IOException;

  
This method backs up the checkpoint and its metadata files. This method is called once the checkpoint is completely written and is called from a separate thread which runs in the background while the file channel continues operation.

Parameters:
backupDirectory - the directory to which the backup files should be copied.
Throws:
java.io.IOException - if the copy failed, or if there is not enough disk space to copy the checkpoint files over.
  protected void backupCheckpoint(File backupDirectorythrows IOException {
    int availablePermits = .drainPermits();
    Preconditions.checkState(availablePermits == 0,
      "Expected no permits to be available in the backup semaphore, " +
        "but " + availablePermits + " permits were available.");
    if () {
      try {
        ..sleep(10);
      } catch (Exception ex) {
        Throwables.propagate(ex);
      }
    }
    File backupFile = new File(backupDirectory);
    if (backupExists(backupDirectory)) {
      if (!backupFile.delete()) {
        throw new IOException("Error while doing backup of checkpoint. Could " +
          "not remove" + backupFile.toString() + ".");
      }
    }
    Serialization.deleteAllFiles(backupDirectory.);
    File checkpointDir = .getParentFile();
    File[] checkpointFiles = checkpointDir.listFiles();
    Preconditions.checkNotNull(checkpointFiles"Could not retrieve files " +
      "from the checkpoint directory. Cannot complete backup of the " +
      "checkpoint.");
    for (File origFile : checkpointFiles) {
      if(..contains(origFile.getName())) {
        continue;
      }
      if ( && origFile.equals()) {
        Serialization.compressFile(origFilenew File(backupDirectory,
          origFile.getName() + ));
      } else {
        Serialization.copyFile(origFilenew File(backupDirectory,
          origFile.getName()));
      }
    }
    Preconditions.checkState(!backupFile.exists(), "The backup file exists " +
      "while it is not supposed to. Are multiple channels configured to use " +
      "this directory: " + backupDirectory.toString() + " as backup?");
    if (!backupFile.createNewFile()) {
      .error("Could not create backup file. Backup of checkpoint will " +
        "not be used during replay even if checkpoint is bad.");
    }
  }

  
Restore the checkpoint, if it is found to be bad.

Returns:
true - if the previous backup was successfully completed and restore was successfully completed.
Throws:
java.io.IOException - If restore failed due to IOException
  public static boolean restoreBackup(File checkpointDirFile backupDir)
    throws IOException {
    if (!backupExists(backupDir)) {
      return false;
    }
    Serialization.deleteAllFiles(checkpointDir.);
    File[] backupFiles = backupDir.listFiles();
    if (backupFiles == null) {
      return false;
    } else {
      for (File backupFile : backupFiles) {
        String fileName = backupFile.getName();
        if (!fileName.equals() &&
          !fileName.equals(.)) {
          if (fileName.endsWith()){
            Serialization.decompressFile(
              backupFilenew File(checkpointDir,
              fileName.substring(0, fileName.lastIndexOf("."))));
          } else {
            Serialization.copyFile(backupFilenew File(checkpointDir,
              fileName));
          }
        }
      }
      return true;
    }
  }
  void beginCheckpoint() throws IOException {
    .info("Start checkpoint for " +  +
        ", elements to sync = " + .size());
    if () {
      int permits = .drainPermits();
      Preconditions.checkState(permits <= 1, "Expected only one or less " +
        "permits to checkpoint, but got " + String.valueOf(permits) +
        " permits");
      if(permits < 1) {
        // Force the checkpoint to not happen by throwing an exception.
        throw new IOException("Previous backup of checkpoint files is still " +
          "in progress. Will attempt to checkpoint only at the end of the " +
          "next checkpoint interval. Try increasing the checkpoint interval " +
          "if this error happens often.");
      }
    }
    // Start checkpoint
  }
  void checkpoint()  throws IOException {
    setLogWriteOrderID(WriteOrderOracle.next());
    .info("Updating checkpoint metadata: logWriteOrderID: "
        + getLogWriteOrderID() + ", queueSize: " + getSize() + ", queueHead: "
          + getHead());
    try {
    } catch (IOException e) {
      throw new IOException("Error writing metadata"e);
    }
    while (it.hasNext()) {
      int index = it.next();
      long value = .get(index);
      .put(indexvalue);
      it.remove();
    }
    Preconditions.checkState(.isEmpty(),
        "concurrent update detected ");
    // Finish checkpoint
    if () {
      startBackupThread();
    }
  }

  
This method starts backing up the checkpoint in the background.
  private void startBackupThread() {
      "Expected the checkpoint backup exector to be non-null, " +
        "but it is null. Checkpoint will not be backed up.");
    .info("Attempting to back up checkpoint.");
      @Override
      public void run() {
        boolean error = false;
        try {
          backupCheckpoint();
        } catch (Throwable throwable) {
          error = true;
          .error("Backing up of checkpoint directory failed."throwable);
        } finally {
          .release();
        }
        if (!error) {
          .info("Checkpoint backup completed.");
        }
      }
    });
  }
  void close() {
    try {
    } catch (IOException e) {
      .info("Error closing " + e);
    }
      .isShutdown()) {
      try {
        // Wait till the executor dies.
        while (!.awaitTermination(1,
          .));
      } catch (InterruptedException ex) {
        .warn("Interrupted while waiting for checkpoint backup to " +
          "complete");
      }
    }
  }
  long get(int index) {
    int realIndex = getPhysicalIndex(index);
    long result = ;
    if (.containsKey(realIndex)) {
      result = .get(realIndex);
    } else {
      result = .get(realIndex);
    }
    return result;
  }
    return ImmutableSortedSet.copyOf(.keySet());
  }
  void put(int indexlong value) {
    int realIndex = getPhysicalIndex(index);
    .put(realIndexvalue);
  }
  boolean syncRequired() {
    return .size() > 0;
  }
  protected void incrementFileID(int fileID) {
    AtomicInteger counter = .get(fileID);
    if(counter == null) {
      counter = new AtomicInteger(0);
      .put(fileIDcounter);
    }
    counter.incrementAndGet();
  }
  protected void decrementFileID(int fileID) {
    AtomicInteger counter = .get(fileID);
    Preconditions.checkState(counter != null"null counter ");
    int count = counter.decrementAndGet();
    if(count == 0) {
      .remove(fileID);
    }
  }
  protected int getPhysicalIndex(int index) {
    return  + (getHead() + index) % getCapacity();
  }
  protected static void allocate(File filelong totalBytesthrows IOException {
    RandomAccessFile checkpointFile = new RandomAccessFile(file"rw");
    boolean success = false;
    try {
      if (totalBytes <= ) {
        /*
         * totalBytes <= MAX_ALLOC_BUFFER_SIZE, so this can be cast to int
         * without a problem.
         */
        checkpointFile.write(new byte[(int)totalBytes]);
      } else {
        byte[] initBuffer = new byte[];
        long remainingBytes = totalBytes;
        while (remainingBytes >= ) {
          checkpointFile.write(initBuffer);
          remainingBytes -= ;
        }
        /*
         * At this point, remainingBytes is < MAX_ALLOC_BUFFER_SIZE,
         * so casting to int is fine.
         */
        if (remainingBytes > 0) {
          checkpointFile.write(initBuffer, 0, (int)remainingBytes);
        }
      }
      success = true;
    } finally {
      try {
        checkpointFile.close();
      } catch (IOException e) {
        if(success) {
          throw e;
        }
      }
    }
  }
  public static boolean backupExists(File backupDir) {
    return new File(backupDir).exists();
  }
  public static void main(String[] argsthrows Exception {
    File file = new File(args[0]);
    File inflightTakesFile = new File(args[1]);
    File inflightPutsFile = new File(args[2]);
    File queueSetDir = new File(args[3]);
    if (!file.exists()) {
      throw new IOException("File " + file + " does not exist");
    }
    if (file.length() == 0) {
      throw new IOException("File " + file + " is empty");
    }
    int capacity = (int) ((file.length() - ( * 8L)) / 8L);
        EventQueueBackingStoreFactory.get(file,capacity"debug"false);
    ..println("File Reference Counts"
            + backingStore.logFileIDReferenceCounts);
    ..println("Queue Capacity " + backingStore.getCapacity());
    ..println("Queue Size " + backingStore.getSize());
    ..println("Queue Head " + backingStore.getHead());
    for (int index = 0; index < backingStore.getCapacity(); index++) {
      long value = backingStore.get(backingStore.getPhysicalIndex(index));
      int fileID = (int) (value >>> 32);
      int offset = (intvalue;
      ..println(index + ":" + Long.toHexString(value) + " fileID = "
              + fileID + ", offset = " + offset);
    }
    FlumeEventQueue queue =
        new FlumeEventQueue(backingStoreinflightTakesFileinflightPutsFile,
            queueSetDir);
    SetMultimap<LongLongputMap = queue.deserializeInflightPuts();
    ..println("Inflight Puts:");
    for (Long txnID : putMap.keySet()) {
      Set<Longputs = putMap.get(txnID);
      ..println("Transaction ID: " + String.valueOf(txnID));
      for (long value : puts) {
        int fileID = (int) (value >>> 32);
        int offset = (intvalue;
        ..println(Long.toHexString(value) + " fileID = "
                + fileID + ", offset = " + offset);
      }
    }
    SetMultimap<LongLongtakeMap = queue.deserializeInflightTakes();
    ..println("Inflight takes:");
    for (Long txnID : takeMap.keySet()) {
      Set<Longtakes = takeMap.get(txnID);
      ..println("Transaction ID: " + String.valueOf(txnID));
      for (long value : takes) {
        int fileID = (int) (value >>> 32);
        int offset = (intvalue;
        ..println(Long.toHexString(value) + " fileID = "
                + fileID + ", offset = " + offset);
      }
    }
  }
New to GrepCode? Check out our FAQ X