Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (C) 2012 Ness Computing, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package com.nesscomputing.concurrent;
 
 import java.util.Set;
 
 
 
Guice bindings for a configurable, lifecycled java.util.concurrent.ExecutorService. The executor service is bound as @Named(threadPoolName) ExecutorService myService. The service will be shut down during com.nesscomputing.lifecycle.LifecycleStage.STOP_STAGE. Configuration has the prefix ness.thread-pool.[pool-name].

See also:
ThreadPoolConfiguration Thread pool configuration options
 
 public class NessThreadPoolModule extends AbstractModule
 {
     private static final Log LOG = Log.findLog();
     private final String threadPoolName;
     private final Annotation annotation;
 
 
     private boolean threadDelegatingWrapperEnabled = true;
     private boolean timingWrapperEnabled = true;
 
     NessThreadPoolModule(String threadPoolName)
     {
         this. = threadPoolName;
         this. = Names.named(threadPoolName);
     }

    
Create a default thread pool.
 
     public static NessThreadPoolModule defaultPool(String threadPoolName)
     {
         return new NessThreadPoolModule(threadPoolName);
     }

    
Create a thread pool for long-running tasks. The default settings will change to not have a run queue.
 
     public static NessThreadPoolModule longTaskPool(String threadPoolNameint poolSize)
     {
        return new NessThreadPoolModule(threadPoolName).withDefaultMaxThreads(poolSize).withDefaultQueueSize(0);
    }

    
Create a thread pool for short-running tasks. The default settings will change to have one thread available per core, plus a few to pick up slack.
    public static NessThreadPoolModule shortTaskPool(String threadPoolNameint queueSize)
    {
        return new NessThreadPoolModule(threadPoolName).withDefaultMaxThreads(Runtime.getRuntime().availableProcessors() + 2).withDefaultQueueSize(queueSize);
    }
    @Override
    protected void configure()
    {
        Multibinder.newSetBinder(binder(), CallableWrapper.class);
        PoolProvider poolProvider = new PoolProvider();
        bind (ExecutorService.class).annotatedWith().toProvider(poolProvider).in(.);
        if () {
        }
        if () {
        }
    }

    
Set the default pool core thread count.
    public NessThreadPoolModule withDefaultMinThreads(int defaultMinThreads)
    {
        this. = defaultMinThreads;
        return this;
    }

    
Set the default pool max thread count. May be 0, in which case the executor will be a com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor().
    public NessThreadPoolModule withDefaultMaxThreads(int defaultMaxThreads)
    {
        this. = defaultMaxThreads;
        return this;
    }

    
Set the default worker thread idle timeout.
    public NessThreadPoolModule withDefaultThreadTimeout(long durationTimeUnit units)
    {
         = new TimeSpan(durationunits);
        return this;
    }

    
Set the default queue length. May be 0, in which case the queue will be a java.util.concurrent.SynchronousQueue.
    public NessThreadPoolModule withDefaultQueueSize(int defaultQueueSize)
    {
        this. = defaultQueueSize;
        return this;
    }

    
Set the default rejected execution handler.
    {
        this. = defaultRejectedHandler;
        return this;
    }

    
Add a CallableWrapper that may decorate this executor service.
    {
        return Multibinder.newSetBinder(binderCallableWrapper.class).permitDuplicates().addBinding();
    }

    
Remove thread delegated wrapper.
    {
        this. = false;
        return this;
    }

    
Remove timing wrapper.
    {
        this. = false;
        return this;
    }
    private String createMBeanName()
    {
        return "com.nesscomputing.concurrent:type=ThreadPool,name=" + ;
    }
    @Singleton
    class PoolProvider implements Provider<ExecutorService>
    {
        private ThreadPoolConfiguration config;
        private volatile ExecutorService service;
        private volatile ExecutorServiceManagementBean management;
        private Set<CallableWrapperwrappers;
        @Inject
        public void inject(Config configLifecycle lifecycleInjector injector)
        {
             = injector.getInstance(Key.get(new TypeLiteral<Set<CallableWrapper>>() {}, ));
            this. = config.getBean("ness.thread-pool." + ThreadPoolConfiguration.class);
             = create();
            lifecycle.addListener(.new LifecycleListener() {
                @Override
                public void onStage(LifecycleStage lifecycleStage)
                {
                    stopExecutor();
                }
            });
        }
        @Override
        public ExecutorService get()
        {
            ExecutorService myService = ;
            Preconditions.checkState(myService != null"Thread pool %s was injected before lifecycle start or after lifecycle stop.  " +
            		"You might consider injecting a Provider instead, or maybe you forgot a Lifecycle entirely.");
            return myService;
        }
        void stopExecutor()
        {
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
            ExecutorService myService = ;
            Preconditions.checkState(myService != null"no service was ever started?");
            myService.shutdown();
            try {
                if (!myService.awaitTermination(20, .))
                {
                    .error("Executor service %s did not shut down after 20 seconds of waiting!");
                    myService.shutdownNow();
                }
            } catch (InterruptedException e) {
                .warn(e"While awaiting executor %s termination");
                Thread.currentThread().interrupt();
            }
            .info("Executor service %s shutdown after %s"stopWatch);
        }
        private ExecutorService create() {
            Preconditions.checkArgument( != null"no config injected");
            Integer queueSize = Objects.firstNonNull(.getQueueSize(), );
            Integer minThreads = Objects.firstNonNull(.getMinThreads(), );
            Integer maxThreads = Objects.firstNonNull(.getMaxThreads(), );
            TimeSpan threadTimeout = Objects.firstNonNull(.getThreadTimeout(), );
            RejectedHandler rejectedHandlerEnum = .getRejectedHandler();
            RejectedExecutionHandler rejectedHandler = rejectedHandlerEnum != null ? rejectedHandlerEnum.getHandler() : ;
            final BlockingQueue<Runnablequeue;
            final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat( + "-%d").build();
            if (queueSize == 0) {
                queue = new SynchronousQueue<Runnable>();
            } else {
                queue = new LinkedBlockingQueue<Runnable>(queueSize);
            }
            final ExecutorService result;
            if (maxThreads <= 0) {
                result = MoreExecutors.sameThreadExecutor();
                 = new GenericExecutorManagementBean(resultnew SynchronousQueue<>());
            } else {
                ThreadPoolExecutor executor = new LoggingExecutor(
                        minThreads,
                        maxThreads,
                        threadTimeout.getMillis(),
                        .,
                        queue,
                        threadFactory,
                        rejectedHandler);
                 = new ThreadPoolExecutorManagementBean(executor);
                result = executor;
            }
            return DecoratingExecutors.decorate(result, CallableWrappers.combine());
        }
        {
            return new ManagementProvider();
        }
        class ManagementProvider implements Provider<ExecutorServiceManagementBean>
        {
            @Inject
            void setInjector(Injector injector)
            {
                // Ensure that create() has been called so that management is set.
                injector.getInstance(Key.get(ExecutorService.class));
            }
            @Override
            public ExecutorServiceManagementBean get()
            {
                return ;
            }
        }
    }
New to GrepCode? Check out our FAQ X