Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.flume.channel.file;
 
 import java.io.File;
 import java.util.Set;
 
 import org.mapdb.DB;
 
Queue of events in the channel. This queue stores only FlumeEventPointer objects which are represented as 8 byte longs internally. Additionally the queue itself of longs is stored as a memory mapped file with a fixed header and circular queue semantics. The header of the queue contains the timestamp of last sync, the queue size and the head position.
 
 final class FlumeEventQueue {
   private static final Logger LOG = LoggerFactory
   private static final int EMPTY = 0;
   private final EventQueueBackingStore backingStore;
   private final String channelNameDescriptor;
   private final InflightEventWrapper inflightTakes;
   private final InflightEventWrapper inflightPuts;
   private long searchTime = 0;
   private long searchCount = 0;
   private long copyTime = 0;
   private long copyCount = 0;
   private DB db;
   private Set<LongqueueSet;

  

Parameters:
capacity max event capacity of queue
Throws:
java.io.IOException
 
   FlumeEventQueue(EventQueueBackingStore backingStoreFile inflightTakesFile,
           File inflightPutsFileFile queueSetDBDirthrows Exception {
     Preconditions.checkArgument(backingStore.getCapacity() > 0,
         "Capacity must be greater than zero");
     Preconditions.checkNotNull(backingStore"backingStore");
     this. = "[channel=" + backingStore.getName() + "]";
     Preconditions.checkNotNull(inflightTakesFile"inflightTakesFile");
     Preconditions.checkNotNull(inflightPutsFile"inflightPutsFile");
     Preconditions.checkNotNull(queueSetDBDir"queueSetDBDir");
     this. = backingStore;
     try {
        = new InflightEventWrapper(inflightPutsFile);
        = new InflightEventWrapper(inflightTakesFile);
     } catch (Exception e) {
       .error("Could not read checkpoint."e);
       throw e;
     }
     if(queueSetDBDir.isDirectory()) {
       FileUtils.deleteDirectory(queueSetDBDir);
     } else if(queueSetDBDir.isFile() && !queueSetDBDir.delete()) {
       throw new IOException("QueueSetDir " + queueSetDBDir + " is a file and"
           + " could not be deleted");
     }
     if(!queueSetDBDir.mkdirs()) {
       throw new IllegalStateException("Could not create QueueSet Dir "
           + queueSetDBDir);
    }
    File dbFile = new File(queueSetDBDir"db");
     = DBMaker.newFileDB(dbFile)
        .closeOnJvmShutdown()
        .transactionDisable()
        .syncOnCommitDisable()
        .deleteFilesAfterClose()
        .cacheDisable()
        .mmapFileEnableIfSupported()
        .make();
     =
      .createHashSet("QueueSet " + " - " + backingStore.getName()).make();
    long start = System.currentTimeMillis();
    for (int i = 0; i < backingStore.getSize(); i++) {
      .add(get(i));
    }
    .info("QueueSet population inserting " + backingStore.getSize()
        + " took " + (System.currentTimeMillis() - start));
  }
          throws IOExceptionBadCheckpointException{
    return .deserialize();
  }
          throws IOExceptionBadCheckpointException{
    return .deserialize();
  }
  synchronized long getLogWriteOrderID() {
  }
  synchronized boolean checkpoint(boolean forcethrows Exception {
            && !.syncRequired()
            && !force) { //No need to check inflight puts, since that would
                         //cause elements.syncRequired() to return true.
      .debug("Checkpoint not required");
      return false;
    }
    return true;
  }

  
Retrieve and remove the head of the queue.

Returns:
FlumeEventPointer or null if queue is empty
  synchronized FlumeEventPointer removeHead(long transactionID) {
    if(.getSize()  == 0) {
      return null;
    }
    long value = remove(0, transactionID);
    Preconditions.checkState(value != "Empty value "
          + );
    FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);
    return ptr;
  }

  
Add a FlumeEventPointer to the head of the queue. Called during rollbacks.

Parameters:
FlumeEventPointer to be added
Returns:
true if space was available and pointer was added to the queue
  synchronized boolean addHead(FlumeEventPointer e) {
    //Called only during rollback, so should not consider inflight takes' size,
    //because normal puts through addTail method already account for these
    //events since they are in the inflight takes. So puts will not happen
    //in such a way that these takes cannot go back in. If this if returns true,
    //there is a buuuuuuuug!
      .error("Could not reinsert to queue, events which were taken but "
              + "not committed. Please report this issue.");
      return false;
    }
    long value = e.toLong();
    Preconditions.checkArgument(value != );
    add(0, value);
    return true;
  }


  
Add a FlumeEventPointer to the tail of the queue.

Parameters:
FlumeEventPointer to be added
Returns:
true if space was available and pointer was added to the queue
  synchronized boolean addTail(FlumeEventPointer e) {
    if (getSize() == .getCapacity()) {
      return false;
    }
    long value = e.toLong();
    Preconditions.checkArgument(value != );
    add(.getSize(), value);
    return true;
  }

  
Must be called when a put happens to the log. This ensures that put commits after checkpoints will retrieve all events committed in that txn.

