Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   *
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.util.List;
 import java.util.UUID;
 
Class that handles the source of a replication stream. Currently does not handle more than 1 slave For each slave cluster it selects a random number of peers using a replication ratio. For example, if replication ration = 0.1 and slave cluster has 100 region servers, 10 will be selected.

A stream is considered down when we cannot contact a region server on the peer cluster for more than 55 seconds by default.

 
 public class ReplicationSource extends Thread
     implements ReplicationSourceInterface {
 
   public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
   // Queue of logs to process
 
   private Configuration conf;
   // id of the peer cluster this source replicates to
   private String peerId;
   // The manager of all sources to which we ping back our progress
   // Should we stop everything?
   private Stoppable stopper;
   // How long should we sleep for each retry
   private long sleepForRetries;
   // Max size in bytes of entriesArray
   private long replicationQueueSizeCapacity;
   // Max number of entries in entriesArray
   private int replicationQueueNbCapacity;
   // Our reader for the current log
   private HLog.Reader reader;
   // Last position in the log that we sent to ZooKeeper
   private long lastLoggedPosition = -1;
   // Path of the current log
   private volatile Path currentPath;
  private FileSystem fs;
  // id of this cluster
  private UUID clusterId;
  // id of the other cluster
  private UUID peerClusterId;
  // total number of edits we replicated
  private long totalReplicatedEdits = 0;
  // total number of edits we replicated
  private long totalReplicatedOperations = 0;
  // The znode we currently play with
  // Maximum number of retries before taking bold actions
  private int maxRetriesMultiplier;
  // Current number of operations (Put/Delete) that we need to replicate
  private int currentNbOperations = 0;
  // Current size of data we need to replicate
  private int currentSize = 0;
  // Indicates if this particular source is running
  private volatile boolean running = true;
  // Metrics for this source
  private MetricsSource metrics;
  // Handle on the log reader helper
  //WARN threshold for the number of queued logs, defaults to 2
  private int logQueueWarnThreshold;
  // ReplicationEndpoint which will handle the actual replication
  // A filter (or a chain of filters) for the WAL entries.
  // Context for ReplicationEndpoint#replicate()
  // throttler
Instantiation method used by region servers

Parameters:
conf configuration to use
fs file system to use
manager replication manager to ping to
stopper the atomic boolean to use to stop the regionserver
peerClusterZnode the name of our znode
clusterId unique UUID for the cluster
replicationEndpoint the replication endpoint implementation
metrics metrics for replication source
Throws:
java.io.IOException
  public void init(final Configuration conffinal FileSystem fs,
      final ReplicationSourceManager managerfinal ReplicationQueues replicationQueues,
      final ReplicationPeers replicationPeersfinal Stoppable stopper,
      final String peerClusterZnodefinal UUID clusterIdReplicationEndpoint replicationEndpoint,
      final MetricsSource metrics)
          throws IOException {
    this. = stopper;
    this. = conf;
    decorateConf();
        this..getLong("replication.source.size.capacity", 1024*1024*64);
        this..getInt("replication.source.nb.capacity", 25000);
    this. = this..getInt("replication.source.maxretriesmultiplier", 10);
    this. =
        new PriorityBlockingQueue<Path>(
            this..getInt("hbase.regionserver.maxlogs", 32),
            new LogsComparator());
    long bandwidth = this..getLong("replication.source.per.peer.node.bandwidth", 0);
    this. = new ReplicationThrottler((double)bandwidth/10.0);
    this. = replicationQueues;
    this. = replicationPeers;
    this. = manager;
    this. =
        this..getLong("replication.source.sleepforretries", 1000);
    this. = fs;
    this. = metrics;
    this. = new ReplicationHLogReaderManager(this.this.);
    this. = clusterId;
    this. = peerClusterZnode;
    this. = new ReplicationQueueInfo(peerClusterZnode);
    // ReplicationQueueInfo parses the peerId out of the znode for us
    this. = this..getPeerId();
    this. = this..getInt("replication.source.log.queue.warn", 2);
    this. = replicationEndpoint;
  }
  private void decorateConf() {
    String replicationCodec = this..get(.);
    if (StringUtils.isNotEmpty(replicationCodec)) {
      this..set(.replicationCodec);
    }
  }
  public void enqueueLog(Path log) {
    this..put(log);
    int queueSize = .size();
    this..setSizeOfLogQueue(queueSize);
    // This will log a warning for each new log that gets created above the warn threshold
    if (queueSize > this.) {
      .warn("Queue size: " + queueSize +
        " exceeds value of replication.source.log.queue.warn: " + );
    }
  }
  private void uninitialize() {
    .debug("Source exiting " + this.);
    .clear();
        || .state() == ..) {
    }
  }
  public void run() {
    // We were stopped while looping to connect to sinks, just abort
    if (!this.isActive()) {
      uninitialize();
      return;
    }
    try {
      // start the endpoint, connect to the cluster
      Service.State state = .start().get();
      if (state != ..) {
        .warn("ReplicationEndpoint was not started. Exiting");
        uninitialize();
        return;
      }
    } catch (Exception ex) {
      .warn("Error starting ReplicationEndpoint, exiting"ex);
      throw new RuntimeException(ex);
    }
    // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
    ArrayList<WALEntryFilterfilters = Lists.newArrayList(
    WALEntryFilter filterFromEndpoint = this..getWALEntryfilter();
    if (filterFromEndpoint != null) {
      filters.add(filterFromEndpoint);
    }
    this. = new ChainWALEntryFilter(filters);
    int sleepMultiplier = 1;
    // delay this until we are in an asynchronous thread
    while (this.isActive() && this. == null) {
      if (this.isActive() && this. == null) {
        if (sleepForRetries("Cannot contact the peer's zk ensemble"sleepMultiplier)) {
          sleepMultiplier++;
        }
      }
    }
    // We were stopped while looping to contact peer's zk ensemble, just abort
    if (!this.isActive()) {
      uninitialize();
      return;
    }
    // resetting to 1 to reuse later
    sleepMultiplier = 1;
    // In rare case, zookeeper setting may be messed up. That leads to the incorrect
    // peerClusterId value, which is the same as the source clusterId
      this.terminate("ClusterId " +  + " is replicating to itself: peerClusterId "
          +  + " which is not allowed by ReplicationEndpoint:"
          + .getClass().getName(), nullfalse);
    }
    .info("Replicating "+ + " -> " + );
    // If this is recovered, the queue is already full and the first log
    // normally has a position (unless the RS failed between 2 logs)
      try {
          this..peek().getName()));
        if (.isTraceEnabled()) {
          .trace("Recovered queue started with log " + this..peek() +
              " at position " + this..getPosition());
        }
      } catch (ReplicationException e) {
        this.terminate("Couldn't get the position of this recovered queue " +
            this.e);
      }
    }
    // Loop until we close down
    while (isActive()) {
      // Sleep until replication is enabled again
      if (!isPeerEnabled()) {
        if (sleepForRetries("Replication is disabled"sleepMultiplier)) {
          sleepMultiplier++;
        }
        continue;
      }
      Path oldPath = getCurrentPath(); //note that in the current scenario,
                                       //oldPath will be null when a log roll
                                       //happens.
      // Get a new path
      boolean hasCurrentPath = getNextPath();
      if (getCurrentPath() != null && oldPath == null) {
        sleepMultiplier = 1; //reset the sleepMultiplier on a path change
      }
      if (!hasCurrentPath) {
        if (sleepForRetries("No log to process"sleepMultiplier)) {
          sleepMultiplier++;
        }
        continue;
      }
      boolean currentWALisBeingWrittenTo = false;
      //For WAL files we own (rather than recovered), take a snapshot of whether the
      //current WAL file (this.currentPath) is in use (for writing) NOW!
      //Since the new WAL paths are enqueued only after the prev WAL file
      //is 'closed', presence of an element in the queue means that
      //the previous WAL file was closed, else the file is in use (currentPath)
      //We take the snapshot now so that we are protected against races
      //where a new file gets enqueued while the current file is being processed
      //(and where we just finished reading the current file).
      if (!this..isQueueRecovered() && .size() == 0) {
        currentWALisBeingWrittenTo = true;
      }
      // Open a reader on it
      if (!openReader(sleepMultiplier)) {
        // Reset the sleep multiplier, else it'd be reused for the next file
        sleepMultiplier = 1;
        continue;
      }
      // If we got a null reader but didn't continue, then sleep and continue
      if (this. == null) {
        if (sleepForRetries("Unable to open a reader"sleepMultiplier)) {
          sleepMultiplier++;
        }
        continue;
      }
      boolean gotIOE = false;
       = 0;
      List<HLog.Entryentries = new ArrayList<HLog.Entry>(1);
       = 0;
      try {
        if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenToentries)) {
          continue;
        }
      } catch (IOException ioe) {
        .warn(this. + " Got: "ioe);
        gotIOE = true;
        if (ioe.getCause() instanceof EOFException) {
          boolean considerDumping = false;
          if (this..isQueueRecovered()) {
            try {
              FileStatus stat = this..getFileStatus(this.);
              if (stat.getLen() == 0) {
                .warn(this. + " Got EOF and the file was empty");
              }
              considerDumping = true;
            } catch (IOException e) {
              .warn(this. + " Got while getting file size: "e);
            }
          }
          if (considerDumping &&
              sleepMultiplier == this. &&
              processEndOfFile()) {
            continue;
          }
        }
      } finally {
        try {
          this. = null;
          this..closeReader();
        } catch (IOException e) {
          gotIOE = true;
          .warn("Unable to finalize the tailing of a file"e);
        }
      }
      // If we didn't get anything to replicate, or if we hit a IOE,
      // wait a bit and retry.
      // But if we need to stop, don't bother sleeping
      if (this.isActive() && (gotIOE || entries.isEmpty())) {
        if (this. != this..getPosition()) {
          this..logPositionAndCleanOldLogs(this.,
              this.this..getPosition(),
              this..isQueueRecovered(), currentWALisBeingWrittenTo);
          this. = this..getPosition();
        }
        // Reset the sleep multiplier if nothing has actually gone wrong
        if (!gotIOE) {
          sleepMultiplier = 1;
          // if there was nothing to ship and it's not an error
          // set "ageOfLastShippedOp" to <now> to indicate that we're current
          this..setAgeOfLastShippedOp(System.currentTimeMillis());
        }
        if (sleepForRetries("Nothing to replicate"sleepMultiplier)) {
          sleepMultiplier++;
        }
        continue;
      }
      sleepMultiplier = 1;
      shipEdits(currentWALisBeingWrittenToentries);
    }
    uninitialize();
  }

  
