Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
   *
   * Copyright 1997-2008 Sun Microsystems, Inc. All rights reserved.
   *
   * The contents of this file are subject to the terms of either the GNU
   * General Public License Version 2 only ("GPL") or the Common Development
   * and Distribution License("CDDL") (collectively, the "License").  You
   * may not use this file except in compliance with the License. You can obtain
  * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
  * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
  * language governing permissions and limitations under the License.
  *
  * When distributing the software, include this License Header Notice in each
  * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
  * Sun designates this particular file as subject to the "Classpath" exception
  * as provided by Sun in the GPL Version 2 section of the License file that
  * accompanied this code.  If applicable, add the following below the License
  * Header, with the fields enclosed by brackets [] replaced by your own
  * identifying information: "Portions Copyrighted [year]
  * [name of copyright owner]"
  *
  * Contributor(s):
  *
  * If you wish your version of this file to be governed by only the CDDL or
  * only the GPL Version 2, indicate your decision by adding "[Contributor]
  * elects to include this software in this distribution under the [CDDL or GPL
  * Version 2] license."  If you don't indicate a single choice of license, a
  * recipient has the option to distribute your version of this file under
  * either the CDDL, the GPL Version 2 or to extend the choice of license to
  * its licensees as provided above.  However, if you add GPL Version 2 code
  * and therefore, elected the GPL Version 2 license, then the option applies
  * only if the new code is made subject to such option by the copyright
  * holder.
  *
  *
  * This file incorporates work covered by the following copyright and
  * permission notice:
  *
  * Copyright 2004 The Apache Software Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 
 
 package org.apache.tomcat.util.threads;
 
 import java.util.*;
A thread pool that is trying to copy the apache process management.

Author(s):
Gal Shachor
 
 public class ThreadPool  {
 
     private static Log log = LogFactory.getLog(ThreadPool.class);
 
     private static StringManager sm =
         StringManager.getManager("org.apache.tomcat.util.threads.res");
 
     private static boolean logfull=true;
 
     /*
      * Default values ...
      */
     public static final int MAX_THREADS = 200;
     public static final int MAX_THREADS_MIN = 10;
     public static final int MAX_SPARE_THREADS = 50;
     public static final int MIN_SPARE_THREADS = 4;
     public static final int WORK_WAIT_TIMEOUT = 60*1000;
 
     /*
      * Where the threads are held.
      */
     protected ControlRunnable[] pool = null;
 
     /*
      * A monitor thread that monitors the pool for idel threads.
      */
     protected MonitorRunnable monitor;
 
 
     /*
      * Max number of threads that you can open in the pool.
     */
    protected int maxThreads;
    /*
     * Min number of idel threads that you can leave in the pool.
     */
    protected int minSpareThreads;
    /*
     * Max number of idel threads that you can leave in the pool.
     */
    protected int maxSpareThreads;
    /*
     * Number of threads in the pool.
     */
    protected int currentThreadCount;
    /*
     * Number of busy threads in the pool.
     */
    protected int currentThreadsBusy;
    /*
     * Flag that the pool should terminate all the threads and stop.
     */
    protected boolean stopThePool;
    /* Flag to control if the main thread is 'daemon' */
    protected boolean isDaemon=true;

    
The threads that are part of the pool. Key is Thread, value is the ControlRunnable
    protected Hashtable threads=new Hashtable();
    protected Vector listeners=new Vector();

    
Name of the threadpool
    protected String name = "TP";

    
Sequence.
    protected int sequence = 1;


    
Constructor.
    
    public ThreadPool() {
         = ;
         = 0;
         = 0;
         = false;
    }


    
Create a ThreadPool instance.

Parameters:
jmx True if you want a pool with JMX support. A regular pool will be returned if JMX or the modeler are not available.
Returns:
ThreadPool instance. If JMX support is requested, you need to call register() in order to set a name.
    public static ThreadPool createThreadPool(boolean jmx) {
//        if( jmx ) {
//            try {
//                Class.forName( "com.sun.org.apache.commons.modeler.Registry");
//                Class tpc=Class.forName( "org.apache.tomcat.util.threads.ThreadPoolMX");
//                ThreadPool res=(ThreadPool)tpc.newInstance();
//                return res;
//            } catch( Exception ex ) {
//            }
//        }
        return new ThreadPool();
    }
    public synchronized void start() {
          = 0;
          = 0;
        adjustLimits();
         = new ControlRunnable[];
         = new MonitorRunnable(this);
        if ( < ) {
             = new MonitorRunnable(this);
        }
    }
    public MonitorRunnable getMonitor() {
        return ;
    }
    public void setMaxThreads(int maxThreads) {
        this. = maxThreads;
    }
    public int getMaxThreads() {
        return ;
    }
    public void setMinSpareThreads(int minSpareThreads) {
        this. = minSpareThreads;
    }
    public int getMinSpareThreads() {
        return ;
    }
    public void setMaxSpareThreads(int maxSpareThreads) {
        this. = maxSpareThreads;
    }
    public int getMaxSpareThreads() {
        return ;
    }
    public int getCurrentThreadCount() {
        return ;
    }
    public int getCurrentThreadsBusy() {
        return ;
    }
    public boolean isDaemon() {
        return ;
    }
    public static int getDebug() {
        return 0;
    }

    
The default is true - the created threads will be in daemon mode. If set to false, the control thread will not be daemon - and will keep the process alive.
    public void setDaemonboolean b ) {
        =b;
    }
    
    public boolean getDaemon() {
        return ;
    }
    public void setName(String name) {
        this. = name;
    }
    public String getName() {
        return ;
    }
    public int getSequence() {
        return ++;
    }
    public void addThreadThread tControlRunnable cr ) {
        .puttcr );
        forint i=0; i<.size(); i++ ) {
            tpl.threadStart(thist);
        }
    }
    public void removeThreadThread t ) {
        .remove(t);
        forint i=0; i<.size(); i++ ) {
            tpl.threadEnd(thist);
        }
    }
    public void addThreadPoolListenerThreadPoolListener tpl ) {
        .addElementtpl );
    }
    public Enumeration getThreads(){
        return .keys();
    }
    public void run(Runnable r) {
        ControlRunnable c = findControlRunnable();
        c.runIt(r);
    }    
    
    //
    // You may wonder what you see here ... basically I am trying
    // to maintain a stack of threads. This way locality in time
    // is kept and there is a better chance to find residues of the
    // thread in memory next time it runs.
    //

    
