Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 package org.apache.hadoop.hbase.master;
 import java.util.List;
 import java.util.Map;
Class to publish the cluster status to the client. This allows them to know immediately the dead region servers, hence to cut the connection they have with them, eventually stop waiting on the socket. This improves the mean time to recover, and as well allows to increase on the client the different timeouts, as the dead servers will be detected separately.
 public class ClusterStatusPublisher extends Chore {
The implementation class used to publish the status. Default is null (no publish). Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the status.
   public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
   public static final Class<? extends ClusterStatusPublisher.Publisher>
The minimum time between two status messages, in milliseconds.
   public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period";
   public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000;
   private long lastMessageTime = 0;
   private final HMaster master;
   private final int messagePeriod// time between two message
   private final ConcurrentMap<ServerNameIntegerlastSent =
       new ConcurrentHashMap<ServerNameInteger>();
   private Publisher publisher;
   private boolean connected = false;

We want to limit the size of the protobuf message sent, do fit into a single packet. a reasonable size for ip / ethernet is less than 1Kb.
   public final static int MAX_SERVER_PER_MESSAGE = 10;

If a server dies, we're sending the information multiple times in case a receiver misses the message.
  public final static int NB_SEND = 5;
  public ClusterStatusPublisher(HMaster masterConfiguration conf,
                                Class<? extends PublisherpublisherClass)
      throws IOException {
    super("HBase clusterStatusPublisher for " + master.getName(),
    this. = master;
    try {
      this. = publisherClass.newInstance();
    } catch (InstantiationException e) {
      throw new IOException("Can't create publisher " + publisherClass.getName(), e);
    } catch (IllegalAccessException e) {
      throw new IOException("Can't create publisher " + publisherClass.getName(), e);
     = true;
  // For tests only
  protected ClusterStatusPublisher() {
     = null;
     = 0;
  protected void chore() {
    if (!) {
    if (sns.isEmpty()) {
      // Nothing to send. Done.
    final long curTime = EnvironmentEdgeManager.currentTimeMillis();
    if ( > curTime - ) {
      // We already sent something less than 10 second ago. Done.
    // Ok, we're going to send something then.
     = curTime;
    // We're reusing an existing protobuf message, but we don't send everything.
    // This could be extended in the future, for example if we want to send stuff like the
    //  hbase:meta server name.
    ClusterStatus cs = new ClusterStatus(VersionInfo.getVersion(),
  protected void cleanup() {
     = false;

Create the dead server to send. A dead server is sent NB_SEND times. We send at max MAX_SERVER_PER_MESSAGE at a time. if there are too many dead servers, we send the newly dead first.
    // We're getting the message sent since last time, and add them to the list
    long since = EnvironmentEdgeManager.currentTimeMillis() -  * 2;
    for (Pair<ServerNameLongdead : getDeadServers(since)) {
      .putIfAbsent(dead.getFirst(), 0);
    // We're sending the new deads first.
    List<Map.Entry<ServerNameInteger>> entries = new ArrayList<Map.Entry<ServerNameInteger>>();
    Collections.sort(entriesnew Comparator<Map.Entry<ServerNameInteger>>() {
      public int compare(Map.Entry<ServerNameIntegero1Map.Entry<ServerNameIntegero2) {
        return o1.getValue().compareTo(o2.getValue());
    // With a limit of MAX_SERVER_PER_MESSAGE
    int max = entries.size() >  ?  : entries.size();
    List<ServerNameres = new ArrayList<ServerName>(max);
    for (int i = 0; i < maxi++) {
      Map.Entry<ServerNameIntegertoSend = entries.get(i);
      if (toSend.getValue() >= ( - 1)) {
      } else {
        .replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1);
    return res;

Get the servers which died since a given timestamp. protected because it can be subclassed by the tests.
  protected List<Pair<ServerNameLong>> getDeadServers(long since) {
    if (.getServerManager() == null) {
      return Collections.emptyList();
  public interface Publisher extends Closeable {
    void connect(Configuration confthrows IOException;
    void publish(ClusterStatus cs);
    void close();
  public static class MulticastPublisher implements Publisher {
    private DatagramChannel channel;
    private final ExecutorService service = Executors.newSingleThreadExecutor(
    public MulticastPublisher() {
    public void connect(Configuration confthrows IOException {
      String mcAddress = conf.get(.,
      int port = conf.getInt(.,
      // Can't be NiO with Netty today => not implemented in Netty.
      b.setPipeline(Channels.pipeline(new ProtobufEncoder(),
          new ChannelUpstreamHandler() {
            public void handleUpstream(ChannelHandlerContext ctxChannelEvent e)
                throws Exception {
              // We're just writing here. Discard any incoming data. See HBASE-8466.
       = (DatagramChannelb.bind(new InetSocketAddress(0));
      InetAddress ina;
      try {
        ina = InetAddress.getByName(mcAddress);
      } catch (UnknownHostException e) {
        throw new IOException("Can't connect to " + mcAddresse);
      .connect(new InetSocketAddress(mcAddressport));
    public void publish(ClusterStatus cs) {
      ClusterStatusProtos.ClusterStatus csp = cs.convert();
    public void close() {
      if ( != null) {
New to GrepCode? Check out our FAQ X