Monday 2 June 2014

Message Correlation using JMS

Last year I created a few OSB services with the asynchronous request response message exchange pattern. OSB does not support this out of the box, since OSB is in fact synchronous in nature. Although OSB supports the WS-Addressing namespaces, you need to set the WS-Addressing elements programmatically.

Since OSB is synchronous the request and response flows in the Asynchronous Request/Response pattern are completely seperated implemented from eachother. That means that in the response flow you don't know what request message was responsible for the current response. Even worse: you don't know what client did the request and how to respond to that client in a way you can correlate to the initating instance. Using SOA/BPM Suite as a client, you want to correlate to the requesting process instance.

There are of course several ways to solve this. I choose to use a Universal Distributed Queue for several reasons, where knowledge of JMS and performance were a few. I only need to temporarly store a message against a key. Coherence was not on my CV yet. And a database table requires a database(connection) with the query-overhead, etc.

Unfortunately you can't use the OSB transports or SOASuite JMS adapters to get/browse for a message using a correlation-id in a synchronous way. When you create a proxy service on a jms transport or configure a JMS Adapter for read it will be a polling construction. But it's quite easy to do it in Java, so I created a java-method to get a message based on a CorrelationId.

One thing I did not know back then was that if you put a message on the queue from one OSB Server Node (having a JMS Server) it can't be read from the other node, as such. Messages are stored in the local JMS Server member of the Queue.

I found that you can quite easily reach the local member of a Universal Distributed Queue on a certain JMSServer on Weblogic by prefixing the JNDI name of the queue with the JMSServer separated with the at-sign ('@'):

    String jmsSvrQueueJndi = jmsServer +"@" +queueJndi

The problem now is: "how do you know which JMS Servers are available servicing your queue?"
I solved that by providing a comma seperated string of JMS Server names as a parameter in the Java-Callout in my proxy service. But that left me with a more or less hardcoded string of JMS Server names, that needs to be expanded when a second OSB Server node with a JMSServer instance is added. I figured that I should be able to query that from the WebLogic instance. And of course it can: that's I'm writing the blog.

First you need to get a connection to an MBeanServer in WebLogic. You can create a remote server connection, but since my java class is running on the OSB Server within WebLogic, a JNDI-lookup is better.

I reused a class that I created in the past for doing JNDI lookups of JDBC-Connections for both inside or outside the application server. I'll add it at the end of this blog entry.

To get a MBServer connection you simply get a JNDI Context from the JNDI Provider above do a lookup of the JNDI name "java:comp/env/jmx/runtime":
mbServer = (MBeanServer) jndiContext.lookup("java:comp/env/jmx/runtime");
 The documentation for this can be found here.

From the MBean Server you can get a list of JMS Servers by first instantiating a jmx 'ObjectName' using:
ObjectName queryObjName = new ObjectName("com.bea:Type=JMSServer,*");
Here we're about to ask for all MBeans of type 'JMSServer'.

 I named the JMSServers in my setup each with the same prefix 'OSBJMSServer' and a number like 'OSBJMSServer1', 'OSBJMSServer2'. So I want all the JMS Servers that start with 'OSBJMSServer'. For this I need to provide a jmx query expression. Luckily this is quite simple:
QueryExp queryExp = Query.initialSubString(Query.attr("Name"), Query.value("OSBJMSServer"));
Here I create a query expression that queries on the initial substring on the "Name" attribute of the ObjectName, that equals the query value "OSBJMSServer". I found a blog on the jmx query possibilities here.

Having this in place you only need to perform the query:
Set objNames = mbServer.queryNames(queryObjName, queryExp);
This gets you a set with all the ObjectName objects that matches the query-expression.

Now since I only need the plain name's of JMSServers (not their canonical names) I can add them easily into a List:
  List jmsServers = new Vector();
...
  for (ObjectName objName : objNames) {
   String jmsServerCanonicalName = objName.getCanonicalName();
   String jmsServerName = objName.getKeyProperty("Name");
   jmsServers.add(jmsServerName);
   lgr.debug(methodName, "JmsServerCanonicalName [" + jmsServers.size() + "]: " + jmsServerCanonicalName);
   lgr.debug(methodName, "JmsServerName [" + jmsServers.size() + "]: " + jmsServerName);
  }

For the canonical name there is a getter. For the Name property you need to use the getKeyProperty() method.

WebLogicJms Class

 The complete WeblogicJms class:
package nl.darwin-it.jms;

import java.util.List;
import java.util.Set;
import java.util.Vector;

import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.Query;
import javax.management.QueryExp;
import javax.naming.Context;
import javax.naming.NamingException;

import nl.darwin-it.jndi.JndiContextProvider;
import nl.darwin-it.log.Logger;

/**
 * @author Martien van den Akker, Darwin-IT Professionals Version 1.1 , 2014-06
 *         Class with methods to query JMS artifacts from WeblogicJMS
 */
