Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   *  Copyright (c) xlightweb.org, 2008 - 2010. All rights reserved.
   *
   *  This library is free software; you can redistribute it and/or
   *  modify it under the terms of the GNU Lesser General Public
   *  License as published by the Free Software Foundation; either
   *  version 2.1 of the License, or (at your option) any later version.
   *
   *  This library is distributed in the hope that it will be useful,
  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  *  Lesser General Public License for more details.
  *
  *  You should have received a copy of the GNU Lesser General Public
  *  License along with this library; if not, write to the Free Software
  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  *
  * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
  * The latest copy of this software may be found on http://www.xlightweb.org/
  */
 package org.xlightweb;
 
 
 
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
 
 import  org.xlightweb.AbstractHttpConnection.IMultimodeExecutor;
 import  org.xlightweb.client.HttpClientConnection;
 import  org.xlightweb.server.HttpServerConnection;
 import  org.xsocket.Execution;
 import  org.xsocket.MaxReadSizeExceededException;
 import  org.xsocket.connection.IConnectHandler;
 import  org.xsocket.connection.IDataHandler;
 import  org.xsocket.connection.IDisconnectHandler;
 import  org.xsocket.connection.IHandler;
 import  org.xsocket.connection.INonBlockingConnection;
 import  org.xsocket.connection.NonBlockingConnection;
 import  org.xsocket.connection.NonBlockingConnectionPool;




NonBlockingWebSocketConnection

This is an experimental implementation of the Web Socket protocol draft and subject to change