Executes a given Runnable on a thread in the pool, block if needed.
    public void runIt(ThreadPoolRunnable r) {
        if(null == r) {
            throw new NullPointerException();
        }
        ControlRunnable c = findControlRunnable();
        c.runIt(r);
    }
        ControlRunnable c=null;
        if (  ) {
            throw new IllegalStateException();
        }
        // Obtain a free thread from the pool.
        synchronized(this) {
            while ( == ) {
                 // All threads are busy
                if ( < ) {
                    // Not all threads were open,
                    // Open new threads up to the max number of idel threads
                    int toOpen =  + ;
                    openThreads(toOpen);
                } else {
                    logFull();
                    // Wait for a thread to become idel.
                    try {
                        this.wait();
                    }
                    // was just catch Throwable -- but no other
                    // exceptions can be thrown by wait, right?
                    // So we catch and ignore this one, since
                    // it'll never actually happen, since nowhere
                    // do we say pool.interrupt().
                    catch(InterruptedException e) {
                        .error("Unexpected exception"e);
                    }
		    if.isDebugEnabled() ) {
			.debug("Finished waiting: CTC="+ +
				  ", CTB=" + );
                    }
                    // Pool was stopped. Get away of the pool.
                    if) {
                        break;
                    }
                }
            }
            // Pool was stopped. Get away of the pool.
            if(0 ==  || ) {
                throw new IllegalStateException();
            }
                    
            // If we are here it means that there is a free thread. Take it.
            int pos =  -  - 1;
            c = [pos];
            [pos] = null;
            ++;
        }
        return c;
    }
    private static void logFull(Log loghelperint currentThreadCount,
                                int maxThreads) {
	if ) {
            .error(.getString("threadpool.busy",
                                   Integer.valueOf(currentThreadCount),
                                   Integer.valueOf(maxThreads)));
            =false;
        } else if.isDebugEnabled() ) {
            .debug("All threads are busy " + currentThreadCount + " " +
                      maxThreads );
        }
    }

    
Stop the thread pool
    public synchronized void shutdown() {
        if(!) {
             = true;
            .terminate();
             = null;
            for(int i = 0 ; i <  - i++) {
                try {
                    [i].terminate();
                } catch(Throwable t) {
                    /*
		     * Do nothing... The show must go on, we are shutting
		     * down the pool and nothing should stop that.
		     */
		    .error("Ignored exception while shutting down thread pool"t);
                }
            }
             =  = 0;
             = null;
            notifyAll();
        }
    }

    
