Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.flume.channel.file;
 
 
 import  javax.annotation.Nullable;
 import java.io.File;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;

Processes a set of data logs, replaying said logs into the queue.
 
 class ReplayHandler {
   private static final Logger LOG = LoggerFactory
       .getLogger(ReplayHandler.class);
   private final FlumeEventQueue queue;
   private final long lastCheckpoint;
   private final Map<IntegerLogFile.SequentialReaderreaders;
   private final PriorityQueue<LogRecordlogRecordBuffer;
   private final KeyProvider encryptionKeyProvider;
   private final boolean fsyncPerTransaction;
  
This data structure stores takes for which we found a commit in the log files before we found a commit for the put. This can happen if the channel is configured for multiple directories. Consider the following: logdir1, logdir2 Put goes to logdir2 Commit of Put goes to logdir2 Take goes to logdir1 Commit of Take goes to logdir1 When replaying we will start with log1 and find the take and commit before finding the put and commit in logdir2.
 
   private final List<LongpendingTakes;
   int readCount = 0;
   int putCount = 0;
   int takeCount = 0;
   int rollbackCount = 0;
   int commitCount = 0;
   int skipCount = 0;
 
   public int getReadCount() {
     return ;
   }
   public int getPutCount() {
     return ;
   }
   public int getTakeCount() {
     return ;
   }
   public int getCommitCount() {
     return ;
   }
 
   public int getRollbackCount() {
     return ;
   }
    @Nullable KeyProvider encryptionKeyProvider,
    boolean fsyncPerTransaction) {
    this. = queue;
    this. = queue.getLogWriteOrderID();
     = Lists.newArrayList();
     = Maps.newHashMap();
    this. = encryptionKeyProvider;
    this. = fsyncPerTransaction;
  }
  