Parameters:
e
transactionID
  synchronized void addWithoutCommit(FlumeEventPointer elong transactionID) {
    .addEvent(transactionIDe.toLong());
  }

  
Remove FlumeEventPointer from queue, will only be used when recovering from a crash. It is not legal to call this method after replayComplete has been called.

Parameters:
FlumeEventPointer to be removed
Returns:
true if the FlumeEventPointer was found and removed
  synchronized boolean remove(FlumeEventPointer e) {
    long value = e.toLong();
    Preconditions.checkArgument(value != );
    if ( == null) {
     throw new IllegalStateException("QueueSet is null, thus replayComplete"
         + " has been called which is illegal");
    }
    if (!.contains(value)) {
      return false;
    }
    ++;
    long start = System.currentTimeMillis();
    for (int i = 0; i < .getSize(); i++) {
      if(get(i) == value) {
        remove(i, 0);
        FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);
        .decrementFileID(ptr.getFileID());
         += System.currentTimeMillis() - start;
        return true;
      }
    }
     += System.currentTimeMillis() - start;
    return false;
  }
  

Returns:
a copy of the set of fileIDs which are currently on the queue will be normally be used when deciding which data files can be deleted
  synchronized SortedSet<IntegergetFileIDs() {
    //Java implements clone pretty well. The main place this is used
    //in checkpointing and deleting old files, so best
    //to use a sorted set implementation.
    SortedSet<IntegerfileIDs =
    fileIDs.addAll(.getFileIDs());
    fileIDs.addAll(.getFileIDs());
    return fileIDs;
  }
  protected long get(int index) {
    if (index < 0 || index > .getSize() - 1) {
      throw new IndexOutOfBoundsException(String.valueOf(index)
          + );
    }
    return .get(index);
  }
  private void set(int indexlong value) {
    if (index < 0 || index > .getSize() - 1) {
      throw new IndexOutOfBoundsException(String.valueOf(index)
          + );
    }
    .put(indexvalue);
  }
  protected boolean add(int indexlong value) {
    if (index < 0 || index > .getSize()) {
      throw new IndexOutOfBoundsException(String.valueOf(index)
          + );
    }
      return false;
    }
    if (index <= .getSize()/2) {
      // Shift left
      if (.getHead() < 0) {
      }
      for (int i = 0; i < indexi++) {
        set(iget(i+1));
      }
    } else {
      // Sift right
      for (int i = .getSize() - 1; i > indexi--) {
        set(iget(i-1));
      }
    }
    set(indexvalue);
    if ( != null) {
      .add(value);
    }
    return true;
  }

  
Must be called when a transaction is being committed or rolled back.

Parameters:
transactionID
  synchronized void completeTransaction(long transactionID) {
    if (!.completeTransaction(transactionID)) {
      .completeTransaction(transactionID);
    }
  }
  protected synchronized long remove(int indexlong transactionID) {
    if (index < 0 || index > .getSize() - 1) {
      throw new IndexOutOfBoundsException("index = " + index
          + ", queueSize " + .getSize() +" " + );
    }
    ++;
    long start = System.currentTimeMillis();
    long value = get(index);
    if ( != null) {
      .remove(value);
    }
    //if txn id = 0, we are recovering from a crash.
    if(transactionID != 0) {
      .addEvent(transactionIDvalue);
    }
    if (index > .getSize()/2) {
      // Move tail part to left
      for (int i = indexi < .getSize() - 1; i++) {
        long rightValue = get(i+1);
        set(irightValue);
      }
      set(.getSize() - 1, );
    } else {
      // Move head part to right
      for (int i = index - 1; i >= 0; i--) {
        long leftValue = get(i);
        set(i+1, leftValue);
      }
      set(0, );
      if (.getHead() == .getCapacity()) {
        .setHead(0);
      }
    }
     += System.currentTimeMillis() - start;
    return value;
  }
  protected synchronized int getSize() {
    return .getSize() + .getSize();
  }

  

Returns:
max capacity of the queue
  public int getCapacity() {
    return .getCapacity();
  }
  synchronized void close() throws IOException {
    try {
      if ( != null) {
        .close();
      }
    } catch(Exception ex) {
      .warn("Error closing db"ex);
    }
    try {
      .close();
      .close();
      .close();
    } catch (IOException e) {
      .warn("Error closing backing store"e);
    }
  }

  
Called when ReplayHandler has completed and thus remove(FlumeEventPointer) will no longer be called.
  synchronized void replayComplete() {
    String msg = "Search Count = " +  + ", Search Time = " +
         + ", Copy Count = " +  + ", Copy Time = " +
        ;
    .info(msg);
    if( != null) {
      .close();
    }
     = null;
     = null;
  }
  long getSearchCount() {
    return ;
  }
  long getCopyCount() {
    return ;
  }

  
