Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 package org.apache.hadoop.hbase.replication.regionserver;
 import java.util.List;
A org.apache.hadoop.hbase.replication.ReplicationEndpoint implementation for replicating to another HBase cluster. For the slave cluster it selects a random number of peers using a replication ratio. For example, if replication ration = 0.1 and slave cluster has 100 region servers, 10 will be selected.

A stream is considered down when we cannot contact a region server on the peer cluster for more than 55 seconds by default.

   private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
   private HConnection conn;
   private Configuration conf;
   // How long should we sleep for each retry
   private long sleepForRetries;
   // Maximum number of retries before taking bold actions
   private int maxRetriesMultiplier;
   // Socket timeouts require even bolder actions since we don't want to DDOS
   private int socketTimeoutMultiplier;
   //Metrics for this source
   private MetricsSource metrics;
   // Handles connecting to peer region servers
   private boolean peersSelected = false;
   public void init(Context contextthrows IOException {
     this. = HBaseConfiguration.create(.getConfiguration());
     this. = this..getInt("replication.source.maxretriesmultiplier", 10);
     this. = this..getInt("replication.source.socketTimeoutMultiplier",
     // TODO: This connection is replication specific or we should make it particular to
     // replication and make replication specific settings such as compression or codec to use
     // passing Cells.
     this. = HConnectionManager.createConnection(this.);
     this. =
         this..getLong("replication.source.sleepforretries", 1000);
     this. = context.getMetrics();
     // ReplicationQueueInfo parses the peerId out of the znode for us
     this. = new ReplicationSinkManager(.getPeerId(), thisthis.);
   private void decorateConf() {
     String replicationCodec = this..get(.);
     if (StringUtils.isNotEmpty(replicationCodec)) {
  private void connectToPeers() {
    int sleepMultiplier = 1;
    // Connect to peer cluster first, unless we have to stop
    while (this.isRunning() && .getSinks().size() == 0) {
      if (this.isRunning() && .getSinks().size() == 0) {
        if (sleepForRetries("Waiting for peers"sleepMultiplier)) {

Do the sleeping logic

msg Why we sleep
sleepMultiplier by how many times the default sleeping time is augmented
True if sleepMultiplier is < maxRetriesMultiplier
  protected boolean sleepForRetries(String msgint sleepMultiplier) {
    try {
      if (.isTraceEnabled()) {
        .trace(msg + ", sleeping " +  + " times " + sleepMultiplier);
      Thread.sleep(this. * sleepMultiplier);
    } catch (InterruptedException e) {
      .debug("Interrupted while sleeping between retries");
    return sleepMultiplier < ;

Do the shipping logic
  public boolean replicate(ReplicateContext replicateContext) {
    List<HLog.Entryentries = replicateContext.getEntries();
    int sleepMultiplier = 1;
    while (this.isRunning()) {
      if (!) {
         = true;
      if (!isPeerEnabled()) {
        if (sleepForRetries("Replication is disabled"sleepMultiplier)) {
      SinkPeer sinkPeer = null;
      try {
        sinkPeer = .getReplicationSink();
        BlockingInterface rrs = sinkPeer.getRegionServer();
        if (.isTraceEnabled()) {
          .trace("Replicating " + entries.size() +
              " entries of total size " + replicateContext.getSize());
            entries.toArray(new HLog.Entry[entries.size()]));
        // update metrics
        return true;
      } catch (IOException ioe) {
        // Didn't ship anything, but must still age the last time we did
        if (ioe instanceof RemoteException) {
          ioe = ((RemoteExceptionioe).unwrapRemoteException();
          .warn("Can't replicate because of an error on the remote cluster: "ioe);
          if (ioe instanceof TableNotFoundException) {
            if (sleepForRetries("A table is missing in the peer cluster. "
                + "Replication cannot proceed without losing data."sleepMultiplier)) {
        } else {
          if (ioe instanceof SocketTimeoutException) {
            // This exception means we waited for more than 60s and nothing
            // happened, the cluster is alive and calling it right away
            // even for a test just makes things worse.
            sleepForRetries("Encountered a SocketTimeoutException. Since the " +
              "call to the remote cluster timed out, which is usually " +
              "caused by a machine failure or a massive slowdown",
          } else if (ioe instanceof ConnectException) {
            .warn("Peer is unavailable, rechecking all sinks: "ioe);
          } else {
            .warn("Can't replicate because of a local or network error: "ioe);
        if (sinkPeer != null) {
        if (sleepForRetries("Since we are unable to replicate"sleepMultiplier)) {
    return false// in case we exited before replicating
  protected boolean isPeerEnabled() {
  protected void doStop() {
    disconnect(); //don't call super.doStop()
    if (this. != null) {
      try {
        this. = null;
      } catch (IOException e) {
        .warn("Failed to close the connection");
New to GrepCode? Check out our FAQ X