Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   /*
    * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
    * 
    * Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
    * 
    * The contents of this file are subject to the terms of either the GNU
    * General Public License Version 2 only ("GPL") or the Common Development
    * and Distribution License("CDDL") (collectively, the "License").  You
    * may not use this file except in compliance with the License. You can obtain
   * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
   * or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
   * language governing permissions and limitations under the License.
   * 
   * When distributing the software, include this License Header Notice in each
   * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
   * Sun designates this particular file as subject to the "Classpath" exception
   * as provided by Sun in the GPL Version 2 section of the License file that
   * accompanied this code.  If applicable, add the following below the License
   * Header, with the fields enclosed by brackets [] replaced by your own
   * identifying information: "Portions Copyrighted [year]
   * [name of copyright owner]"
   * 
   * Contributor(s):
   * 
   * If you wish your version of this file to be governed by only the CDDL or
   * only the GPL Version 2, indicate your decision by adding "[Contributor]
   * elects to include this software in this distribution under the [CDDL or GPL
   * Version 2] license."  If you don't indicate a single choice of license, a
   * recipient has the option to distribute your version of this file under
   * either the CDDL, the GPL Version 2 or to extend the choice of license to
   * its licensees as provided above.  However, if you add GPL Version 2 code
   * and therefore, elected the GPL Version 2 license, then the option applies
   * only if the new code is made subject to such option by the copyright
   * holder.
   */
  package com.sun.enterprise.web.connector.grizzly;
  
  import java.net.Socket;
  import java.util.Set;
  
  
  
  import java.util.List;
  import java.util.Queue;
This class implement an NIO socket HTTP Listener. This class supports three stagegy: Mode Blocking: This mode uses NIO blocking mode, and doesn't uses any of the java.nio.* classes. Mode Non-Blocking: This mode uses NIO non blocking mode and read the entire request stream before processing the request. The stragegy used is to find the content-lenght header and buffer bytes until the end of the stream is read.