Read all the entries from the current log files and retain those that need to be replicated. Else, process the end of the current file.

Parameters:
currentWALisBeingWrittenTo is the current WAL being written to
entries resulting entries to be replicated
Returns:
true if we got nothing and went to the next file, false if we got entries
Throws:
java.io.IOException
  protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
      List<HLog.Entryentriesthrows IOException{
    long seenEntries = 0;
    if (.isTraceEnabled()) {
      .trace("Seeking in " + this. + " at position "
          + this..getPosition());
    }
    this..seek();
    long positionBeforeRead = this..getPosition();
    HLog.Entry entry =
        this..readNextAndSetPosition();
    while (entry != null) {
      this..incrLogEditsRead();
      seenEntries++;
      // don't replicate if the log entries have already been consumed by the cluster
          || !entry.getKey().getClusterIds().contains()) {
        // Remove all KVs that should not be replicated
        entry = .filter(entry);
        WALEdit edit = null;
        HLogKey logKey = null;
        if (entry != null) {
          edit = entry.getEdit();
          logKey = entry.getKey();
        }
        if (edit != null && edit.size() != 0) {
          //Mark that the current cluster has the change
          logKey.addClusterId();
           += countDistinctRowKeys(edit);
          entries.add(entry);
           += entry.getEdit().heapSize();
        } else {
          this..incrLogEditsFiltered();
        }
      }
      // Stop if too many entries or too big
      if ( >= this. ||
          entries.size() >= this.) {
        break;
      }
      try {
        entry = this..readNextAndSetPosition();
      } catch (IOException ie) {
        .debug("Break on IOE: " + ie.getMessage());
        break;
      }
    }
    .incrLogReadInBytes(this..getPosition() - positionBeforeRead);
    if (currentWALisBeingWrittenTo) {
      return false;
    }
    // If we didn't get anything and the queue has an object, it means we
    // hit the end of the file for sure
    return seenEntries == 0 && processEndOfFile();
  }

  
