Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.kie.remote.services.jms;
  
  import static org.kie.remote.common.jaxb.JaxbRequestStatus.*;
  import static org.kie.services.client.serialization.SerializationConstants.DEPLOYMENT_ID_PROPERTY_NAME;
  import static org.kie.services.client.serialization.SerializationConstants.EXTRA_JAXB_CLASSES_PROPERTY_NAME;
  import static org.kie.services.client.serialization.SerializationConstants.SERIALIZATION_TYPE_PROPERTY_NAME;
  
 import java.util.List;
 import java.util.Set;
 
 import  javax.ejb.TransactionAttribute;
 import  javax.ejb.TransactionAttributeType;
 import  javax.inject.Inject;
 import  javax.jms.BytesMessage;
 import  javax.jms.Connection;
 import  javax.jms.ConnectionFactory;
 import  javax.jms.JMSException;
 import  javax.jms.Message;
 import  javax.jms.MessageListener;
 import  javax.jms.MessageProducer;
 import  javax.jms.Queue;
 import  javax.jms.Session;
 
 import  org.jbpm.services.task.commands.TaskCommand;
 import  org.jbpm.services.task.identity.JAASUserGroupCallbackImpl;
 import  org.jbpm.services.task.identity.adapter.UserGroupAdapter;
 import  org.kie.api.command.Command;
 import  org.kie.services.client.serialization.JaxbSerializationProvider;
 import  org.kie.services.client.serialization.SerializationException;
 import  org.kie.services.client.serialization.SerializationProvider;
 import  org.kie.services.client.serialization.jaxb.impl.JaxbCommandsRequest;
 import  org.kie.services.client.serialization.jaxb.impl.JaxbCommandsResponse;
 import  org.kie.services.shared.AcceptedCommands;
 import  org.slf4j.Logger;
 import  org.slf4j.LoggerFactory;