Author(s):
Jean-Francois Arcand
  
  public class SelectorThread extends Thread implements MBeanRegistration{
              
      public final static String SERVER_NAME = 
              System.getProperty("product.name") != null 
                  ? System.getProperty("product.name") : "grizzly";
      
          
      private final Object[] lock = new Object[0];
  
      
 
     protected InetAddress inet;
     protected int port;
 
     protected ServerSocket serverSocket;
    
    
The ServerSocketChannel used in blocking mode.
 
     
     protected boolean initialized = false;    
     protected volatile boolean running = false;    
     // ----------------------------------------------------- JMX Support ---/
     
     
     protected String domain;
     protected ObjectName oname;
     private ObjectName keepAliveMbeanName;
     protected MBeanServer mserver;
 
 
     // ------------------------------------------------------Socket setting --/
 
     protected boolean tcpNoDelay=false;
     
     
     protected int linger=100;
     
     
     protected int socketTimeout=-1;
     
     
     
     
     protected boolean oOBInline = false;
     // ------------------------------------------------------ Compression ---/
 

    
Compression value.
 
     protected String compression = "off";
     protected String noCompressionUserAgents = null;
     protected String restrictedUserAgents = null;
     protected String compressableMimeTypes = "text/html,text/xml,text/plain";
     private volatile String[] parsedCompressableMimeTypes = null;
     private volatile int parsedComressableMimeTypesHash = -1;
     protected int compressionMinSize    = 2048;
        
     // ------------------------------------------------------ Properties----/
     
    
    
Is the socket reuse socket enabled.
 
     private boolean reuseAddress = true;


    
Buffer the response until the buffer is full.
 
     protected boolean bufferResponse = false;
    
    
    
Default HTTP header buffer size.
 
     protected int maxHttpHeaderSize = .;


    
Number of polled Read*Task instance.
 
     protected int minReadQueueLength = 10;


    
Number of polled ProcessorTask instance.
 
     protected int minProcessorQueueLength = 10;
     
     
     protected int maxPostSize = 2 * 1024 * 1024;

    
The maximum time a connection can stay open holding a WorkerThread. Default is 5 minutes like Apache.
 
     private int transactionTimeout = .;

    
The Selector used by the connector.
 
     protected Selector selector;


    
Associated adapter.
 
     protected Adapter adapter = null;

    
    
The queue shared by this thread and code>ReadTask
  
     protected Pipeline readPipeline;
    

    
The queue shared by this thread and the code>ProcessorTask.
  
     protected Pipeline processorPipeline;
    
  
    
Placeholder for Pipeline statistic.
 
     protected PipelineStatistic pipelineStat;
    
    
The default Pipeline used.
 
     protected String pipelineClassName = 
             LinkedListPipeline.class.getName();
    
    
Maximum number of WorkerThread
 
     protected int maxProcessorWorkerThreads = 5; // By default
     
    
    
Maximum number of ReadWorkerThread
 
     protected int maxReadWorkerThreads = -1; // By default
 
    
    
Minimum numbers of WorkerThread created
 
     protected int minWorkerThreads = 5;
    

    
Maximum number of Selectors in the SelectorFactory pool
 
     protected int maxSelectors = -1; // By default
 
    
    
Minimum numbers of WorkerThread before creating new thread. <implementation-note> Not used in 9.x </implementation-note>
 
     protected int minSpareThreads = 2;

    
    
The number used when increamenting the Pipeline thread pool.
 
     protected int threadsIncrement = 1;
    
    
    
The timeout used by the thread when processing a request.
 
     protected int threadsTimeout = .;

    
    
Is the ByteBuffer used by the ReadTask use direct ByteBuffer or not.
 
     protected boolean useDirectByteBuffer = false;
    
  
    
Monitoring object used to store information.
 
     protected RequestGroupInfo globalRequestProcessornew RequestGroupInfo();
    
    
    
Keep-alive stats
 
     private KeepAliveStats keepAliveStats = new KeepAliveStats();


    
If true, display the NIO configuration information.
 
     protected boolean displayConfiguration = false;
    
    
    
Is monitoring already started.
 
     protected boolean isMonitoringEnabled = false;
    

    
The current number of simulatenous connection.
 
     protected int currentConnectionNumber;


    
Is this Selector currently in Wating mode?
 
     protected volatile boolean isWaiting = false;
    

    
The input request buffer size.
 
     protected int requestBufferSize = .;
    
    
    
Create view ByteBuffer from another ByteBuffer
 
     protected boolean useByteBufferView = false;
     
 
     /*
      * Number of seconds before idle keep-alive connections expire
      */
     protected int keepAliveTimeoutInSeconds = .;

    
    
Number of seconds before idle keep-alive connections expire
 
     private int kaTimeout = . * 1000;
    
    
    
Recycle the Task after running them
 
     protected boolean recycleTasks = .;
    
    
    
The Selector timeout value. By default, it is set to 60000 miliseconds (as in the j2se 1.5 ORB).
 
     protected static int selectorTimeout = 1000;


    
Maximum pending connection before refusing requests.
 
     protected int maxQueueSizeInBytes = .;


    
The Algorithm used to predict the end of the NIO stream
 
     protected Class algorithmClass;
    
    
    
The Algorithm used to parse the NIO stream.
 
     protected String algorithmClassName = ;
    
    
    
The default NIO stream algorithm.
 
     public final static String DEFAULT_ALGORITHM =
             NoParsingAlgorithm.class.getName();

    
    
Server socket backlog.
 
     protected int ssBackLog = 4096;
    
    
    
Next time the exprireKeys() will delete keys.
     
     private long nextKeysExpiration = 0;
    
    
    
The default response-type
 
The forced request-type
 
     protected String forcedRequestType = .;
    
    
    
The root folder where application are deployed
 
     protected static String rootFolder = "";


    
Name of the property file that holds grizzly version number
 
     private static final String VERSION_FILE =
             "/com/sun/enterprise/web/connector/grizzly/version.properties";
     
     
     // ----------------------------------------------------- Collections --//
     
    
    
List of SelectionKey event to register next time the Selector wakeups. This is needed since there a bug in j2se 1.4.x that prevent registering selector event if the call is done on another thread.
 
     private Queue<SelectionKeykeysToEnable =
         new ConcurrentQueue<SelectionKey>("SelectorThread.keysToEnable");
          
     
     // ---------------------------------------------------- Object pools --//
 

    
ConcurrentLinkedQueue used as an object pool. If the list becomes empty, new ProcessorTask will be automatically added to the list.
 
     protected Queue<ProcessorTaskprocessorTasks =
         new ConcurrentQueue<ProcessorTask>("SelectorThread.processorTasks");
              
    
    
ConcurrentLinkedQueue used as an object pool. If the list becomes empty, new ReadTask will be automatically added to the list.
 
     protected Queue<ReadTaskreadTasks =
         new ConcurrentQueue<ReadTask>("SelectorThread.readTasks");

    
    
List of active ProcessorTask.
 
     protected Queue<ProcessorTaskactiveProcessorTasks =
         new ConcurrentQueue<ProcessorTask>("SelectorThread.activeProcessorTasks");
     
     // -----------------------------------------  Multi-Selector supports --//
 
    
The number of SelectorReadThread
 
     protected int multiSelectorsCount = 0;

    
    
The Selector used to register OP_READ
     
     protected MultiSelectorThread[] readThreads;
    
    
    
The current readThreads used to process OP_READ.
 
     int curReadThread;

    
    
The logger used by the grizzly classes.
 
     protected static Logger logger = Logger.getLogger("GRIZZLY");
    
    
    
Flag to disable setting a different time-out on uploads.
 
     protected boolean disableUploadTimeout = true;    
    
    
    
Maximum timeout on uploads. 5 minutes as in Apache HTTPD server.
 
     protected int uploadTimeout = 30000;
             
             
     // -----------------------------------------  Keep-Alive subsystems --//
     
     
    
Keep-Alive subsystem. If a client opens a socket but never close it, the SelectionKey will stay forever in the Selector keys, and this will eventualy produce a memory leak.
 
     
     
     // ------------------------------------------------- FileCache support --//
    
    
    
The FileCacheFactory associated with this Selector
  
     protected FileCacheFactory fileCacheFactory;
    
        
Timeout before remove the static resource from the cache.
 
     protected int secondsMaxAge = -1;
    
    
    
The maximum entries in the fileCache
 
     protected int maxCacheEntries = 1024;
    
 
    
The maximum size of a cached resources.
 
     protected long minEntrySize = 2048;
            
               
    
The maximum size of a cached resources.
 
     protected long maxEntrySize = 537600;
    
    
    
The maximum cached bytes
 
     protected long maxLargeFileCacheSize = 10485760;
 
    
    
The maximum cached bytes
 
     protected long maxSmallFileCacheSize = 1048576;
    
    
    
Is the FileCache enabled.
 
     protected boolean isFileCacheEnabled = true;
    
    
    
Is the large FileCache enabled.
 
     protected boolean isLargeFileCacheEnabled = true;    
     
     // --------------------------------------------- Asynch supports -----//
     
    
Is asynchronous mode enabled?
 
     protected boolean asyncExecution = false;
    
    
    
When the asynchronous mode is enabled, the execution of this object will be delegated to the AsyncHandler
 
     protected AsyncHandler asyncHandler;
    
    
    
Is the DEFAULT_ALGORITHM used.
 
     protected static boolean defaultAlgorithmInstalled = true;
    
    
    
The JMX Management class.
 
     private Management jmxManagement = null;
    
    
    
The Classloader used to load instance of StreamAlgorithm.
 
     private ClassLoader classLoader;
    
    
    
Grizzly own debug flag.
 
     protected boolean enableNioLogging = false;
    
    
    
Static list of current instance of this class.
 
     private final static ConcurrentHashMap<Integer,SelectorThread
             selectorThreads = new ConcurrentHashMap<Integer,SelectorThread>();
    
    
    
Banned SelectionKey registration.
 
     protected Queue<SelectionKeybannedKeys =
         new ConcurrentQueue<SelectionKey>("SelectorThread.bannedKeys");
 
     // Workaround for Issue 555
     private long lastSpinTimestamp;
     private int emptySpinCounter;
    
The threshold for detecting selector.select spin on linux, used for enabling workaround to prevent server from hanging.
 
     private final static int spinRateTreshold = 2000;

    
Enable workaround Linux spinning Selector
 
     static final boolean isLinux =
             System.getProperty("os.name").equalsIgnoreCase("linux") &&
                 !System.getProperty("java.version").startsWith("1.7");
     
     // ---------------------------------------------------- Constructor --//
     
    
    
Create the Selector object. Each instance of this class will listen to a specific port.
 
     public SelectorThread(){
     }
     
     // ------------------------------------------------------ Selector hook --/
     
    
    
Return the SelectorThread which listen on port, or null if there is no SelectorThread.
 
     public final static SelectorThread getSelector(int port){
         return .get(port);
     }
    
    
    
Return an Enumeration of the active SelectorThreads
 
     public final static Enumeration<SelectorThreadgetSelectors(){
         return .elements();
     }
     
     
     // ----------------------------------------------------------------------/
     
   
Enable all registered interestOps. Due a a NIO bug, all interestOps invokation needs to occurs on the same thread as the selector thread.
 
     public void enableSelectionKeys(){
         SelectionKey selectionKey;
         int size = .size();
         long currentTime = 0L;
         if (size > 0){
             currentTime = (Long)System.currentTimeMillis();
         }
 
         for (int i=0; i < sizei++) {
             selectionKey = .poll();
             
             // If the SelectionKey is used for continuation, do not allow
             // the key to be registered.
             if ( && !.isEmpty() 
                     && .remove(selectionKey)){
                 continue;
             }
             
             if (!selectionKey.isValid()){
                 cancelKey(selectionKey); 
                 continue;
             }
             
             if (!.trap(selectionKey)){
                
Workaround issue 3615 This code is an ugly workaround as we cannot change the current API to handle this scenario.
 
                 Object attachment = selectionKey.attachment();
                 selectionKey.attach(-1L);
                 
                 if (.expireKey(selectionKey)){                 
                     cancelKey(selectionKey);
                     continue;
                 }
                 selectionKey.attach(attachment);
             }
 
             selectionKey.interestOps(
                     selectionKey.interestOps() | .);
 
             if (selectionKey.attachment() == null)
                 selectionKey.attach(currentTime);
         } 
     } 
    
    
    
Add a SelectionKey to the banned list of SelectionKeys. A SelectionKey is banned when new registration aren't allowed on the Selector.
 
     public void addBannedSelectionKey(SelectionKey key){
         .offer(key);
     }
    
    
    
Register a SelectionKey to this Selector running of this thread.
 
     public void registerKey(SelectionKey key){
         if (key == nullreturn;
         
         if (.dropConnection()) {
             cancelKey(key);
             return;
         }
         
         if (){
             key.attach(null);
         }
         
         if (){
             .log(.,
                     "Registering SocketChannel for keep alive " +  
                     key.channel());
         }         
         // add SelectionKey & Op to list of Ops to enable
         .add(key);
         // tell the Selector Thread there's some ops to enable
         .wakeup();
         // wakeup() will force the SelectorThread to bail out
         // of select() to process your registered request
     } 
 
    // -------------------------------------------------------------- Init // 
 

    
initialized the endpoint by creating the ServerScoketChannel and by initializing the server socket.
 
     public void initEndpoint() throws IOExceptionInstantiationException {
         SelectorThreadConfig.configure(this);
         
         initFileCacheFactory();
         initAlgorithm();
         initPipeline();
         initMonitoringLevel();
         
         setName("SelectorThread-" + );
         
         try{
             // Create the socket listener
              = ServerSocketChannel.open();
              = Selector.open();
 
              = .socket();
             .setReuseAddress();
             if (  == null)
                 .bind(new InetSocketAddress(),);
             else
                 .bind(new InetSocketAddress(,),);
 
             .configureBlocking(false);
         } catch (SocketException ex){
             throw new BindException(ex.getMessage() + ": " + );
         }
         
         
         if (  > 1 ){
              = new MultiSelectorThread[];
             initMultiSelectors();
         } 
         
         if ( == -1) {
              = ;
         }
         
 
          = true;           
         .log(.,"Initializing Grizzly Non-Blocking Mode");                     
     }
     
    
    
Create a new Pipeline instance using the pipelineClassName value.
 
     protected Pipeline newPipeline(int maxThreads,
                                    int minThreads,
                                    String name
                                    int port,
                                    int priority){
         
         Class className = null;                               
         Pipeline pipeline = null;                               
         try{           
             if (  == null ){
                 className = Class.forName();
             } else {
                 className = .loadClass();
             }
             pipeline = (Pipeline)className.newInstance();
         } catch (ClassNotFoundException ex){
             .log(.,
                        "Unable to load Pipeline: " + );
             pipeline = new LinkedListPipeline();
         } catch (InstantiationException ex){
             .log(.,
                        "Unable to instantiate Pipeline: "
                        + );
             pipeline = new LinkedListPipeline();
         } catch (IllegalAccessException ex){
             .log(.,
                        "Unable to instantiate Pipeline: "
                        + );
             pipeline = new LinkedListPipeline();
         }
         
         if (.isLoggable(.)){
             .log(.,
                        "http-listener " + port + " uses pipeline: "
                        + pipeline.getClass().getName());
         }
         
         pipeline.setMaxThreads(maxThreads);
         pipeline.setMinThreads(minThreads);    
         pipeline.setName(name);
         pipeline.setPort(port);
         pipeline.setPriority(priority);
         pipeline.setQueueSizeInBytes();
         pipeline.setThreadsIncrement();
         pipeline.setThreadsTimeout();
         
         return pipeline;
     }
    
    
    
Initialize the fileCacheFactory associated with this instance
 
     protected void initFileCacheFactory(){        
         if (){
              = false;
              = false;
         }
         
          = FileCacheFactory.getFactory();
         .setMaxSmallCacheSize();         
     }
       
    
    
Injects PipelineStatistic into every Pipeline, for monitoring purposes.
 
     protected void enablePipelineStats(){
         .start();
 
         .setPipelineStatistic();       
 
         if ( != null){
         }
     }
    

    
Removes PipelineStatistic from every Pipeline, when monitoring has been turned off.
 
     protected void disablePipelineStats(){
         .stop();
         
         .setProcessorPipeline(null);
 
         if ( != null){
             .setKeepAliveStats(null);
         }
 
     }

    
    
Load using reflection the Algorithm class.
 
     protected void initAlgorithm(){
         try{    
             if ( == null){
                  = Class.forName();
             } else {
                  = .loadClass();
             }
             .log(.,
                        "Using Algorithm: " + );   
         } catch (ClassNotFoundException ex){
             .log(.,
                        "Unable to load Algorithm: " + );        
         }  finally {
             if (  == null ){
                  = NoParsingAlgorithm.class;
             }
         }
 
          = 
                 .equals() ? true:false;
     }
    
    
    
Initialize the keep-alive mechanism.
 
     protected void initKeepAlivePipeline(){
          = new KeepAlivePipeline();
         
         .setPort();
 
     }
    
    
    
Init the Pipelines used by the WorkerThreads.
 
     protected void initPipeline(){     
                 
         .put(,this);
         
         initKeepAlivePipeline();
         
                                         "http",
                                         ,.);  
         .initPipeline();
 
         if (  == 0){
              = -1;
             .log(.,
                        "http-listener " +  + 
                        " is security-enabled and needs at least 2 threads");
         }
         
         // Only creates the pipeline if the max > 0, and the async mechanism
         // must not be enabled.
         if (  > 0 && !){                        
              = newPipeline(
                                        "read"
                                        ,.);
             .initPipeline();
         } else {
              = ( == 0 ? null:);
         }
     }
    
    
    
Create a pool of ReadTask
    protected void initReadTask(int size){         
        ReadTask task;
        for (int i=0; i < sizei++){
            task = newReadTask();             
            .offer(task);    
        }
    }
    
    
    
Return a new ReadTask instance
    protected ReadTask newReadTask(){
        StreamAlgorithm streamAlgorithm = null;
        
        try{
            streamAlgorithm = (StreamAlgorithm).newInstance();
        } catch (InstantiationException ex){
            .log(.,
                       "Unable to instantiate Algorithm: ");
        } catch (IllegalAccessException ex){
            .log(.,
                       "Unable to instantiate Algorithm: " + );
        } finally {
            if ( streamAlgorithm == null)
                streamAlgorithm = new NoParsingAlgorithm();
        }       
        streamAlgorithm.setPort();
        
        ReadTask task = null;
        
        
For performance reason, we need to avoid calling newInstance() when the default configuration is used.
        if ( ! ) {
            try{
                task = (ReadTask)streamAlgorithm.getReadTask(this).newInstance();
            } catch (InstantiationException ex){
                .log(.,
                           "Unable to instantiate Algorithm: "
                        + );
            } catch (IllegalAccessException ex){
                .log(.,
                           "Unable to instantiate Algorithm: " 
                        + );
            } finally {
                if ( task == null)
                    task = new DefaultReadTask();
            }  
        } else if (  > 0 ||  ){
            task = new AsyncReadTask();
        } else {
            task = new DefaultReadTask();
        }
        task.setSelectorThread(this);     
        task.setPipeline();  
        task.setRecycle();
        task.initialize(streamAlgorithm,);
       
        return task;
    }


    
Initialize SelectorReadThread used to process OP_READ operations.
    protected void initMultiSelectors() throws IOException
                                                 InstantiationException {
        for (int i = 0; i < .i++) {
            [i] = new SelectorReadThread();
            ((SelectorReadThread)[i]). = i;
        }
         = 0;
    }
    
    
    protected void configureReadThread(SelectorThread multiSelector)
            throws IOExceptionInstantiationException {
        multiSelector.setMaxThreads();
        multiSelector.setBufferSize();
        multiSelector.setMaxKeepAliveRequests();
        multiSelector
        multiSelector.maxQueueSizeInBytes = ;
        multiSelector.fileCacheFactory = ;     
        multiSelector.maxReadWorkerThreads = ;
        multiSelector.defaultResponseType = ;
        multiSelector.forcedRequestType = ;          
        multiSelector.minReadQueueLength = ;
        multiSelector.maxHttpHeaderSize = ;
        multiSelector.isMonitoringEnabled = isMonitoringEnabled();
        multiSelector.pipelineStat = ;
        multiSelector.globalRequestProcessor = ;
        if (  ) {
            multiSelector.asyncExecution = ;
            multiSelector.asyncHandler = ;
        }
        multiSelector.threadsIncrement = ;
        multiSelector.setPort();
        multiSelector.setAdapter();
        multiSelector.processorPipeline = ;
        multiSelector.readPipeline = ;            
        multiSelector.readTasks = ;
        multiSelector.processorTasks = ;   
        multiSelector.keepAlivePipeline = ;
        multiSelector.domain = ;
        multiSelector.bufferResponse = ;
        multiSelector.initEndpoint();
        multiSelector.start();
    }


    
Return an instance of SelectorReadThread to use for registering OP_READ
    private synchronized MultiSelectorThread getSelectorReadThread() {
        if ( == .)
             = 0;
        return [++];
    }
    
    
    
Create a pool of ProcessorTask
    protected void initProcessorTask(int size){
        for (int i=0; i < sizei++){           
            .offer(newProcessorTask(false));
        }
    }  


    
Initialize ProcessorTask
    protected void rampUpProcessorTask(){
        Iterator<ProcessorTaskiterator = .iterator();
        while (iterator.hasNext()) {
            iterator.next().initialize();
        }
    }  
    

    
Create ProcessorTask objects and configure it to be ready to proceed request.
    protected ProcessorTask newProcessorTask(boolean initialize){                                                      
        DefaultProcessorTask task = 
                new DefaultProcessorTask(initialize);
        return configureProcessorTask(task);       
    }
    
    
        task.setAdapter();
        task.setBufferSize();
        task.setSelectorThread(this);               
        task.setRecycle();
        task.setMaxPostSize();
        task.setTimeout();
        if ( .dropConnection() ) {
            task.setDropConnection(true);
        }
        
        // Asynch extentions
        if (  ) {
            task.setEnableAsyncExecution();
            task.setAsyncHandler();          
        }
                
        task.setPipeline();         
        configureCompression(task);
        
        return (ProcessorTask)task;        
    }
 
    
    
