Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.flume.channel.file;
 
 
 import java.io.File;
 import java.util.List;
 import java.util.Set;
 
 public class CheckpointRebuilder {
 
   private final List<FilelogFiles;
   private final FlumeEventQueue queue;
           Sets.newHashSet();
           Sets.newHashSet();
           HashMultimap.create();
           uncommittedTakes = HashMultimap.create();
   private final boolean fsyncPerTransaction;
 
   private static Logger LOG =
           LoggerFactory.getLogger(CheckpointRebuilder.class);
 
   public CheckpointRebuilder(List<FilelogFiles,
     FlumeEventQueue queueboolean fsyncPerTransactionthrows
     IOException {
     this. = logFiles;
     this. = queue;
     this. = fsyncPerTransaction;
   }
 
   public boolean rebuild() throws IOExceptionException {
     .info("Attempting to fast replay the log files.");
     List<LogFile.SequentialReaderlogReaders = Lists.newArrayList();
     for (File logFile : ) {
       try {
         logReaders.add(LogFileFactory.getSequentialReader(logFilenull,
           ));
       } catch(EOFException e) {
         .warn("Ignoring " + logFile + " due to EOF"e);
       }
     }
     long transactionIDSeed = 0;
     long writeOrderIDSeed = 0;
     try {
       for (LogFile.SequentialReader log : logReaders) {
         LogRecord entry;
         int fileID = log.getLogFileID();
         while ((entry = log.next()) != null) {
           int offset = entry.getOffset();
           TransactionEventRecord record = entry.getEvent();
           long trans = record.getTransactionID();
           long writeOrderID = record.getLogWriteOrderID();
             transactionIDSeed = Math.max(transtransactionIDSeed);
             writeOrderIDSeed = Math.max(writeOrderIDwriteOrderIDSeed);
           if (record.getRecordType() == ...get()) {
             .put(record.getTransactionID(),
                     new ComparableFlumeEventPointer(
                     new FlumeEventPointer(fileIDoffset),
                     record.getLogWriteOrderID()));
           } else if (record.getRecordType()
                   == ...get()) {
             Take take = (Takerecord;
             .put(record.getTransactionID(),
                     new ComparableFlumeEventPointer(
                     new FlumeEventPointer(take.getFileID(), take.getOffset()),
                    record.getLogWriteOrderID()));
          } else if (record.getRecordType()
                  == ...get()) {
            Commit commit = (Commitrecord;
            if (commit.getType()
                    == ...get()) {
              Set<ComparableFlumeEventPointerputs =
                      .get(record.getTransactionID());
              if (puts != null) {
                for (ComparableFlumeEventPointer put : puts) {
                  if (!.remove(put)) {
                    .add(put);
                  }
                }
              }
            } else {
              Set<ComparableFlumeEventPointertakes =
                      .get(record.getTransactionID());
              if (takes != null) {
                for (ComparableFlumeEventPointer take : takes) {
                  if (!.remove(take)) {
                    .add(take);
                  }
                }
              }
            }
          } else if (record.getRecordType()
                  == ...get()) {
            if (.containsKey(record.getTransactionID())) {
              .removeAll(record.getTransactionID());
            } else {
              .removeAll(record.getTransactionID());
            }
          }
        }
      }
    } catch (Exception e) {
      .warn("Error while generating checkpoint "
              + "using fast generation logic"e);
      return false;
    } finally {
        TransactionIDOracle.setSeed(transactionIDSeed);
        WriteOrderOracle.setSeed(writeOrderIDSeed);
      for (LogFile.SequentialReader reader : logReaders) {
        reader.close();
      }
    }
    Set<ComparableFlumeEventPointersortedPuts =
            Sets.newTreeSet();
    int count = 0;
    for (ComparableFlumeEventPointer put : sortedPuts) {
      .addTail(put.pointer);
      count++;
    }
    .info("Replayed {} events using fast replay logic."count);
    return true;
  }
  private void writeCheckpoint() throws IOException {
    long checkpointLogOrderID = 0;
    List<LogFile.MetaDataWritermetaDataWriters = Lists.newArrayList();
    for (File logFile : ) {
        String name = logFile.getName();
        metaDataWriters.add(LogFileFactory.getMetaDataWriter(logFile,
            Integer.parseInt(name.substring(name.lastIndexOf('-') + 1))));
    }
    try {
      if (.checkpoint(true)) {
        checkpointLogOrderID = .getLogWriteOrderID();
        for (LogFile.MetaDataWriter metaDataWriter : metaDataWriters) {
          metaDataWriter.markCheckpoint(checkpointLogOrderID);
        }
      }
    } catch (Exception e) {
      .warn("Error while generating checkpoint "
              + "using fast generation logic"e);
    } finally {
      for (LogFile.MetaDataWriter metaDataWriter : metaDataWriters) {
        metaDataWriter.close();
      }
    }
  }
  private final class ComparableFlumeEventPointer
          implements Comparable<ComparableFlumeEventPointer> {
    private final FlumeEventPointer pointer;
    private final long orderID;
    public ComparableFlumeEventPointer(FlumeEventPointer pointerlong orderID){
      Preconditions.checkNotNull(pointer"FlumeEventPointer cannot be"
              + "null while creating a ComparableFlumeEventPointer");
      this. = pointer;
      this. = orderID;
    }
    @Override
    public int compareTo(ComparableFlumeEventPointer o) {
      if ( < o.orderID) {
        return -1;
      } else { //Unfortunately same log order id does not mean same event
        //for older logs.
        return 1;
      }
    }
    @Override
    public int hashCode(){
      return .hashCode();
    }
    @Override
    public boolean equals(Object o){
      if(this == o){
        return true;
      }
      if(o == null){
        return false;
      }
      if(o.getClass() != this.getClass()){
        return false;
      }
    }
  }
  public static void main(String[] argsthrows Exception {
    Options options = new Options();
    Option opt = new Option("c"true"checkpoint directory");
    opt.setRequired(true);
    options.addOption(opt);
    opt = new Option("l"true"comma-separated list of log directories");
    opt.setRequired(true);
    options.addOption(opt);
    options.addOption(opt);
    opt = new Option("t"true"capacity of the channel");
    opt.setRequired(true);
    options.addOption(opt);
    CommandLineParser parser = new GnuParser();
    CommandLine cli = parser.parse(optionsargs);
    File checkpointDir = new File(cli.getOptionValue("c"));
    String[] logDirs = cli.getOptionValue("l").split(",");
    List<FilelogFiles = Lists.newArrayList();
    for (String logDir : logDirs) {
      logFiles.addAll(LogUtils.getLogs(new File(logDir)));
    }
    int capacity = Integer.parseInt(cli.getOptionValue("t"));
    File checkpointFile = new File(checkpointDir"checkpoint");
    if(checkpointFile.exists()) {
      .error("Cannot execute fast replay",
          new IllegalStateException("Checkpoint exists" + checkpointFile));
    } else {
      EventQueueBackingStore backingStore =
          EventQueueBackingStoreFactory.get(checkpointFile,
              capacity"channel");
      FlumeEventQueue queue = new FlumeEventQueue(backingStore,
              new File(checkpointDir"inflighttakes"),
              new File(checkpointDir"inflightputs"),
              new File(checkpointDir.));
      CheckpointRebuilder rebuilder = new CheckpointRebuilder(logFiles,
        queuetrue);
      if(rebuilder.rebuild()) {
        rebuilder.writeCheckpoint();
      } else {
        .error("Could not rebuild the checkpoint due to errors.");
      }
    }
  }
New to GrepCode? Check out our FAQ X