Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package org.apache.hadoop.hbase.regionserver;
 
 import java.util.List;
 
 
Compact region on request and then run split if appropriate
 
 public class CompactSplitThread implements CompactionRequestor {
   static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
 
   public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
       "hbase.regionserver.regionSplitLimit";
   public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
   
   private final HRegionServer server;
   private final Configuration conf;
 
   private final ThreadPoolExecutor largeCompactions;
   private final ThreadPoolExecutor smallCompactions;
   private final ThreadPoolExecutor splits;
   private final ThreadPoolExecutor mergePool;
 
Splitting should not take place if the total number of regions exceed this. This is not a hard limit to the number of regions but it is a guideline to stop splitting after number of online regions is greater than this.
 
   private int regionSplitLimit;

  

Parameters:
server
 
     super();
     this. = server;
     this. = server.getConfiguration();
 
     int largeThreads = Math.max(1, .getInt(
         "hbase.regionserver.thread.compaction.large", 1));
     int smallThreads = .getInt(
         "hbase.regionserver.thread.compaction.small", 1);
 
     int splitThreads = .getInt("hbase.regionserver.thread.split", 1);
 
     // if we have throttle threads, make sure the user also specified size
     Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
 
     final String n = Thread.currentThread().getName();
 
    this. = new ThreadPoolExecutor(largeThreadslargeThreads,
        60, .new PriorityBlockingQueue<Runnable>(),
        new ThreadFactory() {
          @Override
          public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName(n + "-largeCompactions-" + System.currentTimeMillis());
            return t;
          }
      });
    this. = new ThreadPoolExecutor(smallThreadssmallThreads,
        60, .new PriorityBlockingQueue<Runnable>(),
        new ThreadFactory() {
          @Override
          public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName(n + "-smallCompactions-" + System.currentTimeMillis());
            return t;
          }
      });
    this.
        .setRejectedExecutionHandler(new Rejection());
    this. = (ThreadPoolExecutor)
        Executors.newFixedThreadPool(splitThreads,
            new ThreadFactory() {
          @Override
          public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName(n + "-splits-" + System.currentTimeMillis());
            return t;
          }
      });
    int mergeThreads = .getInt("hbase.regionserver.thread.merge", 1);
    this. = (ThreadPoolExecutor) Executors.newFixedThreadPool(
        mergeThreadsnew ThreadFactory() {
          @Override
          public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName(n + "-merges-" + System.currentTimeMillis());
            return t;
          }
        });
    // compaction throughput controller
        CompactionThroughputControllerFactory.create(server);
  }
  public String toString() {
    return "compaction_queue=("
        + .getQueue().size() + ":"
        + .getQueue().size() + ")"
        + ", split_queue=" + .getQueue().size()
        + ", merge_queue=" + .getQueue().size();
  }
  
  public String dumpQueue() {
    StringBuffer queueLists = new StringBuffer();
    queueLists.append("Compaction/Split Queue dump:\n");
    queueLists.append("  LargeCompation Queue:\n");
    Iterator<Runnableit = lq.iterator();
    while (it.hasNext()) {
      queueLists.append("    " + it.next().toString());
      queueLists.append("\n");
    }
    if ( != null) {
      queueLists.append("\n");
      queueLists.append("  SmallCompation Queue:\n");
      lq = .getQueue();
      it = lq.iterator();
      while (it.hasNext()) {
        queueLists.append("    " + it.next().toString());
        queueLists.append("\n");
      }
    }
    queueLists.append("\n");
    queueLists.append("  Split Queue:\n");
    lq = .getQueue();
    it = lq.iterator();
    while (it.hasNext()) {
      queueLists.append("    " + it.next().toString());
      queueLists.append("\n");
    }
    queueLists.append("\n");
    queueLists.append("  Region Merge Queue:\n");
    lq = .getQueue();
    it = lq.iterator();
    while (it.hasNext()) {
      queueLists.append("    " + it.next().toString());
      queueLists.append("\n");
    }
    return queueLists.toString();
  }
  public synchronized void requestRegionsMerge(final HRegion a,
      final HRegion bfinal boolean forcible) {
    try {
      .execute(new RegionMergeRequest(abthis.forcible));
      if (.isDebugEnabled()) {
        .debug("Region merge requested for " + a + "," + b + ", forcible="
            + forcible + ".  " + this);
      }
    } catch (RejectedExecutionException ree) {
      .warn("Could not execute merge for " + a + "," + b + ", forcible="
          + forcibleree);
    }
  }
  public synchronized boolean requestSplit(final HRegion r) {
    // don't split regions that are blocking
      byte[] midKey = r.checkSplit();
      if (midKey != null) {
        requestSplit(rmidKey);
        return true;
      }
    }
    return false;
  }
  public synchronized void requestSplit(final HRegion rbyte[] midKey) {
    if (midKey == null) {
      .debug("Region " + r.getRegionNameAsString() +
        " not splittable because midkey=null");
      if (r.shouldForceSplit()) {
        r.clearSplit();
      }
      return;
    }
    try {
      this..execute(new SplitRequest(rmidKeythis.));
      if (.isDebugEnabled()) {
        .debug("Split requested for " + r + ".  " + this);
      }
    } catch (RejectedExecutionException ree) {
      .info("Could not execute split for " + rree);
    }
  }
  public synchronized List<CompactionRequestrequestCompaction(final HRegion rfinal String why)
      throws IOException {
    return requestCompaction(rwhynull);
  }
  public synchronized List<CompactionRequestrequestCompaction(final HRegion rfinal String why,
      List<Pair<CompactionRequestStore>> requeststhrows IOException {
    return requestCompaction(rwhy.requests);
  }
  public synchronized CompactionRequest requestCompaction(final HRegion rfinal Store s,
      final String whyCompactionRequest requestthrows IOException {
    return requestCompaction(rswhy.request);
  }
  public synchronized List<CompactionRequestrequestCompaction(final HRegion rfinal String why,
      int pList<Pair<CompactionRequestStore>> requeststhrows IOException {
    return requestCompactionInternal(rwhyprequeststrue);
  }
  private List<CompactionRequestrequestCompactionInternal(final HRegion rfinal String why,
      int pList<Pair<CompactionRequestStore>> requestsboolean selectNowthrows IOException {
    // not a special compaction request, so make our own list
    List<CompactionRequestret = null;
    if (requests == null) {
      ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
      for (Store s : r.getStores().values()) {
        CompactionRequest cr = requestCompactionInternal(rswhypnullselectNow);
        if (selectNowret.add(cr);
      }
    } else {
      Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
      ret = new ArrayList<CompactionRequest>(requests.size());
      for (Pair<CompactionRequestStorepair : requests) {
        ret.add(requestCompaction(rpair.getSecond(), whyppair.getFirst()));
      }
    }
    return ret;
  }
  public CompactionRequest requestCompaction(final HRegion rfinal Store s,
      final String whyint priorityCompactionRequest requestthrows IOException {
    return requestCompactionInternal(rswhypriorityrequesttrue);
  }
  public synchronized void requestSystemCompaction(
      final HRegion rfinal String whythrows IOException {
    requestCompactionInternal(rwhy.nullfalse);
  }
  public void requestSystemCompaction(
      final HRegion rfinal Store sfinal String whythrows IOException {
    requestCompactionInternal(rswhy.nullfalse);
  }

  

Parameters:
r HRegion store belongs to
s Store to request compaction on
why Why compaction requested -- used in debug messages
priority override the default priority (NO_PRIORITY == decide)
request custom compaction request. Can be null in which case a simple compaction will be used.
  private synchronized CompactionRequest requestCompactionInternal(final HRegion rfinal Store s,
      final String whyint priorityCompactionRequest requestboolean selectNow)
          throws IOException {
    if (this..isStopped()
        || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
      return null;
    }
    CompactionContext compaction = null;
    if (selectNow) {
      compaction = selectCompaction(rspriorityrequest);
      if (compaction == nullreturn null// message logged inside
    }
    // We assume that most compactions are small. So, put system compactions into small
    // pool; we will do selection there, and move to large pool if necessary.
    long size = selectNow ? compaction.getRequest().getSize() : 0;
    ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size))
    pool.execute(new CompactionRunner(srcompactionpool));
    if (.isDebugEnabled()) {
      String type = (pool == ) ? "Small " : "Large ";
      .debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
          + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
    }
    return selectNow ? compaction.getRequest() : null;
  }
  private CompactionContext selectCompaction(final HRegion rfinal Store s,
      int priorityCompactionRequest requestthrows IOException {
    CompactionContext compaction = s.requestCompaction(priorityrequest);
    if (compaction == null) {
      if(.isDebugEnabled()) {
        .debug("Not compacting " + r.getRegionNameAsString() +
            " because compaction request was cancelled");
      }
      return null;
    }
    assert compaction.hasSelection();
    if (priority != .) {
      compaction.getRequest().setPriority(priority);
    }
    return compaction;
  }

  
Only interrupt once it's done with a run through the work loop.
    .shutdown();
  }
  private void waitFor(ThreadPoolExecutor tString name) {
    boolean done = false;
    while (!done) {
      try {
        done = t.awaitTermination(60, .);
        .info("Waiting for " + name + " to finish...");
        if (!done) {
          t.shutdownNow();
        }
      } catch (InterruptedException ie) {
        .warn("Interrupted waiting for " + name + " to finish...");
      }
    }
  }
  void join() {
    waitFor("Split Thread");
    waitFor("Merge Thread");
    waitFor("Large Compaction Thread");
    waitFor("Small Compaction Thread");
  }

  
Returns the current size of the queue containing regions that are processed.

Returns:
The current size of the regions queue.
  public int getCompactionQueueSize() {
  }
  public int getLargeCompactionQueueSize() {
    return .getQueue().size();
  }
  public int getSmallCompactionQueueSize() {
    return .getQueue().size();
  }
  public int getSplitQueueSize() {
    return .getQueue().size();
  }
  private boolean shouldSplitRegion() {
      .warn("Total number of regions is approaching the upper limit " +  + ". "
          + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
    }
  }

  

Returns:
the regionSplitLimit
  public int getRegionSplitLimit() {
    return this.;
  }
  private class CompactionRunner implements RunnableComparable<CompactionRunner> {
    private final Store store;
    private final HRegion region;
    private CompactionContext compaction;
    private int queuedPriority;
    private ThreadPoolExecutor parent;
    public CompactionRunner(Store storeHRegion region,
        CompactionContext compactionThreadPoolExecutor parent) {
      super();
      this. = store;
      this. = region;
      this. = compaction;
      this. = (this. == null)
          ? store.getCompactPriority() : compaction.getRequest().getPriority();
      this. = parent;
    }
    @Override
    public String toString() {
      return (this. != null) ? ("Request = " + .getRequest())
          : ("Store = " + .toString() + ", pri = " + );
    }
    @Override
    public void run() {
      Preconditions.checkNotNull();
      if (.isStopped()
          || (.getTableDesc() != null && !.getTableDesc().isCompactionEnabled())) {
        return;
      }
      // Common case - system compaction without a file selection. Select now.
      if (this. == null) {
        int oldPriority = this.;
        this. = this..getCompactPriority();
        if (this. > oldPriority) {
          // Store priority decreased while we were in queue (due to some other compaction?),
          // requeue with new priority to avoid blocking potential higher priorities.
          this..execute(this);
          return;
        }
        try {
          this. = selectCompaction(this.this.null);
        } catch (IOException ex) {
          .error("Compaction selection failed " + thisex);
          .checkFileSystem();
          return;
        }
        if (this. == nullreturn// nothing to do
        // Now see if we are in correct pool for the size; if not, go to the correct one.
        // We might end up waiting for a while, so cancel the selection.
        assert this..hasSelection();
            .getRequest().getSize()) ?  : ;
        if (this. != pool) {
          this..cancelRequestedCompaction(this.);
          this. = null;
          this. = pool;
          this..execute(this);
          return;
        }
      }
      // Finally we can compact something.
      assert this. != null;
      this..getRequest().beforeExecute();
      try {
        // Note: please don't put single-compaction logic here;
        //       put it into region/store/etc. This is CST logic.
        long start = EnvironmentEdgeManager.currentTimeMillis();
        boolean completed = .compact();
        long now = EnvironmentEdgeManager.currentTimeMillis();
        .info(((completed) ? "Completed" : "Aborted") + " compaction: " +
              this + "; duration=" + StringUtils.formatTimeDiff(nowstart));
        if (completed) {
          // degenerate case: blocked regions require recursive enqueues
          if (.getCompactPriority() <= 0) {
            requestSystemCompaction("Recursive enqueue");
          } else {
            // see if the compaction has caused us to exceed max region size
            requestSplit();
          }
        }
      } catch (IOException ex) {
        IOException remoteEx = RemoteExceptionHandler.checkIOException(ex);
        .error("Compaction failed " + thisremoteEx);
        if (remoteEx != ex) {
          .info("Compaction failed at original callstack: " + formatStackTrace(ex));
        }
        .checkFileSystem();
      } catch (Exception ex) {
        .error("Compaction failed " + thisex);
        .checkFileSystem();
      } finally {
        .debug("CompactSplitThread Status: " + CompactSplitThread.this);
      }
      this..getRequest().afterExecute();
    }
    private String formatStackTrace(Exception ex) {
      StringWriter sw = new StringWriter();
      PrintWriter pw = new PrintWriter(sw);
      ex.printStackTrace(pw);
      pw.flush();
      return sw.toString();
    }
    @Override
    public int compareTo(CompactionRunner o) {
      // Only compare the underlying request (if any), for queue sorting purposes.
      int compareVal =  - o.queuedPriority// compare priority
      if (compareVal != 0) return compareVal;
      CompactionContext tc = this.oc = o.compaction;
      // Sort pre-selected (user?) compactions before system ones with equal priority.
      return (tc == null) ? ((oc == null) ? 0 : 1)
          : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
    }
  }

  
Cleanup class to use when rejecting a compaction request from the queue.
  private static class Rejection implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable runnableThreadPoolExecutor pool) {
      if (runnable instanceof CompactionRunner) {
        CompactionRunner runner = (CompactionRunner)runnable;
        .debug("Compaction Rejected: " + runner);
        runner.store.cancelRequestedCompaction(runner.compaction);
      }
    }
  }
  }
New to GrepCode? Check out our FAQ X