Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (C) FuseSource, Inc. http://fusesource.com Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 
 
 package org.fusesource.fabric.dosgi.tcp;
 
 import java.net.URI;
 import java.util.Map;
 
 
 public class TcpTransport implements Transport {
 
     private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
 
     protected State _serviceState = ;
 
     protected Map<StringObjectsocketOptions;
 
     public static class State {
         public String toString() {
             return getClass().getSimpleName();
         }
         public boolean isStarted() {
             return false;
         }
     }
 
     static class CallbackSupport extends State {
         LinkedList<Runnablecallbacks = new LinkedList<Runnable>();
 
         void add(Runnable r) {
             if (r != null) {
                 .add(r);
             }
         }
 
         void done() {
             for (Runnable callback : ) {
                 callback.run();
             }
         }
     }
 
     abstract static class SocketState {
         void onStop(Runnable onCompleted) {
         }
         void onCanceled() {
         }
         boolean is(Class<? extends SocketStateclazz) {
             return getClass()==clazz;
         }
     }
 
     public static final State CREATED = new State();
 
     public static class STARTING extends CallbackSupport {
     }
 
     public static final State STARTED = new State() {
         public boolean isStarted() {
             return true;
         }
     };
     public static class STOPPING extends CallbackSupport {
     }
    public static final State STOPPED = new State();
    final public void start() {
        start(null);
    }
    final public void stop() {
        stop(null);
    }
    final public void start(final Runnable onCompleted) {
        queue().execute(new Runnable() {
            public void run() {
                if ( ==  ||
                         == ) {
                    final STARTING state = new STARTING();
                    state.add(onCompleted);
                     = state;
                    _start(new Runnable() {
                        public void run() {
                             = ;
                            state.done();
                        }
                    });
                } else if ( instanceof STARTING) {
                    ((STARTING).add(onCompleted);
                } else if ( == ) {
                    if (onCompleted != null) {
                        onCompleted.run();
                    }
                } else {
                    if (onCompleted != null) {
                        onCompleted.run();
                    }
                    .error("start should not be called from state: " + );
                }
            }
        });
    }
    final public void stop(final Runnable onCompleted) {
        queue().execute(new Runnable() {
            public void run() {
                if ( == ) {
                    final STOPPING state = new STOPPING();
                    state.add(onCompleted);
                     = state;
                    _stop(new Runnable() {
                        public void run() {
                             = ;
                            state.done();
                        }
                    });
                } else if ( instanceof STOPPING) {
                    ((STOPPING).add(onCompleted);
                } else if ( == ) {
                    if (onCompleted != null) {
                        onCompleted.run();
                    }
                } else {
                    if (onCompleted != null) {
                        onCompleted.run();
                    }
                    .error("stop should not be called from state: " + );
                }
            }
        });
    }
    protected State getServiceState() {
        return ;
    }
    static class DISCONNECTED extends SocketState{}
    class CONNECTING extends SocketState{
        void onStop(Runnable onCompleted) {
            trace("CONNECTING.onStop");
            CANCELING state = new CANCELING();
             = state;
            state.onStop(onCompleted);
        }
        void onCanceled() {
            trace("CONNECTING.onCanceled");
            CANCELING state = new CANCELING();
             = state;
            state.onCanceled();
        }
    }
    class CONNECTED extends SocketState {
        void onStop(Runnable onCompleted) {
            trace("CONNECTED.onStop");
            CANCELING state = new CANCELING();
             = state;
            state.add(createDisconnectTask());
            state.onStop(onCompleted);
        }
        void onCanceled() {
            trace("CONNECTED.onCanceled");
            CANCELING state = new CANCELING();
             = state;
            state.add(createDisconnectTask());
            state.onCanceled();
        }
        Runnable createDisconnectTask() {
            return new Runnable(){
                public void run() {
                    .onTransportDisconnected(TcpTransport.this);
                }
            };
        }
    }
    class CANCELING extends SocketState {
        private LinkedList<Runnablerunnables =  new LinkedList<Runnable>();
        private int remaining;
        private boolean dispose;
        public CANCELING() {
            if!=null ) {
                ++;
                .cancel();
            }
            if!=null ) {
                ++;
                .cancel();
            }
        }
        void onStop(Runnable onCompleted) {
            trace("CANCELING.onCompleted");
            add(onCompleted);
             = true;
        }
        void add(Runnable onCompleted) {
            ifonCompleted!=null ) {
                .add(onCompleted);
            }
        }
        void onCanceled() {
            trace("CANCELING.onCanceled");
            --;
            if!=0 ) {
                return;
            }
            try {
                .close();
            } catch (IOException ignore) {
            }
             = new CANCELED();
            for (Runnable runnable : ) {
                runnable.run();
            }
            if () {
                dispose();
            }
        }
    }
    class CANCELED extends SocketState {
        private boolean disposed;
        public CANCELED(boolean disposed) {
            this.=disposed;
        }
        void onStop(Runnable onCompleted) {
            trace("CANCELED.onStop");
            if( ! ) {
                 = true;
                dispose();
            }
            onCompleted.run();
        }
    }
    protected URI remoteLocation;
    protected URI localLocation;
    protected TransportListener listener;
    protected String remoteAddress;
    protected ProtocolCodec codec;
    protected SocketChannel channel;
    protected SocketState socketState = new DISCONNECTED();
    protected DispatchQueue dispatchQueue;
    private DispatchSource readSource;
    private DispatchSource writeSource;
    protected boolean useLocalHost = true;
    int max_read_rate;
    int max_write_rate;
        int read_allowance = ;
        boolean read_suspended = false;
        int read_resume_counter = 0;
        int write_allowance = ;
        boolean write_suspended = false;
        public void resetAllowance() {
            if !=  ||  != ) {
                 = ;
                 = ;
                if ) {
                     = false;
                    resumeWrite();
                }
                if ) {
                     = false;
                    resumeRead();
                    forint i=0; i <  ; i++ ) {
                        resumeRead();
                    }
                }
            }
        }
        public int read(ByteBuffer dstthrows IOException {
            if==0 ) {
                return .read(dst);
            } else {
                int remaining = dst.remaining();
                if ==0 || remaining ==0 ) {
                    return 0;
                }
                int reduction = 0;
                ifremaining > ) {
                    reduction = remaining - ;
                    dst.limit(dst.limit() - reduction);
                }
                int rc=0;
                try {
                    rc = .read(dst);
                     -= rc;
                } finally {
                    ifreduction!=0 ) {
                        ifdst.remaining() == 0 ) {
                            // we need to suspend the read now until we get
                            // a new allowance..
                            .suspend();
                             = true;
                        }
                        dst.limit(dst.limit() + reduction);
                    }
                }
                return rc;
            }
        }
        public int write(ByteBuffer srcthrows IOException {
            if==0 ) {
                return .write(src);
            } else {
                int remaining = src.remaining();
                if ==0 || remaining ==0 ) {
                    return 0;
                }
                int reduction = 0;
                ifremaining > ) {
                    reduction = remaining - ;
                    src.limit(src.limit() - reduction);
                }
                int rc = 0;
                try {
                    rc = .write(src);
                     -= rc;
                } finally {
                    ifreduction!=0 ) {
                        ifsrc.remaining() == 0 ) {
                            // we need to suspend the read now until we get
                            // a new allowance..
                             = true;
                            suspendWrite();
                        }
                        src.limit(src.limit() + reduction);
                    }
                }
                return rc;
            }
        }
        public boolean isOpen() {
            return .isOpen();
        }
        public void close() throws IOException {
            .close();
        }
        public void resumeRead() {
            if ) {
                 += 1;
            } else {
                _resumeRead();
            }
        }
    }
    private final Runnable CANCEL_HANDLER = new Runnable() {
        public void run() {
            .onCanceled();
        }
    };
    static final class OneWay {
        final Object command;
        final Retained retained;
        public OneWay(Object commandRetained retained) {
            this. = command;
            this. = retained;
        }
    }
    public void connected(SocketChannel channelthrows IOExceptionException {
        this. = channel;
        if !=null ) {
            initializeCodec();
        }
        this..configureBlocking(false);
        this. = channel.socket().getRemoteSocketAddress().toString();
        channel.socket().setSoLinger(true, 0);
        channel.socket().setTcpNoDelay(true);
        this. = new CONNECTED();
    }
    protected void initializeCodec() {
    }
    public void connecting(URI remoteLocationURI localLocationthrows IOExceptionException {
        this. = SocketChannel.open();
        this..configureBlocking(false);
        this. = remoteLocation;
        this. = localLocation;
        if (localLocation != null) {
            InetSocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
            .socket().bind(localAddress);
        }
        String host = resolveHostName(remoteLocation.getHost());
        InetSocketAddress remoteAddress = new InetSocketAddress(hostremoteLocation.getPort());
        .connect(remoteAddress);
        this. = new CONNECTING();
    }
    public DispatchQueue queue() {
        return ;
    }
    public void setDispatchQueue(DispatchQueue queue) {
        this. = queue;
    }
    public void _start(Runnable onCompleted) {
        try {
            if (.is(CONNECTING.class) ) {
                trace("connecting...");
                // this allows the connect to complete..
                 = Dispatch.createSource(.);
                .setEventHandler(new Runnable() {
                    public void run() {
                        if (getServiceState() != ) {
                            return;
                        }
                        try {
                            trace("connected.");
                            .finishConnect();
                            .setCancelHandler(null);
                            .cancel();
                            =null;
                             = new CONNECTED();
                            onConnected();
                        } catch (IOException e) {
                            onTransportFailure(e);
                        }
                    }
                });
                .setCancelHandler();
                .resume();
            } else if (.is(CONNECTED.class) ) {
                .execute(new Runnable() {
                    public void run() {
                        try {
                            trace("was connected.");
                            onConnected();
                        } catch (IOException e) {
                             onTransportFailure(e);
                        }
                    }
                });
            } else {
                ..println("cannot be started.  socket state is: "+);
            }
        } finally {
            ifonCompleted!=null ) {
                onCompleted.run();
            }
        }
    }
    public void _stop(final Runnable onCompleted) {
        trace("stopping.. at state: "+);
        .onStop(onCompleted);
    }
    protected String resolveHostName(String hostthrows UnknownHostException {
        String localName = InetAddress.getLocalHost().getHostName();
        if (localName != null && isUseLocalHost()) {
            if (localName.equals(host)) {
                return "localhost";
            }
        }
        return host;
    }
    protected void onConnected() throws IOException {
        .setEventHandler(new Runnable() {
            public void run() {
                drainInbound();
            }
        });
        .setEventHandler(new Runnable() {
            public void run() {
                drainOutbound();
            }
        });
        if!=0 || !=0 ) {
             = new RateLimitingChannel();
            schedualRateAllowanceReset();
        }
        .onTransportConnected(this);
    }
    private void schedualRateAllowanceReset() {
        .executeAfter(1, .new Runnable(){
            public void run() {
                if( !.is(CONNECTED.class) ) {
                    return;
                }
                .resetAllowance();
                schedualRateAllowanceReset();
            }
        });
    }
    private void dispose() {
        if!=null ) {
            .cancel();
            =null;
        }
        if!=null ) {
            .cancel();
            =null;
        }
        this. = null;
    }
    public void onTransportFailure(IOException error) {
        .onTransportFailure(thiserror);
        .onCanceled();
    }
    public boolean full() {
        return .full();
    }
    public boolean offer(Object command) {
        assert Dispatch.getCurrentQueue() == ;
        try {
            if (!.is(CONNECTED.class)) {
                throw new IOException("Not connected.");
            }
            if (getServiceState() != ) {
                throw new IOException("Not running.");
            }
            ProtocolCodec.BufferState rc = .write(command);
            switch (rc ) {
                case :
                    return false;
                default:
                    if ) {
                         = false;
                        resumeWrite();
                    }
                    return true;
            }
        } catch (IOException e) {
            onTransportFailure(e);
            return false;
        }
    }
    boolean drained = true;
    
    protected void drainOutbound() {
        assert Dispatch.getCurrentQueue() == ;
        if (getServiceState() !=  || !.is(CONNECTED.class)) {
            return;
        }
        try {
            if.flush() == .. && flush() ) {
                if( ! ) {
                     = true;
                    suspendWrite();
                    .onRefill(this);
                }
            }
        } catch (IOException e) {
            onTransportFailure(e);
        }
    }
    protected boolean flush() throws IOException {
        return true;
    }
    protected void drainInbound() {
        if (!getServiceState().isStarted() || .isSuspended()) {
            return;
        }
        try {
            long initial = .getReadCounter();
            // Only process upto 64k worth of data at a time so we can give
            // other connections a chance to process their requests.
            while.getReadCounter()-initial < 1024*64 ) {
                Object command = .read();
                if ( command!=null ) {
                    try {
                        .onTransportCommand(thiscommand);
                    } catch (Throwable e) {
                        onTransportFailure(new IOException("Transport listener failure."));
                    }
                    // the transport may be suspended after processing a command.
                    if (getServiceState() ==  || .isSuspended()) {
                        return;
                    }
                } else {
                    return;
                }
            }
        } catch (IOException e) {
            onTransportFailure(e);
        }
    }
    public String getRemoteAddress() {
        return ;
    }
    private boolean assertConnected() {
        try {
            if ( !isConnected() ) {
                throw new IOException("Not connected.");
            }
            return true;
        } catch (IOException e) {
            onTransportFailure(e);
        }
        return false;
    }
    public void suspendRead() {
        ifisConnected() && !=null ) {
            .suspend();
        }
    }
    public void resumeRead() {
        ifisConnected() && !=null ) {
            if!=null ) {
                .resumeRead();
            } else {
                _resumeRead();
            }
        }
    }
    private void _resumeRead() {
        .resume();
        .execute(new Runnable(){
            public void run() {
                drainInbound();
            }
        });
    }
    protected void suspendWrite() {
        ifisConnected() && !=null ) {
            .suspend();
        }
    }
    protected void resumeWrite() {
        ifisConnected() && !=null ) {
            .resume();
            .execute(new Runnable(){
                public void run() {
                    drainOutbound();
                }
            });
        }
    }
        return ;
    }
    public void setTransportListener(TransportListener listener) {
        this. = listener;
    }
    public ProtocolCodec getProtocolCodec() {
        return ;
    }
    public void setProtocolCodec(ProtocolCodec protocolCodec) {
        this. = protocolCodec;
        if!=null && !=null ) {
            initializeCodec();
        }
    }
    public boolean isConnected() {
        return .is(CONNECTED.class);
    }
    public boolean isDisposed() {
        return getServiceState() == ;
    }
    public void setSocketOptions(Map<StringObjectsocketOptions) {
        this. = socketOptions;
    }
    public boolean isUseLocalHost() {
        return ;
    }

    
Sets whether 'localhost' or the actual local host name should be used to make local connections. On some operating systems such as Macs its not possible to connect as the local host name so localhost is better.
    public void setUseLocalHost(boolean useLocalHost) {
        this. = useLocalHost;
    }
    private void trace(String message) {
        if.isTraceEnabled() ) {
            final String label = .getLabel();
            iflabel !=null ) {
                .trace(label +" | "+message);
            } else {
                .trace(message);
            }
        }
    }
    public SocketChannel getSocketChannel() {
        return ;
    }
    public ReadableByteChannel readChannel() {
        if(!=null) {
            return ;
        } else {
            return ;
        }
    }
    public WritableByteChannel writeChannel() {
        if(!=null) {
            return ;
        } else {
            return ;
        }
    }
    public int getMax_read_rate() {
        return ;
    }
    public void setMax_read_rate(int max_read_rate) {
        this. = max_read_rate;
    }
    public int getMax_write_rate() {
        return ;
    }
    public void setMax_write_rate(int max_write_rate) {
        this. = max_write_rate;
    }
New to GrepCode? Check out our FAQ X