Tuesday 7 December 2010

Reinvoke BPEL Process

Back in 2008 I wrote an article on creating a BPEL Scheduler. The principle of that article I used later in a real implementation of a scheduler. In several blog-posts, amongst others of Clemens Utschig and Lucas Jellema, I read all kinds of arguments why you should or shouldn't use BPEL for scheduling.

I think however that BPEL is perfectly suitable to be used as to schedule jobs. Especially if you create a datamodel with an Apex front-end for instance (used JHeadstart myself for it) to register the schedule meta data to determine the next schedule dates.

There are a few considerations though. One of them is reinvoking the scheduler for a new scheduled-date, after performing a task. In my earlier article I used a plain invoke on the client-partnerlink. A big disadvantage of that is dat every task instance is created under the same root instance. After  a few iterations the process tree under the tree finder is exorbitant big.

This is solved quite easily by replacing the invoke by a piece of embedded java:


<bpelx:exec name="ReInvokeRH" language="Java" version="1.4"><![CDATA[//Get logger
                org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog("my.log");
                // Get singletonId
                log.info("ReInvoke-Get SingletonId.");
                String singletonId = (String)getVariableData("singletonId");;
                //Construct xml message
                log.info("ReInvoke - Construct XML message.");
                String xml = "<BPELProcessProcessRequest xmlns=\"http://xmlns.oracle.com/BPELProcessProcessRequest \">\n<singletonId>"
                           + singletonId 
                           + "</singletonId>\n</BPELProcessProcessRequest >";
                log.info("ReInvoke message: " + xml);
                //Get a locator and delivery service.
                log.info("ReInvoke - Get Locator.");
                try {
                  Locator locator = getLocator();
                  log.info("ReInvoke - Initiate IDeliveryService.");
                  IDeliveryService deliveryService = (IDeliveryService)locator.lookupService(IDeliveryService.SERVICE_NAME);
                  // Construct a normalized message and send to Oracle BPEL Process Manager
                  log.info("ReInvoke - Construct a normalized message and send to Oracle BPEL Process Manager.");
                  NormalizedMessage nm = new NormalizedMessage();
                  nm.addPart("payload", xml);
                  // Initiate the BPEL process
                  log.info("ReInvoke - Initiate the BPEL process.");
                  deliveryService.post("BPELProcess", "initiate", nm);
                } catch (RemoteException e) {
                  log.error(e);
                  setVariableData("invokeError", "true");
                  setVariableData("invokeErrorMessage", "ReInvokeRH-RemoteException: "+e.toString());  
                } catch (ServerException e) {
                  log.error(e);
                  setVariableData("invokeError", "true");
                  setVariableData("invokeErrorMessage", "ReInvokeRH-ServerException: "+e.toString());
                }
                log.info("ReInvoke - Return.");]]>
              </bpelx:exec>
This will build a String message, create NormalizedMesage of it, and post it to the "Initiate" operation of the "BPELProcess" using the post method of the DeliveryService that is fetched using the locator of the bpel instance.

Doing so the parent-child relation is broken, the new instance runs in a new process-instance-tree.

Another consideration is that you might want to "listen" to external messages to do an abort or a reschedule. This can be done in several ways. For example by doing a correlated receive (using a correlation set on a field in the request message like the "singletonId" in the reinvoke example) in a parallel flow.

Problem is that at a normal reinvoke this receive has to be released. If not, the parellel flow will not end and/or the correlation set will not be freed. In the new instance you might get a runtime message that there is already a receive on the same correlation-id. In another project I solved a similar problem by calling the process instance with an abort message from within the process instance, before doing a re-invoke. This is quite costly in terms of performance since it involves a message to the bpel-service with all the overhead (even if it's in fact a WSIF call). So recently I found myself a little smarter solution.

What I basically did was to wrap the parallel flow with the receive and task-execution logic with a scope.
Instead of normally ending the branches, I throw a "finish" or "abort" exception:

<throw name="Throw_Finish" faultName="client:Finish" faultVariable="ReceiveInput_initiate_InputVariable"/> 

Then this can be catched in the surrounding scope:
<catch faultName="client:Finish" faultVariable="ReceiveInput_initiate_InputVariable">

This might look strange at first sight, to end your normal (or abort) flow using a business-exception. Ending your flow is not an exception, is it?
But it will release your receive-activities and the correlation id they hold for a reinvoke on the same correlation-id.

No comments :