Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (c) 2007-2014 Kaazing Corporation. All rights reserved. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 
 package org.kaazing.gateway.transport.wseb;
 
 
 
 
     private static final boolean ALIGN_DOWNSTREAM = Boolean.parseBoolean(System.getProperty("org.kaazing.gateway.transport.wseb.ALIGN_DOWNSTREAM""true"));
     private static final boolean ALIGN_UPSTREAM = Boolean.parseBoolean(System.getProperty("org.kaazing.gateway.transport.wseb.ALIGN_UPSTREAM""true"));
     
 
         @Override
         public <T extends MessageIoBufferEx encode(MessageEncoder<T> encoder, T messageIoBufferAllocatorEx<?> allocatorint flags) {
             return encode("wseb"encodermessageallocatorflags);
         }
 
     };
 
 
         @Override
         public <T extends MessageIoBufferEx encode(MessageEncoder<T> encoder, T messageIoBufferAllocatorEx<?> allocatorint flags) {
             return encode("wseb-escape0"encodermessageallocatorflags);
         }
 
     };
     
 
         @Override
         public <T extends MessageIoBufferEx encode(MessageEncoder<T> encoder, T messageIoBufferAllocatorEx<?> allocatorint flags) {
             return encode("wseb-escape"encodermessageallocatorflags);
         }
 
     };
     
     private static final Logger LOGGER = LoggerFactory.getLogger(WsebSession.class);
     private static final WriteRequest PING_REQUEST = new DefaultWriteRequestEx(new Object());
     private static final WriteRequest PONG_REQUEST = new DefaultWriteRequestEx(new Object());
     private static final WriteRequest RECONNECT_REQUEST = new DefaultWriteRequestEx(new Object());
 
     private final AtomicBoolean attachingWrite;
     private final AtomicReference<IoSessionExreadSession;
     private final AtomicReference<IoSessionExpendingNewWriter;
     private final TimeoutCommand timeout;
     private final int clientIdleTimeout;
     private final long inactivityTimeout;
 
     private ResourceAddress readAddress;
     private ResourceAddress writeAddress;
    private final AtomicBoolean reconnecting = new AtomicBoolean(false);
    
    private final Runnable enqueueReconnectAndFlushTask = new Runnable() {
        @Override public void run() {
            enqueueReconnectAndFlush0();
        }
    };
    public WsebSession(int ioLayer,
                       Thread ioThread
                       Executor ioExecutor
                       IoServiceEx service,
                       IoProcessorEx<WsebSessionprocessor
                       ResourceAddress localAddress
                       ResourceAddress remoteAddress
                       IoBufferAllocatorEx<WsBufferallocator
                       DefaultLoginResult loginResult,
                       ActiveWsExtensions wsExtensions
                       int clientIdleTimeout,
                       long inactivityTimeout) {
        super(ioLayer,
              ioThread,
              ioExecutor,
              service,
              processor,
              localAddress,
              remoteAddress,
              allocator,
              .,
              loginResult,
              wsExtensions);
        this. = new AtomicBoolean(false);
        this. = new AtomicReference<IoSessionEx>();
        this. = new AtomicReference<IoSessionEx>();
        this. = new TimeoutCommand(this);
        this. = clientIdleTimeout;
        this. = inactivityTimeout;
    }
    @Override
        switch(this.) {
        case :
            return ;
        case :
            return ;
        default:
            return ;
        }
    }
    public Runnable getTimeoutCommand() {
        return ;
    }
    public void clearTimeoutCommand() {
        .clear();
    }
    public void setReadAddress(ResourceAddress readAddress) {
        this. = readAddress;
    }
    public ResourceAddress getReadAddress() {
        return ;
    }
    public void setWriteAddress(ResourceAddress writeAddress) {
        this. = writeAddress;
    }
    public ResourceAddress getWriteAddress() {
        return ;
    }

    
Attach new writer immediately if there is none. Or, if there already is one, enqueue a request to switch to the new writer, which will be done by WsebAcceptProcessor.flushInternal (this avoids races between that method and this one, see KG-2756).