Called by the monitor thread to harvest idle threads.
    protected synchronized void checkSpareControllers() {
        if() {
            return;
        }
            int toFree =  -
                          -
                         ;
            for(int i = 0 ; i < toFree ; i++) {
                ControlRunnable c = [ -  - 1];
                c.terminate();
                [ -  - 1] = null;
                 --;
            }
        }
    }

    
Returns the thread to the pool. Called by threads as they are becoming idel.
    protected synchronized void returnController(ControlRunnable c) {
        if(0 ==  || ) {
            c.terminate();
            return;
        }
        // atomic
        --;
        [ -  - 1] = c;
        notify();
    }

    
Inform the pool that the specific thread finish. Called by the ControlRunnable.run() when the runnable throws an exception.
    protected synchronized void notifyThreadEnd(ControlRunnable c) {
        --;
         --;
        notify();
    }
    /*
     * Checks for problematic configuration and fix it.
     * The fix provides reasonable settings for a single CPU
     * with medium load.
     */
    protected void adjustLimits() {
        if( <= 0) {
             = ;
        } else if ( < ) {
            .warn(.getString("threadpool.max_threads_too_low",
                                  Integer.valueOf(),
                                  Integer.valueOf()));
             = ;
        }
        if( >= ) {
             = ;
        }
        if( <= 0) {
            if(1 == ) {
                 = 1;
            } else {
                 = /2;
            }
        }
        if( >  ) {
             =  ;
        }
        if( <= 0) {
            if(1 == ) {
                 = 1;
            } else {
                 = /2;
            }
        }
    }

    
Create missing threads.

Parameters:
toOpen Total number of threads we'll have open
    protected void openThreads(int toOpen) {
        if(toOpen > ) {
            toOpen = ;
        }
        for(int i =  ; i < toOpen ; i++) {
            [i - ] = new ControlRunnable(this);
        }
         = toOpen;
    }

    

Deprecated:
    void logString s ) {
	//loghelper.flush();
    }
    
    
Periodically execute an action - cleanup in this case
    public static class MonitorRunnable implements Runnable {
        ThreadPool p;
        Thread     t;
        int interval=;
        boolean    shouldTerminate;
        MonitorRunnable(ThreadPool p) {
            this.=p;
            this.start();
        }
        public void start() {
             = false;
             = new Thread(this);
            .setDaemon(.getDaemon() );
	    .setName(.getName() + "-Monitor");
            .start();
        }
        public void setInterval(int i ) {
            this.=i;
        }
        public void run() {
            while(true) {
                try {
                    // Sleep for a while.
                    synchronized(this) {
                        this.wait();
                    }
                    // Check if should terminate.
                    // termination happens when the pool is shutting down.
                    if() {
                        break;
                    }
                    // Harvest idle threads.
                    .checkSpareControllers();
                } catch(Throwable t) {
		    ..error("Unexpected exception"t);
                }
            }
        }
        public void stop() {
            this.terminate();
        }

Stop the monitor
        public synchronized void terminate() {
             = true;
            this.notify();
        }
    }

    