Replay logic from Flume1.2 which can be activated if the v2 logic is failing on ol logs for some reason.
  void replayLogv1(List<Filelogsthrows Exception {
    int total = 0;
    int count = 0;
    MultiMap transactionMap = new MultiValueMap();
    //Read inflight puts to see if they were committed
    SetMultimap<LongLonginflightPuts = .deserializeInflightPuts();
    for (Long txnID : inflightPuts.keySet()) {
      Set<LongeventPointers = inflightPuts.get(txnID);
      for (Long eventPointer : eventPointers) {
        transactionMap.put(txnID, FlumeEventPointer.fromLong(eventPointer));
      }
    }
    SetMultimap<LongLonginflightTakes = .deserializeInflightTakes();
    .info("Starting replay of " + logs);
    for (File log : logs) {
      .info("Replaying " + log);
      LogFile.SequentialReader reader = null;
      try {
        reader = LogFileFactory.getSequentialReader(log,
        LogRecord entry;
        FlumeEventPointer ptr;
        // for puts the fileId is the fileID of the file they exist in
        // for takes the fileId and offset are pointers to a put
        int fileId = reader.getLogFileID();
        while ((entry = reader.next()) != null) {
          int offset = entry.getOffset();
          TransactionEventRecord record = entry.getEvent();
          short type = record.getRecordType();
          long trans = record.getTransactionID();
          ++;
          if (record.getLogWriteOrderID() > ) {
            if (type == ...get()) {
              ++;
              ptr = new FlumeEventPointer(fileIdoffset);
              transactionMap.put(transptr);
            } else if (type == ...get()) {
              ++;
              Take take = (Takerecord;
              ptr = new FlumeEventPointer(take.getFileID(), take.getOffset());
              transactionMap.put(transptr);
            } else if (type == ...get()) {
              ++;
              transactionMap.remove(trans);
            } else if (type == ...get()) {
              ++;
              @SuppressWarnings("unchecked")
              Collection<FlumeEventPointerpointers =
                (Collection<FlumeEventPointer>) transactionMap.remove(trans);
              if (((Commitrecord).getType()
                      == ...get()) {
                if (inflightTakes.containsKey(trans)) {
                  if (pointers == null) {
                    pointers = Sets.newHashSet();
                  }
                  Set<Longtakes = inflightTakes.removeAll(trans);
                  Iterator<Longit = takes.iterator();
                  while (it.hasNext()) {
                    Long take = it.next();
                    pointers.add(FlumeEventPointer.fromLong(take));
                  }
                }
              }
              if (pointers != null && pointers.size() > 0) {
                processCommit(((Commitrecord).getType(), pointers);
                count += pointers.size();
              }
            } else {
              Preconditions.checkArgument(false"Unknown record type: "
                + Integer.toHexString(type));
            }
          } else {
            ++;
          }
        }
        .info("Replayed " + count + " from " + log);
        if (.isDebugEnabled()) {
          .debug("read: " +  + ", put: " +  + ", take: "
            +  + ", rollback: " +  + ", commit: "
            +  + ", skipp: " + );
        }
      } catch (EOFException e) {
        .warn("Hit EOF on " + log);
      } finally {
        total += count;
        count = 0;
        if (reader != null) {
          reader.close();
        }
      }
    }
    //re-insert the events in the take map,
    //since the takes were not committed.
    int uncommittedTakes = 0;
    for (Long inflightTxnId : inflightTakes.keySet()) {
      Set<LonginflightUncommittedTakes =
              inflightTakes.get(inflightTxnId);
      for (Long inflightUncommittedTake : inflightUncommittedTakes) {
        .addHead(FlumeEventPointer.fromLong(inflightUncommittedTake));
        uncommittedTakes++;
      }
    }
    inflightTakes.clear();
    count += uncommittedTakes;
    int pendingTakesSize = .size();
    if (pendingTakesSize > 0) {
      String msg = "Pending takes " + pendingTakesSize
          + " exist after the end of replay";
      if (.isDebugEnabled()) {
        for (Long pointer : ) {
          .debug("Pending take " + FlumeEventPointer.fromLong(pointer));
        }
      } else {
        .error(msg + ". Duplicate messages will exist in destination.");
      }
    }
    .info("Replayed " + total);
  }
  
Replay logs in order records were written

Parameters:
logs
Throws:
IOException
  void replayLog(List<Filelogsthrows Exception {
    int count = 0;
    MultiMap transactionMap = new MultiValueMap();
    // seed both with the highest known sequence of either the tnxid or woid
    long transactionIDSeed = writeOrderIDSeed = ;
    .info("Starting replay of " + logs);
    //Load the inflight puts into the transaction map to see if they were
    //committed in one of the logs.
    SetMultimap<LongLonginflightPuts = .deserializeInflightPuts();
    for (Long txnID : inflightPuts.keySet()) {
      Set<LongeventPointers = inflightPuts.get(txnID);
      for (Long eventPointer : eventPointers) {
        transactionMap.put(txnID, FlumeEventPointer.fromLong(eventPointer));
      }
    }
    SetMultimap<LongLonginflightTakes = .deserializeInflightTakes();
    try {
      for (File log : logs) {
        .info("Replaying " + log);
        try {
          LogFile.SequentialReader reader =
            LogFileFactory.getSequentialReader(log,
              );
          Preconditions.checkState(!.containsKey(reader.getLogFileID()),
              "Readers " +  + " already contains "
                  + reader.getLogFileID());
          .put(reader.getLogFileID(), reader);
          LogRecord logRecord = reader.next();
          if(logRecord == null) {
            .remove(reader.getLogFileID());
            reader.close();
          } else {
            .add(logRecord);
          }
        } catch(EOFException e) {
          .warn("Ignoring " + log + " due to EOF"e);
        }
      }
      LogRecord entry = null;
      FlumeEventPointer ptr = null;
      while ((entry = next()) != null) {
        // for puts the fileId is the fileID of the file they exist in
        // for takes the fileId and offset are pointers to a put
        int fileId = entry.getFileID();
        int offset = entry.getOffset();
        TransactionEventRecord record = entry.getEvent();
        short type = record.getRecordType();
        long trans = record.getTransactionID();
        transactionIDSeed = Math.max(transactionIDSeedtrans);
        writeOrderIDSeed = Math.max(writeOrderIDSeed,
            record.getLogWriteOrderID());
        ++;
        if( % 10000 == 0 &&  > 0) {
          .info("read: " +  + ", put: " +  + ", take: "
              +  + ", rollback: " +  + ", commit: "
              +  + ", skip: " +  + ", eventCount:" + count);
        }
        if (record.getLogWriteOrderID() > ) {
          if (type == ...get()) {
            ++;
            ptr = new FlumeEventPointer(fileIdoffset);
            transactionMap.put(transptr);
          } else if (type == ...get()) {
            ++;
            Take take = (Takerecord;
            ptr = new FlumeEventPointer(take.getFileID(), take.getOffset());
            transactionMap.put(transptr);
          } else if (type == ...get()) {
            ++;
            transactionMap.remove(trans);
          } else if (type == ...get()) {
            ++;
            @SuppressWarnings("unchecked")
            Collection<FlumeEventPointerpointers =
              (Collection<FlumeEventPointer>) transactionMap.remove(trans);
            if (((Commitrecord).getType()
                    == ...get()) {
              if (inflightTakes.containsKey(trans)) {
                if(pointers == null){
                  pointers = Sets.newHashSet();
                }
                Set<Longtakes = inflightTakes.removeAll(trans);
                Iterator<Longit = takes.iterator();
                while (it.hasNext()) {
                  Long take = it.next();
                  pointers.add(FlumeEventPointer.fromLong(take));
                }
              }
            }
            if (pointers != null && pointers.size() > 0) {
              processCommit(((Commitrecord).getType(), pointers);
              count += pointers.size();
            }
          } else {
            Preconditions.checkArgument(false"Unknown record type: "
                + Integer.toHexString(type));
          }
        } else {
          ++;
        }
      }
      .info("read: " +  + ", put: " +  + ", take: "
          +  + ", rollback: " +  + ", commit: "
          +  + ", skip: " +  + ", eventCount:" + count);
      .replayComplete();
    } finally {
      TransactionIDOracle.setSeed(transactionIDSeed);
      WriteOrderOracle.setSeed(writeOrderIDSeed);
      for(LogFile.SequentialReader reader : .values()) {
        if(reader != null) {
          reader.close();
        }
      }
    }
    //re-insert the events in the take map,
    //since the takes were not committed.
    int uncommittedTakes = 0;
    for (Long inflightTxnId : inflightTakes.keySet()) {
      Set<LonginflightUncommittedTakes =
              inflightTakes.get(inflightTxnId);
      for (Long inflightUncommittedTake : inflightUncommittedTakes) {
        .addHead(FlumeEventPointer.fromLong(inflightUncommittedTake));
        uncommittedTakes++;
      }
    }
    inflightTakes.clear();
    count += uncommittedTakes;
    int pendingTakesSize = .size();
    if (pendingTakesSize > 0) {
      .info("Pending takes " + pendingTakesSize + " exist after the" +
          " end of replay. Duplicate messages will exist in" +
          " destination.");
    }
  }
  private LogRecord next() throws IOExceptionCorruptEventException {
    LogRecord resultLogRecord = .poll();
    if(resultLogRecord != null) {
      // there is more log records to read
      LogFile.SequentialReader reader = .get(resultLogRecord.getFileID());
      LogRecord nextLogRecord;
      if((nextLogRecord = reader.next()) != null) {
        .add(nextLogRecord);
      }
    }
    return resultLogRecord;
  }
  private void processCommit(short typeCollection<FlumeEventPointerpointers) {
    if (type == ...get()) {
      for (FlumeEventPointer pointer : pointers) {
        if(!.addTail(pointer)) {
          throw new IllegalStateException("Unable to add "
              + pointer + ". Queue depth = " + .getSize()
              + ", Capacity = " + .getCapacity());
        }
        if (.remove(pointer.toLong())) {
          Preconditions.checkState(.remove(pointer),
              "Take was pending and pointer was successfully added to the"
                  + " queue but could not be removed: " + pointer);
        }
      }
    } else if (type == ...get()) {
      for (FlumeEventPointer pointer : pointers) {
        boolean removed = .remove(pointer);
        if (!removed) {
          .add(pointer.toLong());
        }
      }
    } else {
      Preconditions.checkArgument(false,
          "Unknown record type: " + Integer.toHexString(type));
    }
  }
New to GrepCode? Check out our FAQ X