public class WeblogicJms {
 public static final String MBSVR_JNDI = "java:comp/env/jmx/runtime";
 public static final String JMSSVR_OBJNAME_BASE_QRY = "com.bea:Type=JMSServer,*";
 public static final String JMSSVR_QRY_ATTR = "Name";
 public static final String JSBJMSSVR_NAME_PREFIX = "OSBJMSServer";

 private static final String className = "WeblogicJms";
 private static MBeanServer mbServer = null;
 private static Logger lgr = new Logger(className);

 /**
  * Get a lazy instantiation for the MBServer and cache it.
  * 
  * @return
  * @throws NamingException
  */
 private static MBeanServer getMBServer() throws NamingException {
  if (mbServer == null) {
   final Context jndiContext = JndiContextProvider.getJndiContext();
   mbServer = (MBeanServer) jndiContext.lookup(MBSVR_JNDI);
  }
  return mbServer;
 }

 /**
  * Get a list of JMS Servers belonging to OSB (OracleService Bus)
  * 
  * @return
  * @throws NamingException
  * @throws MalformedObjectNameException
  * @throws NullPointerException
  */
 public static List getOSBJMSServers() throws NamingException, MalformedObjectNameException,
   NullPointerException {
  final String methodName = "getOSBJMSServers";
  lgr.debugStart(methodName);

  List jmsServers = new Vector();

  final MBeanServer mbServer = getMBServer();

  ObjectName queryObjName = new ObjectName(JMSSVR_OBJNAME_BASE_QRY);

  QueryExp queryExp = Query.initialSubString(Query.attr(JMSSVR_QRY_ATTR), Query.value(JSBJMSSVR_NAME_PREFIX));

  Set objNames = mbServer.queryNames(queryObjName, queryExp);
  lgr.debug(methodName, "Found " + objNames.size() + " objects");
  for (ObjectName objName : objNames) {
   String jmsServerCanonicalName = objName.getCanonicalName();
   String jmsServerName = objName.getKeyProperty("Name");
   jmsServers.add(jmsServerName);
   lgr.debug(methodName, "JmsServerCanonicalName [" + jmsServers.size() + "]: " + jmsServerCanonicalName);
   lgr.debug(methodName, "JmsServerName [" + jmsServers.size() + "]: " + jmsServerName);
  }

  lgr.debugEnd(methodName);
  return jmsServers;
 }

 /**
  * Get the list of JSB JMS Servers in XML format serialized to string.
  * 
  * @return
  * @throws MalformedObjectNameException
  * @throws NullPointerException
  * @throws NamingException
  */
 public static String getOSBJMSServersXML() throws MalformedObjectNameException, NullPointerException,
   NamingException {
  final String methodName = "getOMSServersXML";
  lgr.debugStart(methodName);
  StringBuffer jmsServersXMLBuf = new StringBuffer("");
  List jmsServers = getOSBJMSServers();
  for (String jmsServer : jmsServers) {
   jmsServersXMLBuf.append("");
   jmsServersXMLBuf.append(jmsServer);
   jmsServersXMLBuf.append("");
  }

  jmsServersXMLBuf.append("");
  lgr.debugEnd(methodName);

  return jmsServersXMLBuf.toString();

 }

}

JNDI ProviderClass

The JNDI Provider Class:
package nl.darwin-it.jndi;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import nl.darwin-it.log.Logger;

/**
 * Class providing and initializing JndiContext.
 * 
 * @author Martien van den Akker
 * @author Darwin IT Professionals
 */
public abstract class JndiContextProvider {
 private static final String className = "JndiContextProvider";
 private static Logger lgr = new Logger(className);
 private static Context jndiContext = null;

 /**
  * Get an Initial Context based on the jndiContextFactory. If
  * jndiContextFactory is null the Initial context is fetched from the J2EE
  * environment. When not in a J2EE environment, the factory class is fetched
  * from the jndi.properties file that should be in the class path.
  * 
  * @return Context
  * @throws NamingException
  */
 private static Context getInitialContext() throws NamingException {
  final String methodName = "getInitialContext";
  lgr.debugStart(methodName);
  Context ctx = new InitialContext();
  lgr.debugEnd(methodName);
  return ctx;
 }

 /**
  * Create a Subcontext within the Initial Context
  * 
  * @param subcontext
  * @throws NamingException
  */
 public static void createSubcontext(String subcontext)
   throws NamingException {
  final String methodName = "createSubcontext";
  lgr.debugStart(methodName);
  lgr.debug(methodName, "Create Subcontext " + subcontext);
  Context ctx = getJndiContext();
  ctx.createSubcontext(subcontext);
  lgr.debugEnd(methodName);
 }

 /**
  * Set jndiContext;
  */
 public static void setJndiContext(Context newJndiContext) {
  jndiContext = newJndiContext;
 }

 /**
  * Get jndiContext
  */
 public static Context getJndiContext() throws NamingException {
  final String methodName = "getJndiContext";
  lgr.debugStart(methodName);
  if (jndiContext == null) {
   lgr.debug(methodName, "Get Initial Context");
   Context ctx = getInitialContext();
   setJndiContext(ctx);
  }
  lgr.debugEnd(methodName);
  return jndiContext;
 }
}