Poll for the next path

Returns:
true if a path was obtained, false if not
  protected boolean getNextPath() {
    try {
      if (this. == null) {
        this. = .poll(this..);
        this..setSizeOfLogQueue(.size());
        if (this. != null) {
          this..cleanOldLogs(this..getName(),
              this.,
              this..isQueueRecovered());
          if (.isTraceEnabled()) {
            .trace("New log: " + this.);
          }
        }
      }
    } catch (InterruptedException e) {
      .warn("Interrupted while reading edits"e);
    }
    return this. != null;
  }

  
Open a reader on the current path

Parameters:
sleepMultiplier by how many times the default sleeping time is augmented
Returns:
true if we should continue with that file, false if we are over with it
  protected boolean openReader(int sleepMultiplier) {
    try {
      try {
        if (.isTraceEnabled()) {
          .trace("Opening log " + this.);
        }
        this. = .openReader(this.);
      } catch (FileNotFoundException fnfe) {
        if (this..isQueueRecovered()) {
          // We didn't find the log in the archive directory, look if it still
          // exists in the dead RS folder (there could be a chain of failures
          // to look at)
          List<StringdeadRegionServers = this..getDeadRegionServers();
          .info("NB dead servers : " + deadRegionServers.size());
          for (String curDeadServerName : deadRegionServers) {
            Path deadRsDirectory =
                new Path(.getLogDir().getParent(), curDeadServerName);
            Path[] locs = new Path[] {
                new Path(deadRsDirectory.getName()),
                new Path(deadRsDirectory.suffix(.),
                                          .getName()),
            };
            for (Path possibleLogLocation : locs) {
              .info("Possible location " + possibleLogLocation.toUri().toString());
              if (this..getFs().exists(possibleLogLocation)) {
                // We found the right new location
                .info("Log " + this. + " still exists at " +
                    possibleLogLocation);
                // Breaking here will make us sleep since reader is null
                return true;
              }
            }
          }
          // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
          // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
          if ( instanceof ReplicationSyncUp.DummyServer) {
            FileStatus[] rss = .listStatus(.getLogDir());
            for (FileStatus rs : rss) {
              Path p = rs.getPath();
              FileStatus[] logs = .listStatus(p);
              for (FileStatus log : logs) {
                p = new Path(plog.getPath().getName());
                if (p.getName().equals(.getName())) {
                   = p;
                  .info("Log " + this. + " exists under " + .getLogDir());
                  // Open the log at the new location
                  this.openReader(sleepMultiplier);
                  return true;
                }
              }
            }
          }
          // TODO What happens if the log was missing from every single location?
          // Although we need to check a couple of times as the log could have
          // been moved by the master between the checks
          // It can also happen if a recovered queue wasn't properly cleaned,
          // such that the znode pointing to a log exists but the log was
          // deleted a long time ago.
          // For the moment, we'll throw the IO and processEndOfFile
          throw new IOException("File from recovered queue is " +
              "nowhere to be found"fnfe);
        } else {
          // If the log was archived, continue reading from there
          Path archivedLogLocation =
              new Path(.getOldLogDir(), .getName());
          if (this..getFs().exists(archivedLogLocation)) {
             = archivedLogLocation;
            .info("Log " + this. + " was moved to " +
                archivedLogLocation);
            // Open the log at the new location
            this.openReader(sleepMultiplier);
          }
          // TODO What happens the log is missing in both places?
        }
      }
    } catch (IOException ioe) {
      if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
      .warn(this. + " Got: "ioe);
      this. = null;
      if (ioe.getCause() instanceof NullPointerException) {
        // Workaround for race condition in HDFS-4380
        // which throws a NPE if we open a file before any data node has the most recent block
        // Just sleep and retry. Will require re-reading compressed HLogs for compressionContext.
        .warn("Got NPE opening reader, will retry.");
      } else if (sleepMultiplier == this.) {
        // TODO Need a better way to determine if a file is really gone but
        // TODO without scanning all logs dir
        .warn("Waited too long for this file, considering dumping");
        return !processEndOfFile();
      }
    }
    return true;
  }
  /*
   * Checks whether the current log file is empty, and it is not a recovered queue. This is to
   * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
   * trying to read the log file and get EOFException. In case of a recovered queue the last log
   * file may be empty, and we don't want to retry that.
   */
  private boolean isCurrentLogEmpty() {
    return (this..getPosition() == 0 &&
        !this..isQueueRecovered() && .size() == 0);
  }

  
