Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (c) 2007-2014 Kaazing Corporation. All rights reserved. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 
 package org.kaazing.gateway.transport.wseb;
 
 
     private static final Logger LOGGER = LoggerFactory.getLogger(WsebAcceptProcessor.class);
     private static final CheckInitialPadding CHECK_INITIAL_PADDING = new CheckInitialPadding();
 
     @Override
     protected void removeInternal(WsebSession session) {
         IoSession parent = session.getParent();
         if (parent == null || parent.isClosing()) {
             // TODO: throw write to close session exception
             return;
         }
 
         super.removeInternal(session);
     }
 
     @Override
     protected void flushInternal(final WsebSession session) {
         // get parent and check if null (no attached http session)
         final HttpAcceptSession writer = (HttpAcceptSession)session.getWriter();
         if (writer == null || writer.isClosing()) {
             if (.isTraceEnabled()) {
                 .trace(String.format("WsebAcceptProcessor.flushInternal: returning because writer (%s) " +
                                                    "is null or writer is closing(%s)",
                         writerwriter==null ? "n/a" : Boolean.valueOf(writer.isClosing()) ));
             }
             return;
         }
 
         // store last write so we can observe it
         WriteFuture lastWrite = null;
 
         IoFilterChain filterChain = session.getFilterChain();
 
         // we can still have a current write request during the transition between writers
         WriteRequest currentWriteRequest = session.getCurrentWriteRequest();
         if (currentWriteRequest != null) {
             session.setCurrentWriteRequest(null);
         }
         
         // get write request queue and process it
         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
         Long clientBuffer = (Longwriter.getAttribute(.);
         do {
             // get current request in the event that it was not complete last
             // iteration
             WriteRequest request = currentWriteRequest;
 
             // if we have no more requests then we are done flushing the queue
             if (request == null) {
                 // if request is null get next one off the queue
                 request = writeRequestQueue.poll(session);
                 if (request == null) {
                     // closing so no need to calculate padding
                     if (session.isClosing() || writer.isClosing()) {
                        break;
                    }
                    // check if we wrote something this flush, if so add padding
                    // check
                    if (lastWrite != null) {
                        lastWrite.addListener();
                    }
                    else {
                        // nothing was in the queue to write at all so check if we
                        // should send padding preemptively
                        checkInitialPadding(writer);
                    }
                    break;
                }
            }
            else {
                currentWriteRequest = null;
            }
            // identity compare for our marker as a command to reconnect the
            // stream
            if (WsebSession.isReconnectRequest(request)) {
                if (.isDebugEnabled()) {
                    .debug(String.format("RECONNECT_REQUEST detected: closing writer %d"writer.getId()));
                }
                try {
                    // WsebReconnectFilter will write out the WsCommandMessage.RECONNECT message
                    writer.close(false);
                    // thread safety
                    // this code assumes single-threaded access (flushInternal)
                    // otherwise there is a race for code already past
                    // the parent != null check above
                }
                finally {
                    // explicitly null the parent reference because
                    // parent not closing until flush completes
                    session.detachWriter(writer);
                    session.attachPendingWriter();
                }
                break;
            }
            
            // get message and compare to types we can process
            Object message = request.getMessage();
            if (message instanceof IoBufferEx) {
                IoBufferEx buf = (IoBufferExmessage;
                try {
                    // stop if parent already closing
                    if (writer.isClosing()) {
                        session.setCurrentWriteRequest(request);
                        break;
                    }
                    // hold current remaining bytes so we know how much was
                    // written
                    int remaining = buf.remaining();
                    if (remaining == 0) {
                        throw new IllegalStateException("Unexpected empty buffer");
                    }
                    
                    // TODO: thread safety
                    // reconnect parent.close(false) above triggers flush of pending
                    // writes before closing the HTTP session, and in the interim
                    // parent.isClosing() returns false until the close begins
                    // since this flush method is gated by parent being null
                    // or closing, there is a race condition that would permit
                    // writing data to the parent during this interim state
                    // resulting in a WriteToClosedSessionException and losing data
                    // convert from session+buffer to message
                    if (buf instanceof WsBuffer) {
                        // reuse previously constructed message if available
                        WsBuffer wsBuffer = (WsBuffer)buf;
                        WsMessage wsebMessage = wsBuffer.getMessage();
                        if (wsebMessage == null) {
                            WsMessage newWsebMessage;
                            if (wsBuffer.getKind() == ..) {
                                //if the connection is mixed transport, send textmessage
                                newWsebMessage = new WsTextMessage(buf);
                            }
                            else {
                                newWsebMessage = new WsBinaryMessage(buf);
                            }
                            
                            if (wsBuffer.isAutoCache()) {
                                // buffer is cached on parent, continue with derived caching
                                newWsebMessage.initCache();
                            }
                            boolean wasUpdated = wsBuffer.setMessage(newWsebMessage);
                            wsebMessage = wasUpdated ? newWsebMessage : wsBuffer.getMessage();
                        }
                        // flush the buffer out to the session
                        lastWrite = flushNowInternal(writerwsebMessagewsBufferfilterChainrequest);
                    }
                    else {
                        // flush the buffer out to the session
                        lastWrite = flushNowInternal(writernew WsBinaryMessage(buf), buffilterChainrequest);
                    }
                    // increment session written bytes
                    int written = remaining;
                    session.increaseWrittenBytes(written, System.currentTimeMillis());
                    // if we are not already reconnecting then add a listener to
                    // the last write future
                    // so it can check the written bytes and compare to client
                    // buffer
                    if (!session.isReconnecting()) {
                        // Check whether we require block padding
                        boolean checkBlockPadding = (writer.getAttribute(.) != null);
                        if (checkBlockPadding) {
                            lastWrite.addListener(new CheckBufferAndBlockPadding(session));
                        }
                        else {
                            // Don't incur overhead of the write future if there was no .kb parameter
                            if (clientBuffer != null) {
                                lastWrite.addListener(new CheckBuffer(session));
                            }
                        }
                    }
                    // wrote the last bytes of the message to link the last
                    // write close to message close
                }
                catch (Exception e) {
                    request.getFuture().setException(e);
                }
            }
            else if (WsebSession.isPingRequest(request) || WsebSession.isPongRequest(request)) {
                boolean ping = WsebSession.isPingRequest(request);
                if (.isDebugEnabled()) {
                    String poing = ping ? "PING" : "PONG";
                    .debug(String.format("%s_REQUEST detected on wsebSession %s: sending %s",
                            poingsessionpoing));
                }
                try {
                    // stop if parent already closing
                    if (writer.isClosing()) {
                        break;
                    }
                    IoBufferAllocatorEx<?> allocator = session.getBufferAllocator();
                    IoBufferEx emptyBuf = allocator.wrap(allocator.allocate(0));
                    emptyBuf.mark();
                    WsMessage emptyPoing = ping ? new WsPingMessage(emptyBuf) : new WsPongMessage(emptyBuf);
                    // The following causes ClassCastException in stomp decoder in messageSent in JMS edition (see KG-9329)
                    // flushNowInternal(writer, emptyPoing, emptyBuf, filterChain, request);
                    writer.write(emptyPoing);
                }
                finally {
                    session.setCurrentWriteRequest(null);
                }
                break;
            }
            else {
                throw new IllegalStateException("Don't know how to handle message of type '" + message.getClass().getName() + "'.  Are you missing a protocol encoder?");
            }
        }
        while (true);
    }
    private static final void checkInitialPadding(HttpAcceptSession session) {
        // check to see if we need to add a padding message to the end of
        // the sent messages
        Integer clientPadding = (Integer)session.getAttribute(.);
        if (clientPadding != null) {
            long writtenBytes = session.getWrittenBytes();
            int padding = (int)(clientPadding - writtenBytes);
            if (padding > 0) {
                // each command uses 2 bytes on the wire so we need half as many commands as padding bytes
                int len = padding / 2;
                Command[] commands = new Command[len];
                for (int i = 0; i < leni++) {
                    commands[i] = Command.noop();
                }
                WsMessage msg = new WsCommandMessage(commands);
                session.write(msg);
            }
            else {
                session.removeAttribute(.);
            }
        }
    }
    private static final class CheckInitialPadding implements IoFutureListener<WriteFuture> {
        @Override
        public void operationComplete(WriteFuture future) {
            HttpAcceptSession session = (HttpAcceptSession)future.getSession();
            checkInitialPadding(session);
        }
    };
    private static final void checkBlockPadding(HttpAcceptSession session) {
        // TODO: Verify if counting bytes is really necessary
        // check to see if we need to add a padding message to the end of sent messages
        long writtenBytes = session.getWrittenBytes();
        Long bytesWrittenOnLastFlush = (Long)session.getAttribute(.);
        if (bytesWrittenOnLastFlush == null || writtenBytes != bytesWrittenOnLastFlush.longValue()) {
            // Block Padding is required
            session.write(.);
            session.setAttribute(.new Long(writtenBytes+4096));
        }
    }
    private static final class CheckBufferAndBlockPadding extends CheckBuffer {
        public CheckBufferAndBlockPadding(WsebSession wsebSession) {
            super(wsebSession);
        }
        @Override
        public void operationComplete(WriteFuture future) {
            HttpAcceptSession session = (HttpAcceptSession)future.getSession();
            checkBlockPadding(session);
            super.operationComplete(future);
        }
    };
    private static class CheckBuffer implements IoFutureListener<WriteFuture> {
        private final WsebSession wsebSession;
        public CheckBuffer(WsebSession wsebSession) {
            this. = wsebSession;
        }
        @Override
        public void operationComplete(WriteFuture future) {
            HttpAcceptSession parent = (HttpAcceptSession)future.getSession();
            if (parent.isClosing() || .isReconnecting()) {
                return;
            }
            // check to see if we have written out at least enough bytes to be
            // over the client buffer
            Long clientBuffer = (Long)parent.getAttribute(.);
            if (clientBuffer != null) {
                long bytesWritten = parent.getWrittenBytes();
                if (bytesWritten >= clientBuffer) {
                    // TODO: thread safety
                    // multiple threads can trigger a reconnect on the same WsfSession
                    if (.compareAndSetReconnecting(falsetrue)) {
                        .enqueueReconnectAndFlush();
                    }
                }
            }
        }
    };
     
   
New to GrepCode? Check out our FAQ X