Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   *
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.hadoop.hbase.io.hfile;
 
 import java.util.Map;
 
 
 
 public class PrefetchExecutor {
 
   private static final Log LOG = LogFactory.getLog(PrefetchExecutor.class);

  
Futures for tracking block prefetch activity
 
   private static final Map<Path,Future<?>> prefetchFutures =
     new ConcurrentSkipListMap<Path,Future<?>>();
  
Executor pool shared among all HFiles for block prefetch
 
   private static final ScheduledExecutorService prefetchExecutorPool;
  
Delay before beginning prefetch
 
   private static final int prefetchDelayMillis;
  
Variation in prefetch delay times, to mitigate stampedes
 
   private static final float prefetchDelayVariation;
   static {
     // Consider doing this on demand with a configuration passed in rather
     // than in a static initializer.
     Configuration conf = HBaseConfiguration.create();
     // 1s here for tests, consider 30s in hbase-default.xml
     // Set to 0 for no delay
      = conf.getInt("hbase.hfile.prefetch.delay", 1000);
      = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f);
     int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4);
      = new ScheduledThreadPoolExecutor(prefetchThreads,
       new ThreadFactory() {
         @Override
         public Thread newThread(Runnable r) {
           Thread t = new Thread(r);
           t.setName("hfile-prefetch-" + System.currentTimeMillis());
           t.setDaemon(true);
           return t;
         }
     });
   }
 
   private static final Random RNG = new Random();
 
   // TODO: We want HFile, which is where the blockcache lives, to handle
   // prefetching of file blocks but the Store level is where path convention
   // knowledge should be contained
   private static final Pattern prefetchPathExclude =
       Pattern.compile(
         "(" +
           . +
             ..replace(".""\\.") +
             . +
         ")|(" +
           . +
             ..replace(".""\\.") +
             . +
         ")");
 
   public static void request(Path pathRunnable runnable) {
     if (!.matcher(path.toString()).find()) {
       long delay;
       if ( > 0) {
         delay = (long)(( * (1.0f - (/2))) +
         ( * (/2) * .nextFloat()));
       } else {
         delay = 0;
       }
      try {
        if (.isDebugEnabled()) {
          .debug("Prefetch requested for " + path + ", delay=" + delay + " ms");
        }
        .put(path.schedule(runnabledelay,
          .));
      } catch (RejectedExecutionException e) {
        .remove(path);
        .warn("Prefetch request rejected for " + path);
      }
    }
  }
  public static void complete(Path path) {
    .remove(path);
    if (.isDebugEnabled()) {
      .debug("Prefetch completed for " + path);
    }
  }
  public static void cancel(Path path) {
    Future<?> future = .get(path);
    if (future != null) {
      // ok to race with other cancellation attempts
      future.cancel(true);
      .remove(path);
      if (.isDebugEnabled()) {
        .debug("Prefetch cancelled for " + path);
      }
    }
  }
  public static boolean isCompleted(Path path) {
    Future<?> future = .get(path);
    if (future != null) {
      return future.isDone(); 
    }
    return true;
  }
New to GrepCode? Check out our FAQ X