Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * JBoss, Home of Professional Open Source
   * Copyright 2011, JBoss Inc., and individual contributors as indicated
   * by the @authors tag. See the copyright.txt in the distribution for a
   * full listing of individual contributors.
   *
   * This is free software; you can redistribute it and/or modify it
   * under the terms of the GNU Lesser General Public License as
   * published by the Free Software Foundation; either version 2.1 of
  * the License, or (at your option) any later version.
  *
  * This software is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  * Lesser General Public License for more details.
  *
  * You should have received a copy of the GNU Lesser General Public
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
 package org.jboss.remoting3.remote;
 
 import static org.jboss.remoting3.remote.RemoteLogger.log;
 
 
 import java.util.Set;
 import org.xnio.Bits;

Author(s):
David M. Lloyd
 
 final class RemoteConnectionChannel extends AbstractHandleableCloseable<Channelimplements Channel {
 
         public int getKey(final RemoteConnectionChannel argument) {
             return argument.channelId;
         }
 
         public boolean equals(final RemoteConnectionChannel argumentfinal int index) {
             return argument.channelId == index;
         }
     };
 
     private final RemoteConnectionHandler connectionHandler;
     private final RemoteConnection connection;
     private final int channelId;
     private final int outboundWindow;
     private final int inboundWindow;
     private final Attachments attachments = new Attachments();
     private final Queue<InboundMessageinboundMessageQueue = new ArrayDeque<InboundMessage>();
     private final int maxOutboundMessages;
     private final int maxInboundMessages;
     private final long maxOutboundMessageSize;
     private final long maxInboundMessageSize;
     private volatile int channelState = 0;
 
     private static final AtomicIntegerFieldUpdater<RemoteConnectionChannelchannelStateUpdater = AtomicIntegerFieldUpdater.newUpdater(RemoteConnectionChannel.class"channelState");
 
     private Receiver nextReceiver;
 
     private static final int WRITE_CLOSED = (1 << 31);
     private static final int READ_CLOSED = (1 << 30);
     private static final int OUTBOUND_MESSAGES_MASK = (1 << 15) - 1;
     private static final int ONE_OUTBOUND_MESSAGE = 1;
     private static final int INBOUND_MESSAGES_MASK = ((1 << 30) - 1) & ~;
     private static final int ONE_INBOUND_MESSAGE = (1 << 15);
 
     RemoteConnectionChannel(final RemoteConnectionHandler connectionHandlerfinal RemoteConnection connectionfinal int channelIdfinal int outboundWindowfinal int inboundWindowfinal int maxOutboundMessagesfinal int maxInboundMessagesfinal long maxOutboundMessageSizefinal long maxInboundMessageSize) {
         super(connectionHandler.getConnectionContext().getConnectionProviderContext().getExecutor(), true);
         this. = maxOutboundMessageSize;
         this. = maxInboundMessageSize;
          = connectionHandler.getConnectionContext();
         this. = connectionHandler;
         this. = connection;
        this. = channelId;
        this. = outboundWindow;
        this. = inboundWindow;
        this. = maxOutboundMessages;
        this. = maxInboundMessages;
    }
    void openOutboundMessage() throws IOException {
        int oldStatenewState;
        do {
            oldState = ;
            if ((oldState & ) != 0) {
                throw new NotOpenException("Writes closed");
            }
            final int outboundCount = oldState & ;
            if (outboundCount == ) {
                throw new ChannelBusyException("Too many open outbound writes");
            }
            newState = oldState + ;
        } while (!casState(oldStatenewState));
        .tracef("Opened outbound message on %s"this);
    }
    private int incrementState(final int count) {
        final int oldState = .getAndAdd(thiscount);
        if (.isTraceEnabled()) {
            final int newState = oldState + count;
            .tracef("CAS %s\n\told: RS=%s WS=%s IM=%d OM=%d\n\tnew: RS=%s WS=%s IM=%d OM=%d"this,
                    Boolean.valueOf((oldState & ) != 0),
                    Boolean.valueOf((oldState & ) != 0),
                    Integer.valueOf((oldState & ) >> Integer.numberOfTrailingZeros()),
                    Integer.valueOf((oldState & ) >> Integer.numberOfTrailingZeros()),
                    Boolean.valueOf((newState & ) != 0),
                    Boolean.valueOf((newState & ) != 0),
                    Integer.valueOf((newState & ) >> Integer.numberOfTrailingZeros()),
                    Integer.valueOf((newState & ) >> Integer.numberOfTrailingZeros())
                    );
        }
        return oldState;
    }
    private boolean casState(final int oldStatefinal int newState) {
        final boolean result = .compareAndSet(thisoldStatenewState);
        if (result && .isTraceEnabled()) {
            .tracef("CAS %s\n\told: RS=%s WS=%s IM=%d OM=%d\n\tnew: RS=%s WS=%s IM=%d OM=%d"this,
                    Boolean.valueOf((oldState & ) != 0),
                    Boolean.valueOf((oldState & ) != 0),
                    Integer.valueOf((oldState & ) >> Integer.numberOfTrailingZeros()),
                    Integer.valueOf((oldState & ) >> Integer.numberOfTrailingZeros()),
                    Boolean.valueOf((newState & ) != 0),
                    Boolean.valueOf((newState & ) != 0),
                    Integer.valueOf((newState & ) >> Integer.numberOfTrailingZeros()),
                    Integer.valueOf((newState & ) >> Integer.numberOfTrailingZeros())
                    );
        }
        return result;
    }
    void closeOutboundMessage() {
        int oldState = incrementState(-);
        if (oldState == ( | )) {
            // no messages left and read & write closed
            .tracef("Closed outbound message on %s (unregistering)"this);
            unregister();
        } else {
            .tracef("Closed outbound message on %s"this);
        }
    }
    boolean openInboundMessage() {
        int oldStatenewState;
        do {
            oldState = ;
            if ((oldState & ) != 0) {
                .tracef("Refusing inbound message on %s (reads closed)"this);
                return false;
            }
            final int inboundCount = oldState & ;
            if (inboundCount == ) {
                .tracef("Refusing inbound message on %s (too many concurrent reads)"this);
                return false;
            }
            newState = oldState + ;
        } while (!casState(oldStatenewState));
        .tracef("Opened inbound message on %s"this);
        return true;
    }
    void closeInboundMessage() {
        int oldState = incrementState(-);
        if (oldState == ( | )) {
            // no messages left and read & write closed
            .tracef("Closed inbound message on %s (unregistering)"this);
            unregister();
        } else {
            .tracef("Closed inbound message on %s"this);
        }
    }
    void closeReads() {
        int oldStatenewState;
        do {
            oldState = ;
            if ((oldState & ) != 0) {
                return;
            }
            newState = oldState | ;
        } while (!casState(oldStatenewState));
        if (oldState == ) {
            // no channels
            .tracef("Closed channel reads on %s (unregistering)"this);
            unregister();
        } else {
            .tracef("Closed channel reads on %s"this);
        }
        notifyEnd();
    }
    boolean closeWrites() {
        int oldStatenewState;
        do {
            oldState = ;
            if ((oldState & ) != 0) {
                return false;
            }
            newState = oldState | ;
        } while (!casState(oldStatenewState));
        if (oldState == ) {
            // no channels and read was closed
            .tracef("Closed channel writes on %s (unregistering)"this);
            unregister();
        } else {
            .tracef("Closed channel writes on %s"this);
        }
        return true;
    }
    boolean closeReadsAndWrites() {
        int oldStatenewState;
        do {
            oldState = ;
            if ((oldState & ( | )) == ( | )) {
                return false;
            }
            newState = oldState |  | ;
        } while (!casState(oldStatenewState));
        if ((oldState & ) == 0) {
            // we're sending the write close request asynchronously
            Pooled<ByteBufferpooled = .allocate();
            boolean ok = false;
            try {
                ByteBuffer byteBuffer = pooled.getResource();
                byteBuffer.put(.);
                byteBuffer.putInt();
                byteBuffer.flip();
                ok = true;
                .send(pooled);
            } finally {
                if (! okpooled.free();
            }
            .tracef("Closed channel reads on %s"this);
        }
        if ((oldState & ( | )) == 0) {
            // there were no channels open
            .tracef("Closed channel reads and writes on %s (unregistering)"this);
            unregister();
        } else {
            .tracef("Closed channel reads and writes on %s"this);
        }
        notifyEnd();
        return true;
    }
    private void notifyEnd() {
        synchronized () {
            if ( != null) {
                final Receiver receiver = ;
                 = null;
                try {
                    getExecutor().execute(new Runnable() {
                        public void run() {
                            receiver.handleEnd(RemoteConnectionChannel.this);
                        }
                    });
                } catch (Throwable t) {
                    .handleException(new IOException("Fatal connection error"t));
                    return;
                }
            }
        }
    }
    private void unregister() {
        .tracef("Unregistering %s"this);
        closeAsync();
    }
    public MessageOutputStream writeMessage() throws IOException {
        int tries = 50;
        IntIndexMap<OutboundMessageoutboundMessages = this.;
        openOutboundMessage();
        boolean ok = false;
        try {
            final Random random = ..get();
            while (tries > 0) {
                final int id = random.nextInt() & 0xfffe;
                if (! outboundMessages.containsKey(id)) {
                    OutboundMessage message = new OutboundMessage((shortidthis);
                    OutboundMessage existing = outboundMessages.putIfAbsent(message);
                    if (existing == null) {
                        ok = true;
                        return message;
                    }
                }
                tries --;
            }
            throw .channelBusy();
        } finally {
            if (! ok) {
                closeOutboundMessage();
            }
        }
    }
    void free(OutboundMessage outboundMessage) {
        if (.remove(outboundMessage)) {
            .tracef("Removed %s"outboundMessage);
        } else {
            .tracef("Got redundant free for %s"outboundMessage);
        }
    }
    public void writeShutdown() throws IOException {
        if (closeWrites()) {
            Pooled<ByteBufferpooled = .allocate();
            boolean ok = false;
            try {
                ByteBuffer byteBuffer = pooled.getResource();
                byteBuffer.put(.);
                byteBuffer.putInt();
                byteBuffer.flip();
                .send(pooled);
                ok = true;
            } finally {
                if (! okpooled.free();
            }
        }
    }
    void handleRemoteClose() {
        closeReadsAndWrites();
    }
        closeReads();
    }
    public void receiveMessage(final Receiver handler) {
        synchronized () {
            if (.isEmpty()) {
                if ( != null) {
                    throw new IllegalStateException("Message handler already queued");
                }
                 = handler;
            } else {
                if (( & ) != 0) {
                    getExecutor().execute(new Runnable() {
                        public void run() {
                            handler.handleEnd(RemoteConnectionChannel.this);
                        }
                    });
                } else {
                    final InboundMessage message = .remove();
                    try {
                        getExecutor().execute(new Runnable() {
                            public void run() {
                                handler.handleMessage(RemoteConnectionChannel.thismessage.messageInputStream);
                            }
                        });
                    } catch (Throwable t) {
                        .handleException(new IOException("Fatal connection error"t));
                        return;
                    }
                }
            }
            .notify();
        }
    }
    private static Set<Option<?>> SUPPORTED_OPTIONS = Option.setBuilder()
            .add(.)
            .add(.)
            .add(.)
            .add(.)
            .create();
    public boolean supportsOption(final Option<?> option) {
        return .contains(option);
    }
    public <T> T getOption(final Option<T> option) {
        if (option == .) {
            return option.cast();
        } else if (option == .) {
            return option.cast();
        } else if (option == .) {
            return option.cast();
        } else if (option == .) {
            return option.cast();
        } else if (option == .) {
            return option.cast();
        } else if (option == .) {
            return option.cast();
        } else {
            return null;
        }
    }
    public <T> T setOption(final Option<T> optionfinal T valuethrows IllegalArgumentException {
        return null;
    }
    void handleMessageData(final Pooled<ByteBuffermessage) {
        boolean ok1 = false;
        try {
            ByteBuffer buffer = message.getResource();
            int id = buffer.getShort() & 0xffff;
            int flags = buffer.get() & 0xff;
            final InboundMessage inboundMessage;
            if ((flags & .) != 0) {
                if (! openInboundMessage()) {
                    asyncCloseMessage(id);
                    return;
                }
                boolean ok2 = false;
                try {
                    inboundMessage = new InboundMessage((shortidthis);
                    final InboundMessage existing = .putIfAbsent(inboundMessage);
                    if (existing != null) {
                        existing.handleDuplicate();
                    }
                    synchronized() {
                        if ( != null) {
                            final Receiver receiver = ;
                             = null;
                            try {
                                getExecutor().execute(new Runnable() {
                                    public void run() {
                                        receiver.handleMessage(RemoteConnectionChannel.thisinboundMessage.messageInputStream);
                                    }
                                });
                                ok2 = true;
                            } catch (Throwable t) {
                                .handleException(new IOException("Fatal connection error"t));
                                return;
                            }
                        } else {
                            .add(inboundMessage);
                            ok2 = true;
                        }
                    }
                } finally {
                    if (! ok2freeInboundMessage((shortid);
                }
            } else {
                inboundMessage = .get(id);
                if (inboundMessage == null) {
                    .tracef("Ignoring message on channel %s for unknown message ID %04x"this, Integer.valueOf(id));
                    return;
                }
            }
            inboundMessage.handleIncoming(message);
            ok1 = true;
        } finally {
            if (! ok1message.free();
        }
    }
    private void asyncCloseMessage(final int id) {
        Pooled<ByteBufferpooled = .allocate();
        boolean ok = false;
        try {
            ByteBuffer byteBuffer = pooled.getResource();
            byteBuffer.put(.);
            byteBuffer.putInt();
            byteBuffer.putShort((shortid);
            byteBuffer.flip();
            ok = true;
            .send(pooled);
        } finally {
            if (! okpooled.free();
        }
    }
    void handleWindowOpen(final Pooled<ByteBufferpooled) {
        ByteBuffer buffer = pooled.getResource();
        int id = buffer.getShort() & 0xffff;
        final OutboundMessage outboundMessage = .get(id);
        if (outboundMessage == null) {
            // ignore; probably harmless...?
            return;
        }
        outboundMessage.acknowledge(buffer.getInt() & 0x7FFFFFFF);
    }
    void handleAsyncClose(final Pooled<ByteBufferpooled) {
        ByteBuffer buffer = pooled.getResource();
        int id = buffer.getShort() & 0xffff;
        final OutboundMessage outboundMessage = .get(id);
        if (outboundMessage == null) {
            // ignore; probably harmless...?
            return;
        }
        outboundMessage.remoteClosed();
    }
    public Attachments getAttachments() {
        return ;
    }
    public Connection getConnection() {
        return .getConnection();
    }
    @Override
    protected void closeAction() throws IOException {
        closeReadsAndWrites();
        closeMessages();
        closeComplete();
    }
    private void closeMessages() {
        for (InboundMessage message : ) {
            message.inputStream.pushException(new MessageCancelledException());
        }
        for (OutboundMessage message : ) {
            message.cancel();
        }
    }
        return ;
    }
        return ;
    }
    int getChannelId() {
        return ;
    }
    void freeInboundMessage(final short id) {
        if (.removeKey(id & 0xffff) != null) {
            closeInboundMessage();
        }
    }
    Pooled<ByteBufferallocate(final byte protoId) {
        final Pooled<ByteBufferpooled = .allocate();
        final ByteBuffer buffer = pooled.getResource();
        buffer.put(protoId);
        buffer.putInt();
        return pooled;
    }
    public String toString() {
        return String.format("Channel ID %08x (%s) of %s", Integer.valueOf(), ( & 0x80000000) == 0 ? "inbound" : "outbound");
    }
    void dumpState(final StringBuilder b) {
        final int state = ;
        final int inboundMessageCnt = (state & ) >>> (Integer.numberOfTrailingZeros());
        final int outboundMessageCnt = (state & ) >>> (Integer.numberOfTrailingZeros());
        b.append("        ").append(String.format("%s channel ID %08x summary:\n", ( & 0x80000000) == 0 ? "Inbound" : "Outbound"));
        b.append("        ").append("* Flags: ");
        if (Bits.allAreSet(state)) b.append("read-closed ");
        if (Bits.allAreSet(state)) b.append("write-closed ");
        b.append('\n');
        b.append("        ").append("* ").append(.size()).append(" pending inbound messages\n");
        b.append("        ").append("* ").append(inboundMessageCnt).append(" (max ").append().append(") inbound messages\n");
        b.append("        ").append("* ").append(outboundMessageCnt).append(" (max ").append().append(") outbound messages\n");
        b.append("        ").append("* Pending inbound messages:\n");
        for (InboundMessage inboundMessage : ) {
            inboundMessage.dumpState(b);
        }
        b.append("        ").append("* Inbound messages:\n");
        for (InboundMessage inboundMessage : ) {
            inboundMessage.dumpState(b);
        }
        b.append("        ").append("* Outbound messages:\n");
        for (OutboundMessage outboundMessage : ) {
            outboundMessage.dumpState(b);
        }
    }
New to GrepCode? Check out our FAQ X