Parameters:
newWriter
    public void attachWriter(final IoSessionEx newWriter) {
        // The attachWriter processing must be done in this WsebSession's IO thread so we can do
        // getProcessor().flush(). We may need to do "thread hopping" for this since attachWriter gets called by
        // WsebDownstreamHandler.reconnectSession during sessionOpened on the downstream, which may be running
        // in another I/O thread.
        if (Thread.currentThread() == getIoThread()) {
            attachWriter0(newWriter);
        }
        else {
            if () {
                final Thread ioThread = getIoThread();
                final Executor ioExecutor = getIoExecutor();
                newWriter.setIoAlignment();
                ioExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        newWriter.setIoAlignment(ioThreadioExecutor);
                        attachWriter0(newWriter);
                    }
                });
            }
            else {
                getIoExecutor().execute(new Runnable() {
                    @Override
                    public void run() {
                        attachWriter0(newWriter);
                    }
                });
            }
        }
    }
    private void attachWriter0(final IoSessionEx newWriter) {
        if (.isDebugEnabled()) {
            .debug(String.format("attachWriter on WsebSession wseb#%d, newWriter=%s"this.getId(), newWriter));
        }
        .set(false);
        if (!isClosing()) {
            if (!compareAndSetParent(nullnewWriter)) {
                // There's an existing parent (writer). Enqueue a request to switch to new writer.
                IoSessionEx oldPending = .getAndSet(newWriter);
                if (oldPending != null) {
                    // Unlikely (means client established two new downstreams in rapid succession without
                    // receiving any data on the first one) but better safe than sorry.
                    oldPending.close(false);
                }
                enqueueReconnectRequest();
                // Do not return here, need to flush (below) to make sure the old downstream gets closed
                // even if there is no more downstream data being sent (KG-4384)
            } else {
                if (Long.valueOf(0L).equals(newWriter.getAttribute(.))) {
                    // long-polling case, need to buffer so that Content-Length is written
                    newWriter.suspendWrite();
                }
            }
            if (!isWriteSuspended()) {
                getProcessor().flush(this);
            }
        }
        else {
            if (newWriter != null) {
                newWriter.close(false);
            }
        }
        .set(false);
        //
        // Now that we have set up a parent for the session,
        // is the time to start scheduled commands which may need
        // to add their own filters to the parent's filter chain.
        //
        if ( !isClosing() ) {
            try {
                this.startupScheduledCommands();
            } catch (Exception e) {
                .error("Failed to start background commands for session"e);
                throw new RuntimeException(e);
            }
        }
    }
    boolean detachWriter(HttpSession oldWriter) {
        boolean detached = compareAndSetParent(oldWriternull);
        if (detached && Long.valueOf(0L).equals(oldWriter.getAttribute(.))) {
            // long-polling case, writes are done (so end of buffering)
            oldWriter.shutdownWrite();
            oldWriter.resumeWrite();
        }
        oldWriter.close(false);
        return detached;
    }
    public void attachPendingWriter() {
        IoSessionEx pendingWriter = .getAndSet(null);
        if (pendingWriter != null) {
            attachWriter(pendingWriter);
        }
    }
    public void attachReader(final IoSessionEx newReader) {
        // The attachReader processing should be done in this WsebSession's IO thread so we can do
        // fireMessageReceived(). We may need to do "thread hopping" for this since attachReader gets called by
        // WsebUpstreamHandler during sessionOpened on the upstream, which may be running
        // in another I/O thread.
        if (Thread.currentThread() == getIoThread()) {
            attachReader0(newReader);
        }
        else {
            if () {
                final Thread ioThread = getIoThread();
                final Executor ioExecutor = getIoExecutor();
                newReader.setIoAlignment();
                ioExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        newReader.setIoAlignment(ioThreadioExecutor);
                        attachReader0(newReader);
                    }
                });
            }
            else {
                getIoExecutor().execute(new Runnable() {
                    @Override
                    public void run() {
                        attachReader0(newReader);
                    }
                });
            }
        }
        
    }
    private void attachReader0(final IoSessionEx newReader) {
        // TODO: needs improved handling of old value for overlapping downstream
        //       from client perspective to detect buffering proxies
        // TODO: needs re-alignment similar to attachWriter
        .set(newReader);
        if (this.isReadSuspended()) {
            newReader.suspendRead();
        }
    }
    
    public void enqueueReconnectAndFlush() {
        // The processing must be done in this WsebSession's IO thread so we can do getProcessor().flush().
        if (Thread.currentThread() == getIoThread()) {
            enqueueReconnectAndFlush0();
        }
        else {
        }
    }
    private void enqueueReconnectAndFlush0() {
        enqueueReconnectRequest();
        // KG-5615: flush pending reconnect request for long-polling, where
        //          there may be no additional write to implicitly flush for us
        if (!isWriteSuspended()) {
            getProcessor().flush(WsebSession.this);
        }
    }
    public boolean detachReader(IoSessionEx oldReader) {
        return .compareAndSet(oldReadernull);
    }
    public IoSessionEx getReader() {
        return .get();
    }
    public IoSessionEx getWriter() {
        return getParent();
    }
    @Override
    public WriteFuture write(Object message) {
        return super.write(message);
    }
    public boolean compareAndSetAttachingWrite(boolean expectedboolean newValue) {
        return .compareAndSet(expectednewValue);
    }
    @Override
    protected void suspendRead1() {
        super.suspendRead2();
        IoSession readSession = this..get();
        if (readSession != null) {
            readSession.suspendRead();
        }
    }
    
    @Override
    protected void resumeRead1() {
        // call super first to trigger processor.consume()
        super.resumeRead2();
        IoSession readSession = this..get();
        if (readSession != null) {
            readSession.resumeRead();
        }
    }
    boolean compareAndSetReconnecting(boolean expectedboolean newValue) {
        return .compareAndSet(expectednewValue);
    }
    void issuePingRequest() {
        if (.isDebugEnabled()) {
            .debug(String.format("enqueuePingRequest on WsebSession %s"this));
        }
        WriteRequestQueue writeRequestQueue = getWriteRequestQueue();
        writeRequestQueue.offer(this);
        if (!isWriteSuspended()) {
            getProcessor().flush(this);
        }
    }
    static boolean isPingRequest(WriteRequest request) {
        return request == ;
    }
    void issuePongRequest() {
        if (.isDebugEnabled()) {
            .debug(String.format("enqueuePongRequest on WsebSession %s"this));
        }
        WriteRequestQueue writeRequestQueue = getWriteRequestQueue();
        writeRequestQueue.offer(this);
        if (!isWriteSuspended()) {
            getProcessor().flush(this);
        }
    }
    static boolean isPongRequest(WriteRequest request) {
        return request == ;
    }
    void enqueueReconnectRequest() {
        if (.isDebugEnabled()) {
            .debug(String.format("enqueueReconnectRequest on WsebSession %s"this));
        }
        WriteRequestQueue writeRequestQueue = getWriteRequestQueue();
        writeRequestQueue.offer(this);
    }
    static boolean isReconnectRequest(WriteRequest request) {
        return request == ;
    }
    boolean isReconnecting() {
        return .get();
    }
    public int getClientIdleTimeout() {
        return ;
    }
    public long getInactivityTimeout() {
        return ;
    }
    
    // close session if reconnect timer elapses and no parent has been attached
    private static class TimeoutCommand implements Runnable {
        private volatile WsebSession session;
        public TimeoutCommand(WsebSession session) {
            this. = session;
        }
        @Override
        public void run() {
            WsebSession session = this.;
            if (session != null) {
                // technically if this is being called then we have passed the timeout and no reconnect
                // has happened because it would have canceled this task, but doing a check just in case of a race condition
                if (!session.isClosing()) {
                    IoSession parent = session.getParent();
                    if (parent == null) {
                        session.close(true);
                    }
                }
            }
        }
        public void clear() {
             = null;
        }
    }
        this.  = escape;
        
    }
New to GrepCode? Check out our FAQ X