Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   *
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.util.List;
 import java.util.UUID;
 
 
 import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;

Gateway to Replication. Used by org.apache.hadoop.hbase.regionserver.HRegionServer.
 
 public class Replication implements WALActionsListener
   private static final Log LOG =
       LogFactory.getLog(Replication.class);
   private boolean replication;
   private Configuration conf;
   // Hosting server
   private Server server;
  
Statistics thread schedule pool
 
   private int statsThreadPeriod;
   // ReplicationLoad to access replication metrics
   private ReplicationLoad replicationLoad;

  
Instantiate the replication management (if rep is enabled).

Parameters:
server Hosting server
fs handle to the filesystem
logDir
oldLogDir directory where logs are archived
Throws:
java.io.IOException
 
   public Replication(final Server serverfinal FileSystem fs,
       final Path logDirfinal Path oldLogDirthrows IOException{
     initialize(serverfslogDiroldLogDir);
   }

  
Empty constructor
  public Replication() {
  }
  public void initialize(final Server serverfinal FileSystem fs,
      final Path logDirfinal Path oldLogDirthrows IOException {
    this. = server;
    this. = this..getConfiguration();
    this. = isReplication(this.);
    this. = Executors.newScheduledThreadPool(1,
      new ThreadFactoryBuilder()
        .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
        .setDaemon(true)
        .build());
    if () {
      try {
        this. =
            ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.this.);
        this..init(this..getServerName().toString());
        this. =
            ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.this.);
        this..init();
        this. =
            ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.,
              this.this.this.);
      } catch (ReplicationException e) {
        throw new IOException("Failed replication handler create"e);
      }
      UUID clusterId = null;
      try {
        clusterId = ZKClusterId.getUUIDForCluster(this..getZooKeeper());
      } catch (KeeperException ke) {
        throw new IOException("Could not read cluster id"ke);
      }
      this. =
              this.fslogDiroldLogDirclusterId);
      this. =
          this..getInt("replication.stats.thread.period.seconds", 5 * 60);
      .debug("ReplicationStatisticsThread " + this.);
      this. = new ReplicationLoad();
    } else {
      this. = null;
      this. = null;
      this. = null;
      this. = null;
      this. = null;
    }
  }

   

Parameters:
c Configuration to look at
Returns:
True if replication is enabled.
  public static boolean isReplication(final Configuration c) {
  }
   /*
    * Returns an object to listen to new hlog changes
    **/
    return this;
  }
  
Stops replication service.
  public void stopReplicationService() {
    join();
  }

  
Join with the replication threads
  public void join() {
    if (this.) {
      this..join();
      if (this. != null) {
      }
    }
  }

  
Carry on the list of log entries down to the sink

Parameters:
entries list of entries to replicate
cells The data -- the cells -- that entries describes (the entries do not contain the Cells we are replicating; they are passed here on the side in this CellScanner).
Throws:
java.io.IOException
  public void replicateLogEntries(List<WALEntryentriesCellScanner cellsthrows IOException {
    if (this.) {
      this..replicateEntries(entriescells);
    }
  }

  
If replication is enabled and this cluster is a master, it starts

  public void startReplicationService() throws IOException {
    if (this.) {
      try {
        this..init();
      } catch (ReplicationException e) {
        throw new IOException(e);
      }
      this. = new ReplicationSink(this.this.);
    }
  }

  
Get the replication sources manager

Returns:
the manager if replication is enabled, else returns false
    return this.;
  }
  public void visitLogEntryBeforeWrite(HRegionInfo infoHLogKey logKey,
      WALEdit logEdit) {
    // Not interested
  }
  public void visitLogEntryBeforeWrite(HTableDescriptor htdHLogKey logKey,
                                       WALEdit logEdit) {
    scopeWALEdits(htdlogKeylogEdit);
  }

  
Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from compaction WAL edits and if the scope is local.

Parameters:
htd Descriptor used to find the scope to use
logKey Key that may get scoped according to its edits
logEdit Edits used to lookup the scopes
  public static void scopeWALEdits(HTableDescriptor htdHLogKey logKey,
                                   WALEdit logEdit) {
    NavigableMap<byte[], Integerscopes =
        new TreeMap<byte[], Integer>(.);
    byte[] family;
    for (KeyValue kv : logEdit.getKeyValues()) {
      family = kv.getFamily();
      // This is expected and the KV should not be replicated
      if (kv.matchingFamily(.)) continue;
      // Unexpected, has a tendency to happen in unit tests
      assert htd.getFamily(family) != null;
      int scope = htd.getFamily(family).getScope();
      if (scope !=  &&
          !scopes.containsKey(family)) {
        scopes.put(familyscope);
      }
    }
    if (!scopes.isEmpty()) {
      logKey.setScopes(scopes);
    }
  }
  public void preLogRoll(Path oldPathPath newPaththrows IOException {
  }
  public void postLogRoll(Path oldPathPath newPaththrows IOException {
  }
  public void preLogArchive(Path oldPathPath newPaththrows IOException {
    // Not interested
  }
  public void postLogArchive(Path oldPathPath newPaththrows IOException {
    // Not interested
  }

  
This method modifies the master's configuration in order to inject replication-related features

Parameters:
conf
  public static void decorateMasterConfiguration(Configuration conf) {
    if (!isReplication(conf)) {
      return;
    }
    String plugins = conf.get();
    String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
    if (!plugins.contains(cleanerClass)) {
      conf.set(plugins + "," + cleanerClass);
    }
  }
  public void logRollRequested(boolean tooFewReplicas) {
    // Not interested
  }
  public void logCloseRequested() {
    // not interested
  }
  /*
   * Statistics thread. Periodically prints the cache statistics to the log.
   */
  static class ReplicationStatisticsThread extends Thread {
    private final ReplicationSink replicationSink;
    public ReplicationStatisticsThread(final ReplicationSink replicationSink,
                            final ReplicationSourceManager replicationManager) {
      super("ReplicationStatisticsThread");
      this. = replicationManager;
      this. = replicationSink;
    }
    @Override
    public void run() {
    }
    private void printStats(String stats) {
      if (!stats.isEmpty()) {
        .info(stats);
      }
    }
  }
    if (this. == null) {
      return null;
    }
    // always build for latest data
    return this.;
  }
  private void buildReplicationLoad() {
    // get source
    List<MetricsSourcesourceMetricsList = new ArrayList<MetricsSource>();
    for (ReplicationSourceInterface source : sources) {
      if (source instanceof ReplicationSource) {
        sourceMetricsList.add(((ReplicationSourcesource).getSourceMetrics());
      }
    }
    // get sink
    MetricsSink sinkMetrics = this..getSinkMetrics();
    this..buildReplicationLoad(sourceMetricsListsinkMetrics);
  }
New to GrepCode? Check out our FAQ X