Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (c) 2007-2014 Kaazing Corporation. All rights reserved. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 
 package org.kaazing.gateway.transport.wsr;
 
 import static java.lang.String.format;
 
 import java.net.URI;
 
 
 
 public class WsrConnector extends AbstractBridgeConnector<WsrSession> {
 
     private static final Charset UTF_8 = Charset.forName("UTF-8");
     private static final TypedAttributeKey<IoBufferExCREATE_RESPONSE_KEY = new TypedAttributeKey<>(
             WsrConnector.class"createResponse");
     private static final String CREATE_SUFFIX = "/;e/cr";
     private static final TypedAttributeKey<WsrSessionWSR_SESSION_KEY = new TypedAttributeKey<>(WsrConnector.class"rtmp.session");
     private static final TypedAttributeKey<ConnectRequest<?>> CONNECT_REQUEST_KEY = new TypedAttributeKey<>(WsrConnector.class"createSession");
 
 
     private final RtmpChunkCodecFilter codec;
 
     private static final String FAULT_LOGGING_FILTER = "wsn#fault";
     private static final String TRACE_LOGGING_FILTER = "wsn#logging";
     private static final String LOGGER_NAME = String.format("transport.%s.connect".);
     private final Logger logger = LoggerFactory.getLogger();
 
 
 
     public WsrConnector() {
         super(new DefaultIoSessionConfigEx());
          = new RtmpChunkCodecFilter();
     }
 
     @Resource(name = "bridgeServiceFactory")
     public void setBridgeServiceFactory(BridgeServiceFactory bridgeServiceFactory) {
         this. = bridgeServiceFactory;
     }
    @Resource(name = "resourceAddressFactory")
    public void setResourceAddressFactory(ResourceAddressFactory factory) {
        this. = factory;
    }
    @Override
    protected IoProcessorEx<WsrSessioninitProcessor() {
        return new WsrConnectProcessor();
    }
    @Override
    protected void init() {
        super.init();
    }
    @Override
    protected boolean canConnect(String transportName) {
        return transportName.equals("wsr") || transportName.equals("ws");
    }
    @Override
    protected <T extends ConnectFutureConnectFuture connectInternal(
            ResourceAddress connectAddressIoHandler handler,
            final IoSessionInitializer<T> initializer) {
        final DefaultConnectFuture bridgeConnectFuture = new DefaultConnectFuture();
        // propagate connection failure, if necessary
        IoFutureListener<ConnectFutureparentConnectListener = new IoFutureListener<ConnectFuture>() {
            @Override
            public void operationComplete(ConnectFuture future) {
                // fail bridge connect future if parent connect fails
                if (!future.isConnected()) {
                    bridgeConnectFuture.setException(future.getException());
                }
            }
        };
        IoSessionInitializer<ConnectFutureparentInitializer = createParentInitializer(
                connectAddresshandlerinitializerbridgeConnectFuture);
        URI connectURI = connectAddress.getResource();
        ResourceAddress createAddress = connectAddress.resolve(connectURI.getPath()+);
        final ResourceAddress transportAddress = createAddress.getTransport();
        BridgeConnector connector = .newBridgeConnector(transportAddress);
        connector.connect(transportAddress,
                          selectConnectHandler(transportAddress),
                          parentInitializer)
                .addListener(parentConnectListener);
        return bridgeConnectFuture;
    }
    private IoHandler selectConnectHandler(ResourceAddress address) {
          Protocol protocol = .getTransportFactory().getProtocol(address.getResource());
          if ( protocol instanceof HttpProtocol) {
              return ;
          }
          throw new RuntimeException(getClass()+
                  ": Cannot select a connect handler for address "+address);
    }
            final ResourceAddress connectAddressfinal IoHandler handler,
            final IoSessionInitializer<T> initializer,
            final DefaultConnectFuture bridgeConnectFuture) {
        // initialize parent session before connection attempt
        return new IoSessionInitializer<ConnectFuture>() {
            @Override
            public void initializeSession(final IoSession parent,
                    ConnectFuture future) {
                // initializer for bridge session to specify bridge handler,
                // and call user-defined bridge session initializer if present
                final IoSessionInitializer<T> bridgeSessionInitializer = new IoSessionInitializer<T>() {
                    @Override
                    public void initializeSession(IoSession bridgeSession,
                            T future) {
                        bridgeSession.setAttribute(
                                .handler);
                        if (initializer != null) {
                            initializer
                                    .initializeSession(bridgeSessionfuture);
                        }
                    }
                };
                parent.setAttribute(new ConnectRequest<>(connectAddressbridgeConnectFuturebridgeSessionInitializer));
                // WebSocket-Version required by WsrAcceptor
                HttpSession httpParent = (HttpSession)parent;
                httpParent.addWriteHeader("X-WebSocket-Version""wsr-1.0");
            }
        };
    }
    private static class ConnectRequest<T extends ConnectFuture> {
    	public final ResourceAddress connectAddress;
    	public final ConnectFuture connectFuture;
    	public final IoSessionInitializer<T> initializer;
    	public ConnectRequest(ResourceAddress connectAddressConnectFuture connectFutureIoSessionInitializer<T> initializer) {
    		this. = connectAddress;
    		this. = connectFuture;
    		this. = initializer;
    	}
    }
    private IoHandler createHandler = new IoHandlerAdapter<HttpSession>() {
        @Override
        protected void doMessageReceived(HttpSession createSession,
                Object messagethrows Exception {
            // Handle fragmentation of response body
            IoBufferEx in = (IoBufferExmessage;
            IoBufferEx buf = .get(createSession);
            if (buf == null) {
                IoBufferAllocatorEx<?> allocator = createSession.getBufferAllocator();
                ByteBuffer nioBuf = allocator.allocate(in.remaining());
                buf = allocator.wrap(nioBuf).setAutoExpander(allocator);
                .set(createSessionbuf);
            }
            buf.put(in);
        }
        @Override
        protected void doSessionClosed(final HttpSession createSession)
                throws Exception {
            IoBufferEx buf = .remove(createSession);
            if (buf == null || createSession.getStatus() != .) {
                ConnectRequest<?> connectRequest = .get(createSession);
                ConnectFuture connectFuture = connectRequest.connectFuture;
               	connectFuture.setException(new IllegalStateException(
                "Create handshake failed: invalid response").fillInStackTrace());
               	return;
            }
            buf.flip();
            String responseText = buf.getString(.newDecoder());
            final ResourceAddress rtmpAddress = .newResourceAddress(URI.create(responseText));
            IoFutureListener<ConnectFutureparentConnectListener = new IoFutureListener<ConnectFuture>() {
            	@Override
            	public void operationComplete(ConnectFuture future) {
            		ConnectRequest<?> connectRequest = .remove(createSession);
            		try {
            			final ResourceAddress connectAddress = connectRequest.connectAddress;
            			final IoSessionEx session = (IoSessionExfuture.getSession();
            			Callable<WsrSessionbridgeSessionFactory = new Callable<WsrSession>() {
            				@Override
            				public WsrSession call() throws Exception {
            				    IoBufferAllocatorEx<?> parentAllocator = session.getBufferAllocator();
            				    WsrBufferAllocator wsrAllocator = new WsrBufferAllocator(parentAllocator);
            					WsrSession wsrSession = new WsrSession(
            							WsrConnector.thisgetProcessor(),
            							connectAddressconnectAddresssession,
            							wsrAllocatornullnull);
            					wsrSession.setRtmpAddress(rtmpAddress);
            					// ability to write will be reactivated when
            					// the rtmp publish stream has been created
            					wsrSession.suspendWrite();
            					return wsrSession;
            				}
            			};
            			WsrSession wsrSession = newSession(connectRequest.initializerconnectRequest.connectFuturebridgeSessionFactory);
            			session.setAttribute(wsrSession);
            		} catch (Exception e) {
            			connectRequest.connectFuture.setException(e);
            		}
            	}
            };
            ResourceAddress remoteAddress = createSession.getRemoteAddress();
            ResourceAddress connectAddress = remoteAddress.getTransport();
            BridgeConnector connector = .newBridgeConnector(connectAddress);
            ConnectFuture connectFuture = connector.connect(connectAddressnull);
            connectFuture.addListener(parentConnectListener);
        }
        @Override
        protected void doExceptionCaught(HttpSession createSession,
                Throwable causethrows Exception {
            ConnectRequest<?> connectRequest = .get(createSession);
            ConnectFuture connectFuture = connectRequest.connectFuture;
            if (!connectFuture.isDone()) {
            	connectFuture.setException(cause);
            }
        }
    };
    public void addBridgeFilters(
            org.apache.mina.core.filterchain.IoFilterChain filterChain) {
        // setup logging filters for bridge session
        if (.isTraceEnabled()) {
            filterChain.addFirst(new ObjectLoggingFilter(. + "#%s"));
        } else if (.isDebugEnabled()) {
            filterChain.addFirst(new ExceptionLoggingFilter(. + "#%s"));
        }
        filterChain.addLast("rtmp");
        filterChain.addLast("log"new LoggingFilter("transport.rtmp"));
    }
    @Override
        return new DefaultTransportMetadata(..name());
    }
    private IoHandler ioBridgeHandler = new IoHandlerAdapter<IoSession>() {
        @Override
        protected void doSessionOpened(IoSession sessionthrows Exception {
            IoFilterChain filterChain = session.getFilterChain();
            addBridgeFilters(filterChain);
            RtmpVersionMessage version = new RtmpVersionMessage();
            session.write(version);
            RtmpHandshakeRequestMessage handshakeRequest = new RtmpHandshakeRequestMessage();
            session.write(handshakeRequest);
        }
        @Override
        protected void doSessionClosed(IoSession sessionthrows Exception {
            WsrSession wsrSession = .get(session);
            if (wsrSession != null && !wsrSession.isClosing()) {
                // TODO: require RTMP controlled close handshake
                wsrSession.reset(new Exception("Early termination of IO session").fillInStackTrace());
            }
        }
        @Override
        protected void doExceptionCaught(IoSession sessionThrowable cause)
                throws Exception {
            WsrSession wsrSession = .get(session);
            if (wsrSession != null && !wsrSession.isClosing()) {
                wsrSession.reset(cause);
            }
            else {
                if (.isDebugEnabled()) {
                    String message = format("Error on WebSocket WSR connection attempt: %s"cause);
                    if (.isTraceEnabled()) {
                        // note: still debug level, but with extra detail about the exception
                        .debug(messagecause);
                    }
                    else {
                        .debug(message);
                    }
                }
                session.close(true);
            }
        }
        @Override
        protected void doSessionIdle(IoSession sessionIdleStatus status)
                throws Exception {
            WsrSession wsrSession = .get(session);
            IoFilterChain filterChain = wsrSession.getFilterChain();
            filterChain.fireSessionIdle(status);
        }
        @Override
        protected void doMessageReceived(IoSession sessionObject message)
                throws Exception {
            WsrSession wsrSession = .get(session);
            RtmpMessage rtmpMessage = (RtmpMessagemessage;
            switch (rtmpMessage.getKind()) {
            case :
                // ignore
                break;
            case :
                RtmpHandshakeMessage handshake = (RtmpHandshakeMessagemessage;
                handshake.setTimestamp2(handshake.getTimestamp1() + 1);
                session.write(handshake);
                RtmpSetChunkSizeMessage setChunkSize = new RtmpSetChunkSizeMessage();
                setChunkSize.setChunkStreamId(2);
                setChunkSize.setMessageStreamId(0);
                setChunkSize.setChunkSize(.);
                session.write(setChunkSize);
                RtmpConnectCommandMessage connectCommand = new RtmpConnectCommandMessage();
                // note: additional properties are probably necessary to connect to other RTMP Servers
                String originUrl = "";
                String tcUrl = wsrSession.getRtmpAddress().getResource().toString();
                connectCommand.setSwfUrl(originUrl);
                connectCommand.setTcUrl(tcUrl);
                session.write(connectCommand);
                break;
            case :
                // ignore
                break;
            case :
                RtmpStreamMessage streamMessage = (RtmpStreamMessagertmpMessage;
                switch (streamMessage.getStreamKind()) {
                case :
                case :
                    RtmpCommandMessage commandMessage = (RtmpCommandMessagestreamMessage;
                    switch (commandMessage.getCommandKind()) {
                    case :
                        doCreateStreams(session);
                        break;
                    case :
                    	RtmpCreateStreamResultCommandMessage createResult = (RtmpCreateStreamResultCommandMessagecommandMessage;
                    	// which stream?
                    	double streamId = createResult.getStreamId();
                    	if ((intstreamId == 1) {
                    		doPlay(session);
                    	} else {
                    		doPublish(session);
                            //TODO: If we deferred opening the client until after both the HTTP and RTMP connections
                            //TODO: are established - that way there is no longer any need to protect against early writes
                            //TODO: and we can eliminate this usage of suspendWrite/resumeWrite.
                    		wsrSession.resumeWrite();
                            // We are always aligned now. if (session.isIoAligned()) {
                            wsrSession.getProcessor().flush(wsrSession);
                    	}
                        break;
                    case :
                    	break;
                    case :
                        // activate for writes
                        break;
                    default:
                        throw new Exception("Unexpected command");
                    }
                    break;
                case :
                    RtmpBinaryDataMessage amfData = (RtmpBinaryDataMessagertmpMessage;
                    IoBufferEx buf = amfData.getBytes();
                    if (wsrSession != null) {
                        IoFilterChain filterChain = wsrSession.getFilterChain();
                        filterChain.fireMessageReceived(buf);
                    }
                    break;
                default:
                    throw new IllegalArgumentException(
                            "Unrecognized stream message kind: "
                                    + streamMessage.getStreamKind());
                }
                break;
            }
        }
        private void doPlay(IoSession session) {
        	RtmpPlayCommandMessage play = new RtmpPlayCommandMessage();
        	play.setChunkStreamId(5);
        	play.setTransactionId(3);
        	play.setMessageStreamId(1);
        	session.write(play);
        }
        private void doPublish(IoSession session) {
        	RtmpPublishCommandMessage publish = new RtmpPublishCommandMessage();
        	publish.setChunkStreamId(5);
        	publish.setTransactionId(4);
        	publish.setMessageStreamId(2);
        	session.write(publish);
        }
        private void doCreateStreams(IoSession session) {
            RtmpCreateStreamCommandMessage createUpstream = new RtmpCreateStreamCommandMessage();
            createUpstream.setChunkStreamId(3);
            createUpstream.setTransactionId(1);
            session.write(createUpstream);
            RtmpCreateStreamCommandMessage createDownstream = new RtmpCreateStreamCommandMessage();
            createDownstream.setChunkStreamId(3);
            createDownstream.setTransactionId(2);
            session.write(createDownstream);
        }
    };
New to GrepCode? Check out our FAQ X