A representation of in flight events which have not yet been committed. None of the methods are thread safe, and should be called from thread safe methods only.
    private SetMultimap<LongLonginflightEvents = HashMultimap.create();
    // Both these are volatile for safe publication, they are never accessed by
    // more than 1 thread at a time.
    private volatile RandomAccessFile file;
    private volatile java.nio.channels.FileChannel fileChannel;
    private final MessageDigest digest;
    private final File inflightEventsFile;
    private volatile boolean syncRequired = false;
    private SetMultimap<LongIntegerinflightFileIDs = HashMultimap.create();
    public InflightEventWrapper(File inflightEventsFilethrows Exception{
      if(!inflightEventsFile.exists()){
        Preconditions.checkState(inflightEventsFile.createNewFile(),"Could not"
                + "create inflight events file: "
                + inflightEventsFile.getCanonicalPath());
      }
      this. = inflightEventsFile;
       = new RandomAccessFile(inflightEventsFile"rw");
       = .getChannel();
       = MessageDigest.getInstance("MD5");
    }

    
Complete the transaction, and remove all events from inflight list.

Parameters:
transactionID
    public boolean completeTransaction(Long transactionID) {
      if(!.containsKey(transactionID)) {
        return false;
      }
      .removeAll(transactionID);
      .removeAll(transactionID);
       = true;
      return true;
    }

    
Add an event pointer to the inflights list.

Parameters:
transactionID
pointer
    public void addEvent(Long transactionIDLong pointer){
      .put(transactionIDpointer);
      .put(transactionID,
              FlumeEventPointer.fromLong(pointer).getFileID());
       = true;
    }

    
Serialize the set of in flights into a byte longBuffer.

Returns:
Returns the checksum of the buffer that is being asynchronously written to disk.
    public void serializeAndWrite() throws Exception {
      Collection<Longvalues = .values();
      if(!.isOpen()){
         = new RandomAccessFile("rw");
         = .getChannel();
      }
      if(values.isEmpty()){
        .setLength(0L);
      }
      //What is written out?
      //Checksum - 16 bytes
      //and then each key-value pair from the map:
      //transactionid numberofeventsforthistxn listofeventpointers
      try {
        int expectedFileSize = (((.keySet().size() * 2) //For transactionIDs and events per txn ID
                + values.size()) * 8) //Event pointers
                + 16; //Checksum
        //There is no real need of filling the channel with 0s, since we
        //will write the exact nummber of bytes as expected file size.
        .setLength(expectedFileSize);
        Preconditions.checkState(.length() == expectedFileSize,
                "Expected File size of inflight events file does not match the "
                + "current file size. Checkpoint is incomplete.");
        .seek(0);
        final ByteBuffer buffer = ByteBuffer.allocate(expectedFileSize);
        LongBuffer longBuffer = buffer.asLongBuffer();
        for (Long txnID : .keySet()) {
          Set<Longpointers = .get(txnID);
          longBuffer.put(txnID);
          longBuffer.put((longpointers.size());
          .debug("Number of events inserted into "
                  + "inflights file: " + String.valueOf(pointers.size())
                  + " file: " + .getCanonicalPath());
          long[] written = ArrayUtils.toPrimitive(
                  pointers.toArray(new Long[0]));
          longBuffer.put(written);
        }
        byte[] checksum = .digest(buffer.array());
        .write(checksum);
        buffer.position(0);
        .write(buffer);
        .force(true);
         = false;
      } catch (IOException ex) {
        .error("Error while writing checkpoint to disk."ex);
        throw ex;
      }
    }

    
Read the inflights file and return a com.google.common.collect.SetMultimap of transactionIDs to events that were inflight.

Returns:
- map of inflight events per txnID.
    public SetMultimap<LongLongdeserialize()
            throws IOExceptionBadCheckpointException {
      SetMultimap<LongLonginflights = HashMultimap.create();
      if (!.isOpen()) {
         = new RandomAccessFile("rw");
         = .getChannel();
      }
      if(.length() == 0) {
        return inflights;
      }
      .seek(0);
      byte[] checksum = new byte[16];
      .read(checksum);
      ByteBuffer buffer = ByteBuffer.allocate(
              (int)(.length() - .getFilePointer()));
      .read(buffer);
      byte[] fileChecksum = .digest(buffer.array());
      if (!Arrays.equals(checksumfileChecksum)) {
        throw new BadCheckpointException("Checksum of inflights file differs"
                + " from the checksum expected.");
      }
      buffer.position(0);
      LongBuffer longBuffer = buffer.asLongBuffer();
      try {
        while (true) {
          long txnID = longBuffer.get();
          int numEvents = (int)(longBuffer.get());
          for(int i = 0; i < numEventsi++) {
            long val = longBuffer.get();
            inflights.put(txnIDval);
          }
        }
      } catch (BufferUnderflowException ex) {
        .debug("Reached end of inflights buffer. Long buffer position ="
                + String.valueOf(longBuffer.position()));
      }
      return  inflights;
    }
    public int getSize() {
      return .size();
    }
    public boolean syncRequired(){
      return ;
    }
    public Collection<IntegergetFileIDs(){
      return .values();
    }
    //Needed for testing.
    public Collection<LonggetInFlightPointers() {
      return .values();
    }
    public void close() throws IOException {
      .close();
    }
  }
New to GrepCode? Check out our FAQ X