Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (c) 2007-2014 Kaazing Corporation. All rights reserved. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 
 package org.kaazing.gateway.transport.sse;
 
 import static java.lang.String.format;
 
 import java.net.URI;
 
 
 
 public class SseConnector extends AbstractBridgeConnector<SseSession> {
 
     private static final TypedAttributeKey<Callable<SseSession>> SSE_SESSION_FACTORY_KEY = new TypedAttributeKey<Callable<SseSession>>(SseConnector.class"sseSessionFactory");
     private static final TypedAttributeKey<ConnectFutureSSE_CONNECT_FUTURE_KEY = new TypedAttributeKey<ConnectFuture>(SseConnector.class"sseConnectFuture");
     private static final TypedAttributeKey<SseSessionSSE_SESSION_KEY = new TypedAttributeKey<SseSession>(SseConnector.class"sseSession");
 
     private static final String CODEC_FILTER = . + "#codec";
     private static final String FAULT_LOGGING_FILTER = . + "#fault";
     private static final String TRACE_LOGGING_FILTER = . + "#logging";
 
     private static final String LOGGER_NAME = String.format("transport.%s.connect".);
 
 	private final Logger logger = LoggerFactory.getLogger();
 
 
 
     @Resource(name = "bridgeServiceFactory")
     public void setBridgeServiceFactory(BridgeServiceFactory bridgeServiceFactory) {
         this. = bridgeServiceFactory;
     }
 
     private IoFilter sseCodec;
 
     public SseConnector() {
         super(new DefaultSseSessionConfig());
     }
 
     @Resource(name = "schedulerProvider")
     public void setSchedulerProvider(SchedulerProvider provider) {
         this. = provider.getScheduler("SseConnector_reconnect"false);
     }
    @Resource(name = "resourceAddressFactory")
    public void setResourceAddressFactory(ResourceAddressFactory factory) {
        this. = factory;
    }
    @Override
        return new DefaultTransportMetadata(.SseSessionConfig.class);
    }
    @Override
    public void init() {
        super.init();
         = new SseConnectCodecFilter();
    }
    @Override
    public void addBridgeFilters(IoFilterChain filterChain) {
        // setup default filters for bridge session
        if (.isTraceEnabled()) {
            filterChain.addFirst(new ObjectLoggingFilter(. + "#%s"));
        } else if (.isDebugEnabled()) {
            filterChain.addFirst(new ExceptionLoggingFilter(. + "#%s"));
        }
        filterChain.addLast();
    }
    @Override
    public void removeBridgeFilters(IoFilterChain filterChain) {
        removeFilter(filterChain);
        if (filterChain.contains()) {
            filterChain.remove();
        } else if (filterChain.contains()) {
            filterChain.remove();
        }
    }
    @Override
    protected boolean canConnect(String transportName) {
        return transportName.equals("sse");
    }
    @Override
    protected <T extends ConnectFutureConnectFuture connectInternal(ResourceAddress connectAddressIoHandler handler,
                                                                      final IoSessionInitializer<T> initializer) {
        final DefaultConnectFuture sseConnectFuture = new DefaultConnectFuture();
        // propagate connection failure, if necessary
        IoFutureListener<ConnectFutureparentConnectListener = new IoFutureListener<ConnectFuture>() {
            @Override
            public void operationComplete(ConnectFuture future) {
                // fail bridge connect future if parent connect fails
                if (!future.isConnected()) {
                    sseConnectFuture.setException(future.getException());
                }
            }
        };
        IoSessionInitializer<ConnectFutureparentInitializer = createParentInitializer(connectAddresshandlerinitializer,
                sseConnectFuture);
        final ResourceAddress transportAddress = connectAddress.getTransport();
        if (transportAddress == null) {
            throw new RuntimeException("Cannot find transport for resource address "+connectAddress);
        }
        BridgeConnector connector = .newBridgeConnector(transportAddress);
        connector.connect(transportAddress,
                          selectConnectHandler(transportAddress),
                          parentInitializer).addListener(parentConnectListener);
        return sseConnectFuture;
    }
    private IoHandler selectConnectHandler(ResourceAddress address) {
        Protocol protocol = .getTransportFactory().getProtocol(address.getResource());
        if ( protocol instanceof HttpProtocol ) {
            return ;
        }
        throw new RuntimeException(getClass()+
                ": Cannot select a connect handler for address "+address);
    }
    @Override
    protected IoFuture dispose0() throws Exception {
        .shutdownNow();
        return super.dispose0();
    }
    private <T extends ConnectFutureIoSessionInitializer<ConnectFuturecreateParentInitializer(final ResourceAddress connectAddress,
            final IoHandler handlerfinal IoSessionInitializer<T> initializerfinal DefaultConnectFuture sseConnectFuture) {
        // initialize parent session before connection attempt
        return new IoSessionInitializer<ConnectFuture>() {
            @Override
            public void initializeSession(final IoSession parentConnectFuture future) {
                // initializer for bridge session to specify bridge handler,
                // and call user-defined bridge session initializer if present
                final IoSessionInitializer<T> sseSessionInitializer = new IoSessionInitializer<T>() {
                    @Override
                    public void initializeSession(IoSession session, T future) {
                        SseSession sseSession = (SseSessionsession;
                        sseSession.setHandler(handler);
                        if (initializer != null) {
                            initializer.initializeSession(sessionfuture);
                        }
                    }
                };
                final HttpSession httpSession = (HttpSessionparent;
                final IoBufferAllocatorEx<SseBufferallocator = new SseBufferAllocator(httpSession.getBufferAllocator());
                // factory to create a new bridge session
                Callable<SseSessioncreateSession = new Callable<SseSession>() {
                    @Override
                    public SseSession call() throws Exception {
                        Callable<SseSessionsseSessionFactory = new Callable<SseSession>() {
                            @Override
                            public SseSession call() throws Exception {
								return new SseSession(SseConnector.thisgetProcessor(), connectAddressconnectAddresshttpSessionallocator);
                            }
                        };
                        return newSession(sseSessionInitializersseConnectFuturesseSessionFactory);
                    }
                };
                .set(httpSessioncreateSession);
                .set(httpSessionsseConnectFuture);
            }
        };
    }
    private void reconnectOrClose(final SseSession sseSession) {
        SseSessionConfig config = sseSession.getConfig();
        int retry = config.getRetry();
        if (retry > 0 || config.isReconnecting()) {
            .debug("Reconnecting: {}"sseSession.getRemoteAddress());
            config.setReconnecting(false);
            if (retry <= 0) {
                // reconnect immediately
                ResourceAddress connectAddress = sseSession.getRemoteAddress();
                ReconnectListener connectListener = new ReconnectListener(sseSession);
                final ResourceAddress transportAddress = connectAddress.getTransport();
                BridgeConnector connector = .newBridgeConnector(transportAddress);
                connector.connect(connectAddressnull).addListener(connectListener);
            } else {
                // reconnect after "retry" milliseconds
                .schedule(new ReconnectCommand(sseSession), retry.);
            }
        }
        else {
            sseSession.reset(new Exception("Early termination of IO session").fillInStackTrace());
        }
    }
    private IoHandler httpHandler = new IoHandlerAdapter<HttpSession>() {
        @Override
        protected void doSessionOpened(HttpSession sessionthrows Exception {
            // TODO session.get[Ready]Future().addListener(...) to check
            // response status / headers
            IoFilterChain filterChain = session.getFilterChain();
            addBridgeFilters(filterChain);
            SseSession sseSession = .get(session);
            if (sseSession == null) {
                Callable<SseSessionsessionFactory = .remove(session);
                SseSession newSseSession = sessionFactory.call();
                .set(sessionnewSseSession);
            }
        }
        @Override
        protected void doMessageReceived(final HttpSession sessionObject messagethrows Exception {
            SseMessage sseMessage = (SseMessagemessage;
            String type = sseMessage.getType();
            IoBufferEx data = sseMessage.getData();
            String id = sseMessage.getId();
            boolean reconnect = sseMessage.isReconnect();
            int retry = sseMessage.getRetry();
            SseSession sseSession = .get(session);
            SseSessionConfig config = sseSession.getConfig();
            config.setReconnecting(reconnect);
            if (retry >= 0) {
                config.setRetry(retry);
            }
            if (id != null) {
                config.setLastId(id);
            }
            if (data != null && data.hasRemaining() && (type == null || "message".equals(type))) {
                IoFilterChain filterChain = sseSession.getFilterChain();
                filterChain.fireMessageReceived(data);
            }
        }
        @Override
        protected void doSessionClosed(HttpSession sessionthrows Exception {
            final SseSession sseSession = .get(session);
            assert (sseSession != null);
            // TODO: move redirect handling to HttpConnector (optionally)
            switch (session.getStatus()) {
            case :
                String location = session.getReadHeader("Location");
                if (location == null) {
                    sseSession.reset(new Exception("Redirect attempted without Location header").fillInStackTrace());
                } else {
                    URI locationURI = URI.create(location);
                    ResourceAddress newConnectAddress = .newResourceAddress(locationURI);
                    BridgeConnector connector = .newBridgeConnector(newConnectAddress);
                    connector.connect(newConnectAddressnew IoSessionInitializer<ConnectFuture>() {
                        @Override
                        public void initializeSession(IoSession sessionConnectFuture future) {
                            .set(sessionnew Callable<SseSession>() {
                                @Override
                                public SseSession call() throws Exception {
                                    return sseSession;
                                }
                            });
                        }
                    }).addListener(new ReconnectListener(sseSession));
                }
                break;
            default:
                reconnectOrClose(sseSession);
                break;
            }
        }
        @Override
        protected void doExceptionCaught(HttpSession sessionThrowable causethrows Exception {
            SseSession sseSession = .get(session);
            if (sseSession != null && !sseSession.isClosing()) {
                // behave similarly to connection reset by peer at NIO layer
                sseSession.reset(cause);
            }
            else {
                ConnectFuture sseConnectFuture = .remove(session);
                if (sseConnectFuture != null) {
                    sseConnectFuture.setException(cause);
                }
                else {
                    if (.isDebugEnabled()) {
                        String message = format("Error on WebSocket connection attempt: %s"cause);
                        if (.isTraceEnabled()) {
                            // note: still debug level, but with extra detail about the exception
                            .debug(messagecause);
                        }
                        else {
                            .debug(message);
                        }
                    }
                }
                session.close(true);
            }
        }
    };
    private class ReconnectCommand implements Runnable {
        private final SseSession sseSession;
        public ReconnectCommand(SseSession sseSession) {
            this. = sseSession;
        }
        @Override
        public void run() {
            ResourceAddress connectAddress = .getRemoteAddress();
            ReconnectListener connectListener = new ReconnectListener();
            BridgeConnector connector = .newBridgeConnector(connectAddress);
            connector.connect(connectAddressnull).addListener(connectListener);
        }
    }
    private final class ReconnectListener implements IoFutureListener<ConnectFuture> {
        private final SseSession sseSession;
        private ReconnectListener(SseSession sseSession) {
            this. = sseSession;
        }
        @Override
        public void operationComplete(ConnectFuture future) {
            if (future.isConnected()) {
                IoSession session = future.getSession();
                session.setAttribute();
                .debug("Reconnected: {}".getRemoteAddress());
            } else {
                reconnectOrClose();
            }
        }
    }
New to GrepCode? Check out our FAQ X