Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   package com.sleepycat.je.rep.utilint;
   
   import java.io.IOException;
   import java.io.InputStream;
   import java.io.OutputStream;
   import java.io.PrintWriter;
   import java.net.ServerSocket;
   import java.net.Socket;
  import java.util.Map;
  import java.util.Set;
  
ServiceDispatcher listens on a specific socket for service requests and dispatches control to the service that is being requested. A service request message has the format: Service:<one byte ServiceName.length><ServiceName> The format of the message is binary, with all text being encoded in ascii. Upon receipt of service request message, the new SocketChannel is queued for processing by the service in the Queue associated with the service. The SocketChannel is the responsibility of the service after this point. It can configure the channel to best suit the requirements of the specific service. The dispatcher returns a single byte to indicate success or failure. The byte value encodes a ServiceDispatcher.Response enumerator.
  
  public class ServiceDispatcher extends StoppableThread {
  
      /* The socket on which the dispatcher is listening */
      private final InetSocketAddress socketAddress;
  
      /*
       * The selector that watches for accept events on the server socket and
       * on subsequent read events.
       */
      private final Selector selector;
  
      /* The server socket channel */
      private final ServerSocketChannel serverChannel;
  
      /* Determines whether new connections should be accepted. */
      private boolean processAcceptRequests = true;
  
      /* Maintains the error count, used primarily for testing. */
      private int errorCount = 0;
  
      /*
       * Maps the service name to the queue of sockets processed by the
       * service.
       */
      private final Map<StringServiceserviceMap =
          new ConcurrentHashMap<StringService>();
  
      /* The thread pool used to manage the threads used by services */
      private final ExecutorService pool = Executors.newCachedThreadPool();
  
      private final Logger logger;
      private final Formatter formatter;
  
      /* The prefix for a service request. */
      private static final String REQUEST_PREFIX = "Service:";
      private static final byte[] REQUEST_PREFIX_BYTES;
  
      /*
       * A reference to a replicated environment, only used for error
       * propagation when this dispatcher has been created for a replicated
       * node.
       */
      private final RepImpl repImpl;

    
The response to a service request. Do not rearrange the order of the enumerators, since their ordinal values are currently used in messages.
 
     public static enum Response {
 
         OK, BUSY, FORMAT_ERROR, UNKNOWN_SERVICE ;
 
         ByteBuffer byteBuffer() {
             ByteBuffer buffer = ByteBuffer.allocate(1);
             buffer.put((byte)ordinal());
             buffer.flip();
             return buffer;
         }
 
         static Response get(int ordinal) {
             if (ordinal < values().length) {
                 return values()[ordinal];
             }
             return null;
         }
     }
 
     static {
          = StringUtils.toASCII();
     }
 
     /*
      * The initial size is the prefix plus the byte that holds the length of
      * the service name.
      */
     private static final int INITIAL_BUFFER_SIZE =
         .+1;

    
Create a ServiceDispatcher listening on a specific socket for service requests. This service dispatcher has been created on behalf of a replicated environment, and the node will be informed of any unexpected failures seen by the dispatcher.

Parameters:
socketAddress the socket on which it listens for service requests
Throws:
java.io.IOException if the socket could not be bound.
 
     public ServiceDispatcher(InetSocketAddress socketAddress,
                              RepImpl repImpl)
         throws IOException {
 
         super(repImpl"ServiceDispatcher-" + socketAddress.getHostName() +
                        ":" + socketAddress.getPort());
 
         this. = repImpl;
         this. = socketAddress;
          = ServerSocketChannel.open();
         .configureBlocking(false);
          = Selector.open();
         ServerSocket acceptSocket = .socket();
         /* No timeout */
         acceptSocket.setSoTimeout(0);
         acceptSocket.bind(socketAddress);
         if (repImpl == null) {
              = LoggerUtils.getLoggerFormatterNeeded(getClass());
         } else {
              = LoggerUtils.getLogger(getClass());
         }
         NameIdPair nameIdPair =
             (repImpl == null) ? . : repImpl.getNameIdPair();
          = new ReplicationFormatter(nameIdPair);
     }

    
Convenience overloading for when the dispatcher is created without a replicated environment, e.g. when used by the Monitor, and in unit test situations.

 
     public ServiceDispatcher(InetSocketAddress socketAddress)
         throws IOException {
 
         this(socketAddressnull /* repImpl */);
     }

    
Stop accepting new connections, while the individual services quiesce and shut themselves down.
 
     public void preShutdown() {
          = false;
     }

    
Shuts down the dispatcher, so that it's no longer listening for service requests. The port is freed up upon return and the thread used to listen on the port is shutdown.
 
     public void shutdown() {
         if (shutdownDone()) {
             return;
         }
 
         LoggerUtils.logMsg(.,
                            "ServiceDispatcher shutdown starting. HostPort=" +
                            .getHostName() + ":" +
                            + .getPort() +
                            " Registered services: " + .keySet());
 
         shutdownThread();
 
         for (String serviceName : .keySet()) {
             cancel(serviceName);
         }
 
         /* Shutdown any executing and queued service requests. */
         .shutdownNow();
         try {
             .socket().close();
             .close();
         } catch (IOException e) {
             LoggerUtils.logMsg
                 (.,
                  "Ignoring I/O error during close: " + e.getMessage());
         }
         LoggerUtils.logMsg(.,
                            "ServiceDispatcher shutdown completed." +
                            " HostPort=" + .getHostName() +
                            ":" + .getPort());
     }
 
     @Override
     protected int initiateSoftShutdown() {
         .wakeup();
         return 0;
     }

    
 
     @Override
     protected Logger getLogger() {
         return ;
     }

    
Builds a service request suitable for sending over to a ServiceDispatcher.

Parameters:
serviceName the service that is being requested.
Returns:
the byte encoding of the service request message
 
     private static byte[] serviceRequestMessage(String serviceName) {
         byte[] serviceNameBytes = StringUtils.toASCII(serviceName);
         int length = . + 1 +
                       serviceNameBytes.length;
         ByteBuffer buffer = ByteBuffer.allocate(length);
         buffer.put().
                put((byte)serviceNameBytes.length).
                put(serviceNameBytes);
         return buffer.array();
     }

    
Used by the client to establish an output stream for the service on the socket. It sends out the request for the service and interprets the response to determine if it was successful.

Parameters:
socket the connected socket that will be the basis for the stream
serviceName the service running on the stream
Returns:
the output stream ready for subsequent output
Throws:
java.io.IOException if the output stream could not be established
ServiceDispatcher.ServiceConnectFailedException if the connection could not be made.
 
     static public OutputStream getServiceOutputStream(Socket socket,
                                                       String serviceName)
         throws IOExceptionServiceConnectFailedException {
 
         assert socket.isConnected();
         byte[] message = ServiceDispatcher.serviceRequestMessage(serviceName);
         OutputStream out = socket.getOutputStream();
         out.write(message);
         out.flush();
         InputStream in = socket.getInputStream();
         int result = in.read();
         if (result < 0) {
             throw new IOException("No service response byte: " + result);
         }
         Response response = Response.get(result);
         if (response == null) {
             throw new IOException("Unexpected read response byte: " + result);
         }
         if (response != .) {
             throw new ServiceConnectFailedException(serviceNameresponse);
         }
         return out;
     }

    
A variation on the method above. It's used by the client to setup a channel for the service. It performs the initial handshake requesting the service and interpreting the response to determine if it was successful.

Parameters:
channel the channel that is the basis for the service
serviceName the service running on the channel
Throws:
ServiceDispatcher.ServiceConnectFailedException if the connection could not be made.
 
     static public void doServiceHandshake(SocketChannel channel,
                                           String serviceName)
         throws IOExceptionServiceConnectFailedException {
 
         ByteBuffer message =
             ByteBuffer.wrap(ServiceDispatcher.serviceRequestMessage(serviceName));
         while (message.remaining() > 0) {
             channel.write(message);
         }
         ByteBuffer buffer = ByteBuffer.allocate(1);
         while (buffer.remaining() > 0) {
             if (channel.read(buffer) < 0) {
                 throw new IOException("EOF in response to service request:" +
                                       serviceName);
             }
         }
         int result = channel.read(buffer);
         if (result < 0) {
             throw new IOException("No service response byte: " + result);
         }
         buffer.flip();
         Response response = Response.get(buffer.get());
         if (response == null) {
             throw new IOException("Unexpected read response byte: " + result);
         }
         if (response != .) {
             throw new ServiceConnectFailedException(serviceNameresponse);
         }
     }

    
Returns the next socketChannel created in response to a request for the service. The socketChannel and the associated socket is configured as requested in the arguments.

Parameters:
serviceName the service for which the channel must be created.
blocking true if the channel must be configured to block
soTimeout the timeout for the underlying socket
Returns:
the configured channel or null if there are no more channels, because the service has been shut down.
Throws:
java.lang.InterruptedException
 
     public SocketChannel takeChannel(String serviceName,
                                      boolean blocking,
                                      int soTimeout)
         throws InterruptedException {
 
         while (true) {
             Service service = .get(serviceName);
             if (service == null) {
                 throw EnvironmentFailureException.unexpectedState
                 ("Service: " + serviceName + " was not registered");
             }
             if (! (service instanceof QueuingService)) {
                 throw EnvironmentFailureException.unexpectedState
                 ("Service: " + serviceName + " is not a queuing service");
             }
             Socket socket = null;
             SocketChannel channel = null;
             try {
                 channel = ((QueuingService)service).take();
                 assert channel != null;
 
                 if (channel == .) {
                     /* A pseudo channel to indicate EOF, return null */
                     return null;
                 }
 
                 channel.configureBlocking(blocking);
                 socket = channel.socket();
                 socket.setSoTimeout(soTimeout);
                 return channel;
             } catch (IOException e) {
                 LoggerUtils.logMsg(.,
                                    "Unable to configure channel " +
                                    "for '" + serviceName + "' service: " +
                                    e.getMessage());
                 try {
                     channel.close();
                 } catch (IOException e1) {
                     LoggerUtils.logMsg(.,
                                        "Cleanup failed for service: " +
                                        serviceName + "\n" + e.getMessage());
                 }
                 /* Wait for the next request. */
                 continue;
             }
         }
     }

    
Returns the socket associated with the dispatcher
 
     public InetSocketAddress getSocketAddress() {
         return ;
     }

    
Registers a service queue with the ServiceDispatcher. Requests for a service result in a new SocketChannel being created on which the service can communicate with the requester of the service.

Parameters:
serviceName the name of the service being requested
serviceQueue the queue that will be used to hold channels established for the service.
 
     public void register(String serviceName,
                          BlockingQueue<SocketChannelserviceQueue) {
         if (serviceName == null) {
             throw EnvironmentFailureException.unexpectedState
                 ("The serviceName argument must not be null");
         }
         if (.containsKey(serviceName)) {
             throw EnvironmentFailureException.unexpectedState
                 ("Service: " + serviceName + " is already registered");
         }
         if (serviceQueue == null) {
             throw EnvironmentFailureException.unexpectedState
                 ("The serviceQueue argument must not be null");
         }
         .put(serviceName,
                        new QueuingService(serviceNameserviceQueue));
     }
 
     public void register(Service service) {
         if (service == null) {
             throw EnvironmentFailureException.unexpectedState
                 ("The service argument must not be null");
         }
 
         if (.containsKey(service.name)) {
             throw EnvironmentFailureException.unexpectedState
                 ("Service: " + service.name + " is already registered");
         }
         LoggerUtils.logMsg(.,
                            "Service: " + service.name + " registered.");
         .put(service.nameservice);
     }
 
     public boolean isRegistered(String serviceName) {
         if (serviceName == null) {
             throw EnvironmentFailureException.unexpectedState
                 ("The serviceName argument must not be null");
         }
         return .containsKey(serviceName);
     }
 
     public void setSimulateIOException(String serviceName,
                                        boolean simulateException) {
 
         Service service = .get(serviceName);
         if (service == null) {
             throw new IllegalStateException
                 ("Service: " + serviceName + " is not registered");
         }
 
         service.setSimulateIOException(simulateException);
     }

    
Cancels the registration of a service. Subsequent attempts to access the service will be ignored and the channel will be closed and will not be queued.

Parameters:
serviceName the name of the service being cancelled
 
     public void cancel(String serviceName) {
         if (serviceName == null) {
             throw EnvironmentFailureException.unexpectedState
                 ("The serviceName argument must not be null.");
         }
         Service service = .remove(serviceName);
 
         if (service == null) {
             throw EnvironmentFailureException.unexpectedState
                 ("Service: " + serviceName + " was not registered.");
         }
         service.cancel();
         LoggerUtils.logMsg(.,
                            "Service: " + serviceName + " shut down.");
     }

    
Processes an accept event on the server socket. As a result of the processing a new socketChannel is created, and the selector is registered with the new channel so that it can process subsequent read events.
 
     private void processAccept() {
 
         SocketChannel socketChannel = null;
         try {
             socketChannel = .accept();
             if (!) {
                 closeChannel(socketChannel);
                 return;
             }
             socketChannel.configureBlocking(false);
             socketChannel.register
                 (,
                  .,
                  ByteBuffer.allocate());
         } catch (IOException e) {
             LoggerUtils.logMsg(.,
                                "Server accept exception: " + e.getMessage());
             closeChannel(socketChannel);
         }
     }

    
Processes read events on newly established socket channels. Input on the channel is verified to ensure that it is a service request. The read is accomplished in two parts, a read for the fixed size prefix and the name length byte, followed by a read of the variable length name itself. Errors result in the channel being closed(with the key being canceled as a result) and a null value being returned.

Parameters:
readKey the read key associated with the channel.
Returns:
the ServiceName or null if there was insufficient input, or an error was encountered.
 
     private String processRead(SelectionKey readKey) {
         SocketChannel socketChannel = null;
         try {
             ByteBuffer readBuffer = (ByteBufferreadKey.attachment();
             socketChannel = (SocketChannelreadKey.channel();
             int readBytes = socketChannel.read(readBuffer);
             if (readBytes < 0 ) {
                 /* Premature EOF */
                 ++;
                 LoggerUtils.logMsg(.,
                                    "Premature EOF on channel: " +
                                    socketChannel + " read() returned: " +
                                    readBytes);
                 socketChannel.close();
                 return null;
             }
             if (readBuffer.remaining() == 0) {
                 readBuffer.flip();
                 if (readBuffer.capacity() == ) {
                     String prefix = StringUtils.fromASCII
                         (readBuffer.array(), 0, .length());
                     if (!prefix.equals()) {
                         ++;
                         LoggerUtils.logMsg
                             (.,
                              "Malformed service request: " + prefix);
                         socketChannel.write
                             (..byteBuffer());
                         socketChannel.close();
                         return null;
                     }
                     /* Enlarge the buffer to read the service name as well */
                     int nameLength = readBuffer.get(-1);
                     if (nameLength <= 0) {
                         ++;
                         LoggerUtils.logMsg
                             (.,
                              "Bad service service name length: " + nameLength);
                         socketChannel.write
                             (..byteBuffer());
                         socketChannel.close();
                         return null;
                     }
                     ByteBuffer buffer = ByteBuffer.allocate
                         ( + nameLength);
                     buffer.put(readBuffer);
                     readKey.attach(buffer);
 
                     return processRead(readKey);
                 }
                 String request = StringUtils.fromASCII(readBuffer.array());
                 readKey.cancel();
                 return request.substring(.length()+1);
             }
             /* Buffer not full as yet, keep reading */
             return null;
         } catch (IOException e) {
             LoggerUtils.logMsg(.,
                                "Exception during read: " + e.getMessage());
             closeChannel(socketChannel);
             return null;
         }
     }

    
Closes the channel, logging any resulting exceptions.

Parameters:
channel the channel being closed
 
     private void closeChannel(Channel channel) {
         if (channel != null) {
             try {
                 channel.close();
             } catch (IOException e1) {
                 LoggerUtils.logMsg(.,
                                    "Exception during cleanup: " +
                                    e1.getMessage());
             }
         }
     }

    
The central run method. It dispatches to the "accept" and "read" event processing methods. Upon a completed read, it verifies the validity of the service name and queues the channel for subsequent consumption by the service.
 
     @Override
     public void run() {
         LoggerUtils.logMsg(.,
                            "Started ServiceDispatcher. HostPort=" +
                            .getHostName() + ":" +
                            .getPort());
         try {
             while (true) {
                 try {
                     int result = .select();
                     if (isShutdown()) {
                         return;
                     }
                     if (result == 0) {
                         continue;
                     }
                 } catch (IOException e) {
                     LoggerUtils.logMsg
                         (.,
                          "Server socket exception " + e.getMessage());
                     throw EnvironmentFailureException.unexpectedException(e);
                 }
                 Set<SelectionKeyskeys = .selectedKeys();
                 for (SelectionKey key : skeys) {
                     switch (key.readyOps()) {
 
                         case .:
                             processAccept();
                             break;
 
                         case .:
                             String serviceName = processRead(key);
                             if (serviceName == null) {
                                 break;
                             }
                             key.cancel();
                             processService((SocketChannel)key.channel(),
                                            serviceName);
                             break;
 
                         default:
                             throw EnvironmentFailureException.unexpectedState
                                 ("Unexpected ops bit set: " + key.readyOps());
                     }
                 }
                 /* All keys have been processed clear them. */
                 skeys.clear();
             }
         } finally {
             closeChannel();
             cleanup();
         }
     }

    
Performs the guts of the work underlying a service request. It validates the service request and writes an appropriate response to the channel.

Parameters:
channel
serviceName
 
     private void processService(SocketChannel channelString serviceName) {
         final Service service = .get(serviceName);
         try {
             if (service == null) {
                 ++;
                 channel.write(..byteBuffer());
                 closeChannel(channel);
                 /*
                  * Not unexpected in a distributed app due to calls being made
                  * before a service is actually registered.
                  */
                 LoggerUtils.logMsg(.,
                                    "Request for unknown Service: " +
                                    serviceName + " Registered services: " +
                                    .keySet());
                 return;
             }
             Response response = service.isBusy() ? . : .;
             LoggerUtils.logMsg(.,
                                "Service response: " + response +
                                " for service: " + service.name);
 
             if (channel.write(response.byteBuffer()) == 0) {
                 throw EnvironmentFailureException.unexpectedState
                     ("Failed to write byte. Send buffer size: " +
                      channel.socket().getSendBufferSize());
             }
             if (response == .) {
                 service.requestDispatch(channel);
             }
         } catch (IOException e) {
             closeChannel(channel);
             LoggerUtils.logMsg(.,
                                "IO error writing to channel for " +
                                "service: " +  serviceName + e.getMessage());
         }
     }

    
The abstract class underlying all services.
 
     static private abstract class Service {
 
         /* The name associated with the service. */
         final String name;
 
         private boolean simulateIOException = false;
 
         public Service(String name) {
             super();
             if (name == null) {
                 throw EnvironmentFailureException.unexpectedState
                     ("Service name was null");
             }
             this. = name;
         }

        
Informs the service of a new request. The implementation of the method must not block.

Parameters:
channel the channel on which the request was made
 
         abstract void requestDispatch(SocketChannel channel);

        
Used to limit a particular type of service to avoid excess load.
 
         public boolean isBusy() {
             return false;
         }

        
Used during unit testing to simulate communications problems.
 
         public boolean simulateIOException() {
             return ;
         }
 
         public void setSimulateIOException(boolean simulateIOException) {
             this. = simulateIOException;
         }

        
Cancel the service as part of the registration being canceled.
 
         abstract void cancel();
     }

    
A service where requests are simply added to the supplied queue. It's the responsibility of the service creator to drain the queue. This service is used when the service carries out a long-running dialog with the service requester. For example, a Feeder service.
 
     public class QueuingService extends Service {
         /* Holds the queue of pending requests, one per channel */
         private final BlockingQueue<SocketChannelqueue;
 
         QueuingService(String serviceName,
                        BlockingQueue<SocketChannelqueue) {
             super(serviceName);
             this. = queue;
         }
 
         SocketChannel take() throws InterruptedException {
             return .take();
         }
 
         @Override
         void requestDispatch(SocketChannel channel) {
             if (simulateIOException()) {
                 LoggerUtils.logMsg(.,
                                    "Simulated test IO exception");
                 try {
                     /*
                      * This will provoke an IOException later when we try to
                      * use the channel in takeChannel().
                      */ 
                     channel.close();
                 } catch (IOException e) {
                     LoggerUtils.logMsg(.,
                                        "Close failure in '" +  +
                                        "' service: " + e.getMessage());
                 }
             }
             if (!.add(channel)) {
                 throw EnvironmentFailureException.unexpectedState
                     ("request queue overflow");
             }
         }
 
         @Override
         void cancel() {
             /*
              * Drain any existing pending requests. It's safe to just iterate
              * since the service dispatcher has already stopped accepting new
              * requests for the service.
              */
             for (SocketChannel channel : ) {
                 try {
                     channel.close();
                 } catch (IOException e) {
                     // Ignore it, it's only cleanup
                 }
             }
             .add(.);
         }
     }

    
A queuing service that starts the thread that services the requests lazily, upon first request and terminates the thread when the service is unregistered. The thread must be "interrupt aware" and must exit when it receives an interrupt. This type of service is suitable for services that are used infrequently.
 
     public class LazyQueuingService extends QueuingService {
 
         private final Thread serviceThread;
 
         public LazyQueuingService(String serviceName,
                                   BlockingQueue<SocketChannelqueue,
                                   Thread serviceThread) {
 
             super(serviceNamequeue);
             this. = serviceThread;
         }
 
         @Override
         void requestDispatch(SocketChannel channel) {
 
             switch (.getState()) {
 
                 case :
                     .start();
                     LoggerUtils.logMsg(.,
                                        "Thread started for service: " + );
                     break;
 
                 case :
                 case :
                 case :
                 case :
                     /* Was previously activated. */
                     LoggerUtils.logMsg(.,
                                        "Thread started for service: " + );
                     break;
 
                 default:
                     RuntimeException e =
                         EnvironmentFailureException.unexpectedState
                             ("Thread for service:" +  +
                              "is in state:" + .getState());
                     LoggerUtils.logMsg(,
                                        .e.getMessage());
                     throw e;
             }
             super.requestDispatch(channel);
         }
 
         @Override
        
Interrupts the thread to cause it to exit.
 
         void cancel() {
             if (.isAlive()) {
                 .interrupt();
                 try {
                     .join();
                 } catch (InterruptedException e) {
                     /* Ignore it on shutdown. */
                 }
             }
             super.cancel();
         }
     }

    
A service that is run immediately in a thread allocated to it. Subtypes implement the getRunnable() method which provides the runnable object for the service. This service frees up the caller from managing the the threads associated with the service. The runnable must manage interrupts so that it can be shut down by the underlying thread pool.
 
     static public abstract class ExecutingService extends Service {
         final private ServiceDispatcher dispatcher;
 
         public ExecutingService(String serviceName,
                                 ServiceDispatcher dispatcher) {
             super(serviceName);
             this. = dispatcher;
         }
 
         public abstract Runnable getRunnable(SocketChannel socketChannel);
 
         @Override
         void requestDispatch(SocketChannel channel) {
             ..execute(getRunnable(channel));
         }
 
         @Override
         protected void cancel() {
             /* Nothing to do */
         }
     }
 
     @SuppressWarnings("serial")
     static public class ServiceConnectFailedException extends Exception {
         final Response response;
         final String serviceName;
 
         ServiceConnectFailedException(String serviceName,
                                       Response response) {
             assert(response != .);
             this. = response;
             this. = serviceName;
         }
 
         public Response getResponse() {
             return ;
         }
 
         @Override
         public String getMessage() {
             switch () {
                 case :
                     return "Bad message format, for service:" + ;
 
                 case :
                     return "Unknown service request:" + ;
 
                 case :
                     return "Service was busy";
 
                 case :
                     /*
                      * Don't expect an OK response to provoke an exception.
                      * Fall through.
                      */
                 default:
                     throw EnvironmentFailureException.unexpectedState
                         ("Unexpected response:" +  +
                          " for service:" + );
             }
         }
     }
 
     abstract public static class ExecutingRunnable implements Runnable {
         protected final SocketChannel channel;
         protected final TextProtocol protocol;
         protected final boolean expectResponse;
 
         public ExecutingRunnable(SocketChannel channel,
                                  TextProtocol protocol,
                                  boolean expectResponse) {
             this. = channel;
             this. = protocol;
             this. = expectResponse;
         }
 
         /* Read request and send out response. */
         public void run() {
             try {
                 .configureBlocking(true);
                 RequestMessage request = .getRequestMessage();
                 if (request == null) {
                     return;
                 }
                 ResponseMessage response = getResponse(request);
                 if ( && response != null) {
                     PrintWriter out = new PrintWriter
                         (.socket().getOutputStream(), true);
                     out.println(response.wireFormat());
                } else {
                    assert (response == null);
                }
            } catch (IOException e) {
                logMessage("IO error on socket: " + e.getMessage());
                return;
            } finally {
                if (.isOpen()) {
                    try {
                        .close();
                    } catch (IOException e) {
                        logMessage("IO error on socket close: " +
                                   e.getMessage());
                         return;
                    }
                }
            }
        }
        /* Get the response for a request. */
        abstract protected ResponseMessage getResponse(RequestMessage request)
            throws IOException;
        /* Log the message. */
        abstract protected void logMessage(String message);
    }