Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
 package org.apache.hadoop.hbase.replication.regionserver;
 import java.util.List;
 import java.util.Map;
Maintains a collection of peers to replicate to, and randomly selects a single peer to replicate to per set of data to replicate. Also handles keeping track of peer availability.
 public class ReplicationSinkManager {
   private static final Log LOG = LogFactory.getLog(ReplicationSinkManager.class);

Default maximum number of times a replication sink can be reported as bad before it will no longer be provided as a sink for replication without the pool of replication sinks being refreshed.
   static final int DEFAULT_BAD_SINK_THRESHOLD = 3;

Default ratio of the total number of peer cluster region servers to consider replicating to.
   static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.1f;
   private final HConnection conn;
   private final String peerClusterId;
   private final HBaseReplicationEndpoint endpoint;
   // Count of "bad replication sink" reports per peer sink
   private final Map<ServerNameIntegerbadReportCounts;
   // Ratio of total number of potential peer region servers to be used
   private final float ratio;
   // Maximum number of times a sink can be reported as bad before the pool of
   // replication sinks is refreshed
   private final int badSinkThreshold;
   private final Random random;
   // A timestamp of the last time the list of replication peers changed
   private long lastUpdateToPeers;
   // The current pool of sinks to which replication can be performed
   private List<ServerNamesinks = Lists.newArrayList();

Instantiate for a single replication peer cluster.

conn connection to the peer cluster
peerClusterId identifier of the peer cluster
endpoint replication endpoint for inter cluster replication
conf HBase configuration, used for determining replication source ratio and bad peer threshold
   public ReplicationSinkManager(HConnection connString peerClusterId,
       HBaseReplicationEndpoint endpointConfiguration conf) {
     this. = conn;
     this. = peerClusterId;
     this. = endpoint;
     this. = Maps.newHashMap();
     this. = conf.getFloat("replication.source.ratio");
     this. = conf.getInt("replication.bad.sink.threshold",
    this. = new Random();

Get a randomly-chosen replication sink to replicate to.

a replication sink to replicate to
  public SinkPeer getReplicationSink() throws IOException {
      .info("Current list of sinks is out of date, updating");
    if (.isEmpty()) {
      throw new IOException("No replication sinks are available");
    ServerName serverName = .get(.nextInt(.size()));
    return new SinkPeer(serverName.getAdmin(serverName));

Report a SinkPeer as being bad (i.e. an attempt to replicate to it failed). If a single SinkPeer is reported as bad more than replication.bad.sink.threshold times, it will be removed from the pool of potential replication targets.

sinkPeer The SinkPeer that had a failed replication attempt on it
  public void reportBadSink(SinkPeer sinkPeer) {
    ServerName serverName = sinkPeer.getServerName();
    int badReportCount = (.containsKey(serverName)
                    ? .get(serverName) : 0) + 1;
    if (badReportCount > ) {
      if (.isEmpty()) {
  void chooseSinks() {
    List<ServerNameslaveAddresses = .getRegionServers();
    int numSinks = (int) Math.ceil(slaveAddresses.size() * );
     = slaveAddresses.subList(0, numSinks);
    return ;

Wraps a replication region server sink to provide the ability to identify it.
  public static class SinkPeer {
    private ServerName serverName;
    public SinkPeer(ServerName serverNameAdminService.BlockingInterface regionServer) {
      this. = serverName;
      this. = regionServer;
      return ;
      return ;
New to GrepCode? Check out our FAQ X