Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Conditions Of Use 
   * 
   * This software was developed by employees of the National Institute of
   * Standards and Technology (NIST), an agency of the Federal Government.
   * Pursuant to title 15 Untied States Code Section 105, works of NIST
   * employees are not subject to copyright protection in the United States
   * and are considered to be in the public domain.  As a result, a formal
   * license is not needed to use the software.
  * 
  * This software is provided by NIST as a service and is expressly
  * provided "AS IS."  NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED
  * OR STATUTORY, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT
  * AND DATA ACCURACY.  NIST does not warrant or make any representations
  * regarding the use of the software or the results thereof, including but
  * not limited to the correctness, accuracy, reliability or usefulness of
  * the software.
  * 
  * Permission to use this software is contingent upon your acceptance
  * of the terms of this agreement
  *  
  * .
  * 
  */
Product of NIST/ITL Advanced Networking Technologies Division (ANTD). * /
 
 package gov.nist.javax.sip.stack;
 
 import java.net.*;
 import java.io.*;
 
 
 /*
  * Ahmet Uyar <auyar@csit.fsu.edu>sent in a bug report for TCP operation of the JAIN sipStack.
  * Niklas Uhrberg suggested that a mechanism be added to limit the number of simultaneous open
  * connections. The TLS Adaptations were contributed by Daniel Martinez. Hagai Sela contributed a
  * bug fix for symmetric nat. Jeroen van Bemmel added compensation for buggy clients ( Microsoft
  * RTC clients ). Bug fixes by viswashanti.kadiyala@antepo.com, Joost Yervante Damand
  */

This is a stack abstraction for TCP connections. This abstracts a stream of parsed messages. The SIP sipStack starts this from the main SIPStack class for each connection that it accepts. It starts a message parser in its own thread and talks to the message parser via a pipe. The message parser calls back via the parseError or processMessage functions that are defined as part of the SIPMessageListener interface.

