Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 package org.apache.flume.channel.file;
 
 
 import java.io.File;

A durable org.apache.flume.Channel implementation that uses the local file system for its storage.

FileChannel works by writing all transactions to a set of directories specified in the configuration. Additionally, when a commit occurs the transaction is synced to disk.

FileChannel is marked org.apache.flume.annotations.InterfaceAudience.Private because it should only be instantiated via a configuration. For example, users should certainly use FileChannel but not by instantiating FileChannel objects. Meaning the label Private applies to user-developers not user-operators. In cases where a Channel is required by instantiated by user-developers org.apache.flume.channel.MemoryChannel should be used.

 
 public class FileChannel extends BasicChannelSemantics {
 
   private static final Logger LOG = LoggerFactory
       .getLogger(FileChannel.class);
 
   private Integer capacity = 0;
   private int keepAlive;
   protected Integer transactionCapacity = 0;
   private Long checkpointInterval = 0L;
   private long maxFileSize;
   private long minimumRequiredSpace;
   private File checkpointDir;
   private File backupCheckpointDir;
   private File[] dataDirs;
   private Log log;
   private volatile boolean open;
   private volatile Throwable startupError;
   private Semaphore queueRemaining;
       new ThreadLocal<FileBackedTransaction>();
   private String channelNameDescriptor = "[channel=unknown]";
   private boolean useLogReplayV1;
   private boolean useFastReplay = false;
   private String encryptionActiveKey;
   private boolean useDualCheckpoints;
  private boolean compressBackupCheckpoint;
  private boolean fsyncPerTransaction;
  private int fsyncInterval;
  public synchronized void setName(String name) {
     = "[channel=" + name + "]";
    super.setName(name);
  }
  public void configure(Context context) {
     = context.getBoolean(
    String homePath = System.getProperty("user.home").replace('\\''/');
    String strCheckpointDir =
            homePath + "/.flume/file-channel/checkpoint").trim();
    String strBackupCheckpointDir = context.getString
    String[] strDataDirs = Iterables.toArray(
        Splitter.on(",").trimResults().omitEmptyStrings().split(
            context.getString(.,
                homePath + "/.flume/file-channel/data")), String.class);
     = new File(strCheckpointDir);
    if () {
      Preconditions.checkState(!strBackupCheckpointDir.isEmpty(),
        "Dual checkpointing is enabled, but the backup directory is not set. " +
          "Please set " + . + " " +
          "to enable dual checkpointing");
       = new File(strBackupCheckpointDir);
      /*
       * If the backup directory is the same as the checkpoint directory,
       * then throw an exception and force the config system to ignore this
       * channel.
       */
        "Could not configure " + getName() + ". The checkpoint backup " +
          "directory and the checkpoint directory are " +
          "configured to be the same.");
    }
     = new File[strDataDirs.length];
    for (int i = 0; i < strDataDirs.lengthi++) {
      [i] = new File(strDataDirs[i]);
    }
    if( <= 0) {
      .warn("Invalid capacity specified, initializing channel to "
              + "default capacity of {}");
    }
     =
    if( <= 0) {
       =
      .warn("Invalid transaction capacity specified, " +
          "initializing channel to default " +
          "capacity of {}");
    }
    Preconditions.checkState( <= ,
      "File Channel transaction capacity cannot be greater than the " +
        "capacity of the channel.");
    if ( <= 0) {
      .warn("Checkpoint interval is invalid: " + 
              + ", using default: "
       =
    }
    // cannot be over FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE
     = Math.min(
     = Math.max(
     = context.getBoolean(
     = context.getBoolean(
    Context encryptionContext = new Context(
            "."));
    String encryptionKeyProviderName = encryptionContext.getString(
     = encryptionContext.getString(
     = encryptionContext.getString(
    if(encryptionKeyProviderName != null) {
      Preconditions.checkState(!Strings.isNullOrEmpty(),
          "Encryption configuration problem: " +
              . + " is missing");
      Preconditions.checkState(!Strings.isNullOrEmpty(),
          "Encryption configuration problem: " +
              . + " is missing");
      Context keyProviderContext = new Context(encryptionContext.
       = KeyProviderFactory.
          getInstance(encryptionKeyProviderNamekeyProviderContext);
    } else {
      Preconditions.checkState( == null,
          "Encryption configuration problem: " +
              . + " is present while key " +
          "provider name is not.");
      Preconditions.checkState( == null,
          "Encryption configuration problem: " +
              . + " is present while " +
          "key provider name is not.");
    }
    if( == null) {
       = new Semaphore(true);
    }
    if( != null) {
    }
    if ( == null) {
       = new ChannelCounter(getName());
    }
  }
  public synchronized void start() {
    .info("Starting {}..."this);
    try {
      Builder builder = new Log.Builder();
      builder.setMaxFileSize();
      builder.setQueueSize();
      builder.setCheckpointDir();
      builder.setLogDirs();
      builder.setChannelName(getName());
      builder.setUseLogReplayV1();
      builder.setUseFastReplay();
      builder.setFsyncInterval();
       = builder.build();
      .replay();
       = true;
      int depth = getDepth();
      Preconditions.checkState(.tryAcquire(depth),
          "Unable to acquire " + depth + " permits " + );
      .info("Queue Size after replay: " + depth + " "
           + );
    } catch (Throwable t) {
       = false;
       = t;
      .error("Failed to start the file channel " + t);
      if (t instanceof Error) {
        throw (Errort;
      }
    }
    if () {
      .start();
    }
    super.start();
  }
  public synchronized void stop() {
    .info("Stopping {}..."this);
     = null;
    int size = getDepth();
    close();
    if (!) {
      .stop();
    }
    super.stop();
  }
  public String toString() {
    return "FileChannel " + getName() + " { dataDirs: " +
        Arrays.toString() + " }";
  }
    if(!) {
      String msg = "Channel closed " + ;
      if( != null) {
        msg += ". Due to " + .getClass().getName() + ": " +
            .getMessage();
        throw new IllegalStateException(msg);
      }
      throw new IllegalStateException(msg);
    }
    if(trans != null && !trans.isClosed()) {
      Preconditions.checkState(false,
          "Thread has transaction which is still open: " +
              trans.getStateAsString()  + );
    }
    trans = new FileBackedTransaction(, TransactionIDOracle.next(),
    .set(trans);
    return trans;
  }
  protected int getDepth() {
    Preconditions.checkState("Channel closed"  + );
    Preconditions.checkNotNull("log");
    Preconditions.checkNotNull(queue"queue");
    return queue.getSize();
  }
  void close() {
    if() {
       = false;
      try {
        .close();
      } catch (Exception e) {
        .error("Error while trying to close the log."e);
        Throwables.propagate(e);
      }
       = null;
       = null;
    }
  }
  boolean didFastReplay() {
    return .didFastReplay();
  }
  }
  public boolean isOpen() {
    return ;
  }

  
Did this channel recover a backup of the checkpoint to restart?

Returns:
true if the channel recovered using a backup.
  boolean checkpointBackupRestored() {
    if( != null) {
      return .backupRestored();
    }
    return false;
  }
  Log getLog() {
    return ;
  }

  
Transaction backed by a file. This transaction supports either puts or takes but not both.
  static class FileBackedTransaction extends BasicTransactionSemantics {
    private final long transactionID;
    private final int keepAlive;
    private final Log log;
    private final FlumeEventQueue queue;
    private final Semaphore queueRemaining;
    private final String channelNameDescriptor;
    private final ChannelCounter channelCounter;
    private final boolean fsyncPerTransaction;
    public FileBackedTransaction(Log loglong transactionID,
        int transCapacityint keepAliveSemaphore queueRemaining,
        String nameboolean fsyncPerTransactionChannelCounter
      counter) {
      this. = log;
       = log.getFlumeEventQueue();
      this. = transactionID;
      this. = keepAlive;
      this. = queueRemaining;
       = new LinkedBlockingDeque<FlumeEventPointer>(transCapacity);
       = new LinkedBlockingDeque<FlumeEventPointer>(transCapacity);
      this. = fsyncPerTransaction;
       = "[channel=" + name + "]";
      this. = counter;
    }
    private boolean isClosed() {
      return ..equals(getState());
    }
    private String getStateAsString() {
      return String.valueOf(getState());
    }
    @Override
    protected void doPut(Event eventthrows InterruptedException {
      if(.remainingCapacity() == 0) {
        throw new ChannelException("Put queue for FileBackedTransaction " +
            "of capacity " + .size() + " full, consider " +
            "committing more frequently, increasing capacity or " +
            "increasing thread count. " + );
      }
      // this does not need to be in the critical section as it does not
      // modify the structure of the log or queue.
        throw new ChannelFullException("The channel has reached it's capacity. "
            + "This might be the result of a sink on the channel having too "
            + "low of batch size, a downstream system running slower than "
            + "normal, or that the channel capacity is just too low. "
            + );
      }
      boolean success = false;
      .lockShared();
      try {
        FlumeEventPointer ptr = .put(event);
        Preconditions.checkState(.offer(ptr), "putList offer failed "
          + );
        .addWithoutCommit(ptr);
        success = true;
      } catch (IOException e) {
        throw new ChannelException("Put failed due to IO error "
                + e);
      } finally {
        .unlockShared();
        if(!success) {
          // release slot obtained in the case
          // the put fails for any reason
          .release();
        }
      }
    }
    @Override
    protected Event doTake() throws InterruptedException {
      if(.remainingCapacity() == 0) {
        throw new ChannelException("Take list for FileBackedTransaction, capacity " +
            .size() + " full, consider committing more frequently, " +
            "increasing capacity, or increasing thread count. "
               + );
      }
      .lockShared();
      /*
       * 1. Take an event which is in the queue.
       * 2. If getting that event does not throw NoopRecordException,
       *    then return it.
       * 3. Else try to retrieve the next event from the queue
       * 4. Repeat 2 and 3 until queue is empty or an event is returned.
       */
      try {
        while (true) {
          FlumeEventPointer ptr = .removeHead();
          if (ptr == null) {
            return null;
          } else {
            try {
              // first add to takeList so that if write to disk
              // fails rollback actually does it's work
              Preconditions.checkState(.offer(ptr),
                "takeList offer failed "
                  + );
              .take(ptr); // write take to disk
              Event event = .get(ptr);
              return event;
            } catch (IOException e) {
              throw new ChannelException("Take failed due to IO error "
                + e);
            } catch (NoopRecordException e) {
              .warn("Corrupt record replaced by File Channel Integrity " +
                "tool found. Will retrieve next event"e);
              .remove(ptr);
            } catch (CorruptEventException ex) {
              if () {
                throw new ChannelException(ex);
              }
              .warn("Corrupt record found. Event will be " +
                "skipped, and next event will be read."ex);
              .remove(ptr);
            }
          }
        }
      } finally {
        .unlockShared();
      }
    }
    @Override
    protected void doCommit() throws InterruptedException {
      int puts = .size();
      int takes = .size();
      if(puts > 0) {
        Preconditions.checkState(takes == 0, "nonzero puts and takes "
                + );
        .lockShared();
        try {
          .commitPut();
          .addToEventPutSuccessCount(puts);
          synchronized () {
            while(!.isEmpty()) {
              if(!.addTail(.removeFirst())) {
                StringBuilder msg = new StringBuilder();
                msg.append("Queue add failed, this shouldn't be able to ");
                msg.append("happen. A portion of the transaction has been ");
                msg.append("added to the queue but the remaining portion ");
                msg.append("cannot be added. Those messages will be consumed ");
                msg.append("despite this transaction failing. Please report.");
                msg.append();
                .error(msg.toString());
                Preconditions.checkState(falsemsg.toString());
              }
            }
            .completeTransaction();
          }
        } catch (IOException e) {
          throw new ChannelException("Commit failed due to IO error "
                  + e);
        } finally {
          .unlockShared();
        }
      } else if (takes > 0) {
        .lockShared();
        try {
          .commitTake();
          .addToEventTakeSuccessCount(takes);
        } catch (IOException e) {
          throw new ChannelException("Commit failed due to IO error "
              + e);
        } finally {
          .unlockShared();
        }
        .release(takes);
      }
      .clear();
      .clear();
    }
    @Override
    protected void doRollback() throws InterruptedException {
      int puts = .size();
      int takes = .size();
      .lockShared();
      try {
        if(takes > 0) {
          Preconditions.checkState(puts == 0, "nonzero puts and takes "
              + );
          synchronized () {
            while (!.isEmpty()) {
              Preconditions.checkState(.addHead(.removeLast()),
                  "Queue add failed, this shouldn't be able to happen "
                      + );
            }
          }
        }
        .clear();
        .clear();
        .rollback();
      } catch (IOException e) {
        throw new ChannelException("Commit failed due to IO error "
            + e);
      } finally {
        .unlockShared();
        // since rollback is being called, puts will never make it on
        // to the queue and we need to be sure to release the resources
        .release(puts);
      }
    }
  }
New to GrepCode? Check out our FAQ X