Reconfigure Grizzly Asynchronous Request Processing(ARP) internal objects.
    protected void reconfigureAsyncExecution(){
        for(ProcessorTask task :){
            if (task instanceof DefaultProcessorTask) {
                ((DefaultProcessorTask)task)
                    .setEnableAsyncExecution();
                ((DefaultProcessorTask)task).setAsyncHandler();  
            }
        }
        
        .clear();
        initReadTask();   
    }
    
 
    
Return a ProcessorTask from the pool. If the pool is empty, create a new instance.
        ProcessorTask processorTask = null;
        if () {
            processorTask = .poll();
        }
        
        if (processorTask == null){
            processorTask = newProcessorTask(false);
        } 
        
        if ( isMonitoringEnabled() ){
           .offer(processorTask); 
        }
        
        return processorTask;
    }
        
    
    
Return a ReadTask from the pool. If the pool is empty, create a new instance.
    public ReadTask getReadTask(SelectionKey keythrows IOException{
        ReadTask task = null;
        if (  ) {
            task = .poll();
        }
        
        if (task == null){
            task = newReadTask(); 
        }           
        task.setSelectionKey(key);
        return task;
    }
    
    // --------------------------------------------------------- Thread run --/
    
    
    
Start the endpoint (this)
    @Override
    public void run(){
        try{
            startEndpoint();
        } catch (Exception ex){
            .log(.,"selectorThread.errorOnRequest"ex);
        }
    }
    
    // ------------------------------------------------------------Start ----/
    
    
    
Start the Acceptor Thread and wait for incoming connection, in a non blocking mode.
    public void startEndpoint() throws IOExceptionInstantiationException {
         = true;
        
         =  * 1000;
        rampUpProcessorTask();
        registerComponents();
        displayConfiguration();
        startPipelines();
        startListener();
    }
    
    
    
Starts the Pipeline used by this Selector
    protected void startPipelines(){
        if ( != null){
            .startPipeline();
        }
        .startPipeline();        
    }

    
    
Stop the Pipeline used by this Selector
    protected void stopPipelines(){
        if (  != null )
            .stopPipeline();        
        if ( != null){
            .stopPipeline();
        }
        
    }
    
    
Start a non blocking Selector object.
    protected void startListener(){
        synchronized(){
            while ( && .isOpen()) {
                doSelect();
            }       
            try{
                closeActiveConnections();
                stopPipelines();
                stopSelectorReadThread();
                try{
                    if (  != null )
                        .close();
                } catch (Throwable ex){
                    .log(.,
                            "selectorThread.closeSocketException",ex);
                }
                try{
                    if (  != null)
                        .close();
                } catch (Throwable ex){
                    .log(.,
                            "selectorThread.closeSocketException",ex);
                }
                try{
                    if (  != null)
                        .close();
                } catch (Throwable ex){
                    .log(.,
                            "selectorThread.closeSocketException",ex);
                }
                unregisterComponents();
            } catch (Throwable t){
                .log(.,"selectorThread.stopException",t);
            } 
        }
    }
    public void resetSpinCounter(){
          = 0;
    }
    public int getSpinRate(){
        if (++ == 0){
             = System.nanoTime();
        } else if ( == 1000) {
            long deltatime = System.nanoTime() - ;
            int contspinspersec = (int) (1000 * 1000000000L / deltatime);
              = 0;
            return contspinspersec;
        }
        return 0;
    }
    private void workaroundSelectorSpin() throws IOException {
        Selector newSelector = Selector.open();
        Set<SelectionKeykeys = .keys();
        for (SelectionKey key : keys) {
            try {
                key.channel().register(newSelectorkey.interestOps(), key.attachment());
            } catch (Exception e) {
            }
        }
        try {
            .close();
        } catch (Exception e) {
        }
         = newSelector;
    }
    
    
Execute a Selector.select() operation.
    protected void doSelect(){
        SelectionKey key = null;
        Set readyKeys;
        Iterator<SelectionKeyiterator;
        int selectorState
        try{
            selectorState = 0;
            enableSelectionKeys();                
            try{                
                selectorState = .select();
            } catch (CancelledKeyException ex){
                ;
            }
            if (!return;
            readyKeys = .selectedKeys();
            // JDK issue.
            if (readyKeys.size() != 0 && ) {
                resetSpinCounter();
            } else if (){
                long sr = getSpinRate();
                if (sr > ) {
                    workaroundSelectorSpin();
                }
            }
            iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                key = iterator.next();
                iterator.remove();
                if (key.isValid()) {
                    handleConnection(key);
                } else {
                    cancelKey(key);
                }
            }
            
            expireIdleKeys();
            
            if (selectorState <= 0){
                .selectedKeys().clear();
                return;
            }
        } catch (Throwable t){     
            String msg = t.getMessage();
            if (msg != null && msg.equals("Too many open files")){
                .log(.,"Running out of operating systems file descriptors. " +
                        "Cannot serve requests anymore.",t);
            }           
            
            if (key != null && key.isValid()){
                .log(.,"selectorThread.errorOnRequest",t);
            } else {
                .log(.,"selectorThread.errorOnRequest",t);
            }
            if ( key != null ){
                key.attach(null);
                key.cancel();
            }
        }
    }
    
    
    
Cancel keep-alive connections.
    protected void expireIdleKeys(){
        if (  <= 0 || !.isOpen()) return;
        long current = System.currentTimeMillis();
        if (current < ) {