A Thread object that executes various actions ( ThreadPoolRunnable ) under control of ThreadPool
    public static class ControlRunnable implements Runnable {
        
ThreadPool where this thread will be returned
        private ThreadPool p;

The thread that executes the actions
        private ThreadWithAttributes     t;

The method that is executed in this thread
        
        private ThreadPoolRunnable   toRun;
        private Runnable toRunRunnable;

Stop this thread
	private boolean    shouldTerminate;

Activate the execution of the action
        private boolean    shouldRun;

Per thread data - can be used only if all actions are of the same type. A better mechanism is possible ( that would allow association of thread data with action type ), but right now it's enough.
	private boolean noThData;

Start a new thread, with no method in it
        ControlRunnable(ThreadPool p) {
             = null;
             = false;
             = false;
            this. = p;
             = new ThreadWithAttributes(pthis);
            .setDaemon(true);
            .setName(p.getName() + "-Processor" + p.getSequence());
            p.addThreadthis );
	    =true;
            .start();
        }
        public void run() {
            boolean _shouldRun = false;
            boolean _shouldTerminate = false
            ThreadPoolRunnable _toRun = null;
          try {
            while(true) {
                try {
                    /* Wait for work. */
                    synchronized(this) {
                        while (! && !) {
                            this.wait();
                        }
                        _shouldRun = ;
                        _shouldTerminate = ;
                        _toRun = ;
                    }
                    if_shouldTerminate ) {
                            if..isDebugEnabled())
                                ..debug"Terminate");
                            break;
                    }
                    /* Check if should execute a runnable.  */
                    try {
                        if() {
                            if_toRun != null ) {
                                Object thData[]=_toRun.getInitData();
                                .setThreadData(thData);
                                if(..isDebugEnabled())
                                    ..debug"Getting new thread data");
                            }
                             = false;
                        }
                        if(_shouldRun) {
			    if_toRun != null ) { 
                                _toRun.runIt(.getThreadData());
                            } else if != null ) { 
                                .run();
                            } else {
                                if..isDebugEnabled())
                                    ..debug"No toRun ???");
                            }
                        }
                    } catch(Throwable t) {
			..error(.getString("threadpool.thread_error",
                                                 t.toString()));
                       /*
                        * The runnable throw an exception (can be even a ThreadDeath),
                        * signalling that the thread die.
                        *
			* The meaning is that we should release the thread from
			* the pool.
			*/
                         = true;
                         = false;
                        .notifyThreadEnd(this);
                    } finally {
                        if(_shouldRun) {
                             = false;
                            /*
			     * Notify the pool that the thread is now idle.
                             */
                            .returnController(this);
                        }
                    }
                    /*
		     * Check if should terminate.
		     * termination happens when the pool is shutting down.
		     */
                    if(_shouldTerminate) {
                        break;
                    }
                } catch(InterruptedException ie) { /* for the wait operation */
		    // can never happen, since we don't call interrupt
    		    ..error("Unexpected exception"ie);
                }
            }
          } finally {
              .removeThread(Thread.currentThread());
          }
        }

        
Run a task

Parameters:
toRun
        public synchronized void runIt(Runnable toRun) {
	    this. = toRun;
	    // Do not re-init, the whole idea is to run init only once per
	    // thread - the pool is supposed to run a single task, that is
	    // initialized once.
            // noThData = true;
             = true;
            this.notify();
        }

        
Run a task

Parameters:
toRun
        public synchronized void runIt(ThreadPoolRunnable toRun) {
	    this. = toRun;
	    // Do not re-init, the whole idea is to run init only once per
	    // thread - the pool is supposed to run a single task, that is
	    // initialized once.
            // noThData = true;
             = true;
            this.notify();
        }
        public void stop() {
            this.terminate();
        }
        public void kill() {
            .stop();
        }
        public synchronized void terminate() {
             = true;
            this.notify();
        }
    }

    
Debug display of the stage of each thread. The return is html style, for display in the console ( it can be easily parsed too )

Returns:
    public String threadStatusString() {
        StringBuffer sb=new StringBuffer();
        Iterator it=.keySet().iterator();
        sb.append("<ul>");
        whileit.hasNext()) {
            sb.append("<li>");
            ThreadWithAttributes twa=(ThreadWithAttributes)
                    it.next();
            sb.append(twa.getCurrentStage(this) ).append(" ");
            sb.appendtwa.getParam(this));
            sb.append"</li>\n");
        }
        sb.append("</ul>");
        return sb.toString();
    }

    
Return an array with the status of each thread. The status indicates the current request processing stage ( for tomcat ) or whatever the thread is doing ( if the application using TP provide this info )

Returns:
    public String[] getThreadStatus() {
        String status[]=new String.size()];
        Iterator it=.keySet().iterator();
        forint i=0; ( i<status.length && it.hasNext()); i++ ) {
            ThreadWithAttributes twa=(ThreadWithAttributes)
                    it.next();
            status[i]=twa.getCurrentStage(this);
        }
        return status;
    }

    
Return an array with the current "param" ( XXX better name ? ) of each thread. This is typically the last request.

Returns:
    public String[] getThreadParam() {
        String status[]=new String.size()];
        Iterator it=.keySet().iterator();
        forint i=0; ( i<status.length && it.hasNext()); i++ ) {
            ThreadWithAttributes twa=(ThreadWithAttributes)
                    it.next();
            Object o=twa.getParam(this);
            status[i]=(o==null)? null : o.toString();
        }
        return status;
    }
    
    
Interface to allow applications to be notified when a threads are created and stopped.
    public static interface ThreadPoolListener {
        public void threadStartThreadPool tpThread t);
        public void threadEndThreadPool tpThread t);
    }
New to GrepCode? Check out our FAQ X