Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (c) 2007-2014 Kaazing Corporation. All rights reserved. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 
 package org.kaazing.gateway.transport.wsr;
 
 import static java.lang.String.format;
 import static java.lang.Thread.currentThread;
 import static java.util.Arrays.asList;
 import static org.kaazing.gateway.resource.address.ResourceAddress.NEXT_PROTOCOL;
 import static org.kaazing.gateway.resource.address.ResourceAddress.TRANSPORT;
 import static org.kaazing.gateway.resource.address.URLUtils.appendURI;
 import static org.kaazing.gateway.resource.address.URLUtils.ensureTrailingSlash;
 import static org.kaazing.gateway.resource.address.URLUtils.truncateURI;
 import static org.kaazing.gateway.resource.address.ws.WsResourceAddress.EXTENSIONS;
 import static org.kaazing.gateway.transport.ws.extension.WsExtensionUtils.negotiateWebSocketExtensions;
 import static org.kaazing.mina.core.future.DefaultUnbindFuture.combineFutures;
 
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
 
 
    // TODO: make this setting available via configuration, with a reasonable default
    static final long TIME_TO_TIMEOUT_CONNECT_MILLIS = ..toMillis(60L);
    static final AttributeKey TIMEOUT_FUTURE_KEY = new AttributeKey(WsrAcceptor.class"timeoutFuture");
    private static final Charset UTF_8 = Charset.forName("UTF-8");
    private static final String CREATE_SUFFIX = "/;e/cr";
    private final RtmpChunkCodecFilter codec;
    private static final TypedAttributeKey<WsrSessionSESSION_KEY = new TypedAttributeKey<>(WsrAcceptor.class"session");
    private static final AttributeKey HTTP_REQUEST_URI_KEY = new AttributeKey(WsrAcceptor.class"httpRequestURI");
    private static final int COMMAND_STREAM_ID = 3;
    private static final String FAULT_LOGGING_FILTER = "wsn#fault";
    private static final String TRACE_LOGGING_FILTER = "wsn#logging";
    private static final String LOGGER_NAME = String.format("transport.%s.accept".);
    private final Logger logger = LoggerFactory.getLogger();

    