There are thus multiple queues to which an instance of this class could listen to, which is why the (JMS queue) configuration is done in the ejb-jar.xml file. Doing the configuration in the ejb-jar.xml file which allows us to configure instances of one class to listen to more than one queue. Also: responses to requests are not placed on a reply-to queue, but on the specified answer queue.
 
 public class RequestMessageBean implements MessageListener {
 
     private static final Logger logger = LoggerFactory.getLogger(RequestMessageBean.class);
 
     // JMS resources
 
     @Resource(mappedName = "java:/JmsXA")
     private ConnectionFactory factory;
 
     // Initialized in @PostConstruct
     private Session session;
     private Connection connection;
 
     @Inject
 
     // KIE resources
 
     @Inject
     protected DeploymentInfoBean runtimeMgrMgr;
 
     @Inject
 
     @Inject
 
     // Constants / properties
     private String RESPONSE_QUEUE_NAME = null;
     private static String RESPONSE_QUEUE_NAME_PROPERTY = "kie.services.jms.queues.response";
 
     private static final String ID_NECESSARY = "This id is needed to be able to match a request to a response message.";
    public void init() {
         = System.getProperty("queue/KIE.RESPONSE.ALL");
        try {
             = .createConnection();
             = .createSession(false, Session.AUTO_ACKNOWLEDGE);
            .start();
        } catch (JMSException jmse) {
            // Unable to create connection/session, so no need to try send the message (4.) either
            String errMsg = "Unable to open new session to send response messages";
            .error(errMsgjmse);
            throw new KieRemoteServicesRuntimeException(errMsgjmse);
        }
    }
    @PreDestroy
    public void cleanup() {
        try {
            if ( != null) {
                .close();
                 = null;
            }
            if ( != null) {
                .close();
                 = null;
            }
        } catch (JMSException jmse) {
            String errMsg = "Unable to close " + ( == null ? "session" : "connection");
            .error(errMsgjmse);
            throw new KieRemoteServicesRuntimeException(errMsgjmse);
        }
    }
    // See EJB 3.1 fr, 5.4.12 and 13.3.3: BMT for which the (last) ut.commit() confirms message reception
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void onMessage(Message message) {
        String msgId = null;
        boolean redelivered = false;
        try {
            msgId = message.getJMSMessageID();
            redelivered = message.getJMSRedelivered();
        } catch (JMSException jmse) {
            String errMsg = "Unable to retrieve JMS " + (msgId == null ? "redelivered flag" : "message id")
                    + " from JMS message. Message will not be returned to queue.";
            .warn(errMsgjmse);
        }
        ifredelivered ) { 
            if (.maxRetriesReached(msgId)) {
                .warn("Maximum number of retries (" + .getMaximumLimitRetries() + ") reached for message " + msgId );
                .warn("Acknowledging message but NOT processing it.");
                return;
            } else {
                .warn("Retry number " + .incrementRetries(msgId) + " of message " + msgId);
            }
        }
        // 0. Get msg correlation id (for response)
        String msgCorrId = null;
        JaxbCommandsResponse jaxbResponse = null;
        try {
            msgCorrId = message.getJMSCorrelationID();
        } catch (JMSException jmse) {
            String errMsg = "Unable to retrieve JMS correlation id from message! " + ;
            throw new KieRemoteServicesRuntimeException(errMsgjmse);
        }
        // 0. get serialization info
        int serializationType = -1;
        try {
            if (!message.propertyExists(SERIALIZATION_TYPE_PROPERTY_NAME)) {
                // default is JAXB
                serializationType = JaxbSerializationProvider.JMS_SERIALIZATION_TYPE;
            } else {
                serializationType = message.getIntProperty(SERIALIZATION_TYPE_PROPERTY_NAME);
            }
        } catch (JMSException jmse) {
            String errMsg = "Unable to get properties from message " + msgCorrId + ".";
            throw new KieRemoteServicesRuntimeException(errMsgjmse);
        }
        SerializationProvider serializationProvider;
        switch (serializationType) {
        case JaxbSerializationProvider.JMS_SERIALIZATION_TYPE:
            serializationProvider = getJaxbSerializationProvider(message);
            break;
        default:
            throw new KieRemoteServicesInternalError("Unknown serialization type: " + serializationType);
        }
        // 1. deserialize request
        JaxbCommandsRequest cmdsRequest = deserializeRequest(messagemsgCorrIdserializationProviderserializationType);
        // 2. security/identity
        .createBackupIdentityProvider(cmdsRequest.getUser());
        cmdsRequest.setUserPass(getUserPass(message));
        // 3. process request
        jaxbResponse = jmsProcessJaxbCommandsRequest(cmdsRequest);
        
        // 4. serialize response
        Message msg = serializeResponse(msgCorrIdserializationTypeserializationProviderjaxbResponse);
        // 5. send response
        sendResponse(msgCorrIdserializationTypemsg);
        if (redelivered) {
            .clearRetries(msgId);
        }
    }
    private void sendResponse(String msgCorrIdint serializationType, Message msg) {
        // 3b. set correlation id in response messgae
        try {
            msg.setJMSCorrelationID(msgCorrId);
        } catch (JMSException jmse) {
            // Without correlation id, receiver won't know what the response relates to
            String errMsg = "Unable to set correlation id of response to msg id " + msgCorrId;
            .error(errMsgjmse);
            return;
        }
        // 3c. send response message
        try {
            Queue responseQueue = (Queue) (new InitialContext()).lookup();
            MessageProducer producer = .createProducer(responseQueue);
            producer.send(msg);
        } catch (NamingException ne) {
            String errMsg = "Unable to lookup response queue " +  + " to send msg " + msgCorrId 
                    + " (Is " +  + " incorrect?).";
            .error(errMsgne);
        } catch (JMSException jmse) {
            String errMsg = "Unable to send msg " + msgCorrId + " to " + ;
            .error(errMsgjmse);
        }
    }
    // De/Serialization helper methods -------------------------------------------------------------------------------------------
    private static JaxbCommandsRequest deserializeRequest(Message messageString msgId, SerializationProvider serializationProviderint serializationType) {
        JaxbCommandsRequest cmdMsg = null;
        try {
            String msgStrContent = null;
            switch (serializationType) {
            case JaxbSerializationProvider.JMS_SERIALIZATION_TYPE:
                msgStrContent = ((BytesMessage) message).readUTF();
                cmdMsg = (JaxbCommandsRequest) serializationProvider.deserialize(msgStrContent);
                break;
            default:
                throw new KieRemoteServicesRuntimeException("Unknown serialization type when deserializing message " + msgId + ":" + serializationType);
            }
        } catch (JMSException jmse) {
            String errMsg = "Unable to read information from message " + msgId + ".";
            throw new KieRemoteServicesRuntimeException(errMsgjmse);
        } catch (Exception e) {
            String errMsg = "Unable to serialize String to " + JaxbCommandsRequest.class.getSimpleName() + " [msg id: " + msgId + "].";
            throw new KieRemoteServicesInternalError(errMsge);
        }
        return cmdMsg;
    }
    private SerializationProvider getJaxbSerializationProvider(Message message) {
        SerializationProvider serializationProvider;
        Set<Class<?>> serializationClasses = new HashSet<Class<?>>();
        try {
            String deploymentId = null;
            ClassLoader classLoader = null;
            // Add classes from deployment (and get deployment classloader)
            if (message.propertyExists(DEPLOYMENT_ID_PROPERTY_NAME)) {
                deploymentId = message.getStringProperty(DEPLOYMENT_ID_PROPERTY_NAME);
                Collection<Class<?>> deploymentClasses = .getDeploymentClasses(deploymentId);
                if (!deploymentClasses.isEmpty()) {
                    .debug("Added classes from {} to serialization context."deploymentId);
                    serializationClasses.addAll(deploymentClasses);
                    // KieContainer (deployment) classloader
                    classLoader = deploymentClasses.iterator().next().getClassLoader();
                } else {
                    .warn("Deployment id '{}' was included in message but no classes were retrieved from deployment!"deploymentId);
                }
            }
            if (classLoader == null) {
                // Application classloader
                classLoader = this.getClass().getClassLoader();
            }
            // Add other classes that might only have been added to the war/application
            if (message.propertyExists(EXTRA_JAXB_CLASSES_PROPERTY_NAME)) {
                String extraClassesString = message.getStringProperty(EXTRA_JAXB_CLASSES_PROPERTY_NAME);
                Set<Class<?>> moreExtraClasses = JaxbSerializationProvider.commaSeperatedStringToClassSet(classLoaderextraClassesString);
                for (Class<?> extraClass : moreExtraClasses) {
                    .debug("Added {} to serialization context."extraClass.getName());
                }
                serializationProvider = new JaxbSerializationProvider(moreExtraClasses);
            } else {
                serializationProvider = new JaxbSerializationProvider();
            }
        } catch (JMSException jmse) {
            throw new KieRemoteServicesInternalError("Unable to check or read JMS message for property."jmse);
        } catch (SerializationException se) {
            throw new KieRemoteServicesRuntimeException("Unable to load classes needed for JAXB deserialization."se);
        }
        return serializationProvider;
    }
    private static Message serializeResponse(Session sessionString msgIdint serializationType,
            SerializationProvider serializationProvider, JaxbCommandsResponse jaxbResponse) {
        BytesMessage byteMsg = null;
        try {
            byteMsg = session.createBytesMessage();
            byteMsg.setIntProperty(SERIALIZATION_TYPE_PROPERTY_NAME, serializationType);
            String msgStr;
            switch (serializationType) {
            case JaxbSerializationProvider.JMS_SERIALIZATION_TYPE:
                msgStr = (StringserializationProvider.serialize(jaxbResponse);
                Collection<Class<?>> extraJaxbClasses = ((JaxbSerializationProvider) serializationProvider).getExtraJaxbClasses();
                if (!extraJaxbClasses.isEmpty()) {
                    String propValue;
                    try {
                        propValue = JaxbSerializationProvider.classSetToCommaSeperatedString(extraJaxbClasses);
                    } catch (SerializationException se) {
                        throw new KieRemoteServicesRuntimeException("Unable to get class names for extra JAXB classes."se);
                    }
                    byteMsg.setStringProperty(EXTRA_JAXB_CLASSES_PROPERTY_NAME, propValue);
                }
                break;
            default:
                throw new KieRemoteServicesRuntimeException("Unknown serialization type when deserializing message " + msgId + ":" + serializationType);
            }
            byteMsg.writeUTF(msgStr);
        } catch (JMSException jmse) {
            String errMsg = "Unable to create response message or write to it [msg id: " + msgId + "].";
            throw new KieRemoteServicesRuntimeException(errMsgjmse);
        } catch (Exception e) {
            String errMsg = "Unable to serialize " + jaxbResponse.getClass().getSimpleName() + " to a String.";
            throw new KieRemoteServicesInternalError(errMsge);
        }
        return byteMsg;
    }
    // Runtime / KieSession / TaskService helper methods --------------------------------------------------------------------------
    protected JaxbCommandsResponse jmsProcessJaxbCommandsRequest(JaxbCommandsRequest request) {
        // If exceptions are happening here, then there is something REALLY wrong and they should be thrown.
        JaxbCommandsResponse jaxbResponse = new JaxbCommandsResponse(request);
        List<Command> commands = request.getCommands();
      
        if (commands != null) {
            UserGroupAdapter userGroupAdapter = null;
            try { 
                for (int i = 0; i < commands.size(); ++i) {
                    Command<?> cmd = commands.get(i);
                    if (!AcceptedCommands.getSet().contains(cmd.getClass())) {
                        String cmdName = cmd.getClass().getName();
                        String errMsg = cmdName + " is not a supported command and will not be executed.";
                        .warn( errMsg );
                        UnsupportedOperationException uoe = new UnsupportedOperationException(errMsg);
                        jaxbResponse.addException(uoeicmd, FORBIDDEN);
                        continue;
                    }
                    
                    ifcmd instanceof TaskCommand && userGroupAdapter == null ) { 
                        userGroupAdapter = getUserFromMessageAndLookupAndInjectGroups(request.getUserPass());
                    }
                    // if the JTA transaction (in HT or the KieSession) doesn't commit, that will cause message reception to be *NOT* acknowledged!
                    .processCommand(cmdrequestijaxbResponse);
                }
            } finally { 
                clearUserGroupAdapter(userGroupAdapter);
            }
        }
        if (commands == null || commands.isEmpty()) {
            .info("Commands request object with no commands sent!");
        }
        return jaxbResponse;
    }

    