Do the sleeping logic

Parameters:
msg Why we sleep
sleepMultiplier by how many times the default sleeping time is augmented
Returns:
True if sleepMultiplier is < maxRetriesMultiplier
  protected boolean sleepForRetries(String msgint sleepMultiplier) {
    try {
      if (.isTraceEnabled()) {
        .trace(msg + ", sleeping " +  + " times " + sleepMultiplier);
      }
      Thread.sleep(this. * sleepMultiplier);
    } catch (InterruptedException e) {
      .debug("Interrupted while sleeping between retries");
      Thread.currentThread().interrupt();
    }
    return sleepMultiplier < ;
  }

  
Count the number of different row keys in the given edit because of mini-batching. We assume that there's at least one KV in the WALEdit.

Parameters:
edit edit to count row keys from
Returns:
number of different row keys
  private int countDistinctRowKeys(WALEdit edit) {
    List<KeyValuekvs = edit.getKeyValues();
    int distinctRowKeys = 1;
    KeyValue lastKV = kvs.get(0);
    for (int i = 0; i < edit.size(); i++) {
      if (!kvs.get(i).matchingRow(lastKV)) {
        distinctRowKeys++;
      }
    }
    return distinctRowKeys;
  }

  
Do the shipping logic

Parameters:
currentWALisBeingWrittenTo was the current WAL being (seemingly) written to when this method was called
  protected void shipEdits(boolean currentWALisBeingWrittenToList<HLog.Entryentries) {
    int sleepMultiplier = 1;
    if (entries.isEmpty()) {
      .warn("Was given 0 edits to ship");
      return;
    }
    while (this.isActive()) {
      try {
        if (this..isEnabled()) {
          long sleepTicks = this..getNextSleepInterval();
          if (sleepTicks > 0) {
            try {
              if (.isTraceEnabled()) {
                .trace("To sleep " + sleepTicks + "ms for throttling control");
              }
              Thread.sleep(sleepTicks);
            } catch (InterruptedException e) {
              .debug("Interrupted while sleeping for throttling control");
              Thread.currentThread().interrupt();
              // current thread might be interrupted to terminate
              // directly go back to while() for confirm this
              continue;
            }
            // reset throttler's cycle start tick when sleep for throttling occurs
            this..resetStartTick();
          }
        }
        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
        boolean replicated = .replicate();
        if (!replicated) {
          continue;
        }
        if (this. != this..getPosition()) {
          this..logPositionAndCleanOldLogs(this.,
              this.this..getPosition(),
              this..isQueueRecovered(), currentWALisBeingWrittenTo);
          this. = this..getPosition();
        }
        if (this..isEnabled()) {
          this..addPushSize();
        }
        this. += entries.size();
        this..shipBatch(this.this./1024);
        this..setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
        if (.isTraceEnabled()) {
          .trace("Replicated " + this. + " entries in total, or "
              + this. + " operations");
        }
        break;
      } catch (Exception ex) {
        .warn(.getClass().getName() + " threw unknown exception:" + ex);
        if (sleepForRetries("ReplicationEndpoint threw exception"sleepMultiplier)) {
          sleepMultiplier++;
        }
      }
    }
  }

  
