Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   /*-
    * See the file LICENSE for redistribution information.
    *
    * Copyright (c) 2002, 2013 Oracle and/or its affiliates.  All rights reserved.
    *
    */
   
   package com.sleepycat.je.rep.impl;
   
  import java.net.Socket;
  import java.util.HashMap;
  import java.util.Map;
  import java.util.Set;
  
TextProtocol provides the support for implementing simple low performance protocols involving replication nodes. The protocol is primarily text based, and checks group membership and version matches with every message favoring flexibility over performance. The base class is primarily responsible for the message formatting and message envelope validation. The subclasses define the specific messages that constitute the protocol and the request/response semantics. Every message has the format: <version>|<name>|<id>|<op>|<op-specific payload> <version> is the version of the protocol in use. <name> identifies a group participating in an election. It avoids accidental cross-talk across groups holding concurrent elections. <id> identifies the originator of the message within the group. <op> the operation identified by the specific message. <op-specific payload> the payload associated with the particular operation.
  
  
  public abstract class TextProtocol {
  
      /*
       * Protocol version string. Format: <major version>.<minor version>
       * It's used to ensure compatibility across versions.
       */
      private final String VERSION;
  
      /* The name of the group executing this protocol. */
      private final String groupName;
  
      /*
       * The set of ids constituting the entire group. It's updated as nodes
       * enter and leave the dynamic group.
       */
      private Set<IntegermemberIds;
  
      /* The id associated with this protocol participant. */
      private final NameIdPair nameIdPair;
  
      /*
       * The suffix of message prefix constituting the "fixed" part of the
       * message for this group and node, it does not include the version
       * information, which goes in front of this prefix.
       */
      protected final String messageNocheckSuffix;
  
      /*
       * Timeouts used for network communications. Use setTimeouts() to override
       * the defaults.
       */
      private int openTimeoutMs = 10000; // Default to 10 sec
      private int readTimeoutMs = 10000; // Default to 10 sec
  
      /* The token separator in messages */
      public static final String SEPARATOR = "|";
      public static final String SEPARATOR_REGEXP="\\" + ;
  
      /* A message defined by the base class to deal with all errors. */
     public final MessageOp PROTOCOL_ERROR =
                 new MessageOp("PE"ProtocolError.class);
     public final MessageOp OK_RESP = new MessageOp("OK"OK.class);
     public final MessageOp FAIL_RESP = new MessageOp("FAIL"Fail.class);
 
     /* The number of message types defined by the subclass. */
     private int nonDefaultMessageCount;
 
     /* Maps request Ops to the corresponding enumerator. */
     private final Map<String,MessageOpops = new HashMap<String,MessageOp>();
 
     protected final Logger logger;
     protected final Formatter formatter;
     protected final EnvironmentImpl envImpl;

    
Creates an instance of the Protocol.

Parameters:
nameIdPair a unique identifier for this node
envImpl for logging, may be null
Parameter:
version the protocol version number
Parameter:
groupName the name of the group executing this protocol
 
     public TextProtocol(String version,
                         String  groupName,
                         NameIdPair nameIdPair,
                         EnvironmentImpl envImpl) {
         this. = version;
         this. = groupName;
         this. = nameIdPair;
         this. = envImpl;
 
          =
             groupName +  + .;
 
         if (envImpl != null) {
             this. = LoggerUtils.getLogger(getClass());
         } else {
             this. = LoggerUtils.getLoggerFormatterNeeded(getClass());
         }
         this. = new ReplicationFormatter(nameIdPair);
     }

    
Set the network timeouts associated with uses of this protocol instance.
 
     protected void setTimeouts(RepImpl repImpl,
                                DurationConfigParam openTimeoutConfig,
                                DurationConfigParam readTimeoutConfig) {
         if (repImpl == null) {
             return;
         }
         final DbConfigManager configManager = repImpl.getConfigManager();
          = configManager.getDuration(openTimeoutConfig);
          = configManager.getDuration(readTimeoutConfig);
     }

    
The messages as defined by the subclass. Note that PROTOCOL_ERROR is a pre-defined message that is defined by this class. The initialization is not considered until this method after been invoked typically in the constructor itself. This two-step is unfortunately necessary since the creation of MessageOps instances requires that this class be completely initialized, otherwise the MessageOp list could have been passed in as a constructor argument.

Parameters:
protocolOps the message ops defined by the subclass.
 
     protected void initializeMessageOps(MessageOp[] protocolOps) {
         for (MessageOp op : protocolOps) {
             .put(op.opIdop);
         }
          = protocolOps.length;
         .put(.);
         .put(.);
     }
 
     public int getOpenTimeout() {
         return ;
     }
 
     public int getReadTimeout() {
         return ;
     }
 
     public NameIdPair getNameIdPair() {
         return ;
     }
 
     /* The total number of nonDefault messages defined by the protocol. */
     public int messageCount() {
         return ;
     }

    
Updates the current set of nodes that constitutes the group. Also update the id used in the header, in case it was null and isn't anymore.

Parameters:
newMemberIds
 
     public void updateNodeIds(Set<IntegernewMemberIds) {
         /* Update the set of known members as well. */
          = newMemberIds;
     }

    
Returns the Integer number which represents a Protocol version.
 
     public int getMajorVersionNumber(String version) {
         return Double.valueOf(version).intValue();
     }

    
The Operations that are part of the protocol.
 
     public static class MessageOp {
 
         /* The string denoting the operation for the request message. */
         private final String opId;
 
         /* The class used to represent the message. */
         private final Class<? extends MessagemessageClass;
 
         public MessageOp(String opIdClass<? extends MessagemessageClass) {
             this. = opId;
             this. = messageClass;
         }
 
         String getOpId() {
             return ;
         }
 
         Class<? extends MessagegetMessageClass() {
             return ;
         }
 
         @Override
         public String toString() {
             return ;
         }
     }

    
Represents the tokens on a message line. The order of the enumerators represents the order of the tokens in the wire format.
 
     public enum TOKENS {
         VERSION_TOKEN,
         NAME_TOKEN,
         ID_TOKEN,
         OP_TOKEN,
         FIRST_PAYLOAD_TOKEN;
     }
 
     /* Used to indicate that an entity is formatable and can be serialized and
      * de-serialized.
      */
     protected interface WireFormatable {
 
         /*
          * Returns the string representation suitable for use in a network
          * request.
          */
         abstract String wireFormat();
     }

    
Parses a line into a Request/Response message.

Parameters:
line containing the message
Returns:
a message instance
Throws:
TextProtocol.InvalidMessageException
 
     public Message parse(String line)
         throws InvalidMessageException {
 
         String[] tokens = line.split();
 
         final int index = ..ordinal();
         if (index >= tokens.length) {
             throw new InvalidMessageException(.,
                                              "Missing message op");
         }
         MessageOp op = .get(tokens[index]);
         assert(op != null);
         try {
             Class<? extends Messagec = op.getMessageClass();
             Constructor<? extends Messagecons =
                 c.getConstructor(c.getEnclosingClass(),
                                  line.getClass(),
                                  tokens.getClass());
             Message message = cons.newInstance(thislinetokens);
             return message;
         } catch (InstantiationException e) {
             throw EnvironmentFailureException.unexpectedException(e);
         } catch (IllegalAccessException e) {
             throw EnvironmentFailureException.unexpectedException(e);
         } catch (SecurityException e) {
             throw EnvironmentFailureException.unexpectedException(e);
         } catch (NoSuchMethodException e) {
             throw EnvironmentFailureException.unexpectedException(e);
         } catch (InvocationTargetException e) {
             /* Unwrap the exception. */
             Throwable target = e.getTargetException();
             if (target instanceof RuntimeException) {
                 throw (RuntimeExceptione.getTargetException();
             } else if (target instanceof InvalidMessageException) {
                 throw (InvalidMessageExceptiontarget;
             }
             throw EnvironmentFailureException.unexpectedException(e);
         }
     }

    
Base message class for all messages exchanged in the protocol.
 
     public abstract class Message implements WireFormatable {
         /* The sender of the message. */
         private int senderId = 0;
 
         /*
          * The version of this message, as it's deserialized and sent across
          * the network. The default is that messages are sent in the VERSION of
          * the current protocol, but in cases of mixed-version upgrades, the
          * message may be sent in an earlier version format.
          *
          * When this message is a RequestMessage, the sender will always
          * initially send it out its own native version, but may resend it in
          * an earlier version, if the recipient can't understand the native
          * version. When the message is a ResponseMessage, the sender can reply
          * either in its native version, or in an earlier version if the
          * requester is an older version of JE.
          */
         protected String sendVersion;
 
         /* The line representing the message. */
         private final String line;
 
         /* The tokenized form of the above line. */
         private final String[] tokens;
 
         /* The current variable arg token */
         private int currToken = ..ordinal();
 
         protected String messagePrefixNocheck;

        
The constructor used for the original non-serialized instance of the message, which does not use the line or tokens.
 
         protected Message() {
              = null;
              = null;
             setSendVersion();
         }

        
Every message must define a constructor of this form so that it can be de-serialized. The constructor is invoked using reflection by the parse() method.

Parameters:
line the line constituting the message
tokens the line in token form
Throws:
TextProtocol.InvalidMessageException
com.sleepycat.je.EnvironmentFailureException on format errors
 
         protected Message(String lineString[] tokens)
             throws InvalidMessageException {
 
             this. = line;
             this. = tokens;
 
             /* Validate the leading fixed fields. */
             final String version = getTokenString(.);
             if (new Double() < new Double(version)) {
                 throw new InvalidMessageException
                     (.,
                      "Version argument mismatch." +
                      " Expected: " +  + " Found: " + version);
             }
 
             /*
              * Set the sender version of a request message. This version
              * information will be used by the receiver to determine what
              * version should be used for the response message.
              */
             setSendVersion(version);
 
             final String messageGroupName = getTokenString(.);
             if (!.equals(messageGroupName)) {
                 throw new InvalidMessageException
                 (.,
                  "Group name mismatch; this group name: " +  +
                  ", message group name: " + messageGroupName);
             }
 
              =
                 new Integer(getTokenString(.)).intValue();
             if (( != null) &&
                 (.size() > 0) &&
                 (.getId() != .) &&
                 ( != .) &&
                 ( != .getId()) &&
                 !.contains()) {
                 throw new InvalidMessageException
                    (.,
                     "Sender's message id: " +  +
                     " message op: " + getTokenString(.) +
                     ", was not a member of the group: " + );
             }
         }
 
         public int getSenderId() {
             return ;
         }
 
         /*
          * Set the version of the message that we have just received. This
          * version information will be used by the receiver to determine what
          * version should be used for the response message.
          */
         public void setSendVersion(String version) {
             if (new Double() < new Double(version)) {
                 throw new IllegalStateException
                     ("Send version: " + version + " shouldn't be larger " +
                      "than the current version: " + );
             }
 
             if (!version.equals()) {
                  = version;
                  =
                      +  + ;
             }
         }
 
         /* Get the send version of a message. */
         public String getSendVersion() {
             return ;
         }
 
         protected String getMessagePrefix() {
             return  +  +  +  +
                    .getId();
         }
 
         public abstract MessageOp getOp();

        
Returns the protocol associated with this message
 
         public TextProtocol getProtocol() {
             return TextProtocol.this;
         }

        
Returns the token value associated with the token type.

Parameters:
tokenType identifies the token in the message
Returns:
the associated token value
 
         private String getTokenString(TOKENS tokenType) {
             final int index = tokenType.ordinal();
             if (index >= .) {
                 throw EnvironmentFailureException.unexpectedState
                     ("Bad format; missing token: " + tokenType +
                      "at position: " + index + "in message: " + );
             }
             return [index];
         }

        
Returns the next token in the payload.

Returns:
the next payload token
Throws:
TextProtocol.InvalidMessageException
 
         protected String nextPayloadToken()
             throws InvalidMessageException {
 
             if ( >= .) {
                 throw new InvalidMessageException
                 (.,
                  "Bad format; missing token at position: " +  +
                  " in message: " + );
             }
             return [++];
         }

        
Returns the current token position in the payload.

Returns:
the current token position
 
         protected int getCurrentTokenPosition() {
             return ;
         }
     }

    
Base classes for response messages.
 
     public abstract class ResponseMessage extends Message {
 
         protected ResponseMessage() {
             super();
         }
 
         protected ResponseMessage(String lineString[] tokens)
             throws InvalidMessageException {
 
             super(linetokens);
         }

        
Returns the version id and Op concatenation that starts every message.
 
         protected String wireFormatPrefix() {
             return getMessagePrefix() +  + getOp().;
         }
 
         @Override
         public boolean equals(Object obj) {
             if (this == obj) {
                 return true;
             }
             if (obj == null) {
                 return false;
             }
             if (!(obj instanceof ResponseMessage)) {
                 return false;
             }
             return getOp().equals(((ResponseMessage)obj).getOp());
         }
 
         @Override
         public int hashCode() {
             return getOp().getOpId().hashCode();
         }
     }
 
     public class ProtocolError extends ResponseMessage {
         private final String message;
         private final MessageError errorType;
 
         public ProtocolError(InvalidMessageException messageException) {
             this(messageException.getErrorType(),
                  messageException.getMessage());
         }
 
         public ProtocolError(MessageError messageErrorString message) {
             this. = message;
             this. = messageError;
         }
 
         public ProtocolError(String responseLineString[] tokens)
             throws InvalidMessageException {
 
             super(responseLinetokens);
              = MessageError.valueOf(nextPayloadToken());
              = nextPayloadToken();
         }
 
         @Override
         public int hashCode() {
             final int prime = 31;
             int result = super.hashCode();
             result = prime * result
                     + (( == null) ? 0 : .hashCode());
             return result;
         }
 
         @Override
         public boolean equals(Object obj) {
             if (this == obj) {
                 return true;
             }
             if (!super.equals(obj)) {
                 return false;
             }
             if (!(obj instanceof ProtocolError)) {
                 return false;
             }
             final ProtocolError other = (ProtocolErrorobj;
             if ( == null) {
                 if (other.message != null) {
                     return false;
                 }
             } else if (!.equals(other.message)) {
                 return false;
             }
 
             return true;
         }
 
         @Override
         public MessageOp getOp() {
             return ;
         }
 
         public String wireFormat() {
             return wireFormatPrefix() +  +
                 .toString() +  + ;
         }
 
         public MessageError getErrorType() {
             return ;
         }
 
         public String getMessage() {
             return ;
         }
     }
 
     public class OK extends ResponseMessage {
         public OK() {
         }
 
         public OK(String lineString[] tokens)
             throws InvalidMessageException {
 
             super(linetokens);
         }
 
         @Override
         public MessageOp getOp() {
            return ;
         }
 
         @Override
         protected String getMessagePrefix() {
             return ;
         }
 
         public String wireFormat() {
            return wireFormatPrefix();
         }
     }
 
     public class Fail extends ResponseMessage {
         private final String message;
 
         public Fail(String message) {
             this. = sanitize(message);
         }
 
         public Fail(String lineString[] tokens)
             throws InvalidMessageException {
             super(linetokens);
 
              = nextPayloadToken();
         }
 
         public String getMessage() {
             return ;
         }
 
         @Override
         public int hashCode() {
             final int prime = 31;
             int result = super.hashCode();
             result = prime * result + getOuterType().hashCode();
             result = prime * result
                     + (( == null) ? 0 : .hashCode());
             return result;
         }
 
         @Override
         public boolean equals(Object obj) {
             if (this == obj) {
                 return true;
             }
             if (!super.equals(obj)) {
                 return false;
             }
             if (!(obj instanceof Fail)) {
                 return false;
             }
             Fail other = (Failobj;
             if (!getOuterType().equals(other.getOuterType())) {
                 return false;
             }
             if ( == null) {
                 if (other.message != null) {
                     return false;
                 }
             } else if (!.equals(other.message)) {
                 return false;
             }
             return true;
         }
 
         @Override
         public MessageOp getOp() {
            return ;
         }
 
         @Override
         protected String getMessagePrefix() {
             return ;
         }
 
         public String wireFormat() {
            return wireFormatPrefix() +  + ;
         }
 
         private TextProtocol getOuterType() {
             return TextProtocol.this;
         }

        
Removes any newline characters. Embedded newlines are not supported by TextProtocol, but exception messages sometimes have them, and the payload of a Fail response often comes from an exception message.
 
         private String sanitize(String message) {
             return message.replace("\n""  ");
         }
     }

    
Base class for all Request messages
 
     public abstract class RequestMessage extends Message {
 
         protected RequestMessage() {}
 
         protected RequestMessage(String lineString[] tokens)
             throws InvalidMessageException {
             super(linetokens);
         }

        
Returns the version id and Op concatenation that form the prefix for every message.
 
         protected String wireFormatPrefix() {
             return getMessagePrefix() +  + getOp().;
         }
 
         @Override
         public boolean equals(Object obj) {
             if (this == obj) {
                 return true;
             }
             if (obj == null) {
                 return false;
             }
             if (!(obj instanceof RequestMessage)) {
                 return false;
             }
             return getOp().equals(((RequestMessageobj).getOp());
         }
 
         @Override
         public int hashCode() {
             return getOp().getOpId().hashCode();
         }
     }

    
Converts a response line into a ResponseMessage.

Parameters:
responseLine
Returns:
the response message
Throws:
TextProtocol.InvalidMessageException
 
     ResponseMessage parseResponse(String responseLine)
         throws InvalidMessageException {
 
         return (ResponseMessageparse(responseLine);
     }

    
Converts a request line into a requestMessage.

Parameters:
requestLine
Returns:
the request message
Throws:
TextProtocol.InvalidMessageException
 
     public RequestMessage parseRequest(String requestLine)
         throws InvalidMessageException {
 
         return (RequestMessageparse(requestLine);
     }

    
Reads the channel and returns a read request. If the message format was bad, it sends a ProtocolError response back over the channel and no further action is needed by the caller.

Parameters:
channel the channel delivering the request
Returns:
null if EOF was reached or the message format was bad
Throws:
java.io.IOException
 
         throws IOException {
 
         final Socket socket = channel.socket();
         BufferedReader in = new BufferedReader
             (new InputStreamReader(socket.getInputStream()));
 
         String requestLine = in.readLine();
         if (requestLine == null) {
             /* EOF */
             return null;
         }
         try {
             return parseRequest(requestLine);
         } catch (InvalidMessageException e) {
             LoggerUtils.logMsg(.,
                                "Message format error:" + e.getMessage());
             PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
             out.println(new ProtocolError(e).wireFormat());
             out.close();
             return null;
         }
     }
 
     public ResponseMessage process(Object requestProcessor,
                                    RequestMessage requestMessage) {
 
         Class<? extends Objectcl = requestProcessor.getClass();
         try {
             Method method =
                 cl.getMethod("process"requestMessage.getClass());
             return (ResponseMessagemethod.invoke
                 (requestProcessorrequestMessage);
         } catch (NoSuchMethodException e) {
             LoggerUtils.logMsg(.,
                                "Method: process(" +
                                requestMessage.getClass().getName() +
                                ") was missing");
             throw EnvironmentFailureException.unexpectedException(e);
         } catch (Exception e) {
             LoggerUtils.logMsg(.,
                                "Unexpected exception: " + e.getMessage());
             throw EnvironmentFailureException.unexpectedException(e);
         }
     }

    
A single request/response interaction, targetted at a given service running at a particular remote socket address. Since it implements Runnable it can be used with thread pools, Futures, etc. But its run() method can also simply be called directly.
 
     public class MessageExchange implements Runnable {
 
         public final InetSocketAddress target;
         private final RequestMessage requestMessage;
         private final String serviceName;
         private ResponseMessage responseMessage;
         public Exception exception;
 
         public MessageExchange(InetSocketAddress target,
                                String serviceName,
                                RequestMessage request) {
             this. = target;
             this. = serviceName;
             this. = request;
         }
 
         /*
          * Get the response message for a request message.
          *
          * If the response message is a ProtocolError message which caused by
          * protocol version mismatch, it resets the request message's
          * sendVersion as the ResponseMessage ProtocolError's version and send
          * again.
          */
         public void run() {
             messageExchange();
 
             if ( != null &&
                 .getOp() == ) {
                 ProtocolError error = (ProtocolError;
                 if (error.getErrorType() == .) {
                     .setSendVersion(error.getSendVersion());
                     messageExchange();
                     LoggerUtils.logMsg
                         (.,
                          "Resend message: " + .toString() +
                          " in version: " + .getSendVersion() +
                          " while protocol version is: " +  +
                          " because of the version mismatch, the returned" +
                          " response message is: " + );
                 }
             }
         }

        
Run a message exchange. A successful exchange results in a response message being set. All failures result in the response message being null and an exception being set.
 
         public void messageExchange() {
 
             Socket socket = new Socket();
             BufferedReader in = null;
             PrintWriter out = null;
             try {
                 socket.setSoTimeout();
                 socket.setTcpNoDelay(true);
                 socket.setReuseAddress(true);
                 socket.connect();
                 OutputStream ostream =
                     ServiceDispatcher.getServiceOutputStream(socket,
                                                              );
                 out = new PrintWriter(ostreamtrue);
                 out.println(.wireFormat());
                 out.flush();
                 in = new BufferedReader
                     (new InputStreamReader(socket.getInputStream()));
                 final String line = in.readLine();
                 if (line == null) {
                     setResponseMessage
                         (new ProtocolError(.,
                                            "Premature EOF for request: " +
                                            .wireFormat()));
                 } else {
                     setResponseMessage(parseResponse(line));
                 }
             } catch (java.net.SocketTimeoutException e){
                 this. = e;
             } catch (SocketException e) {
                 this. = e;
             } catch (IOException e) {
                 this. = e;
             } catch (TextProtocol.InvalidMessageException e) {
                 this. = e;
             } catch (ServiceConnectFailedException e) {
                 this. = e;
             } catch (Exception e) {
                 this. = e;
                 LoggerUtils.logMsg(.,
                                    "Unexpected exception:" + e.getMessage());
                 throw EnvironmentFailureException.unexpectedException
                     ("Service: " +  +
                      " failed; attempting request: " + .getOp(),
                      e);
             } finally {
                 Utils.cleanup(socketinout);
             }
         }
 
         public void setResponseMessage(ResponseMessage responseMessage) {
             this. = responseMessage;
         }

        
Returns the response message. The null may be returned as part of the protocol exchange, or it may be null if an exception was encountered say because of some IO problem. It's the caller's responsibility to check for an exception in that circumstance.

Note: there may be some protocols (e.g., Monitor) that define null to be a proper, expected response upon success. It might be preferable to redefine them to return an explicit OK response, if possible.

Returns:
the response
 
         public ResponseMessage getResponseMessage() {
             /* Make sure the response was from a member of the same group. */
             if  (( instanceof InvalidMessageException) &&
                  (((InvalidMessageException).getErrorType() ==
                   .)) {
                 throw EnvironmentFailureException.
                 unexpectedState(.getMessage());
             }
             return ;
         }
 
         public RequestMessage getRequestMessage() {
             return ;
         }
 
         public Exception getException() {
             return ;
         }
     }
 
     protected static class StringFormatable implements WireFormatable {
         protected String s;
 
         StringFormatable() {}
 
         protected StringFormatable(String s) {
             this. = s;
         }
 
         public void init(String wireFormat) {
              = wireFormat;
         }
 
         public String wireFormat() {
             return ;
        }
        @Override
        public int hashCode() {
            return (( == null) ? 0 : .hashCode());
        }
        @Override
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (!(obj instanceof StringFormatable)) {
                return false;
            }
            final StringFormatable other = (StringFormatableobj;
            if ( == null) {
                if (other.s != null) {
                    return false;
                }
            } else if (!.equals(other.s)) {
                return false;
            }
            return true;
        }
    }
    /*
     * The type associated with an invalid Message. It's used by the exception
     * below and by ProtocolError.
     */
    static public enum MessageError
        {BAD_FORMAT, VERSION_MISMATCH, GROUP_MISMATCH, NOT_A_MEMBER}

    
Used to indicate a message format or invalid content exception.
    @SuppressWarnings("serial")
    public static class InvalidMessageException extends Exception {
        private final MessageError errorType;
        InvalidMessageException(MessageError errorTypeString message) {
            super(message);
            this. = errorType;
        }
        public MessageError getErrorType() {
            return ;
        }
    }