Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   //
   //  ========================================================================
   //  Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
   //  ------------------------------------------------------------------------
   //  All rights reserved. This program and the accompanying materials
   //  are made available under the terms of the Eclipse Public License v1.0
   //  and Apache License v2.0 which accompanies this distribution.
   //
   //      The Eclipse Public License is available at
  //      http://www.eclipse.org/legal/epl-v10.html
  //
  //      The Apache License v2.0 is available at
  //      http://www.opensource.org/licenses/apache2.0.php
  //
  //  You may elect to redistribute this code under either of these licenses.
  //  ========================================================================
  //
  
  package org.eclipse.jetty.io.nio;
  
  import java.util.List;
  import java.util.Set;
  
  
  
  /* ------------------------------------------------------------ */
The Selector Manager manages and number of SelectSets to allow NIO scheduling to scale to large numbers of connections.

  
  public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
  {
      public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
  
      private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
      private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue();
      private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
      private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
  
      private int _maxIdleTime;
      private int _lowResourcesMaxIdleTime;
      private long _lowResourcesConnections;
      private SelectSet[] _selectSet;
      private int _selectSets=1;
      private volatile int _set=0;
      private boolean _deferringInterestedOps0=true;
      private int _selectorPriorityDelta=0;
  
      /* ------------------------------------------------------------ */
    

Parameters:
maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
See also:
setLowResourcesMaxIdleTime(long)
  
      public void setMaxIdleTime(long maxIdleTime)
      {
          =(int)maxIdleTime;
      }
  
      /* ------------------------------------------------------------ */
    

Parameters:
selectSets number of select sets to create
  
      public void setSelectSets(int selectSets)
      {
          long lrc =  * ;
          =selectSets;
          =lrc/;
      }
  
      /* ------------------------------------------------------------ */
    

Returns:
the max idle time
 
     public long getMaxIdleTime()
     {
         return ;
     }
 
     /* ------------------------------------------------------------ */
    

Returns:
the number of select sets in use
 
     public int getSelectSets()
     {
         return ;
     }
 
     /* ------------------------------------------------------------ */
    

Parameters:
i
Returns:
The select set
 
     public SelectSet getSelectSet(int i)
     {
         return [i];
     }
 
     /* ------------------------------------------------------------ */
    
Register a channel

Parameters:
channel
att Attached Object
 
     public void register(SocketChannel channelObject att)
     {
         // The ++ increment here is not atomic, but it does not matter.
         // so long as the value changes sometimes, then connections will
         // be distributed over the available sets.
 
         int s=++;
         if (s<0)
             s=-s;
         s=s%;
         SelectSet[] sets=;
         if (sets!=null)
         {
             SelectSet set=sets[s];
             set.addChange(channel,att);
             set.wakeup();
         }
     }
 
 
     /* ------------------------------------------------------------ */
    
Register a channel

Parameters:
channel
 
     public void register(SocketChannel channel)
     {
         // The ++ increment here is not atomic, but it does not matter.
         // so long as the value changes sometimes, then connections will
         // be distributed over the available sets.
 
         int s=++;
         if (s<0)
             s=-s;
         s=s%;
         SelectSet[] sets=;
         if (sets!=null)
         {
             SelectSet set=sets[s];
             set.addChange(channel);
             set.wakeup();
         }
     }
 
     /* ------------------------------------------------------------ */
    
Register a java.nio.channels.ServerSocketChannel

Parameters:
acceptChannel
 
     public void register(ServerSocketChannel acceptChannel)
     {
         int s=++;
         if (s<0)
             s=-s;
         s=s%;
         SelectSet set=[s];
         set.addChange(acceptChannel);
         set.wakeup();
     }
 
     /* ------------------------------------------------------------ */
    

Returns:
delta The value to add to the selector thread priority.
 
     public int getSelectorPriorityDelta()
     {
         return ;
     }
 
     /* ------------------------------------------------------------ */
    
Set the selector thread priorty delta.

Parameters:
delta The value to add to the selector thread priority.
 
     public void setSelectorPriorityDelta(int delta)
     {
         =delta;
     }
 
 
     /* ------------------------------------------------------------ */
    

Returns:
the lowResourcesConnections
 
     public long getLowResourcesConnections()
     {
         return *;
     }
 
     /* ------------------------------------------------------------ */
    
Set the number of connections, which if exceeded places this manager in low resources state. This is not an exact measure as the connection count is averaged over the select sets.

Parameters:
lowResourcesConnections the number of connections
See also:
setLowResourcesMaxIdleTime(long)
 
     public void setLowResourcesConnections(long lowResourcesConnections)
     {
         =(lowResourcesConnections+-1)/;
     }
 
     /* ------------------------------------------------------------ */
    

Returns:
the lowResourcesMaxIdleTime
 
     public long getLowResourcesMaxIdleTime()
     {
         return ;
     }
 
     /* ------------------------------------------------------------ */
    

Parameters:
lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than getLowResourcesConnections()
See also:
setMaxIdleTime(long)
 
     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
     {
         =(int)lowResourcesMaxIdleTime;
     }
 
 
     /* ------------------------------------------------------------------------------- */
     public abstract boolean dispatch(Runnable task);
 
     /* ------------------------------------------------------------ */
     /* (non-Javadoc)
      * @see org.eclipse.component.AbstractLifeCycle#doStart()
      */
     @Override
     protected void doStart() throws Exception
     {
          = new SelectSet[];
         for (int i=0;i<.;i++)
             [i]= new SelectSet(i);
 
         super.doStart();
 
         // start a thread to Select
         for (int i=0;i<getSelectSets();i++)
         {
             final int id=i;
             boolean selecting=dispatch(new Runnable()
             {
                 public void run()
                 {
                     String name=Thread.currentThread().getName();
                     int priority=Thread.currentThread().getPriority();
                     try
                     {
                         SelectSet[] sets=;
                         if (sets==null)
                             return;
                         SelectSet set=sets[id];
 
                         Thread.currentThread().setName(name+" Selector"+id);
                         if (getSelectorPriorityDelta()!=0)
                             Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
                         .debug("Starting {} on {}",Thread.currentThread(),this);
                         while (isRunning())
                         {
                             try
                             {
                                 set.doSelect();
                             }
                             catch(IOException e)
                             {
                                 .ignore(e);
                             }
                             catch(Exception e)
                             {
                                 .warn(e);
                             }
                         }
                     }
                     finally
                     {
                         .debug("Stopped {} on {}",Thread.currentThread(),this);
                         Thread.currentThread().setName(name);
                         if (getSelectorPriorityDelta()!=0)
                             Thread.currentThread().setPriority(priority);
                     }
                 }
 
             });
 
             if (!selecting)
                 throw new IllegalStateException("!Selecting");
         }
     }
 
 
     /* ------------------------------------------------------------------------------- */
     @Override
     protected void doStop() throws Exception
     {
         SelectSet[] sets;
         =null;
         if (sets!=null)
         {
             for (SelectSet set : sets)
             {
                 if (set!=null)
                     set.stop();
             }
         }
         super.doStop();
     }
 
     /* ------------------------------------------------------------ */
    

Parameters:
endpoint
 
     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
 
     /* ------------------------------------------------------------ */
    

Parameters:
endpoint
 
     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
 
     /* ------------------------------------------------------------ */
     protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
 
     /* ------------------------------------------------------------------------------- */
     public abstract AsyncConnection newConnection(SocketChannel channelAsyncEndPoint endpointObject attachment);
 
     /* ------------------------------------------------------------ */
    
Create a new end point

Parameters:
channel
selectSet
sKey the selection key
Returns:
the new endpoint SelectChannelEndPoint
Throws:
java.io.IOException
 
     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channelSelectorManager.SelectSet selectSetSelectionKey sKeythrows IOException;
 
     /* ------------------------------------------------------------------------------- */
     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
     {
         .warn(ex+","+channel+","+attachment);
         .debug(ex);
     }
 
     /* ------------------------------------------------------------ */
     public String dump()
     {
         return AggregateLifeCycle.dump(this);
     }
 
     /* ------------------------------------------------------------ */
     public void dump(Appendable outString indentthrows IOException
     {
         AggregateLifeCycle.dumpObject(out,this);
         AggregateLifeCycle.dump(out,indent,TypeUtil.asList());
     }
 
 
     /* ------------------------------------------------------------------------------- */
     /* ------------------------------------------------------------------------------- */
     /* ------------------------------------------------------------------------------- */
     public class SelectSet implements Dumpable
     {
         private final int _setID;
         private final Timeout _timeout;
 
         private final ConcurrentLinkedQueue<Object_changes = new ConcurrentLinkedQueue<Object>();
 
         private volatile Selector _selector;
 
         private volatile Thread _selecting;
         private int _busySelects;
         private long _monitorNext;
         private boolean _pausing;
         private boolean _paused;
         private volatile long _idleTick;
 
         /* ------------------------------------------------------------ */
         SelectSet(int acceptorIDthrows Exception
         {
             =acceptorID;
 
              = System.currentTimeMillis();
              = new Timeout(this);
             .setDuration(0L);
 
             // create a selector;
              = Selector.open();
             =System.currentTimeMillis()+;
         }
 
         /* ------------------------------------------------------------ */
         public void addChange(Object change)
         {
             .add(change);
         }
 
         /* ------------------------------------------------------------ */
         public void addChange(SelectableChannel channelObject att)
         {
             if (att==null)
                 addChange(channel);
             else if (att instanceof EndPoint)
                 addChange(att);
             else
                 addChange(new ChannelAndAttachment(channel,att));
         }
 
         /* ------------------------------------------------------------ */
        
Select and dispatch tasks found from changes and the selector.

 
         public void doSelect() throws IOException
         {
             try
             {
                 =Thread.currentThread();
                 final Selector selector=;
                 // Stopped concurrently ?
                 if (selector == null)
                     return;
 
                 // Make any key changes required
                 Object change;
                 int changes=.size();
                 while (changes-->0 && (change=.poll())!=null)
                 {
                     Channel ch=null;
                     SelectionKey key=null;
 
                     try
                     {
                         if (change instanceof EndPoint)
                         {
                             // Update the operations for a key.
                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change;
                             ch=endpoint.getChannel();
                             endpoint.doUpdateKey();
                         }
                         else if (change instanceof ChannelAndAttachment)
                         {
                             // finish accepting/connecting this connection
                             final ChannelAndAttachment asc = (ChannelAndAttachment)change;
                             final SelectableChannel channel=asc._channel;
                             ch=channel;
                             final Object att = asc._attachment;
 
                             if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
                             {
                                 key = channel.register(selector,.,att);
                                 SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key);
                                 key.attach(endpoint);
                                 endpoint.schedule();
                             }
                             else if (channel.isOpen())
                             {
                                 key = channel.register(selector,.,att);
                             }
                         }
                         else if (change instanceof SocketChannel)
                         {
                             // Newly registered channel
                             final SocketChannel channel=(SocketChannel)change;
                             ch=channel;
                             key = channel.register(selector,.,null);
                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
                             key.attach(endpoint);
                             endpoint.schedule();
                         }
                         else if (change instanceof ChangeTask)
                         {
                             ((Runnable)change).run();
                         }
                         else if (change instanceof Runnable)
                         {
                             dispatch((Runnable)change);
                         }
                         else
                             throw new IllegalArgumentException(change.toString());
                     }
                     catch (CancelledKeyException e)
                     {
                         .ignore(e);
                     }
                     catch (Throwable e)
                     {
                         if (isRunning())
                             .warn(e);
                         else
                             .debug(e);
 
                         try
                         {
                             if (ch!=null)
                                 ch.close();
                         }
                         catch(IOException e2)
                         {
                             .debug(e2);
                         }
                     }
                 }
 
 
                 // Do and instant select to see if any connections can be handled.
                 int selected=selector.selectNow();
 
                 long now=System.currentTimeMillis();
 
                 // if no immediate things to do
                 if (selected==0 && selector.selectedKeys().isEmpty())
                 {
                     // If we are in pausing mode
                     if ()
                     {
                         try
                         {
                             Thread.sleep(); // pause to reduce impact of  busy loop
                         }
                         catch(InterruptedException e)
                         {
                             .ignore(e);
                         }
                         now=System.currentTimeMillis();
                     }
 
                     // workout how long to wait in select
                     .setNow(now);
                     long to_next_timeout=.getTimeToNext();
 
                     long wait = .size()==0?:0L;
                     if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
                         wait = to_next_timeout;
 
                     // If we should wait with a select
                     if (wait>0)
                     {
                         long before=now;
                         selector.select(wait);
                         now = System.currentTimeMillis();
                         .setNow(now);
 
                         // If we are monitoring for busy selector
                         // and this select did not wait more than 1ms
                         if (>0 && now-before <=1)
                         {
                             // count this as a busy select and if there have been too many this monitor cycle
                             if (++>)
                             {
                                 // Start injecting pauses
                                 =true;
 
                                 // if this is the first pause
                                 if (!)
                                 {
                                     // Log and dump some status
                                     =true;
                                     .warn("Selector {} is too busy, pausing!",this);
                                 }
                             }
                         }
                     }
                 }
 
                 // have we been destroyed while sleeping
                 if (==null || !selector.isOpen())
                     return;
 
                 // Look for things to do
                 for (SelectionKey keyselector.selectedKeys())
                 {
                     SocketChannel channel=null;
 
                     try
                     {
                         if (!key.isValid())
                         {
                             key.cancel();
                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
                             if (endpoint != null)
                                 endpoint.doUpdateKey();
                             continue;
                         }
 
                         Object att = key.attachment();
                         if (att instanceof SelectChannelEndPoint)
                         {
                             if (key.isReadable()||key.isWritable())
                                 ((SelectChannelEndPoint)att).schedule();
                         }
                         else if (key.isConnectable())
                         {
                             // Complete a connection of a registered channel
                             channel = (SocketChannel)key.channel();
                             boolean connected=false;
                             try
                             {
                                 connected=channel.finishConnect();
                             }
                             catch(Exception e)
                             {
                                 connectionFailed(channel,e,att);
                             }
                             finally
                             {
                                 if (connected)
                                 {
                                     key.interestOps(.);
                                     SelectChannelEndPoint endpoint = createEndPoint(channel,key);
                                     key.attach(endpoint);
                                     endpoint.schedule();
                                 }
                                 else
                                 {
                                     key.cancel();
                                 }
                             }
                         }
                         else
                         {
                             // Wrap readable registered channel in an endpoint
                             channel = (SocketChannel)key.channel();
                             SelectChannelEndPoint endpoint = createEndPoint(channel,key);
                             key.attach(endpoint);
                             if (key.isReadable())
                                 endpoint.schedule();
                         }
                         key = null;
                     }
                     catch (CancelledKeyException e)
                     {
                         .ignore(e);
                     }
                     catch (Exception e)
                     {
                         if (isRunning())
                             .warn(e);
                         else
                             .ignore(e);
 
                         try
                         {
                             if (channel!=null)
                                 channel.close();
                         }
                         catch(IOException e2)
                         {
                             .debug(e2);
                         }
 
                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
                             key.cancel();
                     }
                 }
 
                 // Everything always handled
                 selector.selectedKeys().clear();
 
                 now=System.currentTimeMillis();
                 .setNow(now);
                 Task task = .expired();
                 while (task!=null)
                 {
                     if (task instanceof Runnable)
                         dispatch((Runnable)task);
                     task = .expired();
                 }
 
                 // Idle tick
                 if (now->)
                 {
                     =now;
 
                     final long idle_now=((>0 && selector.keys().size()>))
                         ?(now+-)
                         :now;
 
                     dispatch(new Runnable()
                     {
                         public void run()
                         {
                             for (SelectChannelEndPoint endp:.keySet())
                             {
                                 endp.checkIdleTimestamp(idle_now);
                             }
                         }
                         public String toString() {return "Idle-"+super.toString();}
                     });
 
                 }
 
                 // Reset busy select monitor counts
                 if (>0 && now>)
                 {
                     =0;
                     =false;
                     =now+;
 
                 }
             }
             catch (ClosedSelectorException e)
             {
                 if (isRunning())
                     .warn(e);
                 else
                     .ignore(e);
             }
             catch (CancelledKeyException e)
             {
                 .ignore(e);
             }
             finally
             {
                 =null;
             }
         }
 
 
         /* ------------------------------------------------------------ */
         private void renewSelector()
         {
             try
             {
                 synchronized (this)
                 {
                     Selector selector=;
                     if (selector==null)
                         return;
                     final Selector new_selector = Selector.open();
                     for (SelectionKey kselector.keys())
                     {
                         if (!k.isValid() || k.interestOps()==0)
                             continue;
 
                         final SelectableChannel channel = k.channel();
                         final Object attachment = k.attachment();
 
                         if (attachment==null)
                             addChange(channel);
                         else
                             addChange(channel,attachment);
                     }
                     .close();
                     =new_selector;
                 }
             }
             catch(IOException e)
             {
                 throw new RuntimeException("recreating selector",e);
             }
         }
 
         /* ------------------------------------------------------------ */
         public SelectorManager getManager()
         {
             return SelectorManager.this;
         }
 
         /* ------------------------------------------------------------ */
         public long getNow()
         {
             return .getNow();
         }
 
         /* ------------------------------------------------------------ */
        

Parameters:
task The task to timeout. If it implements Runnable, then expired will be called from a dispatched thread.
timeoutMs
 
         public void scheduleTimeout(Timeout.Task tasklong timeoutMs)
         {
             if (!(task instanceof Runnable))
                 throw new IllegalArgumentException("!Runnable");
             .schedule(tasktimeoutMs);
         }
 
         /* ------------------------------------------------------------ */
         public void cancelTimeout(Timeout.Task task)
         {
             task.cancel();
         }
 
         /* ------------------------------------------------------------ */
         public void wakeup()
         {
             try
             {
                 Selector selector = ;
                 if (selector!=null)
                     selector.wakeup();
             }
             catch(Exception e)
             {
                 addChange(new ChangeTask()
                 {
                     public void run()
                     {
                         renewSelector();
                     }
                 });
 
                 renewSelector();
             }
         }
 
         /* ------------------------------------------------------------ */
         private SelectChannelEndPoint createEndPoint(SocketChannel channelSelectionKey sKeythrows IOException
         {
             SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
             .debug("created {}",endp);
             endPointOpened(endp);
             .put(endp,this);
             return endp;
         }
 
         /* ------------------------------------------------------------ */
         public void destroyEndPoint(SelectChannelEndPoint endp)
         {
             .debug("destroyEndPoint {}",endp);
             .remove(endp);
             endPointClosed(endp);
         }
 
         /* ------------------------------------------------------------ */
         Selector getSelector()
         {
             return ;
         }
 
         /* ------------------------------------------------------------ */
         void stop() throws Exception
         {
             // Spin for a while waiting for selector to complete
             // to avoid unneccessary closed channel exceptions
             try
             {
                 for (int i=0;i<100 && !=null;i++)
                 {
                     wakeup();
                     Thread.sleep(10);
                 }
             }
             catch(Exception e)
             {
                 .ignore(e);
             }
 
             // close endpoints and selector
             synchronized (this)
             {
                 Selector selector=;
                 for (SelectionKey key:selector.keys())
                 {
                     if (key==null)
                         continue;
                     Object att=key.attachment();
                     if (att instanceof EndPoint)
                     {
                         EndPoint endpoint = (EndPoint)att;
                         try
                         {
                             endpoint.close();
                         }
                         catch(IOException e)
                         {
                             .ignore(e);
                         }
                     }
                 }
 
 
                 .cancelAll();
                 try
                 {
                     selector=;
                     if (selector != null)
                         selector.close();
                 }
                 catch (IOException e)
                 {
                     .ignore(e);
                 }
                 =null;
             }
         }
 
         /* ------------------------------------------------------------ */
         public String dump()
         {
             return AggregateLifeCycle.dump(this);
         }
 
         /* ------------------------------------------------------------ */
         public void dump(Appendable outString indentthrows IOException
         {
             out.append(String.valueOf(this)).append(" id=").append(String.valueOf()).append("\n");
 
             Thread selecting = ;
 
             Object where = "not selecting";
             StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
             if (trace!=null)
             {
                 for (StackTraceElement t:trace)
                     if (t.getClassName().startsWith("org.eclipse.jetty."))
                     {
                         where=t;
                         break;
                     }
             }
 
             Selector selector=;
             if (selector!=null)
             {
                 final ArrayList<Objectdump = new ArrayList<Object>(selector.keys().size()*2);
                 dump.add(where);
 
                 final CountDownLatch latch = new CountDownLatch(1);
 
                 addChange(new ChangeTask()
                 {
                     public void run()
                     {
                         dumpKeyState(dump);
                         latch.countDown();
                     }
                 });
 
                 try
                 {
                     latch.await(5,.);
                 }
                 catch(InterruptedException e)
                 {
                     .ignore(e);
                 }
 
                 AggregateLifeCycle.dump(out,indent,dump);
             }
         }
 
         /* ------------------------------------------------------------ */
         public void dumpKeyState(List<Objectdumpto)
         {
             Selector selector=;
             Set<SelectionKeykeys = selector.keys();
             dumpto.add(selector + " keys=" + keys.size());
             for (SelectionKey keykeys)
             {
                 if (key.isValid())
                     dumpto.add(key.attachment()+" iOps="+key.interestOps()+" rOps="+key.readyOps());
                 else
                     dumpto.add(key.attachment()+" iOps=-1 rOps=-1");
             }
         }
 
         /* ------------------------------------------------------------ */
         public String toString()
         {
             Selector selector=;
             return String.format("%s keys=%d selected=%d",
                     super.toString(),
                     selector != null && selector.isOpen() ? selector.keys().size() : -1,
                     selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
         }
     }
 
    /* ------------------------------------------------------------ */
    private static class ChannelAndAttachment
    {
        final SelectableChannel _channel;
        final Object _attachment;
        public ChannelAndAttachment(SelectableChannel channelObject attachment)
        {
            super();
             = channel;
             = attachment;
        }
    }
    /* ------------------------------------------------------------ */
    public boolean isDeferringInterestedOps0()
    {
        return ;
    }
    /* ------------------------------------------------------------ */
    public void setDeferringInterestedOps0(boolean deferringInterestedOps0)
    {
         = deferringInterestedOps0;
    }
    /* ------------------------------------------------------------ */
    /* ------------------------------------------------------------ */
    /* ------------------------------------------------------------ */
    private interface ChangeTask extends Runnable
    {}