At my current customer we use JMS queues that are implemented with AQ queues based on sys.aq$_jms_text_message. In Weblogic you can create a so-called Foreign server that is able to interact with these queues over a datasource. For a Weblogic application, like SOA Suite or OSB, it is as if it is a regular Weblogic JMS queue. Pretty smart, because unlike a JDBC based Weblogic JMS Server, you can not only use the sys.aq$_jms_text_message type to query the aq table, as I described earlier. Not only that, you can also use the AQ PL/Sql api's to enqueue and dequeue these messages.
This can come in handy when you need to purge the tables, to remove the expired messages. But this morning there was a hickup in OSB, so that it couldn't process these messages succesfully. Because of the persisting rollbacks the messages are moved to the exception queue by AQ with the reason 'MAX_RETRY_EXCEEDED'. After I investigated the issue and some interaction with our admins the OSB was restarted which solved the problem.
But the earlier expired messages were still in the exception queue and processes were waiting for the response. So I thought it would be fun to have my own script to re-enqueue the expired messages.
Although the admins turned out to have scripts for this, I would like to have my own. Theirs maybe smarter or at least they had more time to develop.
This script is at least publishable and might be a good starting point if you have to do something with AQ.
declare l_except_queue varchar2(30) := 'AQ$_DWN_OUTBOUND_TABLE_E'; l_dest_queue varchar2(30) := 'DWN_OUTBOUND'; l_message_type varchar2(30) := 'registersomethingmessage'; cursor c_qtb is select qtb.queue_table , qtb.queue , qtb.msg_id , qtb.corr_id correlation_id , qtb.msg_state , qtb.enq_timestamp , qtb.user_data , qtb.user_data.header.replyto , qtb.user_data.header.type type , qtb.user_data.header.userid userid , qtb.user_data.header.appid appid , qtb.user_data.header.groupid groupid , qtb.user_data.header.groupseq groupseq , qtb.user_data.header.properties properties , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'JMSCorrelationID') JMSCorrelationID , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'JMSMessageID') JMSMsgID , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'tracking_compositeInstanceId') tracking_compositeInstanceId , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'JMS_OracleDeliveryMode') JMS_OracleDeliveryMode , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'tracking_ecid') tracking_ecid , (select num_value from table (qtb.user_data.header.properties) prp where prp.name = 'JMS_OracleTimestamp') JMS_OracleTimestamp , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'tracking_parentComponentInstanceId') tracking_prtCptInstanceId , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'tracking_conversationId') tracking_conversationId , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'BPEL_SENSOR_NAME') bpel_sensor_name , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'BPEL_PROCESS_NAME') bpel_process_name , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'BPEL_PROCESS_REVISION') bpel_process_rev , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'BPEL_DOMAIN') bpel_domain , (select str_value from table (qtb.user_data.header.properties) prp where prp.name = 'SBLCorrelationID') SBLCorrelationID , qtb.user_data.header , qtb.user_data.text_lob text_lob , qtb.user_data.text_vc text_vc , qtb.expiration_reason --, qtb.* from ( select 'DWN_OUTBOUND_TABLE' queue_table , qtb.* from AQ$DWN_OUTBOUND_TABLE qtb ) qtb where qtb.user_data.text_vc like '<'||l_message_type||'%' and qtb.msg_state = 'EXPIRED' and qtb.expiration_reason = 'MAX_RETRY_EXCEEDED' order by queue_table, enq_timestamp asc; l_payload SYS.AQ$_JMS_TEXT_MESSAGE; l_sbl_correlation_id varchar2(100); l_parentComponentInstanceId varchar2(100); l_jms_type varchar2(100); -- function get_jms_property(p_payload in SYS.AQ$_JMS_TEXT_MESSAGE, p_property_name in varchar2) return varchar2 as l_property varchar2(32767); begin select str_value into l_property from table (l_payload.header.properties) prp where prp.name = p_property_name; return l_property; exception when no_data_found then return null; end get_jms_property; -- procedure dequeue_msg(p_queue in varchar2, p_msg_id in raw) is l_dequeue_options dbms_aq.DEQUEUE_OPTIONS_T ; l_payload SYS.AQ$_JMS_TEXT_MESSAGE; l_message_properties dbms_aq.message_properties_t ; l_msg_id raw(32); begin --l_dequeue_options.visibility := dbms_aq.immediate; l_dequeue_options.visibility := dbms_aq.on_commit; l_dequeue_options.msgid := p_msg_id; DBMS_AQ.DEQUEUE ( queue_name => p_queue, dequeue_options => l_dequeue_options, message_properties => l_message_properties, payload => l_payload, msgid => l_msg_id); end dequeue_msg; -- procedure enqueue_msg(p_queue in varchar2, p_payload SYS.AQ$_JMS_TEXT_MESSAGE) is l_enqueue_options dbms_aq.ENQUEUE_OPTIONS_T ; l_message_properties dbms_aq.message_properties_t ; l_msg_id raw(32); begin --l_enqueue_options.visibility := dbms_aq.immediate; l_enqueue_options.visibility := dbms_aq.on_commit; DBMS_AQ.ENQUEUE ( queue_name => p_queue, enqueue_options => l_enqueue_options, message_properties => l_message_properties, payload => p_payload, msgid => l_msg_id); end enqueue_msg; -- begin for r_qtb in c_qtb loop l_payload := r_qtb.user_data; l_jms_type := r_qtb.user_data.header.type; l_sbl_correlation_id := get_jms_property(l_payload, 'SBLCorrelationID'); l_parentComponentInstanceId := get_jms_property(l_payload, 'tracking_parentComponentInstanceId'); dbms_output.put_line(r_qtb.queue||' - '||' - '||l_jms_type||' - '||r_qtb.msg_id||' - '||l_sbl_correlation_id||' - '||l_parentComponentInstanceId); enqueue_msg(l_dest_queue , l_payload); dequeue_msg(l_except_queue , r_qtb.msg_id); end loop; end;
This script starts with a cursor that is based on the query described in the post mentioned above. It selects only the Expired messages, where the root-tag starts with a concatenation of '<' and the message type declared in the top. If there was a JMS type you could also select on the userdata.header.type attribute.
It logs a few attributes, merely for me to check if the base of the script worked, without the dequeue and the enqueue. The selecting of the particular JMS properties are taken from the earlier script and are an example on properties that you could use to more granularly determine if a message is eligable to be re-enqueued.
The found message is enqueued and then dequeued, both with visibility set to on_commit. This ensures that the enqueue and dequeue is done within the same transaction. You should hit the commit button in SQL Developer (or your other favorite Database IDE).
The from clause construct:
from ( select 'DWN_OUTBOUND_TABLE' queue_table , qtb.* from AQ$DWN_OUTBOUND_TABLE qtb ) qtb
is from a script I created at the customer to query over all the available queue tables, by doing a union-all over all the queue-tables. That's why the first column names the queue table that is source for the record.
This script can be made more dynamic by putting it in a package and make a pipelined function for the query, so that you can provide the queuetable to query from as a parameter. You could even loop over all the user_queue_tables to dynamically select all the message from all the tables without having to do union alls over the familiar queue tables. See my Object Oriented Pl/Sql article for more info and inspiration.
You might even have fun with Polymorphic Table Functions, the Patrick-ACE-Director-Bar-solutions is expert on that.
No comments :
Post a Comment