Retrieves the user/pass info from the message and authenticates it against the underlying JAAS module:
  • Calls RequestMessageBean.getUserPass(Message) to get the user and password from the Message instance
  • Calls RequestMessageBean.tryLogin(String[]) to create a LoginContext and login
  • Calls RequestMessageBean.getGroupsFromSubject(Subject) to retrieve the Roles info from the JAAS login.
  • Injects the groups information into the underlying framework for use by the human-task code

Parameters:
msg The JMS Message received.
    private UserGroupAdapter getUserFromMessageAndLookupAndInjectGroups(String [] userPass) {
        UserGroupAdapter jmsUserGroupAdapter = null;
        try {
            ifuserPass == null ) { 
                .warn("Unable to retrieve user and password from message: NOT injecting group information.");
                return null;
            }
            Subject msgSubject = tryLogin(userPass);
            ifmsgSubject == null ) { 
                .warn("Unable to login to JAAS with received user and password.");
                return null;
            }
            List<Principalroles = getGroupsFromSubject(msgSubject);
            String [] rolesArr = new String[roles.size()];
            forint i = 0; i < rolesArr.length; ++i ) { 
                rolesArr[i] = roles.get(i).getName();
            }
            UserGroupAdapter newUserGroupAdapter = new JmsUserGroupAdapter(userPass[0], rolesArr);
            JAASUserGroupCallbackImpl.addExternalUserGroupAdapter(newUserGroupAdapter);
            jmsUserGroupAdapter = newUserGroupAdapter;
        } catch (Exception e) {
            .warn("Unable to retrieve group information for user in message: " + e.getMessage(), e);
        } 
        return jmsUserGroupAdapter;
    }
    private void clearUserGroupAdapter(UserGroupAdapter userGroupAdapter) {
        ifuserGroupAdapter != null ) { 
            JAASUserGroupCallbackImpl.clearExternalUserGroupAdapter();
        }
    }
    private static final String USERNAME_PROPERTY = "username";
    private static final String PASSWORD_PROPERTY = "password";
   
    