check whether the peer is enabled or not

Returns:
true if the peer is enabled, otherwise false
  protected boolean isPeerEnabled() {
    return this..getStatusOfPeer(this.);
  }

  
If the queue isn't empty, switch to the next one Else if this is a recovered queue, it means we're done! Else we'll just continue to try reading the log file

Returns:
true if we're done with the current file, false if we should continue trying to read from it
  protected boolean processEndOfFile() {
    if (this..size() != 0) {
      if (.isTraceEnabled()) {
        String filesize = "N/A";
        try {
          FileStatus stat = this..getFileStatus(this.);
          filesize = stat.getLen()+"";
        } catch (IOException ex) {}
        .trace("Reached the end of a log, stats: " + getStats() +
            ", and the length of the file is " + filesize);
      }
      this. = null;
      this. = null;
      return true;
    } else if (this..isQueueRecovered()) {
      this..closeRecoveredQueue(this);
      .info("Finished recovering the queue with the following stats " + getStats());
      this. = false;
      return true;
    }
    return false;
  }
  public void startup() {
    String n = Thread.currentThread().getName();
        new Thread.UncaughtExceptionHandler() {
          @Override
          public void uncaughtException(final Thread tfinal Throwable e) {
            .error("Unexpected exception in ReplicationSource," +
              " currentPath=" + e);
          }
        };
    Threads.setDaemonThreadRunning(
        thisn + ".replicationSource," +
        this.handler);
  }
  public void terminate(String reason) {
    terminate(reasonnull);
  }
  public void terminate(String reasonException cause) {
    terminate(reasoncausetrue);
  }
  public void terminate(String reasonException causeboolean join) {
    if (cause == null) {
      .info("Closing source "
          + this. + " because: " + reason);
    } else {
      .error("Closing source " + this.
          + " because an error occurred: " + reasoncause);
    }
    this. = false;
    this.interrupt();
    ListenableFuture<Service.Statefuture = null;
    if (this. != null) {
      future = this..stop();
    }
    if (join) {
      Threads.shutdown(thisthis.);
      if (future != null) {
        try {
          future.get();
        } catch (Exception e) {
          .warn("Got exception:" + e);
        }
      }
    }
  }
  public String getPeerClusterZnode() {
    return this.;
  }
  public String getPeerClusterId() {
    return this.;
  }
  public Path getCurrentPath() {
    return this.;
  }
  private boolean isActive() {
    return !this..isStopped() && this. && !isInterrupted();
  }

  
Comparator used to compare logs together based on their start time
  public static class LogsComparator implements Comparator<Path> {
    @Override
    public int compare(Path o1Path o2) {
      return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
    }

    
Split a path to get the start time For example: 10.20.20.171%3A60020.1277499063250

Parameters:
p path to split
Returns:
start time
    private long getTS(Path p) {
      String[] parts = p.getName().split("\\.");
      return Long.parseLong(parts[parts.length-1]);
    }
  }
  public String getStats() {
    long position = this..getPosition();
    return "Total replicated edits: " +  +
      ", currently replicating from: " + this. +
      " at position: " + position;
  }

  
Get Replication Source Metrics

Returns:
sourceMetrics
    return this.;
  }
New to GrepCode? Check out our FAQ X