Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.flume.channel.file;
 
 import java.io.File;
 import java.util.Map;
 
 
 
   private static final Logger LOG = LoggerFactory
   private final File metaDataFile;
 
   EventQueueBackingStoreFileV3(File checkpointFileint capacity,
       String namethrows IOExceptionBadCheckpointException {
     this(checkpointFilecapacitynamenullfalsefalse);
   }
 
   EventQueueBackingStoreFileV3(File checkpointFileint capacity,
       String nameFile checkpointBackupDir,
       boolean backupCheckpointboolean compressBackup)
       throws IOExceptionBadCheckpointException {
     super(capacitynamecheckpointFilecheckpointBackupDirbackupCheckpoint,
       compressBackup);
     Preconditions.checkArgument(capacity > 0,
         "capacity must be greater than 0 " + capacity);
      = Serialization.getMetaDataFile(checkpointFile);
     .info("Starting up with " + checkpointFile + " and " + );
     if(.exists()) {
       FileInputStream inputStream = new FileInputStream();
       try {
         .info("Reading checkpoint metadata from " + );
         ProtosFactory.Checkpoint checkpoint =
             ProtosFactory.Checkpoint.parseDelimitedFrom(inputStream);
         if(checkpoint == null) {
           throw new BadCheckpointException("The checkpoint metadata file does "
                   + "not exist or has zero length");
         }
         int version = checkpoint.getVersion();
         if(version != getVersion()) {
           throw new BadCheckpointException("Invalid version: " + version +
                   " " + name + ", expected " + getVersion());
         }
         long logWriteOrderID = checkpoint.getWriteOrderID();
         if(logWriteOrderID != getCheckpointLogWriteOrderID()) {
           String msg = "Checkpoint and Meta files have differing " +
               "logWriteOrderIDs " + getCheckpointLogWriteOrderID() + ", and "
               + logWriteOrderID;
           .warn(msg);
           throw new BadCheckpointException(msg);
         }
         WriteOrderOracle.setSeed(logWriteOrderID);
         setLogWriteOrderID(logWriteOrderID);
         setSize(checkpoint.getQueueSize());
         setHead(checkpoint.getQueueHead());
         for(ProtosFactory.ActiveLog activeLog : checkpoint.getActiveLogsList()) {
           Integer logFileID = activeLog.getLogFileID();
           Integer count = activeLog.getCount();
           .put(logFileIDnew AtomicInteger(count));
         }
       } catch (InvalidProtocolBufferException ex) {
         throw new BadCheckpointException("Checkpoint metadata file is invalid. "
                 + "The agent might have been stopped while it was being "
                 + "written"ex);
       } finally {
         try {
           inputStream.close();
         } catch (IOException e) {
           .warn("Unable to close " + e);
         }
       }
     } else {
      if(backupExists(checkpointBackupDir) && ) {
        // If a backup exists, then throw an exception to recover checkpoint
        throw new BadCheckpointException("The checkpoint metadata file does " +
            "not exist, but a backup exists");
      }
      ProtosFactory.Checkpoint.Builder checkpointBuilder =
          ProtosFactory.Checkpoint.newBuilder();
      checkpointBuilder.setVersion(getVersion());
      checkpointBuilder.setQueueHead(getHead());
      checkpointBuilder.setQueueSize(getSize());
      checkpointBuilder.setWriteOrderID(getLogWriteOrderID());
      FileOutputStream outputStream = new FileOutputStream();
      try {
        checkpointBuilder.build().writeDelimitedTo(outputStream);
        outputStream.getChannel().force(true);
      } finally {
        try {
          outputStream.close();
        } catch (IOException e) {
          .warn("Unable to close " + e);
        }
      }
    }
  }
    return ;
  }
  protected int getVersion() {
    return .;
  }
  protected void writeCheckpointMetaData() throws IOException {
    ProtosFactory.Checkpoint.Builder checkpointBuilder =
        ProtosFactory.Checkpoint.newBuilder();
    checkpointBuilder.setVersion(getVersion());
    checkpointBuilder.setQueueHead(getHead());
    checkpointBuilder.setQueueSize(getSize());
    checkpointBuilder.setWriteOrderID(getLogWriteOrderID());
    for(Integer logFileID : .keySet()) {
      int count = .get(logFileID).get();
      if(count != 0) {
         ProtosFactory.ActiveLog.Builder activeLogBuilder =
             ProtosFactory.ActiveLog.newBuilder();
         activeLogBuilder.setLogFileID(logFileID);
         activeLogBuilder.setCount(count);
         checkpointBuilder.addActiveLogs(activeLogBuilder.build());
      }
    }
    FileOutputStream outputStream = new FileOutputStream();
    try {
      checkpointBuilder.build().writeDelimitedTo(outputStream);
      outputStream.getChannel().force(true);
    } finally {
      try {
        outputStream.close();
      } catch (IOException e) {
        .warn("Unable to close " + e);
      }
    }
  }
  static void upgrade(EventQueueBackingStoreFileV2 backingStoreV2,
      File checkpointFileFile metaDataFile)
          throws IOException {
    int head = backingStoreV2.getHead();
    int size = backingStoreV2.getSize();
    long writeOrderID = backingStoreV2.getLogWriteOrderID();
    Map<IntegerAtomicIntegerreferenceCounts =
        backingStoreV2.logFileIDReferenceCounts;
    ProtosFactory.Checkpoint.Builder checkpointBuilder =
        ProtosFactory.Checkpoint.newBuilder();
    checkpointBuilder.setVersion(.);
    checkpointBuilder.setQueueHead(head);
    checkpointBuilder.setQueueSize(size);
    checkpointBuilder.setWriteOrderID(writeOrderID);
    for(Integer logFileID : referenceCounts.keySet()) {
      int count = referenceCounts.get(logFileID).get();
      if(count > 0) {
         ProtosFactory.ActiveLog.Builder activeLogBuilder =
             ProtosFactory.ActiveLog.newBuilder();
         activeLogBuilder.setLogFileID(logFileID);
         activeLogBuilder.setCount(count);
         checkpointBuilder.addActiveLogs(activeLogBuilder.build());
      }
    }
    FileOutputStream outputStream = new FileOutputStream(metaDataFile);
    try {
      checkpointBuilder.build().writeDelimitedTo(outputStream);
      outputStream.getChannel().force(true);
    } finally {
      try {
        outputStream.close();
      } catch (IOException e) {
        .warn("Unable to close " + metaDataFilee);
      }
    }
    RandomAccessFile checkpointFileHandle =
        new RandomAccessFile(checkpointFile"rw");
    try {
      checkpointFileHandle.seek( * .);
      checkpointFileHandle.writeLong(.);
      checkpointFileHandle.getChannel().force(true);
    } finally {
      try {
        checkpointFileHandle.close();
      } catch (IOException e) {
        .warn("Unable to close " + checkpointFilee);
      }
    }
  }
New to GrepCode? Check out our FAQ X