Thursday 27 August 2020

Requeue expired JMS-AQ Messages

At my current customer we use JMS queues that are implemented with AQ queues based on sys.aq$_jms_text_message. In Weblogic you can create a so-called Foreign server that is able to interact with these queues over a datasource. For a Weblogic application, like SOA Suite or OSB, it is as if it is a regular Weblogic JMS queue. Pretty smart, because unlike a JDBC based Weblogic JMS Server, you can not only use the sys.aq$_jms_text_message type to query the aq table, as I described earlier. Not only that, you can also use the AQ PL/Sql api's to enqueue and dequeue these messages.

This can come in handy when you need to purge the tables, to remove the expired messages. But this morning there was a hickup in OSB, so that it couldn't process these messages succesfully. Because of the persisting rollbacks the messages are moved to the exception queue by AQ with the reason 'MAX_RETRY_EXCEEDED'. After I investigated the issue and some interaction with our admins the OSB was restarted which solved the problem.

But the earlier expired messages were still in the exception queue and processes were waiting for the response. So I thought it would be fun to have my own script to re-enqueue the expired messages. 

Although the admins turned out to have scripts for this, I would like to have my own. Theirs maybe smarter or at least they had more time to develop.

This script is at least publishable and might be a good starting point if you have to do something with AQ.

declare
  l_except_queue varchar2(30) := 'AQ$_DWN_OUTBOUND_TABLE_E';
  l_dest_queue varchar2(30) := 'DWN_OUTBOUND';
  l_message_type varchar2(30) := 'registersomethingmessage';
  cursor c_qtb 
    is select  qtb.queue_table 
      , qtb.queue 
      , qtb.msg_id
      , qtb.corr_id correlation_id
      , qtb.msg_state
      , qtb.enq_timestamp
      , qtb.user_data
      , qtb.user_data.header.replyto
      , qtb.user_data.header.type type
      , qtb.user_data.header.userid userid
      , qtb.user_data.header.appid appid
      , qtb.user_data.header.groupid groupid
      , qtb.user_data.header.groupseq groupseq
      , qtb.user_data.header.properties properties
      , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'JMSCorrelationID') JMSCorrelationID
      , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'JMSMessageID') JMSMsgID
      , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'tracking_compositeInstanceId') tracking_compositeInstanceId
      , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'JMS_OracleDeliveryMode') JMS_OracleDeliveryMode
      , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'tracking_ecid') tracking_ecid
      , (select num_value from table (qtb.user_data.header.properties) prp where prp.name = 'JMS_OracleTimestamp') JMS_OracleTimestamp
      , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'tracking_parentComponentInstanceId') tracking_prtCptInstanceId
      , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'tracking_conversationId') tracking_conversationId
      , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'BPEL_SENSOR_NAME') bpel_sensor_name
      , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'BPEL_PROCESS_NAME') bpel_process_name
      , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'BPEL_PROCESS_REVISION') bpel_process_rev
      , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'BPEL_DOMAIN') bpel_domain
      , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'SBLCorrelationID') SBLCorrelationID
      , qtb.user_data.header
      , qtb.user_data.text_lob text_lob
      , qtb.user_data.text_vc text_vc
      , qtb.expiration_reason
      --, qtb.*
      from (
        select 'DWN_OUTBOUND_TABLE' queue_table
        , qtb.* 
        from AQ$DWN_OUTBOUND_TABLE qtb
      ) qtb
      where qtb.user_data.text_vc  like '<'||l_message_type||'%'
      and qtb.msg_state = 'EXPIRED'
      and qtb.expiration_reason = 'MAX_RETRY_EXCEEDED'
      order by queue_table, enq_timestamp asc;
  l_payload SYS.AQ$_JMS_TEXT_MESSAGE;
  l_sbl_correlation_id varchar2(100);
  l_parentComponentInstanceId varchar2(100);
  l_jms_type varchar2(100);
  --
  function get_jms_property(p_payload in SYS.AQ$_JMS_TEXT_MESSAGE, p_property_name in varchar2)
  return varchar2
  as
    l_property varchar2(32767);
  begin
    select str_value into l_property from table (l_payload.header.properties) prp where prp.name = p_property_name;
    return l_property;
  exception
    when no_data_found then
      return null;
  end get_jms_property;
  --
  procedure dequeue_msg(p_queue in varchar2, p_msg_id in raw)
  is
    l_dequeue_options dbms_aq.DEQUEUE_OPTIONS_T ;
    l_payload SYS.AQ$_JMS_TEXT_MESSAGE;
    l_message_properties dbms_aq.message_properties_t ;
    l_msg_id raw(32);
  begin
    --l_dequeue_options.visibility := dbms_aq.immediate;
    l_dequeue_options.visibility := dbms_aq.on_commit;
    l_dequeue_options.msgid := p_msg_id;    
    DBMS_AQ.DEQUEUE (
     queue_name          => p_queue,
     dequeue_options     => l_dequeue_options,
     message_properties  => l_message_properties,
     payload             => l_payload,
     msgid               => l_msg_id);
  end dequeue_msg;
  --
  procedure enqueue_msg(p_queue in varchar2, p_payload SYS.AQ$_JMS_TEXT_MESSAGE)
  is
    l_enqueue_options dbms_aq.ENQUEUE_OPTIONS_T ;
    l_message_properties dbms_aq.message_properties_t ;
    l_msg_id raw(32);
  begin
    --l_enqueue_options.visibility := dbms_aq.immediate;
    l_enqueue_options.visibility := dbms_aq.on_commit;
    DBMS_AQ.ENQUEUE (
     queue_name          => p_queue,
     enqueue_options     => l_enqueue_options,
     message_properties  => l_message_properties,
     payload             => p_payload,
     msgid               => l_msg_id);
  end enqueue_msg;
  --