Author(s):
grro
 
 public final class WebSocketConnection implements IWebSocketConnection {
 
     private static final Logger LOG = Logger.getLogger(WebSocketConnection.class.getName());
     
     private INonBlockingConnection tcpConnection;
     
 
     // receive timeout
     public static final int DEFAULT_RECEIVE_TIMEOUT = .;
     private int receiveTimeoutSec = ;
 
     
     // protocol handling
     private final IMultimodeExecutor executor;
     
     private final List<WebSocketMessageinQueue = new ArrayList<WebSocketMessage>();
     private int inQueueVersion = 0;
     
     private final String protocol;
     
     // interceptor support
     private final IPostWriteInterceptor interceptor;
     
     // web socket handler
     
     // attachment management
     private AtomicReference<ObjectattachmentRef = new AtomicReference<Object>(null);
 
     
  
     
     public WebSocketConnection(String uriStringthrows IOException {
         this(uriString, (Stringnull);
    }
    
    public WebSocketConnection(String uriStringString protocolthrows IOException {
        this(uriStringprotocolnull);
    }
    
    public WebSocketConnection(String uriStringIWebSocketHandler webSocketHandlerthrows IOException {
        this(uriStringnullwebSocketHandler);
    }
    
    public WebSocketConnection(String uriStringString protocolIWebSocketHandler webSocketHandlerthrows IOException {
        this(URI.create(uriString), protocolwebSocketHandler);
    }
    
    private WebSocketConnection(URI uriString protocolIWebSocketHandler webSocketHandlerthrows IOException {
        this(connect(uri), uriprotocolwebSocketHandler);
    }
    
    private static INonBlockingConnection connect(URI urithrows IOException {
        
        int port = uri.getPort();
        if (port == -1) {
            if (uri.getScheme().toLowerCase().equals("wss")) {
                port = 443;
            } else {
                port = 80;
            }
        }
        
        return new NonBlockingConnection(uri.getHost(), port);
    }
    
    
    
    public WebSocketConnection(INonBlockingConnection tcpConnectionURI uriString protocolIWebSocketHandler webSocketHandlerthrows IOException {
        this(performHandshake(tcpConnectionuriprotocol), nullwebSocketHandlernullprotocolnull);
    }
    
    private static INonBlockingConnection performHandshake(INonBlockingConnection tcpConnectionURI uriString protocolthrows IOException {
        
        HttpClientConnection httpCon = new HttpClientConnection(tcpConnection);
  
        GetRequest request = new GetRequest(uri.toString());
        request.setHeader("Upgrade""WebSocket");
        request.setHeader("Connection""Upgrade");
        request.setHeader("Origin"request.getRequestUrl().toString());
        
        if (protocol != null) {
            request.setHeader("WebSocket-Protocol"protocol);
        }
  
        IHttpResponse response = httpCon.call(request);
        if (response.getStatus() != 101) {
            if (response.getStatus() == 501) {
                if (response.hasBody()) {
                    throw new UnsupportedProtocolException(response.getBody().toString());
                } else {
                    throw new UnsupportedProtocolException();
                }
            } else {
                throw new IOException(response.getStatus() + " " + response.getReason());
            }
        }
        
        return tcpConnection;
    }
    
    
    public WebSocketConnection(HttpServerConnection httpConnectionIWebSocketHandler webSocketHandler, IHttpExchange exchangethrows IOException {
        this(httpConnectionwebSocketHandlerexchange.getRequest().getHeader("WebSocket-Protocol"), new UpgradeResponseSender(exchange));
    }
    
    
    private static ByteBuffer[] drainHttpParser(AbstractHttpConnection httpConnection) {
        IHandler hdl = httpConnection.getUnderlyingTcpConnection().getHandler();
        return ((AbstractHttpConnection.DataHandler) hdl).drainBuffer();
    }
    
    
    private WebSocketConnection(HttpServerConnection httpConnectionIWebSocketHandler webSocketHandlerString protocolUpgradeResponseSender upgradeResponseSenderthrows IOException {
        this(httpConnection.getUnderlyingTcpConnection(), upgradeResponseSenderwebSocketHandlerupgradeResponseSenderprotocoldrainHttpParser(httpConnection));
    }
    
    private static final class UpgradeResponseSender implements IPostConnectInterceptorIPostWriteInterceptor {
        private final IHttpExchange exchange;
        private final AtomicBoolean isUpgradeSent = new AtomicBoolean(false);
        public UpgradeResponseSender(IHttpExchange exchange) {
            this. = exchange;
        }
        
        
        public void onConnectException(IOException ioe) {
            if (ioe instanceof UnsupportedProtocolException) {
                .sendError(501, ioe.getMessage());
            } else {
                .sendError(501);
            }
        }
        
             
        public void onPostConnect() throws IOException {
            sentUpgradeIfNecessary();
        }
        
        public void onPreWrite() throws IOException {
            sentUpgradeIfNecessary() ;
        }
        
        private void sentUpgradeIfNecessary() throws IOException {
            if (!.getAndSet(true)) {
                IHttpRequest request = .getRequest();
                
                HttpResponse response = new HttpResponse(101);
                response.setHeader("WebSocket-Location""ws://" + request.getHost() + request.getRequestURI());
                response.setReason("Web Socket Protocol Handshake");
                response.setHeader("Upgrade""WebSocket");
                response.setHeader("Connection""Upgrade");
                response.setHeader("WebSocket-Origin"request.getHeader("Origin"));
                        
                String protocol = request.getHeader("WebSocket-Protocol");
                if (protocol != null) {
                    response.setHeader("WebSocket-Protocol"protocol);
                }
                
                .send(response);
            }
        }
    }
    
   
    private WebSocketConnection(INonBlockingConnection tcpConnectionIPostWriteInterceptor connectInterceptorIWebSocketHandler webSocketHandlerIPostConnectInterceptor postConnectInterceptorString protocolByteBuffer[] buffersthrows IOException {
        this. = connectInterceptor;
        this. = tcpConnection;
        this. = protocol;        
        
         = HttpUtils.newMultimodeExecutor(tcpConnection.getWorkerpool());
        setMessageHandler(webSocketHandlerpostConnectInterceptor);
        .onConnect(tcpConnection);
        
        if (buffers != null) {
            .onData(buffers);
        }
        
        tcpConnection.setHandler();
	}
    
    public String getProtocol() {
        return ;
    }
    
    
    public void destroy() {
        // destroy (underlying tcp) connection by using connection pool. The underlying connection could be a pooled one)
        // The connection pool detects automatically if the connection is pooled or not. The connection will be 
        // closed (logically) anyway
        try {
            NonBlockingConnectionPool.destroy();
        } catch (IOException ioe) {
            if (.isLoggable(.)) {
                .fine("[" + getId() + "] error occured by destroying htttp connection " + getId() + " " + ioe.toString());
            }
        }       
    }
    
    
    public INonBlockingConnection getUnderlyingTcpConnection() {
        return ;
    }
   
    
    public int availableMessages() {
        synchronized () {
            return .size();
        }
    }
    
    int getInQueueVersion() {
        synchronized () {
            return ;
        }
    }
    
    
        
        long start = System.currentTimeMillis();
        long remainingTime = ;
        do {
            synchronized () {
                if (.isEmpty()) {
                    try {
                        .wait(remainingTime);
                    } catch (InterruptedException ie) { 
                        // Restore the interrupted status
                        Thread.currentThread().interrupt();
                    }
                } else {
                    ++;
                    return .remove(0);
                }
            }
            remainingTime = computeRemainingTime(start);
        } while (remainingTime > 0);
        
        if (.isLoggable(.)) {
            .fine("receive timeout " +  + " sec reached. throwing timeout exception");
        }
        throw new SocketTimeoutException("timeout " +  + " sec reached");
    }
    private long computeRemainingTime(long startint receiveTimeoutSec) {
        return (start + ((longreceiveTimeoutSec * 1000)) - System.currentTimeMillis();
    }
    
    public int writeMessage(WebSocketMessage msgthrows IOException {
        if ( != null) {
            .onPreWrite();
        }
        return msg.writeTo(thisnull);
    }
    
    public void writeMessage(WebSocketMessage msgIWebSocketMessageWriteCompleteHandler completeHandlerthrows IOException {
        if ( != null) {
            .onPreWrite();
        }
        msg.writeTo(thiscompleteHandler);
    }
    
    
    public void closeQuitly() {
        try {
            close();
        } catch (IOException ioe) {
            if (.isLoggable(.)) {
                .fine("[" + getId() + "] error occured by closing connection " + getId() + " " + ioe.toString());
            }
            
            try {
                NonBlockingConnectionPool.destroy();
            } catch (IOException e) {
                if (.isLoggable(.)) {
                    .fine("[" + getId() + "] error occured by closing connection " + getId() + " " + e.toString());
                }
            }
        }
    }
    
    
    public void close() throws IOException {
        destroy();
    }
    
    
    void processNonthreaded(Runnable task) {
        .processNonthreaded(task);
    }
    
    void processMultithreaded(Runnable task) {
        .processMultithreaded(task);
    }
    
    
    void setMessageHandler(IWebSocketHandler webSocketHandlerIPostConnectInterceptor postConnectInterceptorthrows IOException {
        
        WebSocketHandlerAdapter oldAdapter = .getAndSet(null);
        if (oldAdapter != null) {
            oldAdapter.onDisconnect(this);
        }
        
        WebSocketHandlerAdapter adapter = new WebSocketHandlerAdapter(webSocketHandlerpostConnectInterceptor);
        adapter.onConnect(this);
        .set(adapter);
    }
    
  
    public boolean isOpen() {
        return .isOpen();
    }
    
    public boolean isServerSide() {
        return .isServerSide();
    }
    
    public INonBlockingConnection getTcpConnection() {
        return ;
    }
    
    
    
    public void setAttachment(Object obj) {
        .set(obj);
    }
    
    public Object getAttachment() {
        return .get();
    }
    
    public long getConnectionTimeoutMillis() {
        return .getConnectionTimeoutMillis();
    }
    
    public void setConnectionTimeoutMillis(long timeoutMillis) {
        .setConnectionTimeoutMillis(timeoutMillis);
    }
    
    public long getRemainingMillisToConnectionTimeout() {
        return .getRemainingMillisToConnectionTimeout();
    }
    
    public long getIdleTimeoutMillis() {
        return .getIdleTimeoutMillis();
    }
    
    public void setIdleTimeoutMillis(long timeoutInMillis) {
        .setIdleTimeoutMillis(timeoutInMillis);
    }
    
    public long getRemainingMillisToIdleTimeout() {
        return .getRemainingMillisToIdleTimeout();
    }
    
    public String getId() {
        return .getId();
    }
    
    public int getLocalPort() {
        return .getLocalPort();
    }
    
    public InetAddress getLocalAddress() {
        return .getLocalAddress();
    }
    
    public InetAddress getRemoteAddress() {
        return .getRemoteAddress();
    }
    
    public int getRemotePort() {
        return .getRemotePort();
    }
    
    public Object getOption(String namethrows IOException {
        return .getOption(name);
    }
    
    @SuppressWarnings("unchecked")
    public Map<StringClassgetOptions() {
        return .getOptions();
    }
    
    public void setOption(String nameObject valuethrows IOException {
        .setOption(namevalue);
    }
    
    
    
 
    
    @Execution(Execution.NONTHREADED)
    private final class WebSocketProtocolHandler implements IConnectHandler, IDataHandler, IDisconnectHandler {
        // network data
        private ByteBuffer rawBuffer = null;
        
        boolean isReading() {
            return !HttpUtils.isEmpty();
        }
        
        public boolean onConnect(INonBlockingConnection connectionthrows IOExceptionBufferUnderflowException, MaxReadSizeExceededException {
            WebSocketHandlerAdapter adapter = .get();
            if (adapter != null) {
                adapter.onConnect(WebSocketConnection.this);
            }
            
            return true;
        }
        
        
        public boolean onData(INonBlockingConnection connectionthrows IOExceptionBufferUnderflowExceptionClosedChannelException, MaxReadSizeExceededException {
            
            if (connection.isOpen()) {
                // copying available network data into raw data buffer
                int available = connection.available();
                
                if (available > 0) {
                    ByteBuffer[] data = connection.readByteBufferByLength(available);
                    onData(data);
                }   
            }      
            
            return true;
        }
        
        
        void onData(ByteBuffer[] datathrows IOException {
            if ( == null) {
                 = HttpUtils.merge(data);
            } else {
                 = HttpUtils.merge(data);
            }
            
            parse();
            
            if (!.hasRemaining()) {
                 = null;
            }
        }
        
        
        void parse(ByteBuffer bufferthrows IOException {
            while (buffer.hasRemaining()) {
                
                WebSocketMessage msg = WebSocketMessage.parse(buffer);
                
                if (msg == null) {
                    return;
                    
                } else {
                    synchronized () {
                        ++;
                        .add(msg);
                        .notifyAll();
                    }
                    
                    WebSocketHandlerAdapter adapter = .get();
                    if (adapter != null) {
                        adapter.onMessage(WebSocketConnection.this);
                    }
                }
            }
        }
        
        
        public boolean onDisconnect(INonBlockingConnection connectionthrows IOException {
            WebSocketHandlerAdapter adapter = .get();
            if (adapter != null) {
                adapter.onDisconnect(WebSocketConnection.this);
            }
            
            return true;
        }   
    }
    
    
    private static interface IPostWriteInterceptor {
        
        void onPreWrite() throws IOException ;
    }
}
New to GrepCode? Check out our FAQ X