This map is used to find WsrSessions from a RTMP connection
    private final Map<URIWsrSessionsessionMap;
    public WsrAcceptor() {
        super(new DefaultIoSessionConfigEx());
         = new ConcurrentHashMap<>();
         = new RtmpChunkCodecFilter();
    }
    @Resource(name = "bridgeServiceFactory")
    public void setBridgeServiceFactory(BridgeServiceFactory bridgeServiceFactory) {
        this. = bridgeServiceFactory;
    }
    @Resource(name = "resourceAddressFactory")
    public void setResourceAddressFactory(ResourceAddressFactory resourceAddressFactory) {
        this. = resourceAddressFactory;
    }
    @Resource(name = "schedulerProvider")
    public void setSchedulerProvider(SchedulerProvider provider) {
        this. = provider.getScheduler("Timeout-wsr"false);
    }
    @Override
    protected WsrBindings initBindings() {
        return new WsrBindings();
    }
    /* for test observability only */
    WsrBindings bindings() {
        return (WsrBindingssuper.;
    }
    @Override
    protected IoProcessorEx<WsrSessioninitProcessor() {
        return new WsrAcceptProcessor();
    }
    @Override
    protected boolean canBind(String transportName) {
        return transportName.equals("wsr") || transportName.equals("ws");
    }
    String createResolvePath(URI httpUrifinal String suffixWithLeadingSlash) {
        return appendURI(ensureTrailingSlash(httpUri),suffixWithLeadingSlash).getPath();
    }
    @Override
    protected <T extends IoFuturevoid bindInternal(final ResourceAddress address,
                                                     IoHandler handlerBridgeSessionInitializer<T> initializer) {
        try {
            bindCreateAddress(addressinitializer);
            bindRtmpAddress(addresshandlerinitializer);
        } catch (NioBindException e) {
            throw new RuntimeException("Unable to bind address "+address+": "+e.getMessage(),e);
        }
    }
    @Override
    protected UnbindFuture unbindInternal(ResourceAddress addressIoHandler handler,
                                          BridgeSessionInitializer<? extends IoFutureinitializer) {
        UnbindFuture future = unbindCreateAddress(address);
        return combineFutures(futureunbindRtmpAddress(address));
    }
    private <T extends IoFuturevoid bindCreateAddress(final ResourceAddress address,
                                                        BridgeSessionInitializer<T> initializer) {
        // bind the create URL accessed using HTTP transport
        ResourceAddress wsrCreateAddress = createWsrCreateAddress(address);
        BridgeAcceptor createAcceptor = .newBridgeAcceptor(wsrCreateAddress);
        final BridgeSessionInitializer<T> theCreateInitializer = (initializer != null)
                ? initializer.getParentInitializer(.)
                : null;
        BridgeSessionInitializer<T> wrapperInitializer = new BridgeSessionInitializerAdapter<T>() {
            @Override
            public void initializeSession(IoSession session, T future) {
                if ( theCreateInitializer != null ) {
                    theCreateInitializer.initializeSession(sessionfuture);
                }
                // Store the next over-the-top of-websocket protocols the server supports for this address on session.
                .set(sessionaddress.getOption(.));
            }
        };
        createAcceptor.bind(wsrCreateAddress,
                            selectCreateHandler(wsrCreateAddress),
                            wrapperInitializer);
    }
    private <T extends IoFuturevoid bindRtmpAddress(ResourceAddress address,
                                                      IoHandler handler,
                                                      BridgeSessionInitializer<T> initializer) {
        // TODO: avoid the global bind for all RTMP streams
        //       instead, dynamically bind RTMP streams, like WSE upstream & downstream
        //       note: may require an RTMP transport
        ResourceAddress wsrRtmpAddress = createWsrRtmpAddress(address);
        // add bind mapping for the wsr rtmp address
        addWsrRtmpBinding(wsrRtmpAddresshandlerinitializer);
        // bind the wsr rtmp address transport
        BridgeAcceptor acceptor = .newBridgeAcceptor(wsrRtmpAddress.getTransport());
        BridgeSessionInitializer<T> theInitializer = (initializer != null)
                ? initializer.getParentInitializer(.)
                : null;
        acceptor.bind(wsrRtmpAddress.getTransport(), theInitializer);
    }
    private IoHandler selectCreateHandler(ResourceAddress transport) {
        Protocol protocol = .getTransportFactory().getProtocol(transport.getResource());
        if ( protocol instanceof HttpProtocol) {
            return ;
        }
        throw new RuntimeException("Unable to locate a WSR create handler for address "+transport);
    }
    @Override
    protected IoFuture dispose0() throws Exception {
        .shutdownNow();
        return super.dispose0();
    }
    private UnbindFuture unbindCreateAddress(final ResourceAddress address) {
        // bind the create URL accessed using HTTP transport
        ResourceAddress wsrCreateAddress = createWsrCreateAddress(address);
        BridgeAcceptor createAcceptor = .newBridgeAcceptor(wsrCreateAddress);
        return createAcceptor.unbind(wsrCreateAddress);
    }
    private UnbindFuture unbindRtmpAddress(ResourceAddress address) {
        ResourceAddress wsrRtmpAddress = createWsrRtmpAddress(address);
        removeWsrRtmpBinding(wsrRtmpAddress);
        BridgeAcceptor acceptor = .newBridgeAcceptor(wsrRtmpAddress.getTransport());
        return acceptor.unbind(wsrRtmpAddress.getTransport());
    }
        ResourceAddress wsrTransportAddress = address.getTransport();
        URI wsrTransportBindURI = wsrTransportAddress.getResource();
        return wsrTransportAddress.resolve(createResolvePath(wsrTransportBindURI));
    }
        // wsr[wsr/1.0] | tcp  or wsr[wsr/1.0] | ssl
        // so find http[http/1.1] layer's transport either way and you'll get ssl or tcp.
        ResourceAddress wsrRtmpTransportAddress;
        wsrRtmpTransportAddress = address.findTransport("http[http/1.1]").getTransport();
        ResourceOptions wsrRtmpTransportAddressOptions =
                ..newResourceOptions(wsrRtmpTransportAddress);
        wsrRtmpTransportAddressOptions.setOption("rtmp/1.0");
        wsrRtmpTransportAddressOptions.setOption(.null);
        wsrRtmpTransportAddress = .newResourceAddress(wsrRtmpTransportAddress.getExternalURI(),
                                                                            wsrRtmpTransportAddressOptions);
        ResourceOptions wsrRtmpAddressOptions = ..newResourceOptions(address);
        wsrRtmpAddressOptions.setOption(wsrRtmpTransportAddress);
        return .newResourceAddress(address.getExternalURI(), wsrRtmpAddressOptions);
    }
    private <T extends IoFuturevoid addWsrRtmpBinding(ResourceAddress wsrRtmpAddressIoHandler handler,
                                                        BridgeSessionInitializer<T> initializer) {
        Bindings.Binding newBinding = new Bindings.Binding(wsrRtmpAddresshandlerinitializer);
        Bindings.Binding oldBinding = .addBinding(newBinding);
        if (oldBinding != null) {
            throw new RuntimeException("Unable to bind address " + wsrRtmpAddress
                                               + " because it collides with an already bound address " + oldBinding.bindAddress());
        }
    }
    private void removeWsrRtmpBinding(ResourceAddress wsrRtmpAddress) {
        Bindings.Binding binding = .getBinding(wsrRtmpAddress);
        .removeBinding(wsrRtmpAddressbinding);
    }
    private static final TypedAttributeKey<String[]> SUPPORTED_PROTOCOLS
            = new TypedAttributeKey<>(WsrAcceptor.class"supportedProtocols");
    private final WsrCreateHandler wsrCreateHandler = new WsrCreateHandler();
    private class WsrCreateHandler extends IoHandlerAdapter<HttpAcceptSession> {
        @Override
        protected void doSessionOpened(final HttpAcceptSession session)
                throws Exception {
            // validate WebSocket version
            String wsrVersion = session.getReadHeader("X-WebSocket-Version");
            if (!"wsr-1.0".equals(wsrVersion)) {
                session.setStatus(.);
                session.setReason("WebSocket-Version not supported");
                session.close(false);
                return;
            }
        	// negotiate WebSocket protocol
            List<StringwsProtocols = session.getReadHeaders("X-WebSocket-Protocol");
            String wsProtocol = null;
            try {
                wsProtocol = WsUtils.negotiateWebSocketProtocol(session,
                        "X-WebSocket-Protocol",
                        wsProtocols,
                        asList(.remove(session)));
            } catch (WsHandshakeNegotiationException e) {
                return;
            }
            // find (based on this http session) the local address for the WS session
            // we are about to upgrade to.
            ResourceAddress resourceAddress = getWsrLocalAddress(session.wsProtocol);
            // fallback to null protocol as a workaround until we properly inject next protocol from service during bind
            // This is safe as we guard this logic via negotiateWebSocketProtocol function
            // If the client send any bogus protocol that is not in the list of supported protocols,
            // we will fail fast before getting here
            if (resourceAddress == null) {
                wsProtocol = null;
                resourceAddress = getWsrLocalAddress(session.wsProtocol);
            }
            final String wsProtocol0 = wsProtocol;
            final ResourceAddress wsrLocalAddress = resourceAddress;
            // negotiate WebSocket extensions
            List<StringclientRequestedWsExtensions =  session.getReadHeaders(.);
            // null next-protocol from client gives null local address when we only have explicitly named next-protocol binds
            List<StringwsExtensions = (wsrLocalAddress != null) ? wsrLocalAddress.getOption() : .defaultValue();
            WsExtensionNegotiationResult extNegotiationResult =
                negotiateWebSocketExtensions(
                wsrLocalAddresssession,
                .,
                clientRequestedWsExtensionswsExtensions);
            if (extNegotiationResult.isFailure()) {
                // This happens when the extension negotiation leads to
                // a fatal failure; the session should be closed because
                // the service REQUIRED some extension that the client
                // did not request.
                if (.isDebugEnabled()) {
                    if (.isDebugEnabled()) {
                        URI requestURI = session.getRequestURL();
                        .debug(String.format(
                                "Rejected %s request for URI \"%s\" on session '%s': failed to negotiate client requested extensions '%s'",
                                session.getMethod(), requestURIsessionclientRequestedWsExtensions));
                    }
                }
                session.setStatus(.);
                session.setReason("WebSocket Extensions not found");
                session.close(false);
                return;
            }
            final ActiveWsExtensions wsExtensions0 = extNegotiationResult.getExtensions();
            URI request = session.getRequestURL();
            //URI pathInfo = session.getPathInfo();
            String sessionId = HttpUtils.newSessionId();
            String scheme = request.getScheme();
            String path = request.getPath();
            String acceptPath = path.substring(0, path.length() - .length());
            Protocol protocol = .getTransportFactory().getProtocol(scheme);
            if (protocol.isSecure()) {
                scheme = "rtmps";
            } else {
                scheme = "rtmp";
            }
            final String sessionIdSuffix = '/' + sessionId;
            // Use explicit RTMP port for HTTP default ports
            String authority = HttpUtils.getHostAndPort(request.getAuthority(), protocol.isSecure());
            final URI rtmpAddress = new URI(schemeauthorityacceptPath + sessionIdSuffixrequest.getQuery(), request.getFragment());
            // create new address to use as key and session remote address
            final ResourceAddress remoteAddress =
                    .newResourceAddress(
                        wsrLocalAddress.getExternalURI());
            final ResourceAddress localAddress =
                    .newResourceAddress(
                            wsrLocalAddress.getResource());
            ResourceAddress httpCreateAddress = session.getLocalAddress();
            URI httpCreateURI = httpCreateAddress.getResource();
            final URI httpUri = session.getRequestURL();
            if ( !httpUri.getPath().contains()) {
                throw new IllegalStateException("Session created with unexpected URL: "+httpUri.toASCIIString());
            }
            final WsrSession wsrSession = newSession(
                    new IoSessionInitializer<IoFuture>() {
                        @Override
                        public void initializeSession(IoSession wsSession,
                                IoFuture future) {
                            wsSession.setAttribute(.session.getAttribute(.));
                            wsSession.setAttribute(session.getRequestURL());
                            ((AbstractWsBridgeSession)wsSession).setSubject(session.getSubject());
                            wsSession.setAttribute(.wsProtocol0);
                            wsExtensions0.set(wsSession);
                        }
                    }, new Callable<WsrSession>() {
                        @Override
                        public WsrSession call() {
                            IoBufferAllocatorEx<?> parentAllocator = session.getBufferAllocator();
                            WsrBufferAllocator wsrAllocator = new WsrBufferAllocator(parentAllocator);
                            ResultAwareLoginContext loginContext = (ResultAwareLoginContextsession.getAttribute(.);
                            WsrSession wsrSession = new WsrSession(
                                    session.getIoLayer(), session.getIoThread(), session.getIoExecutor(), WsrAcceptor.thisgetProcessor(),
                                    localAddressremoteAddresswsrAllocatorloginContext.getLoginResult(), wsExtensions0);
                            wsrSession.setBridgeServiceFactory();
                            wsrSession.setResourceAddressFactory();
                            wsrSession.setScheduler();
                            IoHandler handler = getHandler(localAddress);
                            wsrSession.setHandler(handler);
                            wsrSession.suspendWrite();
                            return wsrSession;
                        }
                    });
            .put(rtmpAddresswsrSession);
            // write response that session was created and pass redirect urls
            session.setWriteHeader("Content-Type""text/plain");
            session.setStatus(.);
            IoBufferAllocatorEx<?> allocator = session.getBufferAllocator();
            ByteBuffer nioBuf = allocator.allocate(256);
            IoBufferEx buf = allocator.wrap(nioBuf).setAutoExpander(allocator);
            CharsetEncoder utf8Encoder = .newEncoder();
            buf.putString(rtmpAddress.toASCIIString(), utf8Encoder);
            buf.flip();
            session.setWriteHeader("Content-Length", Integer.toString(buf.remaining()));
            session.write(buf);
            // TODO: use session.writeComplete() instead
            session.close(false);
            // timeout session if rtmp(s) connection is never established
            ScheduledFuture<?> timeoutFuture = .schedule(wsrSession.getTimeoutCommand(), .);
            wsrSession.setAttribute(timeoutFuture);
            // Cancel commands and clear session maps when a session is closed - avoid session reference leak.
            CloseFuture closeFuture = wsrSession.getCloseFuture();
            closeFuture.addListener(new IoFutureListener<CloseFuture>() {
                @Override
                public void operationComplete(CloseFuture future) {
                    if (.isTraceEnabled()) {
                        .trace(WsrAcceptor.class.getSimpleName() + " removing enforcement of lifetime for closed session (" + wsrSession.getId() + ").");
                    }
                    .remove(rtmpAddress);
                    wsrSession.shutdownScheduledCommands();
                    wsrSession.logout();
                }
            });
        }
//        private AcceptOptionsContext getAcceptOptionsContext(IoSession session) {
//            ServiceRegistration sr = (ServiceRegistration) session
//                                .getAttribute(HttpAcceptor.SERVICE_REGISTRATION_KEY);
//            return sr.getServiceContext().getAcceptOptionsContext();
//        }
        protected ResourceAddress getWsrLocalAddress(HttpAcceptSession session,
                                                    final String schemeName,
                                                    String nextProtocol) {
            URI resource = session.getLocalAddress().getResource();
            if (resource.getPath().endsWith()) {
                resource = truncateURI(resource);
            }
            ResourceOptions options = ..newResourceOptions();
            options.setOption(session.getLocalAddress().resolve(resource.getPath()));
            options.setOption(nextProtocol);
            URI wsLocalAddressLocation = URLUtils.modifyURIScheme(resource,
                                                                  schemeName);
            ResourceAddress candidate = .newResourceAddress(
                    wsLocalAddressLocationoptions);
            Bindings.Binding binding = .getBinding(candidate);
            if (binding == null) {
                if (.isDebugEnabled()) {
                    .debug("\n***Did NOT find local address for WSR session:" +
                                 "\n***using candidate:\n" +
                                 candidate +
                                 "\n***with bindings " +
                                 );
                }
                return null;
            }
            if (.isTraceEnabled()) {
                .trace("\n***Found local address for WSR session:\n" +
                             binding.bindAddress() +
                             "\n***via candidate:\n" +
                             candidate +
                             "\n***with bindings " +
                             );
            }
            return binding.bindAddress();
        }
    }
    private IoHandler ioBridgeHandler = new IoHandlerAdapter<IoSessionEx>() {
        @Override
        protected void doExceptionCaught(IoSessionEx sessionThrowable causethrows Exception {
            WsrSession wsrSession = .get(session);
            if (wsrSession != null && !wsrSession.isClosing()) {
                wsrSession.reset(cause);
            }
            else {
                if (.isDebugEnabled()) {
                    String message = format("Error on WebSocket (WSR) connection, closing connection: %s"cause);
                    if (.isTraceEnabled()) {
                        // note: still debug level, but with extra detail about the exception
                        .debug(messagecause);
                    }
                    else {
                        .debug(message);
                    }
                }
                session.close(true);
            }
        }
        @Override
        protected void doMessageReceived(IoSessionEx sessionObject message)
                throws Exception {
            // KG-3160: avoid causing an exception and an ugly log message if the message was not decoded
            // because the filter chain is being torn down due to a race with session close
            if (session.isClosing() && !(message instanceof RtmpMessage)) {
                if (.isDebugEnabled()) {
                    .debug("WsrAcceptor: ignoring wrongly typed message {} since session is closing"message);
                }
                return;
            }
            final WsrSession wsrSession = .get(session);
            RtmpMessage rtmpMessage = (RtmpMessagemessage;
            switch (rtmpMessage.getKind()) {
            case :
                // TODO assert version is 3
                session.write(new RtmpVersionMessage());
                session.write(new RtmpHandshakeRequestMessage());
                break;
            case :
                RtmpHandshakeMessage handshake = (RtmpHandshakeMessagertmpMessage;
                // For now, just set the returning timestamp to t+1
                handshake.setTimestamp2(handshake.getTimestamp1() + 1);
                session.write(handshake);
                break;
            case :
                // avoid unnecessary fragmentation
                RtmpSetChunkSizeMessage setChunkSize = new RtmpSetChunkSizeMessage();
                setChunkSize.setChunkStreamId(2);
                setChunkSize.setMessageStreamId(0);
                setChunkSize.setChunkSize(.);
                session.write(setChunkSize);
                break;
            case :
                RtmpStreamMessage streamMessage = (RtmpStreamMessagemessage;
                switch (streamMessage.getStreamKind()) {
                case :
                    RtmpCommandMessage commandMessage = (RtmpCommandMessagertmpMessage;
                    int messageStreamId = commandMessage.getMessageStreamId();
                    switch (commandMessage.getCommandKind()) {
                    case :
                        doHandleConnect(session, (RtmpConnectCommandMessagecommandMessage);
                        break;
                    case :
                        doHandleCreateStream(session, (RtmpCreateStreamCommandMessagecommandMessage);
                        break;
                    case :
                        // TODO close session
                        break;
                    case :
                        wsrSession.setDownstreamId(messageStreamId);
                        doHandlePlayStream(session, (RtmpPlayCommandMessagecommandMessage);
                        //TODO: If we deferred session created until after the RTMP handshake has completed,
                        //TODO: then there is no need for suspendWrite/resumeWrite (no early writes would be possible)
                        wsrSession.resumeWrite();
                        if (currentThread() == wsrSession.getIoThread()) {
                            // We are always aligned now. if (session.isIoAligned()) {
                            wsrSession.getProcessor().flush(wsrSession);
                        } else {
                            wsrSession.getIoExecutor().execute(new Runnable() {
                                @Override
                                public void run() {
                                    // We are always aligned now. if (session.isIoAligned()) {
                                    wsrSession.getProcessor().flush(wsrSession);
                                }
                            });
                        }
                        break;
                    case :
                        wsrSession.setUpstreamId(messageStreamId);
                        break;
                    }
                    break;
                case :
                    break;
                case :
                    RtmpBinaryDataMessage amfData = (RtmpBinaryDataMessagertmpMessage;
                    IoBufferEx buf = amfData.getBytes();
                    if (wsrSession != null) {
                        IoFilterChain filterChain = wsrSession.getFilterChain();
                        filterChain.fireMessageReceived(buf);
                    }
                    break;
                case :
                    // ignore client acks
                    break;
                default:
                    throw new IllegalArgumentException(
                            "Unrecognized stream message kind: "
                                    + streamMessage.getStreamKind());
                }
                break;
            default:
                throw new IllegalArgumentException(
                        "Unrecognized message kind: " + rtmpMessage.getKind());
            }
        }
        private int streamCounter = 1;
        private void doHandleCreateStream(IoSession session,
                RtmpCreateStreamCommandMessage request) {
            response.setTransactionId(request.getTransactionId());
            response.setStreamId(++);
            response.setChunkStreamId();
            session.write(response);
        }
        private void doHandlePlayStream(IoSession session,
                RtmpPlayCommandMessage request) {
            RtmpPlayResponseCommandMessage response = new RtmpPlayResponseCommandMessage();
            response.setTransactionId(request.getTransactionId());
            response.setChunkStreamId();
            response.setMessageStreamId(0);
            session.write(response);
            // start data stream
            doSendRtmpSampleAccess(sessionrequest);
            doSendStreamStart(sessionrequest);
            doSendStreamMetaData(sessionrequest);
        }
        private void doSendRtmpSampleAccess(IoSession session,
                RtmpCommandMessage command) {
            RtmpSampleAccessMessage msg = new RtmpSampleAccessMessage();
            msg.setMessageStreamId(command.getMessageStreamId());
            msg.setChunkStreamId(5);
            session.write(msg);
        }
        private void doSendStreamMetaData(IoSession session,
                RtmpCommandMessage command) {
            RtmpStreamMetaDataMessage msg = new RtmpStreamMetaDataMessage();
            msg.setMessageStreamId(command.getMessageStreamId());
            msg.setChunkStreamId(5);
            session.write(msg);
        }
        private void doSendStreamStart(IoSession session,
                RtmpCommandMessage command) {
            RtmpDataStartDataMessage msg = new RtmpDataStartDataMessage();
            msg.setMessageStreamId(command.getMessageStreamId());
            msg.setChunkStreamId(5);
            session.write(msg);
        }
        private void doHandleConnect(final IoSessionEx session,
                RtmpConnectCommandMessage requestthrows Exception {
            RtmpConnectResponseCommandMessage response = new RtmpConnectResponseCommandMessage();
            response.setTransactionId(request.getTransactionId());
            response.setChunkStreamId();
            URI rtmpAddress = new URI(request.getTcUrl());
            WsrSession wsrSession = .get(rtmpAddress);
            wsrSession.setParent(session);
            .set(sessionwsrSession);
            // At this point we know the wsrSession, and we can add the escaping filter
            // find the encoder from the session and give it the escape sequencer.
            final ActiveWsExtensions extensions = ActiveWsExtensions.get(wsrSession);
            .setExtensions(sessionextensions);
            // In order to address KG-2167 and related issues, we need to
            // start the scheduled commands at a point in the session when we have a
            // valid WSR session.
            //
            // Due to the code in AbstractWsBridgeSession, which handles the
            // scheduling of those commands, we need to make sure that the
            // parent session for our WSR session object contains the
            // ServiceRegistration object as an attribute (otherwise an ugly
            // NPE ensues).  Hence the conditional check/copy that happens
            // next...
            if (session.getAttribute(.) == null) {
            }
            // ...and now we can officially start our lifetime commands.
            wsrSession.startupSessionTimeoutCommand();
            // TODO: cross origin filter paranoia check
            //IoFilterChain filterChain = wsrSession.getFilterChain();
            //filterChain.remove("rtmp.crossOrigin");
            result.setTransactionId(request.getTransactionId());
            result.setMessageStreamId(request.getMessageStreamId());
            // cancel the timeout future if it is found
            ScheduledFuture<?> timeoutFuture = (ScheduledFuture<?>) wsrSession.removeAttribute();
            if (timeoutFuture != null && !timeoutFuture.isDone()) {
                timeoutFuture.cancel(false);
            }
            // avoid temp memory leak while the scheduler performs cleanup of canceled tasks
            wsrSession.clearTimeoutCommand();
            session.write(response);
        }
        @Override
        protected void doSessionClosed(final IoSessionEx session)
                throws Exception {
            WsrSession wsrSession = .get(session);
            if (wsrSession != null && !wsrSession.isClosing()) {
                // lifetime of WSR session ends normally with underlying transport session
                wsrSession.reset(new Exception("Early termination of IO session").fillInStackTrace());
            }
        }
        @Override
        protected void doSessionOpened(final IoSessionEx session)
                throws Exception {
            IoFilterChain filterChain = session.getFilterChain();
            addBridgeFilters(filterChain);
        }
    };
    @Override
    public void addBridgeFilters(IoFilterChain filterChain) {
        // setup logging filters for bridge session
        if (.isTraceEnabled()) {
            filterChain.addFirst(new ObjectLoggingFilter(. + "#%s"));
        } else if (.isDebugEnabled()) {
            filterChain.addFirst(new ExceptionLoggingFilter(. + "#%s"));
        }
        filterChain.addLast("rtmp");
        // filterChain.addLast("rtmp.window", windowFilter);
        //filterChain.addLast("rtmp.crossOrigin", crossOriginFilter);
    }
    @Override
    public void removeBridgeFilters(IoFilterChain filterChain) {
        removeFilter(filterChain"rtmp");
        removeFilter(filterChain"log");
        super.removeBridgeFilters(filterChain);
    }
    @Override
        return new DefaultTransportMetadata(..name());
    }
New to GrepCode? Check out our FAQ X