begin
  for r_qtb in c_qtb loop
    l_payload := r_qtb.user_data;
    l_jms_type := r_qtb.user_data.header.type;
    l_sbl_correlation_id := get_jms_property(l_payload, 'SBLCorrelationID');
    l_parentComponentInstanceId := get_jms_property(l_payload, 'tracking_parentComponentInstanceId');
    dbms_output.put_line(r_qtb.queue||' - '||' - '||l_jms_type||' - '||r_qtb.msg_id||' - '||l_sbl_correlation_id||' - '||l_parentComponentInstanceId);
    enqueue_msg(l_dest_queue , l_payload);
    dequeue_msg(l_except_queue , r_qtb.msg_id);
  end loop;
end;

This script starts with a cursor that is based on the query described in the post mentioned above. It selects only the Expired messages, where the root-tag starts with a concatenation of '<' and the message type declared in the top. If there was a JMS type you could also select on the userdata.header.type attribute.

It logs a few attributes, merely for me to check if the base of the script worked, without the dequeue and the enqueue. The selecting of the particular JMS properties are taken from the earlier script and are an example on properties that you could use to more granularly determine if a message is eligable to be re-enqueued.

The found message is enqueued and then dequeued, both with visibility set to on_commit. This ensures that the enqueue and dequeue is done within the same transaction. You should hit the commit button in SQL Developer (or your other favorite Database IDE).

The from clause construct:

      from (
        select 'DWN_OUTBOUND_TABLE' queue_table
        , qtb.* 
        from AQ$DWN_OUTBOUND_TABLE qtb
      ) qtb

is from a script I created at the customer to query over all the available queue tables, by doing a union-all over all the queue-tables. That's why the first column names the queue table that is source for the record. 

This script can be made more dynamic by putting it in a package and make a pipelined function for the query, so that you can provide the queuetable to query from as a parameter. You could even loop over all the user_queue_tables to dynamically select all the message from all the tables without having to do union alls over the familiar queue tables. See my Object Oriented Pl/Sql article for more info and inspiration.

You might even have fun with Polymorphic Table Functions, the Patrick-ACE-Director-Bar-solutions is expert on that.


No comments :