Author(s):
M. Ranganathan
Version:
1.2 $Revision: 1.59 $ $Date: 2009/11/20 04:45:53 $
See also:
gov.nist.javax.sip.parser.PipelinedMsgParser
 
 public class TCPMessageChannel extends MessageChannel implements SIPMessageListenerRunnable,
         RawMessageChannel {
 
     private Socket mySock;
 
     private PipelinedMsgParser myParser;
 
     protected InputStream myClientInputStream// just to pass to thread.
 
     protected OutputStream myClientOutputStream;
 
     protected String key;
 
     protected boolean isCached;
 
     protected boolean isRunning;
 
     private Thread mythread;
 
     protected SIPTransactionStack sipStack;
 
     protected String myAddress;
 
     protected int myPort;
 
     protected InetAddress peerAddress;
 
     protected int peerPort;
 
     protected String peerProtocol;
 
     // Incremented whenever a transaction gets assigned
     // to the message channel and decremented when
     // a transaction gets freed from the message channel.
     // protected int useCount;
 
    protected TCPMessageChannel(SIPTransactionStack sipStack) {
        this. = sipStack;
    }

    
Constructor - gets called from the SIPStack class with a socket on accepting a new client. All the processing of the message is done here with the sipStack being freed up to handle new connections. The sock input is the socket that is returned from the accept. Global data that is shared by all threads is accessible in the Server structure.

Parameters:
sock Socket from which to read and write messages. The socket is already connected (was created as a result of an accept).
sipStack Ptr to SIP Stack
    protected TCPMessageChannel(Socket sockSIPTransactionStack sipStack,
            TCPMessageProcessor msgProcessorthrows IOException {
        if (sipStack.isLoggingEnabled()) {
            sipStack.getStackLogger().logDebug("creating new TCPMessageChannel ");
            sipStack.getStackLogger().logStackTrace();
        }
         = sock;
         = .getInetAddress();
         = msgProcessor.getIpAddress().getHostAddress();
         = new Thread(this);
        .setDaemon(true);
        .setName("TCPMessageChannelThread");
        // Stash away a pointer to our sipStack structure.
        this. = sipStack;
        this. = .getPort();
        this. = msgProcessor;
        this. = this..getPort();
        // Bug report by Vishwashanti Raj Kadiayl
        super. = msgProcessor;
        // Can drop this after response is sent potentially.
        .start();
    }

    
Constructor - connects to the given inet address. Acknowledgement -- Lamine Brahimi (IBM Zurich) sent in a bug fix for this method. A thread was being uncessarily created.

Parameters:
inetAddr inet address to connect to.
sipStack is the sip sipStack from which we are created.
Throws:
java.io.IOException if we cannot connect.
    protected TCPMessageChannel(InetAddress inetAddrint portSIPTransactionStack sipStack,
            TCPMessageProcessor messageProcessorthrows IOException {
        if (sipStack.isLoggingEnabled()) {
            sipStack.getStackLogger().logDebug("creating new TCPMessageChannel ");
            sipStack.getStackLogger().logStackTrace();
        }
        this. = inetAddr;
        this. = port;
        this. = messageProcessor.getPort();
        this. = "TCP";
        this. = sipStack;
        this. = messageProcessor;
        this. = messageProcessor.getIpAddress().getHostAddress();
        // Bug report by Vishwashanti Raj Kadiayl
        this. = MessageChannel.getKey("TCP");
        super. = messageProcessor;
    }

    
Returns "true" as this is a reliable transport.
    public boolean isReliable() {
        return true;
    }

    
Close the message channel.
    public void close() {
        try {
            if ( != null) {
                .close();
                 = null;
            }
            if (.isLoggingEnabled())
                .getStackLogger().logDebug("Closing message Channel " + this);
        } catch (IOException ex) {
            if (.isLoggingEnabled())
                .getStackLogger().logDebug("Error closing socket " + ex);
        }
    }

    
Get my SIP Stack.

Returns:
The SIP Stack for this message channel.
    public SIPTransactionStack getSIPStack() {
        return ;
    }

    
get the transport string.

Returns:
"tcp" in this case.
    public String getTransport() {
        return "TCP";
    }

    
get the address of the client that sent the data to us.

Returns:
Address of the client that sent us data that resulted in this channel being created.
    public String getPeerAddress() {
        if ( != null) {
            return .getHostAddress();
        } else
            return getHost();
    }
    protected InetAddress getPeerInetAddress() {
        return ;
    }
    public String getPeerProtocol() {
        return this.;
    }

    
Send message to whoever is connected to us. Uses the topmost via address to send to.

Parameters:
msg is the message to send.
retry
    private void sendMessage(byte[] msgboolean retrythrows IOException {
        /*
         * Patch from kircuv@dev.java.net (Issue 119 ) This patch avoids the case where two
         * TCPMessageChannels are now pointing to the same socket.getInputStream().
         * 
         * JvB 22/5 removed
         */
       // Socket s = this.sipStack.ioHandler.getSocket(IOHandler.makeKey(
       // this.peerAddress, this.peerPort));
        Socket sock = this...sendBytes(this..getIpAddress(),
                this.this.this.msgretrythis);
        // Created a new socket so close the old one and stick the new
        // one in its place but dont do this if it is a datagram socket.
        // (could have replied via udp but received via tcp!).
        // if (mySock == null && s != null) {
        // this.uncache();
        // } else
        if (sock !=  && sock != null) {
            try {
                if ( != null)
                    .close();
            } catch (IOException ex) {
            }
             = sock;
            this. = .getInputStream();
            this. = .getOutputStream();
            Thread thread = new Thread(this);
            thread.setDaemon(true);
            thread.setName("TCPMessageChannelThread");
            thread.start();
        }
    }

    
Return a formatted message to the client. We try to re-connect with the peer on the other end if possible.

Parameters:
sipMessage Message to send.
Throws:
java.io.IOException If there is an error sending the message
    public void sendMessage(SIPMessage sipMessagethrows IOException {
        byte[] msg = sipMessage.encodeAsBytes(this.getTransport());
        long time = System.currentTimeMillis();
        // JvB: also retry for responses, if the connection is gone we should
        // try to reconnect
        this.sendMessage(msg/* sipMessage instanceof SIPRequest */true);
            logMessage(sipMessagetime);
    }

    
Send a message to a specified address.

Parameters:
message Pre-formatted message to send.
receiverAddress Address to send it to.
receiverPort Receiver port.
Throws:
java.io.IOException If there is a problem connecting or sending.
    public void sendMessage(byte message[], InetAddress receiverAddressint receiverPort,
            boolean retrythrows IOException {
        if (message == null || receiverAddress == null)
            throw new IllegalArgumentException("Null argument");
         Socket sock = this...sendBytes(this..getIpAddress(),
                receiverAddressreceiverPort"TCP"messageretrythis);
        if (sock !=  && sock != null) {
            if ( != null) {
                /*
                 * Delay the close of the socket for some time in case it is being used.
                 */
                .getTimer().schedule(new TimerTask() {
                    @Override
                    public boolean cancel() {
                        try {
                            .close();
                            super.cancel();
                        } catch (IOException ex) {
                        }
                        return true;
                    }
                    @Override
                    public void run() {
                        try {
                            .close();
                        } catch (IOException ex) {
                        }
                    }
                }, 8000);
            }
             = sock;
            this. = .getInputStream();
            this. = .getOutputStream();
            // start a new reader on this end of the pipe.
            Thread mythread = new Thread(this);
            mythread.setDaemon(true);
            mythread.setName("TCPMessageChannelThread");
            mythread.start();
        }
    }

    
Exception processor for exceptions detected from the parser. (This is invoked by the parser when an error is detected).

Parameters:
sipMessage -- the message that incurred the error.
ex -- parse exception detected by the parser.
header -- header that caused the error.
Throws:
java.text.ParseException Thrown if we want to reject the message.
    public void handleException(ParseException exSIPMessage sipMessageClass hdrClass,
            String headerString messagethrows ParseException {
        if (.isLoggingEnabled())
            .getStackLogger().logException(ex);
        // Log the bad message for later reference.
        if ((hdrClass != null)
                && (hdrClass.equals(From.class) || hdrClass.equals(To.class)
                        || hdrClass.equals(CSeq.class) || hdrClass.equals(Via.class)
                        || hdrClass.equals(CallID.class) || hdrClass.equals(RequestLine.class) || hdrClass
                        .equals(StatusLine.class))) {
            if (.isLoggingEnabled()) {
                .getStackLogger().logDebug(
                        "Encountered Bad Message \n" + sipMessage.toString());
            }
            // JvB: send a 400 response for requests (except ACK)
            // Currently only UDP, @todo also other transports
            String msgString = sipMessage.toString();
            if (!msgString.startsWith("SIP/") && !msgString.startsWith("ACK ")) {
                String badReqRes = createBadReqRes(msgStringex);
                if (badReqRes != null) {
                    if (.isLoggingEnabled()) {
                        .getStackLogger().logDebug("Sending automatic 400 Bad Request:");
                        .getStackLogger().logDebug(badReqRes);
                    }
                    try {
                        this.sendMessage(badReqRes.getBytes(), this.getPeerInetAddress(), this
                                .getPeerPort(), false);
                    } catch (IOException e) {
                        this..getStackLogger().logException(e);
                    }
                } else {
                    if (.isLoggingEnabled()) {
                        .getStackLogger().logDebug(
                                "Could not formulate automatic 400 Bad Request");
                    }
                }
            }
            throw ex;
        } else {
            sipMessage.addUnparsed(header);
        }
    }

    
Gets invoked by the parser as a callback on successful message parsing (i.e. no parser errors).

Parameters:
sipMessage Mesage to process (this calls the application for processing the message).
    public void processMessage(SIPMessage sipMessagethrows Exception {
        try {
            if (sipMessage.getFrom() == null
                    || // sipMessage.getFrom().getTag()
                    // == null ||
                    sipMessage.getTo() == null || sipMessage.getCallId() == null
                    || sipMessage.getCSeq() == null || sipMessage.getViaHeaders() == null) {
                String badmsg = sipMessage.encode();
                if (.isLoggingEnabled()) {
                    .getStackLogger().logDebug(">>> Dropped Bad Msg");
                    .getStackLogger().logDebug(badmsg);
                }
                return;
            }
            ViaList viaList = sipMessage.getViaHeaders();
            // For a request
            // first via header tells where the message is coming from.
            // For response, this has already been recorded in the outgoing
            // message.
            if (sipMessage instanceof SIPRequest) {
                Via v = (ViaviaList.getFirst();
                Hop hop = ..resolveAddress(v.getHop());
                this. = v.getTransport();
                try {
                    this. = .getInetAddress();
                    // Check to see if the received parameter matches
                    // the peer address and tag it appropriately.
                    // JvB: dont do this. It is both costly and incorrect
                    // Must set received also when it is a FQDN, regardless
                    // whether
                    // it resolves to the correct IP address
                    // InetAddress sentByAddress =
                    // InetAddress.getByName(hop.getHost());
                    // JvB: if sender added 'rport', must always set received
                    if (v.hasParameter(.)
                            || !hop.getHost().equals(this..getHostAddress())) {
                        v.setParameter(.this..getHostAddress());
                    }
                    // @@@ hagai
                    // JvB: technically, may only do this when Via already
                    // contains
                    // rport
                    v.setParameter(., Integer.toString(this.));
                } catch (java.text.ParseException ex) {
                    InternalErrorHandler.handleException(ex.getStackLogger());
                }
                // Use this for outgoing messages as well.
                if (!this.) {
                    ((TCPMessageProcessorthis.).cacheMessageChannel(this);
                    this. = true;
                    int remotePort = ((java.net.InetSocketAddress.getRemoteSocketAddress()).getPort();
                    String key = IOHandler.makeKey(.getInetAddress(), remotePort);
                    ..putSocket(key);
                }
            }
         
            // Foreach part of the request header, fetch it and process it
            long receptionTime = System.currentTimeMillis();
            if (sipMessage instanceof SIPRequest) {
                // This is a request - process the request.
                SIPRequest sipRequest = (SIPRequestsipMessage;
                // Create a new sever side request processor for this
                // message and let it handle the rest.
                if (.isLoggingEnabled()) {
                    .getStackLogger().logDebug("----Processing Message---");
                }
                // Check for reasonable size - reject message
                // if it is too long.
                if (this..getStackLogger().isLoggingEnabled(.)) {
                    ..logMessage(sipMessagethis.getPeerHostPort().toString(),
                            this.getMessageProcessor().getIpAddress().getHostAddress() + ":"
                                    + this.getMessageProcessor().getPort(), falsereceptionTime);
                }
                if (.getMaxMessageSize() > 0
                        && sipRequest.getSize()
                                + (sipRequest.getContentLength() == null ? 0 : sipRequest
                                        .getContentLength().getContentLength()) > 
                                .getMaxMessageSize()) {
                    SIPResponse sipResponse = sipRequest
                            .createResponse(.);
                    byte[] resp = sipResponse.encodeAsBytes(this.getTransport());
                    this.sendMessage(respfalse);
                    throw new Exception("Message size exceeded");
                }
                ServerRequestInterface sipServerRequest = .newSIPServerRequest(
                        sipRequestthis);
                if (sipServerRequest != null) {
                    try {
                        sipServerRequest.processRequest(sipRequestthis);
                    } finally {
                        if (sipServerRequest instanceof SIPTransaction) {
                            SIPServerTransaction sipServerTx = (SIPServerTransactionsipServerRequest;
                            if (!sipServerTx.passToListener())
                                ((SIPTransactionsipServerRequest).releaseSem();
                        }
                    }
                } else {
                	if (.isLoggingEnabled())
                		this..getStackLogger()
                            .logWarning("Dropping request -- could not acquire semaphore in 10 sec");
                }
            } else {
                SIPResponse sipResponse = (SIPResponsesipMessage;
                // JvB: dont do this
                // if (sipResponse.getStatusCode() == 100)
                // sipResponse.getTo().removeParameter("tag");
                try {
                    sipResponse.checkHeaders();
                } catch (ParseException ex) {
                    if (.isLoggingEnabled())
                        .getStackLogger()
                                .logError("Dropping Badly formatted response message >>> "
                                        + sipResponse);
                    return;
                }
                // This is a response message - process it.
                // Check the size of the response.
                // If it is too large dump it silently.
                if (.getMaxMessageSize() > 0
                        && sipResponse.getSize()
                                + (sipResponse.getContentLength() == null ? 0 : sipResponse
                                        .getContentLength().getContentLength()) > 
                                .getMaxMessageSize()) {
                    if (.isLoggingEnabled())
                        this..getStackLogger().logDebug("Message size exceeded");
                    return;
                }
                ServerResponseInterface sipServerResponse = .newSIPServerResponse(
                        sipResponsethis);
                if (sipServerResponse != null) {
                    try {
                        if (sipServerResponse instanceof SIPClientTransaction
                                && !((SIPClientTransactionsipServerResponse)
                                        .checkFromTag(sipResponse)) {
                            if (.isLoggingEnabled())
                                .getStackLogger()
                                        .logError("Dropping response message with invalid tag >>> "
                                                + sipResponse);
                            return;
                        }
                        sipServerResponse.processResponse(sipResponsethis);
                    } finally {
                        if (sipServerResponse instanceof SIPTransaction
                                && !((SIPTransactionsipServerResponse).passToListener())
                            ((SIPTransactionsipServerResponse).releaseSem();
                    }
                } else {
                    
                            .getStackLogger()
                            .logWarning(
                                    "Application is blocked -- could not acquire semaphore -- dropping response");
                }
            }
        } finally {
        }
    }

    
This gets invoked when thread.start is called from the constructor. Implements a message loop - reading the tcp connection and processing messages until we are done or the other end has closed.
    public void run() {
        Pipeline hispipe = null;
        // Create a pipeline to connect to our message parser.
        hispipe = new Pipeline(.,
                ((SIPTransactionStack).getTimer());
        // Create a pipelined message parser to read and parse
        // messages that we write out to him.
         = new PipelinedMsgParser(thishispipethis..getMaxMessageSize());
        // Start running the parser thread.
        .processInput();
        // bug fix by Emmanuel Proulx
        int bufferSize = 4096;
        this..++;
        this. = true;
        try {
            while (true) {
                try {
                    byte[] msg = new byte[bufferSize];
                    int nbytes = .read(msg, 0, bufferSize);
                    // no more bytes to read...
                    if (nbytes == -1) {
                        hispipe.write("\r\n\r\n".getBytes("UTF-8"));
                        try {
                            if (. != -1) {
                                synchronized () {
                                    .--;
                                    .notify();
                                }
                            }
                            hispipe.close();
                            .close();
                        } catch (IOException ioex) {
                        }
                        return;
                    }
                    hispipe.write(msg, 0, nbytes);
                } catch (IOException ex) {
                    // Terminate the message.
                    try {
                        hispipe.write("\r\n\r\n".getBytes("UTF-8"));
                    } catch (Exception e) {
                        // InternalErrorHandler.handleException(e);
                    }
                    try {
                        if (.isLoggingEnabled())
                            .getStackLogger().logDebug("IOException  closing sock " + ex);
                        try {
                            if (. != -1) {
                                synchronized () {
                                    .--;
                                    // System.out.println("Notifying!");
                                    .notify();
                                }
                            }
                            .close();
                            hispipe.close();
                        } catch (IOException ioex) {
                        }
                    } catch (Exception ex1) {
                        // Do nothing.
                    }
                    return;
                } catch (Exception ex) {
                    InternalErrorHandler.handleException(ex.getStackLogger());
                }
            }
        } finally {
            this. = false;
            this..remove(this);
            this..--;
            .close();
        }
    }
    protected void uncache() {
    	if ( && !) {
    		this..remove(this);
    	}
    }

    
Equals predicate.

Parameters:
other is the other object to compare ourselves to for equals
    public boolean equals(Object other) {
        if (!this.getClass().equals(other.getClass()))
            return false;
        else {
            TCPMessageChannel that = (TCPMessageChannelother;
            if (this. != that.mySock)
                return false;
            else
                return true;
        }
    }

    
Get an identifying key. This key is used to cache the connection and re-use it if necessary.
    public String getKey() {
        if (this. != null) {
            return this.;
        } else {
            this. = MessageChannel.getKey(this.this."TCP");
            return this.;
        }
    }

    
Get the host to assign to outgoing messages.

Returns:
the host to assign to the via header.
    public String getViaHost() {
        return ;
    }

    
Get the port for outgoing messages sent from the channel.

Returns:
the port to assign to the via header.
    public int getViaPort() {
        return ;
    }

    
Get the port of the peer to whom we are sending messages.

Returns:
the peer port.
    public int getPeerPort() {
        return ;
    }
    public int getPeerPacketSourcePort() {
        return this.;
    }
        return this.;
    }

    
TCP Is not a secure protocol.
    public boolean isSecure() {
        return false;
    }
New to GrepCode? Check out our FAQ X