Get the user and password information.

Parameters:
msg The JMS Message received.
Returns:
A String array, with the user and password in that order. In the case that something goes wrong, null is returned.
    private String[] getUserPass(Message msg) {
        String prop = ;
        try { 
            String user = null;
            String pass = null;
            ifmsg.propertyExists(prop) ) { 
               user = msg.getStringProperty(prop);
            } 
            prop = ;
            ifmsg.propertyExists(prop) ) { 
               pass = msg.getStringProperty(prop);
            } 
            ifuser != null && pass != null ) { 
                String [] userPass = { userpass };
                return userPass;
            }
        } catch(Exception e) { 
           .error( "Unable to retrieve '" + prop + "' from JMS message."e);
        }
        return null;
    }

    
Try to login to the underlying JAAS module via a LoginException.

Parameters:
userPass A String array containing the user and password information.
Returns:
The logged-in Subject
Throws:
LoginException If something goes wrong when trying to login.
    private Subject tryLogin(String[] userPassthrows LoginException {
        try { 
            CallbackHandler handler = new UserPassCallbackHandler(userPass);
            LoginContext lc = new LoginContext("kie-jms-login-context"handler);
            lc.login();
            return lc.getSubject();
        } catchException e ) { 
            .error( "Unable to login via JAAS with message supplied user and password"e);
            return null
        }
    }

    
Extracts the list of roles from the subject.

Parameters:
subject The JAAS login subject
Returns:
A list of Principal objects, that are the role/groups.
    private List<PrincipalgetGroupsFromSubject(Subject subject) {
        List<PrincipaluserGroups = new ArrayList<Principal>();
        for (Principal principal : subject.getPrincipals()) {
            if (principal instanceof Group && "Roles".equalsIgnoreCase(principal.getName())) {
                Enumeration<? extends Principalgroups = ((Groupprincipal).members();
                while (groups.hasMoreElements()) {
                    Principal groupPrincipal = (Principalgroups.nextElement();
                    userGroups.add(groupPrincipal);
                }
            }
        }
        return userGroups;
    }
New to GrepCode? Check out our FAQ X