Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  //
  //  ========================================================================
  //  Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
  //  ------------------------------------------------------------------------
  //  All rights reserved. This program and the accompanying materials
  //  are made available under the terms of the Eclipse Public License v1.0
  //  and Apache License v2.0 which accompanies this distribution.
  //
  //      The Eclipse Public License is available at
 //      http://www.eclipse.org/legal/epl-v10.html
 //
 //      The Apache License v2.0 is available at
 //      http://www.opensource.org/licenses/apache2.0.php
 //
 //  You may elect to redistribute this code under either of these licenses.
 //  ========================================================================
 //
 
 package org.eclipse.jetty.io.nio;
 
 
 
 /* ------------------------------------------------------------ */
An Endpoint that can be scheduled by SelectorManager.
 
 {
     public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
 
     private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase(.).contains("win");
     private final SelectorManager.SelectSet _selectSet;
     private final SelectorManager _manager;
     private  SelectionKey _key;
     private final Runnable _handler = new Runnable()
         {
             public void run() { handle(); }
         };

    
 
     private int _interestOps;

    
The connection instance is the handler for any IO activity on the endpoint. There is a different type of connection for HTTP, AJP, WebSocket and ProxyConnect. The connection may change for an SCEP as it is upgraded from HTTP to proxy connect or websocket.
 
     private volatile AsyncConnection _connection;

    
true if a thread has been dispatched to handle this endpoint
 
     private boolean _dispatched = false;

    
true if a non IO dispatch (eg async resume) is outstanding
 
     private boolean _asyncDispatch = false;

    
true if the last write operation succeed and wrote all offered bytes
 
     private volatile boolean _writable = true;


    
True if a thread has is blocked in blockReadable(long)
 
     private boolean _readBlocked;

    
True if a thread has is blocked in blockWritable(long)
 
     private boolean _writeBlocked;

    
 
     private boolean _open;
 
     private volatile long _idleTimestamp;
 
     private boolean _ishut;
 
     /* ------------------------------------------------------------ */
     public SelectChannelEndPoint(SocketChannel channelSelectSet selectSetSelectionKey keyint maxIdleTime)
         throws IOException
     {
         super(channelmaxIdleTime);
 
          = selectSet.getManager();
          = selectSet;
          = false;
          = false;
         =true;
         = key;
        setCheckForIdle(true);
    }
    /* ------------------------------------------------------------ */
    public SelectionKey getSelectionKey()
    {
        synchronized (this)
        {
            return ;
        }
    }
    /* ------------------------------------------------------------ */
    {
        return ;
    }
    /* ------------------------------------------------------------ */
    public Connection getConnection()
    {
        return ;
    }
    /* ------------------------------------------------------------ */
    public void setConnection(Connection connection)
    {
        Connection old=;
        =(AsyncConnection)connection;
        if (old!=null && old!=)
            .endPointUpgraded(this,old);
    }
    /* ------------------------------------------------------------ */
    public long getIdleTimestamp()
    {
        return ;
    }
    /* ------------------------------------------------------------ */
    
Called by selectSet to schedule handling
    public void schedule()
    {
        synchronized (this)
        {
            // If there is no key, then do nothing
            if ( == null || !.isValid())
            {
                =false;
                =false;
                this.notifyAll();
                return;
            }
            // If there are threads dispatched reading and writing
            if ( || )
            {
                // assert _dispatched;
                if ( && .isReadable())
                    =false;
                if ( && .isWritable())
                    =false;
                // wake them up is as good as a dispatched.
                this.notifyAll();
                // we are not interested in further selecting
                .interestOps(0);
                if (!)
                    updateKey();
                return;
            }
            // Remove writeable op
            {
                // Remove writeable op
                 = .interestOps() & ~.;
                .interestOps();
                 = true// Once writable is in ops, only removed with dispatch.
            }
            // If dispatched, then deregister interest
            if ()
                .interestOps(0);
            else
            {
                // other wise do the dispatch
                dispatch();
                if ( && !.getManager().isDeferringInterestedOps0())
                {
                    .interestOps(0);
                }
            }
        }
    }
    /* ------------------------------------------------------------ */
    public void asyncDispatch()
    {
        synchronized(this)
        {
            if ()
                =true;
            else
                dispatch();
        }
    }
    /* ------------------------------------------------------------ */
    public void dispatch()
    {
        synchronized(this)
        {
            if (!)
            {
                 = true;
                boolean dispatched = .dispatch();
                if(!dispatched)
                {
                     = false;
                    .warn("Dispatched Failed! "+this+" to "+);
                    updateKey();
                }
            }
        }
    }
    /* ------------------------------------------------------------ */
    
Called when a dispatched thread is no longer handling the endpoint. The selection key operations are updated.

Returns:
If false is returned, the endpoint has been redispatched and thread must keep handling the endpoint.
    protected boolean undispatch()
    {
        synchronized (this)
        {
            if ()
            {
                =false;
                return false;
            }
             = false;
            updateKey();
        }
        return true;
    }
    /* ------------------------------------------------------------ */
    public void cancelTimeout(Task task)
    {
        getSelectSet().cancelTimeout(task);
    }
    /* ------------------------------------------------------------ */
    public void scheduleTimeout(Task tasklong timeoutMs)
    {
        getSelectSet().scheduleTimeout(task,timeoutMs);
    }
    /* ------------------------------------------------------------ */
    public void setCheckForIdle(boolean check)
    {
        =check?System.currentTimeMillis():0;
    }
    /* ------------------------------------------------------------ */
    public boolean isCheckForIdle()
    {
        return !=0;
    }
    /* ------------------------------------------------------------ */
    protected void notIdle()
    {
        if (!=0)
            =System.currentTimeMillis();
    }
    /* ------------------------------------------------------------ */
    public void checkIdleTimestamp(long now)
    {
        long idleTimestamp=;
        if (idleTimestamp!=0 && >0)
        {
            final long idleForMs=now-idleTimestamp;
            if (idleForMs>)
            {
                // Don't idle out again until onIdleExpired task completes.
                setCheckForIdle(false);
                .dispatch(new Runnable()
                {
                    public void run()
                    {
                        try
                        {
                            onIdleExpired(idleForMs);
                        }
                        finally
                        {
                            setCheckForIdle(true);
                        }
                    }
                });
            }
        }
    }
    /* ------------------------------------------------------------ */
    public void onIdleExpired(long idleForMs)
    {
        .onIdleExpired(idleForMs);
    }
    /* ------------------------------------------------------------ */
    @Override
    public int fill(Buffer bufferthrows IOException
    {
        int fill=super.fill(buffer);
        if (fill>0)
            notIdle();
        return fill;
    }
    /* ------------------------------------------------------------ */
    @Override
    public int flush(Buffer headerBuffer bufferBuffer trailerthrows IOException
    {
        int l = super.flush(headerbuffertrailer);
        // If there was something to write and it wasn't written, then we are not writable.
        if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
        {
            synchronized (this)
            {   
                =false;
                if (!)
                    updateKey();
            }
        }
        else if (l>0)
        {
            =true;
            notIdle();
        }
        return l;
    }
    /* ------------------------------------------------------------ */
    /*
     */
    @Override
    public int flush(Buffer bufferthrows IOException
    {
        int l = super.flush(buffer);
        // If there was something to write and it wasn't written, then we are not writable.
        if (l==0 && buffer!=null && buffer.hasContent())
        {
            synchronized (this)
            {   
                =false;
                if (!)
                    updateKey();
            }
        }
        else if (l>0)
        {
            =true;
            notIdle();
        }
        return l;
    }
    /* ------------------------------------------------------------ */
    /*
     * Allows thread to block waiting for further events.
     */
    @Override
    public boolean blockReadable(long timeoutMsthrows IOException
    {
        synchronized (this)
        {
            if (isInputShutdown())
                throw new EofException();
            long now=.getNow();
            long end=now+timeoutMs;
            boolean check=isCheckForIdle();
            setCheckForIdle(true);
            try
            {
                =true;
                while (!isInputShutdown() && )
                {
                    try
                    {
                        updateKey();
                        this.wait(timeoutMs>0?(end-now):10000);
                    }
                    catch (InterruptedException e)
                    {
                        .warn(e);
                    }
                    finally
                    {
                        now=.getNow();
                    }
                    if ( && timeoutMs>0 && now>=end)
                        return false;
                }
            }
            finally
            {
                =false;
                setCheckForIdle(check);
            }
        }
        return true;
    }
    /* ------------------------------------------------------------ */
    /*
     * Allows thread to block waiting for further events.
     */
    @Override
    public boolean blockWritable(long timeoutMsthrows IOException
    {
        synchronized (this)
        {
            if (isOutputShutdown())
                throw new EofException();
            long now=.getNow();
            long end=now+timeoutMs;
            boolean check=isCheckForIdle();
            setCheckForIdle(true);
            try
            {
                =true;
                while ( && !isOutputShutdown())
                {
                    try
                    {
                        updateKey();
                        this.wait(timeoutMs>0?(end-now):10000);
                    }
                    catch (InterruptedException e)
                    {
                        .warn(e);
                    }
                    finally
                    {
                        now=.getNow();
                    }
                    if ( && timeoutMs>0 && now>=end)
                        return false;
                }
            }
            finally
            {
                =false;
                setCheckForIdle(check);
            }
        }
        return true;
    }
    /* ------------------------------------------------------------ */
    
    public void scheduleWrite()
    {
        if ()
            .debug("Required scheduleWrite {}",this);
        =false;
        updateKey();
    }
    /* ------------------------------------------------------------ */
    public boolean isWritable()
    {
        return ;
    }
    /* ------------------------------------------------------------ */
    public boolean hasProgressed()
    {
        return false;
    }
    /* ------------------------------------------------------------ */
    
Updates selection key. Adds operations types to the selection key as needed. No operations are removed as this is only done during dispatch. This method records the new key and schedules a call to doUpdateKey to do the keyChange
    private void updateKey()
    {
        final boolean changed;
        synchronized (this)
        {
            int current_ops=-1;
            if (getChannel().isOpen())
            {
                boolean read_interest =  || (! && !.isSuspended());
                boolean write_interest || (! && !);
                 =
                    ((!.isInputShutdown() && read_interest ) ? .  : 0)
                |   ((!.isOutputShutdown()&& write_interest) ? . : 0);
                try
                {
                    current_ops = ((!=null && .isValid())?.interestOps():-1);
                }
                catch(Exception e)
                {
                    =null;
                    .ignore(e);
                }
            }
            changed=!=current_ops;
        }
        if(changed)
        {
            .addChange(this);
            .wakeup();
        }
    }
    /* ------------------------------------------------------------ */
    
Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
    void doUpdateKey()
    {
        synchronized (this)
        {
            if (getChannel().isOpen())
            {
                if (>0)
                {
                    if (==null || !.isValid())
                    {
                        SelectableChannel sc = (SelectableChannel)getChannel();
                        if (sc.isRegistered())
                        {
                            updateKey();
                        }
                        else
                        {
                            try
                            {
                                =((SelectableChannel)getChannel()).register(.getSelector(),,this);
                            }
                            catch (Exception e)
                            {
                                .ignore(e);
                                if (!=null && .isValid())
                                {
                                    .cancel();
                                }
                                if ()
                                {
                                    .destroyEndPoint(this);
                                }
                                =false;
                                 = null;
                            }
                        }
                    }
                    else
                    {
                        .interestOps();
                    }
                }
                else
                {
                    if (!=null && .isValid())
                        .interestOps(0);
                    else
                        =null;
                }
            }
            else
            {
                if (!=null && .isValid())
                    .cancel();
                if ()
                {
                    =false;
                    .destroyEndPoint(this);
                }
                 = null;
            }
        }
    }
    /* ------------------------------------------------------------ */
    /*
     */
    protected void handle()
    {
        boolean dispatched=true;
        try
        {
            while(dispatched)
            {
                try
                {
                    while(true)
                    {
                        final AsyncConnection next = (AsyncConnection).handle();
                        if (next!=)
                        {
                            .debug("{} replaced {}",next,);
                            Connection old=;
                            =next;
                            .endPointUpgraded(this,old);
                            continue;
                        }
                        break;
                    }
                }
                catch (ClosedChannelException e)
                {
                    .ignore(e);
                }
                catch (EofException e)
                {
                    .debug("EOF"e);
                    try{close();}
                    catch(IOException e2){.ignore(e2);}
                }
                catch (IOException e)
                {
                    .warn(e.toString());
                    try{close();}
                    catch(IOException e2){.ignore(e2);}
                }
                catch (Throwable e)
                {
                    .warn("handle failed"e);
                    try{close();}
                    catch(IOException e2){.ignore(e2);}
                }
                finally
                {
                    if (! && isInputShutdown() && isOpen())
                    {
                        =true;
                        try
                        {
                            .onInputShutdown();
                        }
                        catch(Throwable x)
                        {
                            .warn("onInputShutdown failed"x);
                            try{close();}
                            catch(IOException e2){.ignore(e2);}
                        }
                        finally
                        {
                            updateKey();
                        }
                    }
                    dispatched=!undispatch();
                }
            }
        }
        finally
        {
            if (dispatched)
            {
                dispatched=!undispatch();
                while (dispatched)
                {
                    .warn("SCEP.run() finally DISPATCHED");
                    dispatched=!undispatch();
                }
            }
        }
    }
    /* ------------------------------------------------------------ */
    /*
     * @see org.eclipse.io.nio.ChannelEndPoint#close()
     */
    @Override
    public void close() throws IOException
    {
        // On unix systems there is a JVM issue that if you cancel before closing, it can 
        // cause the selector to block waiting for a channel to close and that channel can 
        // block waiting for the remote end.  But on windows, if you don't cancel before a 
        // close, then the selector can block anyway!
        // https://bugs.eclipse.org/bugs/show_bug.cgi?id=357318
        if ()
        {
            try
            {
                SelectionKey key = ;
                if (key!=null)
                    key.cancel();
            }
            catch (Throwable e)
            {
                .ignore(e);
            }
        }
        try
        {
            super.close();
        }
        catch (IOException e)
        {
            .ignore(e);
        }
        finally
        {
            updateKey();
        }
    }
    /* ------------------------------------------------------------ */
    @Override
    public String toString()
    {
        // Do NOT use synchronized (this)
        // because it's very easy to deadlock when debugging is enabled.
        // We do a best effort to print the right toString() and that's it.
        SelectionKey key = ;
        String keyString = "";
        if (key != null)
        {
            if (key.isValid())
            {
                if (key.isReadable())
                    keyString += "r";
                if (key.isWritable())
                    keyString += "w";
            }
            else
            {
                keyString += "!";
            }
        }
        else
        {
            keyString += "-";
        }
        return String.format("SCEP@%x{l(%s)<->r(%s),d=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
                hashCode(),
                .getRemoteSocketAddress(),
                .getLocalSocketAddress(),
                ,
                isOpen(),
                isInputShutdown(),
                isOutputShutdown(),
                ,
                ,
                ,
                ,
                keyString,
                );
    }
    /* ------------------------------------------------------------ */
    public SelectSet getSelectSet()
    {
        return ;
    }
    /* ------------------------------------------------------------ */
    
Don't set the SoTimeout

    @Override
    public void setMaxIdleTime(int timeMsthrows IOException
    {
        =timeMs;
    }
New to GrepCode? Check out our FAQ X