Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  //
  //  ========================================================================
  //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
  //  ------------------------------------------------------------------------
  //  All rights reserved. This program and the accompanying materials
  //  are made available under the terms of the Eclipse Public License v1.0
  //  and Apache License v2.0 which accompanies this distribution.
  //
  //      The Eclipse Public License is available at
 //      http://www.eclipse.org/legal/epl-v10.html
 //
 //      The Apache License v2.0 is available at
 //      http://www.opensource.org/licenses/apache2.0.php
 //
 //  You may elect to redistribute this code under either of these licenses.
 //  ========================================================================
 //
 
 
 package org.eclipse.jetty.util.thread;
 
 import java.util.List;
 
 
 public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPoolExecutorDumpable
 {
     private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
 
     private final AtomicInteger _threadsStarted = new AtomicInteger();
     private final AtomicInteger _threadsIdle = new AtomicInteger();
     private final AtomicLong _lastShrink = new AtomicLong();
     private final Object _joinLock = new Object();
     private BlockingQueue<Runnable_jobs;
     private String _name;
     private int _maxIdleTimeMs=60000;
     private int _maxThreads=254;
     private int _minThreads=8;
     private int _maxQueued=-1;
     private int _priority=.;
     private boolean _daemon=false;
     private int _maxStopTime=100;
     private boolean _detailedDump=false;
 
     /* ------------------------------------------------------------------- */
    
Construct
 
     public QueuedThreadPool()
     {
         ="qtp"+super.hashCode();
     }
 
     /* ------------------------------------------------------------------- */
    
Construct
 
     public QueuedThreadPool(int maxThreads)
     {
         this();
         setMaxThreads(maxThreads);
     }
 
     /* ------------------------------------------------------------------- */
    
Construct
 
     public QueuedThreadPool(BlockingQueue<RunnablejobQ)
     {
         this();
         =jobQ;
         .clear();
     }
 
 
     /* ------------------------------------------------------------ */
     @Override
     protected void doStart() throws Exception
     {
         super.doStart();
         .set(0);
 
         if (==null)
        {
            =>0 ?new ArrayBlockingQueue<Runnable>()
                :new BlockingArrayQueue<Runnable>(,);
        }
        int threads=.get();
        while (isRunning() && threads<)
        {
            startThread(threads);
            threads=.get();
        }
    }
    /* ------------------------------------------------------------ */
    @Override
    protected void doStop() throws Exception
    {
        super.doStop();
        long start=System.currentTimeMillis();
        // let jobs complete naturally for a while
        while (.get()>0 && (System.currentTimeMillis()-start) < (/2))
            Thread.sleep(1);
        // kill queued jobs and flush out idle jobs
        .clear();
        Runnable noop = new Runnable(){public void run(){}};
        for  (int i=.get();i-->0;)
            .offer(noop);
        Thread.yield();
        // interrupt remaining threads
        if (.get()>0)
            for (Thread thread : )
                thread.interrupt();
        // wait for remaining threads to die
        while (.get()>0 && (System.currentTimeMillis()-start) < )
        {
            Thread.sleep(1);
        }
        Thread.yield();
        int size=.size();
        if (size>0)
        {
            .warn(size+" threads could not be stopped");
            if (size==1 || .isDebugEnabled())
            {
                for (Thread unstopped : )
                {
                    .info("Couldn't stop "+unstopped);
                    for (StackTraceElement element : unstopped.getStackTrace())
                    {
                        .info(" at "+element);
                    }
                }
            }
        }
        synchronized ()
        {
            .notifyAll();
        }
    }
    /* ------------------------------------------------------------ */
    
Delegated to the named or anonymous Pool.
    public void setDaemon(boolean daemon)
    {
        =daemon;
    }
    /* ------------------------------------------------------------ */
    
Set the maximum thread idle time. Threads that are idle for longer than this period may be stopped. Delegated to the named or anonymous Pool.

Parameters:
maxIdleTimeMs Max idle time in ms.
See also:
getMaxIdleTimeMs()
    public void setMaxIdleTimeMs(int maxIdleTimeMs)
    {
        =maxIdleTimeMs;
    }
    /* ------------------------------------------------------------ */
    

Parameters:
stopTimeMs maximum total time that stop() will wait for threads to die.
    public void setMaxStopTimeMs(int stopTimeMs)
    {
         = stopTimeMs;
    }
    /* ------------------------------------------------------------ */
    
Set the maximum number of threads. Delegated to the named or anonymous Pool.

Parameters:
maxThreads maximum number of threads.
See also:
getMaxThreads()
    public void setMaxThreads(int maxThreads)
    {
        =maxThreads;
        if (>)
            =;
    }
    /* ------------------------------------------------------------ */
    
Set the minimum number of threads. Delegated to the named or anonymous Pool.

Parameters:
minThreads minimum number of threads
See also:
getMinThreads()
    public void setMinThreads(int minThreads)
    {
        =minThreads;
        if (>)
            =;
        int threads=.get();
        while (isStarted() && threads<)
        {
            startThread(threads);
            threads=.get();
        }
    }
    /* ------------------------------------------------------------ */
    

Parameters:
name Name of the BoundedThreadPool to use when naming Threads.
    public void setName(String name)
    {
        if (isRunning())
            throw new IllegalStateException("started");
        name;
    }
    /* ------------------------------------------------------------ */
    
Set the priority of the pool threads.

Parameters:
priority the new thread priority.
    public void setThreadsPriority(int priority)
    {
        =priority;
    }
    /* ------------------------------------------------------------ */
    

Returns:
maximum queue size
    public int getMaxQueued()
    {
        return ;
    }
    /* ------------------------------------------------------------ */
    

Parameters:
max job queue size
    public void setMaxQueued(int max)
    {
        if (isRunning())
            throw new IllegalStateException("started");
        =max;
    }
    /* ------------------------------------------------------------ */
    
Get the maximum thread idle time. Delegated to the named or anonymous Pool.

Returns:
Max idle time in ms.
See also:
setMaxIdleTimeMs(int)
    public int getMaxIdleTimeMs()
    {
        return ;
    }
    /* ------------------------------------------------------------ */
    

Returns:
maximum total time that stop() will wait for threads to die.
    public int getMaxStopTimeMs()
    {
        return ;
    }
    /* ------------------------------------------------------------ */
    
Set the maximum number of threads. Delegated to the named or anonymous Pool.

Returns:
maximum number of threads.
See also:
setMaxThreads(int)
    public int getMaxThreads()
    {
        return ;
    }
    /* ------------------------------------------------------------ */
    
Get the minimum number of threads. Delegated to the named or anonymous Pool.

Returns:
minimum number of threads.
See also:
setMinThreads(int)
    public int getMinThreads()
    {
        return ;
    }
    /* ------------------------------------------------------------ */
    

Returns:
The name of the BoundedThreadPool.
    public String getName()
    {
        return ;
    }
    /* ------------------------------------------------------------ */
    
Get the priority of the pool threads.

Returns:
the priority of the pool threads.
    public int getThreadsPriority()
    {
        return ;
    }
    /* ------------------------------------------------------------ */
    
Delegated to the named or anonymous Pool.
    public boolean isDaemon()
    {
        return ;
    }
    /* ------------------------------------------------------------ */
    public boolean isDetailedDump()
    {
        return ;
    }
    /* ------------------------------------------------------------ */
    public void setDetailedDump(boolean detailedDump)
    {
         = detailedDump;
    }
    /* ------------------------------------------------------------ */
    public boolean dispatch(Runnable job)
    {
        if (isRunning())
        {
            final int jobQ = .size();
            final int idle = getIdleThreads();
            if(.offer(job))
            {
                // If we had no idle threads or the jobQ is greater than the idle threads
                if (idle==0 || jobQ>idle)
                {
                    int threads=.get();
                    if (threads<)
                        startThread(threads);
                }
                return true;
            }
        }
        .debug("Dispatched {} to stopped {}",job,this);
        return false;
    }
    /* ------------------------------------------------------------ */
    public void execute(Runnable job)
    {
        if (!dispatch(job))
            throw new RejectedExecutionException();
    }
    /* ------------------------------------------------------------ */
    
Blocks until the thread pool is stopped.
    public void join() throws InterruptedException
    {
        synchronized ()
        {
            while (isRunning())
                .wait();
        }
        while (isStopping())
            Thread.sleep(1);
    }
    /* ------------------------------------------------------------ */
    

Returns:
The total number of threads currently in the pool
    public int getThreads()
    {
        return .get();
    }
    /* ------------------------------------------------------------ */
    

Returns:
The number of idle threads in the pool
    public int getIdleThreads()
    {
        return .get();
    }
    /* ------------------------------------------------------------ */
    

Returns:
True if the pool is at maxThreads and there are not more idle threads than queued jobs
    public boolean isLowOnThreads()
    {
        return .get()== && .size()>=.get();
    }
    /* ------------------------------------------------------------ */
    private boolean startThread(int threads)
    {
        final int next=threads+1;
        if (!.compareAndSet(threads,next))
            return false;
        boolean started=false;
        try
        {
            Thread thread=newThread();
            thread.setDaemon();
            thread.setPriority();
            thread.setName(+"-"+thread.getId());
            .add(thread);
            thread.start();
            started=true;
        }
        finally
        {
            if (!started)
                .decrementAndGet();
        }
        return started;
    }
    /* ------------------------------------------------------------ */
    protected Thread newThread(Runnable runnable)
    {
        return new Thread(runnable);
    }
    /* ------------------------------------------------------------ */
    public String dump()
    {
        return AggregateLifeCycle.dump(this);
    }
    /* ------------------------------------------------------------ */
    public void dump(Appendable outString indentthrows IOException
    {
        List<Objectdump = new ArrayList<Object>(getMaxThreads());
        for (final Thread thread)
        {
            final StackTraceElement[] trace=thread.getStackTrace();
            boolean inIdleJobPoll=false;
            // trace can be null on early java 6 jvms
            if (trace != null)
            {
                for (StackTraceElement t : trace)
                {
                    if ("idleJobPoll".equals(t.getMethodName()))
                    {
                        inIdleJobPoll = true;
                        break;
                    }
                }
            }
            final boolean idle=inIdleJobPoll;
            if ()
            {
                dump.add(new Dumpable()
                {
                    public void dump(Appendable outString indentthrows IOException
                    {
                        out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
                        if (!idle)
                            AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
                    }
                    public String dump()
                    {
                        return null;
                    }
                });
            }
            else
            {
                dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":""));
            }
        }
        AggregateLifeCycle.dumpObject(out,this);
        AggregateLifeCycle.dump(out,indent,dump);
    }
    /* ------------------------------------------------------------ */
    @Override
    public String toString()
    {
        return +"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(==null?-1:.size())+"}";
    }
    /* ------------------------------------------------------------ */
    private Runnable idleJobPoll() throws InterruptedException
    {
    }
    /* ------------------------------------------------------------ */
    private Runnable _runnable = new Runnable()
    {
        public void run()
        {
            boolean shrink=false;
            try
            {
                Runnable job=.poll();
                while (isRunning())
                {
                    // Job loop
                    while (job!=null && isRunning())
                    {
                        runJob(job);
                        job=.poll();
                    }
                    // Idle loop
                    try
                    {
                        .incrementAndGet();
                        while (isRunning() && job==null)
                        {
                            if (<=0)
                                job=.take();
                            else
                            {
                                // maybe we should shrink?
                                final int size=.get();
                                if (size>)
                                {
                                    long last=.get();
                                    long now=System.currentTimeMillis();
                                    if (last==0 || (now-last)>)
                                    {
                                        shrink=.compareAndSet(last,now) &&
                                        .compareAndSet(size,size-1);
                                        if (shrink)
                                            return;
                                    }
                                }
                                job=idleJobPoll();
                            }
                        }
                    }
                    finally
                    {
                        .decrementAndGet();
                    }
                }
            }
            catch(InterruptedException e)
            {
                .ignore(e);
            }
            catch(Exception e)
            {
                .warn(e);
            }
            finally
            {
                if (!shrink)
                    .decrementAndGet();
                .remove(Thread.currentThread());
            }
        }
    };
    /* ------------------------------------------------------------ */
    

Runs the given job in the current thread.

Subclasses may override to perform pre/post actions before/after the job is run.

Parameters:
job the job to run
    protected void runJob(Runnable job)
    {
        job.run();
    }
    /* ------------------------------------------------------------ */
    

Returns:
the job queue
    protected BlockingQueue<RunnablegetQueue()
    {
        return ;
    }
    /* ------------------------------------------------------------ */
    

Deprecated:
Use interruptThread(long) in preference
Parameters:
id The thread ID to stop.
Returns:
true if the thread was found and stopped.
    @Deprecated
    public boolean stopThread(long id)
    {
        for (Thread thread)
        {
            if (thread.getId()==id)
            {
                thread.stop();
                return true;
            }
        }
        return false;
    }
    /* ------------------------------------------------------------ */
    

Parameters:
id The thread ID to interrupt.
Returns:
true if the thread was found and interrupted.
    public boolean interruptThread(long id)
    {
        for (Thread thread)
        {
            if (thread.getId()==id)
            {
                thread.interrupt();
                return true;
            }
        }
        return false;
    }
    /* ------------------------------------------------------------ */
    

Parameters:
id The thread ID to interrupt.
Returns:
true if the thread was found and interrupted.
    public String dumpThread(long id)
    {
        for (Thread thread)
        {
            if (thread.getId()==id)
            {
                StringBuilder buf = new StringBuilder();
                buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
                for (StackTraceElement element : thread.getStackTrace())
                    buf.append("  at ").append(element.toString()).append('\n');
                return buf.toString();
            }
        }
        return null;
    }
New to GrepCode? Check out our FAQ X