Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Conditions Of Use
   *
   * This software was developed by employees of the National Institute of
   * Standards and Technology (NIST), an agency of the Federal Government.
   * Pursuant to title 15 United States Code Section 105, works of NIST
   * employees are not subject to copyright protection in the United States
   * and are considered to be in the public domain.  As a result, a formal
   * license is not needed to use the software.
  *
  * This software is provided by NIST as a service and is expressly
  * provided "AS IS."  NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED
  * OR STATUTORY, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT
  * AND DATA ACCURACY.  NIST does not warrant or make any representations
  * regarding the use of the software or the results thereof, including but
  * not limited to the correctness, accuracy, reliability or usefulness of
  * the software.
  *
  * Permission to use this software is contingent upon your acceptance
  * of the terms of this agreement
  *
  * .
  *
  */
Product of NIST/ITL Advanced Networking Technologies Division (ANTD). * /
 
 package gov.nist.javax.sip.stack;
 
 
 
 
 /*
  * TLS support Added by Daniel J.Martinez Manzano <dani@dif.um.es>
  *
  */

Low level Input output to a socket. Caches TCP connections and takes care of re-connecting to the remote party if the other end drops the connection

Author(s):
M. Ranganathan
Version:
1.2
 
 
 public class IOHandler {
 	
 	private static StackLogger logger = CommonLogger.getLogger(IOHandler.class);
 
     private SipStackImpl sipStack;
 
     private static final String TCP = "tcp";
 
     // Added by Daniel J. Martinez Manzano <dani@dif.um.es>
     private static final String TLS = "tls";
 
     // A cache of client sockets that can be re-used for
     // sending tcp messages.
     private final ConcurrentHashMap<StringSocketsocketTable = new ConcurrentHashMap<StringSocket>();
 
 
     // private Semaphore ioSemaphore = new Semaphore(1);
 
     protected static String makeKey(InetAddress addrint port) {
         return addr.getHostAddress() + ":" + port;
 
     }
 
     protected static String makeKey(String addrint port) {
         return addr + ":" + port;
     }
 
     protected IOHandler(SIPTransactionStack sipStack) {
         this. = (SipStackImplsipStack;
    }
    protected void putSocket(String keySocket sock) {
            .logDebug("adding socket for key " + key);
        }
        .put(keysock);
    }
    protected Socket getSocket(String key) {
        return (Socket.get(key);
    }
    protected void removeSocket(String key) {
        .remove(key);
        .remove(key);
            .logDebug("removed Socket and Semaphore for key " + key);
        }
    }

    
A private function to write things out. This needs to be synchronized as writes can occur from multiple threads. We write in chunks to allow the other side to synchronize for large sized writes.
    private void writeChunks(OutputStream outputStreambyte[] bytesint length)
            throws IOException {
        // Chunk size is 16K - this hack is for large
        // writes over slow connections.
        synchronized (outputStream) {
            // outputStream.write(bytes,0,length);
            int chunksize = 8 * 1024;
            for (int p = 0; p < lengthp += chunksize) {
                int chunk = p + chunksize < length ? chunksize : length - p;
                outputStream.write(bytespchunk);
            }
        }
        outputStream.flush();
    }

    
Creates and binds, if necessary, a socket connected to the specified destination address and port and then returns its local address.

Parameters:
dst the destination address that the socket would need to connect to.
dstPort the port number that the connection would be established with.
localAddress the address that we would like to bind on (null for the "any" address).
localPort the port that we'd like our socket to bind to (0 for a random port).
Returns:
the SocketAddress that this handler would use when connecting to the specified destination address and port.
Throws:
java.io.IOException if we fail binding the socket
    public SocketAddress getLocalAddressForTcpDst(InetAddress dstint dstPort,
            InetAddress localAddressint localPortthrows IOException {
        String key = makeKey(dstdstPort);
        Socket clientSock = getSocket(key);
        if (clientSock == null) {
            clientSock = .getNetworkLayer().createSocket(dstdstPort,
                    localAddresslocalPort);
            putSocket(keyclientSock);
        }
        return clientSock.getLocalSocketAddress();
    }

    
Creates and binds, if necessary, a socket connected to the specified destination address and port and then returns its local address.

Parameters:
dst the destination address that the socket would need to connect to.
dstPort the port number that the connection would be established with.
localAddress the address that we would like to bind on (null for the "any" address).
channel the message channel that will be servicing the socket
Returns:
the SocketAddress that this handler would use when connecting to the specified destination address and port.
Throws:
java.io.IOException if we fail binding the socket
    public SocketAddress getLocalAddressForTlsDst(InetAddress dstint dstPort,
             InetAddress localAddressTLSMessageChannel channel)
             throws IOException {
        String key = makeKey(dstdstPort);
        Socket clientSock = getSocket(key);
        if (clientSock == null) {
            clientSock = .getNetworkLayer()
                .createSSLSocket(dstdstPortlocalAddress);
            SSLSocket sslsock = (SSLSocketclientSock;
            if (.isLoggingEnabled(.)) {
                .logDebug(
                        "inaddr = " + dst);
                .logDebug(
                        "port = " + dstPort);
            }
            HandshakeCompletedListener listner
                    = new HandshakeCompletedListenerImpl(channel);
            channel.setHandshakeCompletedListener(listner);
            sslsock.addHandshakeCompletedListener(listner);
            sslsock.setEnabledProtocols(.getEnabledProtocols());
            sslsock.startHandshake();
            if (.isLoggingEnabled(.)) {
                this..logDebug(
                        "Handshake passed");
            }
            // allow application to enforce policy by validating the
            // certificate
            try {
                .getTlsSecurityPolicy().enforceTlsPolicy(
                            channel.getEncapsulatedClientTransaction());
            }
            catch (SecurityException ex) {
                throw new IOException(ex.getMessage());
            }
            if (.isLoggingEnabled(.)) {
                this..logDebug(
                        "TLS Security policy passed");
            }
            putSocket(keyclientSock);
        }
        return clientSock.getLocalSocketAddress();
    }

    
Send an array of bytes.

Parameters:
receiverAddress -- inet address
contactPort -- port to connect to.
transport -- tcp or udp.
isClient -- retry to connect if the other end closed connection
Throws:
java.io.IOException -- if there is an IO exception sending message.
    public Socket sendBytes(InetAddress senderAddress,
            InetAddress receiverAddressint contactPortString transport,
            byte[] bytesboolean isClientMessageChannel messageChannel)
            throws IOException {
        int retry_count = 0;
        int max_retry = isClient ? 2 : 1;
        // Server uses TCP transport. TCP client sockets are cached
        int length = bytes.length;
            .logDebug(
                    "sendBytes " + transport + " inAddr "
                            + receiverAddress.getHostAddress() + " port = "
                            + contactPort + " length = " + length + " isClient " + isClient );
        }
                && .isLogStackTraceOnMessageSend()) {
            .logStackTrace(.);
        }
        if (transport.compareToIgnoreCase() == 0) {
            String key = makeKey(receiverAddresscontactPort);
            // This should be in a synchronized block ( reported by
            // Jayashenkhar ( lucent ).
            Socket clientSock = null;
            enterIOCriticalSection(key);
            try {
                clientSock = getSocket(key);
                while (retry_count < max_retry) {
                    if (clientSock == null) {
                        if (.isLoggingEnabled(.)) {
                            .logDebug(
                                    "inaddr = " + receiverAddress);
                            .logDebug(
                                    "port = " + contactPort);
                        }
                        // note that the IP Address for stack may not be
                        // assigned.
                        // sender address is the address of the listening point.
                        // in version 1.1 all listening points have the same IP
                        // address (i.e. that of the stack). In version 1.2
                        // the IP address is on a per listening point basis.
                        try {
                        	clientSock = .getNetworkLayer().createSocket(
                        			receiverAddresscontactPortsenderAddress);
                        } catch (SocketException e) { // We must catch the socket timeout exceptions here, any SocketException not just ConnectException
                        	.logError("Problem connecting " +
                        			receiverAddress + " " + contactPort + " " + senderAddress + " for message " + new String(bytes"UTF-8"));
                        	// new connection is bad.
                        	// remove from our table the socket and its semaphore
                        	removeSocket(key);
                        	throw new SocketException(e.getClass() + " " + e.getMessage() + " " + e.getCause() + " Problem connecting " +
                        			receiverAddress + " " + contactPort + " " + senderAddress + " for message " + new String(bytes"UTF-8"));
                        }
                        OutputStream outputStream = clientSock
                                .getOutputStream();
                        writeChunks(outputStreambyteslength);
                        putSocket(keyclientSock);
                        break;
                    } else {
                        try {
                            OutputStream outputStream = clientSock
                                    .getOutputStream();
                            writeChunks(outputStreambyteslength);
                            break;
                        } catch (IOException ex) {
                            if (
                                    .isLoggingEnabled(.))
                                .logWarning(
                                        "IOException occured retryCount "
                                                + retry_count);                            
                            try {
                                clientSock.close();
                            } catch (Exception e) {
                            }
                            clientSock = null;
                            retry_count++;
                            // This is a server tx trying to send a response.
                            if ( !isClient ) {
   								removeSocket(key);
                                throw ex;
                            }
                            if(retry_count >= max_retry) {
								// old connection is bad.
								// remove from our table the socket and its semaphore
else {
								// don't remove the semaphore on retry
							}
                        }
                    }
                }
            } catch (IOException ex) {
                if (.isLoggingEnabled(.)) {
                    .logError(
                            "Problem sending: sendBytes " + transport
                                    + " inAddr "
                                    + receiverAddress.getHostAddress()
                                    + " port = " + contactPort +
                            " remoteHost " + messageChannel.getPeerAddress() +
                            " remotePort " + messageChannel.getPeerPort() +
                            " peerPacketPort "
                                    + messageChannel.getPeerPacketSourcePort() + " isClient " + isClient);
                }
                removeSocket(key);
                /*
                 * For TCP responses, the transmission of responses is
                 * controlled by RFC 3261, section 18.2.2 :
                 *
                 * o If the "sent-protocol" is a reliable transport protocol
                 * such as TCP or SCTP, or TLS over those, the response MUST be
                 * sent using the existing connection to the source of the
                 * original request that created the transaction, if that
                 * connection is still open. This requires the server transport
                 * to maintain an association between server transactions and
                 * transport connections. If that connection is no longer open,
                 * the server SHOULD open a connection to the IP address in the
                 * "received" parameter, if present, using the port in the
                 * "sent-by" value, or the default port for that transport, if
                 * no port is specified. If that connection attempt fails, the
                 * server SHOULD use the procedures in [4] for servers in order
                 * to determine the IP address and port to open the connection
                 * and send the response to.
                 */
                if (!isClient) {
                    receiverAddress = InetAddress.getByName(messageChannel
                            .getViaHost());
                    contactPort = messageChannel.getViaPort();
                    if (contactPort == -1)
                        contactPort = 5060;
                    key = makeKey(receiverAddressmessageChannel
                            .getViaPort());
                    clientSock = this.getSocket(key);
                    if (clientSock == null) {
                        if (.isLoggingEnabled(.)) {
                            .logDebug(
                                    "inaddr = " + receiverAddress +
                                    " port = " + contactPort);
                        }
                        clientSock = .getNetworkLayer().createSocket(
                                receiverAddresscontactPortsenderAddress);
                        OutputStream outputStream = clientSock
                                .getOutputStream();
                        writeChunks(outputStreambyteslength);
                        putSocket(keyclientSock);
                        return clientSock;
                    } else {
                        if (.isLoggingEnabled(.)) {
                            .logDebug(
                                    "sending to " + key );
                        }
                        try {
                            OutputStream outputStream = clientSock
                                    .getOutputStream();
                            writeChunks(outputStreambyteslength);
                            return clientSock;
                        } catch (IOException ioe) {
                            if (
                                    .isLoggingEnabled(.))
                                .logError(
                                        "IOException occured  "ioe);
                            if (
                                    .isLoggingEnabled(.))
                                .logDebug(
                                        "Removing and Closing socket");
                            // old connection is bad.
                            // remove from our table.
                            removeSocket(key);
                            try {
                                clientSock.close();
                            } catch (Exception e) {
                            }
                            clientSock = null;
                            throw ioe;
                        }
                    }
                } else {
                    .logError("IOException occured at " , ex);
                    throw ex;
                }
            } finally {
                leaveIOCriticalSection(key);
            }
            if (clientSock == null) {
                if (.isLoggingEnabled(.)) {
                    .logError(
                            this..toString());
                    .logError(
                            "Could not connect to " + receiverAddress + ":"
                                    + contactPort);
                }
                throw new IOException("Could not connect to " + receiverAddress
                        + ":" + contactPort);
            } else {
                return clientSock;
            }
            // Added by Daniel J. Martinez Manzano <dani@dif.um.es>
            // Copied and modified from the former section for TCP
        } else if (transport.compareToIgnoreCase() == 0) {
            String key = makeKey(receiverAddresscontactPort);
            Socket clientSock = null;
            enterIOCriticalSection(key);
            try {
                clientSock = getSocket(key);
                while (retry_count < max_retry) {
                    if (clientSock == null) {
                        clientSock = .getNetworkLayer()
                                .createSSLSocket(receiverAddresscontactPort,
                                        senderAddress);
                        SSLSocket sslsock = (SSLSocketclientSock;
                        if (.isLoggingEnabled(.)) {
                            .logDebug(
                                    "inaddr = " + receiverAddress);
                            .logDebug(
                                    "port = " + contactPort);
                        }
                        HandshakeCompletedListener listner = new HandshakeCompletedListenerImpl(
                                (TLSMessageChannelmessageChannel);
                        ((TLSMessageChannelmessageChannel)
                                .setHandshakeCompletedListener(listner);
                        sslsock.addHandshakeCompletedListener(listner);
                        sslsock.setEnabledProtocols(
                                .getEnabledProtocols());
                        sslsock.startHandshake();
                        if (.isLoggingEnabled(.)) {
                            this..logDebug(
                                    "Handshake passed");
                        }
                        // allow application to enforce policy by validating the
                        // certificate
                        try {
                            
                                    .getTlsSecurityPolicy()
                                    .enforceTlsPolicy(
                                            messageChannel
                                                    .getEncapsulatedClientTransaction());
                        } catch (SecurityException ex) {
                            throw new IOException(ex.getMessage());
                        }
                        if (.isLoggingEnabled(.)) {
                            this..logDebug(
                                    "TLS Security policy passed");
                        }
                        OutputStream outputStream = clientSock
                                .getOutputStream();
                        writeChunks(outputStreambyteslength);
                        putSocket(keyclientSock);
                        break;
                    } else {
                        try {
                            OutputStream outputStream = clientSock
                                    .getOutputStream();
                            writeChunks(outputStreambyteslength);
                            break;
                        } catch (IOException ex) {
                            if (.isLoggingEnabled())
                                .logException(ex);
                            // old connection is bad.
                            // remove from our table.
                            removeSocket(key);
                            try {
                                .logDebug(
                                        "Closing socket");
                                clientSock.close();
                            } catch (Exception e) {
                            }
                            clientSock = null;
                            retry_count++;
                        }
                    }
                }
            } catch (SSLHandshakeException ex) {
                removeSocket(key);
                throw ex;
            } catch (IOException ex) {
                removeSocket(key);
                if (!isClient) {
                    receiverAddress = InetAddress.getByName(messageChannel
                            .getViaHost());
                    contactPort = messageChannel.getViaPort();
                    if (contactPort == -1)
                        contactPort = 5060;
                    key = makeKey(receiverAddressmessageChannel
                            .getViaPort());
                    clientSock = this.getSocket(key);
                    if (clientSock == null) {
                        if (.isLoggingEnabled(.)) {
                            .logDebug(
                                    "inaddr = " + receiverAddress +
                                    " port = " + contactPort);
                        }
                        SSLSocket sslsock = .getNetworkLayer().createSSLSocket(
                                receiverAddresscontactPortsenderAddress);
                        OutputStream outputStream = sslsock
                                .getOutputStream();
                        HandshakeCompletedListener listner = new HandshakeCompletedListenerImpl(
                                (TLSMessageChannelmessageChannel);
                        ((TLSMessageChannelmessageChannel)
                                .setHandshakeCompletedListener(listner);
                        sslsock.addHandshakeCompletedListener(listner);
                        sslsock.setEnabledProtocols(
                                .getEnabledProtocols());
                        sslsock.startHandshake();
                        if (.isLoggingEnabled(.)) {
                            this..logDebug(
                                    "Handshake passed");
                        }
                        writeChunks(outputStreambyteslength);
                        putSocket(keysslsock);
                        return sslsock;
                    } else {
                        if (.isLoggingEnabled(.)) {
                            .logDebug(
                                    "sending to " + key );
                        }
                        try {
                            OutputStream outputStream = clientSock
                                    .getOutputStream();
                            writeChunks(outputStreambyteslength);
                            return clientSock;
                        } catch (IOException ioe) {
                            if (
                                    .isLoggingEnabled(.))
                                .logError(
                                        "IOException occured  "ioe);
                            if (
                                    .isLoggingEnabled(.))
                                .logDebug(
                                        "Removing and Closing socket");
                            // old connection is bad.
                            // remove from our table.
                            removeSocket(key);
                            try {
                                clientSock.close();
                            } catch (Exception e) {
                            }
                            clientSock = null;
                            throw ioe;
                        }
                    }
                } else {
                    throw ex;
                }
            } finally {
                leaveIOCriticalSection(key);
            }
            if (clientSock == null) {
                throw new IOException("Could not connect to " + receiverAddress
                        + ":" + contactPort);
            } else
                return clientSock;
        } else {
            // This is a UDP transport...
            DatagramSocket datagramSock = .getNetworkLayer()
                    .createDatagramSocket();
            datagramSock.connect(receiverAddresscontactPort);
            DatagramPacket dgPacket = new DatagramPacket(bytes, 0, length,
                    receiverAddresscontactPort);
            datagramSock.send(dgPacket);
            datagramSock.close();
            return null;
        }
    }
    /*
     * private void enterIOCriticalSection(String key) throws IOException { try
     * { if ( ! this.ioSemaphore.tryAcquire(10,TimeUnit.SECONDS) ) { throw new
     * IOException("Could not acquire semaphore"); } } catch
     * (InterruptedException e) { throw new
     * IOException("exception in acquiring sem"); } }
     *
     *
     * private void leaveIOCriticalSection(String key) {
     * this.ioSemaphore.release(); }
     */
    private void leaveIOCriticalSection(String key) {
        Semaphore creationSemaphore = .get(key);
        if (creationSemaphore != null) {
            creationSemaphore.release();
        }
    }
    private void enterIOCriticalSection(String keythrows IOException {
        // http://dmy999.com/article/34/correct-use-of-concurrenthashmap
        Semaphore creationSemaphore = .get(key);
        if(creationSemaphore == null) {
            Semaphore newCreationSemaphore = new Semaphore(1, true);
            creationSemaphore = .putIfAbsent(keynewCreationSemaphore);
            if(creationSemaphore == null) {
                creationSemaphore = newCreationSemaphore;       
                if (.isLoggingEnabled(.)) {
                    .logDebug("new Semaphore added for key " + key);
                }
            }
        }
        
        try {
            boolean retval = creationSemaphore.tryAcquire(10, .);
            if (!retval) {
                throw new IOException("Could not acquire IO Semaphore'" + key
                        + "' after 10 seconds -- giving up ");
            }
        } catch (InterruptedException e) {
            throw new IOException("exception in acquiring sem");
        }
    }

    
Close all the cached connections.
    public void closeAll() {
            
                    .logDebug(
                            "Closing " + .size()
                                    + " sockets from IOHandler");
        for (Enumeration<Socketvalues = .elements(); values
                .hasMoreElements();) {
            Socket s = (Socketvalues.nextElement();
            try {
                s.close();
            } catch (IOException ex) {
            }
        }
    }
New to GrepCode? Check out our FAQ X