Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
  Kontraktor-Http Copyright (c) Ruediger Moeller, All rights reserved.
  
  This library is free software; you can redistribute it and/or
  modify it under the terms of the GNU Lesser General Public
  License as published by the Free Software Foundation; either
  version 3.0 of the License, or (at your option) any later version.
  
  This library is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 Lesser General Public License for more details.
 
 See https://www.gnu.org/licenses/lgpl.txt
 */
 
 package org.nustaq.kontraktor.remoting.websockets;
 
Created by ruedi on 10/05/15. Publishes an actor as a websocket server using Undertow.
 
 
     String host;
     String path;
     int port;
 
     public UndertowWebsocketServerConnector(String pathint portString host) {
         this. = path;
         this. = port;
         this. = host;
     }
 
     @Override
     public void connect(Actor facadeFunction<ObjectSocketObjectSinkfactorythrows Exception {
         PathHandler server = getServer().getFirst();
         server.addExactPath(
             ,
             Handlers.websocket((exchangechannel) -> { // connection callback
                Runnable runnable = () -> {
                    UTWebObjectSocket objectSocket = new UTWebObjectSocket(exchangechannel);
                    ObjectSink sink = factory.apply(objectSocket);
                    objectSocket.setSink(sink);
 
                    channel.getReceiveSetter().set(new AbstractReceiveListener() {
                        @Override
                        protected void onCloseMessage(CloseMessage cmWebSocketChannel channel) {
                            try {
                                channel.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                            sink.sinkClosed();
                            try {
                                objectSocket.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                            channel.getReceiveSetter().set(null);
                        }
 
                        @Override
                        protected void onError(WebSocketChannel channelThrowable error) {
                            sink.sinkClosed();
                        }
 
                        @Override
                        protected void onFullTextMessage(WebSocketChannel channelBufferedTextMessage messagethrows IOException {
                            String data = message.getData();
                            byte[] bytez = data.getBytes("UTF-8");
                            sink.receiveObject(objectSocket.getConf().asObject(bytez), null);
                        }
 
                        @Override
                        protected void onFullBinaryMessage(WebSocketChannel channelBufferedBinaryMessage messagethrows IOException {
                            ByteBuffer[] data = message.getData().getResource();
                            byte[] bytez = Buffers.take(data, 0, data.length);
                            sink.receiveObject(objectSocket.getConf().asObject(bytez), null);
                       }
                   });
               };
            //                runnable.run();
               facade.execute(runnable);
               channel.resumeReceives();
            })
        );
    }
    protected Pair<PathHandlerUndertowgetServer(int port) {
        String hostName = this.;
        return Http4K.get().getServer(porthostName);
    }
    @Override
    public IPromise closeServer() {
        getServer().getSecond().stop();
        return new Promise(null);
    }
    static class UTWebObjectSocket extends WebObjectSocket {
        protected WebSocketChannel channel;
        protected WebSocketHttpExchange ex;
        protected WeakReference<ObjectSinksink;
        public UTWebObjectSocket(WebSocketHttpExchange exWebSocketChannel channel) {
            this. = ex;
            this. = channel;
        }
        @Override
        public void sendBinary(byte[] message) {
            WebSockets.sendBinary(ByteBuffer.wrap(message), new WebSocketCallback() {
                @Override
                public void complete(WebSocketChannel channelObject context) {
                }
                @Override
                public void onError(WebSocketChannel channelObject contextThrowable throwable) {
                    setLastError(throwable);
                    try {
                         = true;
                        UTWebObjectSocket.this.close();
                    } catch (IOException e) {
                        FSTUtil.<RuntimeException>rethrow(e);
                    }
                }
            });
        }
        @Override
        public void close() throws IOException {
            .getReceiveSetter().set(null);
            .close();
            ObjectSink objectSink = .get();
            if (objectSink != null )
                objectSink.sinkClosed();
             = null;
             = null;
             = null;
        }
        public void setSink(ObjectSink sink) {
            this. = new WeakReference<ObjectSink>(sink);
        }
        public ObjectSink getSink() {
            return .get();
        }
    }
New to GrepCode? Check out our FAQ X