Tuesday 2 November 2010

B2B Queue to JMS

Lately I was asked to help with routing messages from a object-type based AQ-queue to JMS.
The thing is that JMS works with text. So you have to transform the objecttype to text.
When the Object type is one of your own, you could extend it with a method "toXML" to give a XML message based on the attributes of the object.

In this case it was about a Oracle Integratin B2B AQ Queue, which is based on the B2B object "IP_MESSAGE_TYPE".

It turns out not too hard to translate the object type to JMS. I created a package for it which I provided for download here.

You can test it with the following code:
declare
  -- Non-scalar parameters require additional processing
  result     sys.aq$_jms_text_message;
  ip_message ip_message_type;
  payload    clob;
begin
  payload         := def_b2b.varchar_to_clob('Jet, Teun, Vuur, Schapen');
  ip_message := ip_message_type(MSG_ID           => 'Aap'
                                    ,INREPLYTO_MSG_ID => 'noot'
                                    ,FROM_PARTY       => 'Mies'
                                    ,TO_PARTY         => 'Zus'
                                    ,ACTION_NAME      => 'Lezen'
                                    ,DOCTYPE_NAME     => 'Leesplankje'
                                    ,DOCTYPE_REVISION => '1.0'
                                    ,MSG_TYPE         => 1
                                    ,PAYLOAD          => payload
                                    ,ATTACHMENT       => null);
  -- Call the function
  result := def_b2b.ip_message2jms_msg(ip_message => ip_message);
   result.get_text( :msg);
end;


As you can see a jms-accesible AQ-queue has a special system-Object-type: 'sys.aq$_jms_text_message'. There are several others, for different kinds of jms queues or topics. Also markt that the object types differ between Oracle 10g or 11g Enterprise Edition or equivalent and Oracle XE. In XE you wouldn't find a 'construct' method. You could try the solution of Peter Ebell for this.

Another thing is that the guys that asked me for help, had to do the enqueue of the message on the JMS-queue based on the enqueue on the source B2B-queue.
From Oracle they got permission to use a trigger on the Queue-table. To begin with they used a Before Row Insert trigger. Besides triggers on queue-tables are not support and certainly not the way to go, they encountered a problem with it. And that lays in the fact that the payload attribute is a CLOB. I allways found the way Oracle handled CLOBs at least "a little remarkable". On insert you create a row with an empty CLOB and then query it for update. In the queried row you upload the content to the CLOB-column. Since an AQ queue is based on a table it works essentially the same way. So on Before Row Insert the payload attribute is still empty. They solved it to use an After Delete trigger (when the message is consumed by the subscribing-process).

The way to go is actually to register a notification service on the queue using code like:
declare
  lc_reg_info      SYS.AQ$_REG_INFO;
  lc_reg_info_list SYS.AQ$_REG_INFO_LIST;
begin
  lc_reg_info := SYS.AQ$_REG_INFO('B2B.IP_IN_QUEUE:'
                                 ,DBMS_AQ.NAMESPACE_AQ
                                 ,'plsql://b2b.def_b2b.handle_inbound_notification?PR=1'
                                 ,HEXTORAW('FF'));

  lc_reg_info_list := SYS.AQ$_REG_INFO_LIST(lc_reg_info);
  dbms_aq.register(lc_reg_info_list, 1);
end;

Such a plsql notification function is a function that is required to have a particular "authograph":
PROCEDURE handle_inbound_notification(context  RAW
                                       ,reginfo  sys.aq$_reg_info
                                       ,descr    sys.aq$_descriptor
                                       ,payload  RAW
                                       ,payloadl NUMBER)

These parameters provide you with the data to fetch/dequeue the message this procedure is called for. You won't get the message itself, you have to dequeue-it explicitly. See the package for an example to implement this procedure.

You should perform the register for every queue-consumer that you want to reroute the messages for. But it is not too hard to put this in a parameterized-procedure and call it based on a query that fetches the consumer from either the dictionary or (better) the B2B repository. In fact this code is extracted from such a construct. But it was a little too much (I already put a reasonable amount of time in the package) to anonymize it and make it more generic.
If you need more help with it, I could of course provide some consultancy.