This year, I came up with the idea of doing something around Correlation Sets. Preparing a series of articles and a talk. And therefor, let's start with an article on Correlation Sets in BPEL. Maybe later on I could pick up those earlier plans again.
You may have read "BPEL", and tend to skip this article. But wait, if you use BPM Suite: the Oracle BPM Process Engine is the exact same thing as the BPEL Process engine! And if you use the Processes module of Oracle Integration Cloud: it can use Correlation Sets too. Surprise: again it uses the exact same Process Engine as Oracle SOA Suite BPEL and Oracle BPM Suite.
Why Correlation Sets?
Now, why Correlation Sets and what are those? You may be familiar with OSB or maybe Mulesoft, or other integration tools.OSB is a stateless engine. What comes in is executed at once until it is done. So, services in OSB are inherently synchronous and short-lived. You may argue that you can do Asynch Services in OSB. But those are in fact "synchronous" one-way services. Fire & Forget, as you will. They are executed right away (hence the quoted synchronous) , until it is done. But the calling application does not expect a result (and thus asynchronous in the sense that the caller won't wait).
You could, and I have done it actually, create asynchronous request response services in OSB. Asynchronous Request Response services are actually two complementary one way fire & forget services. For such a WSDL both services are defined in different port types: one for the actual service consumer, and one callback service for the service provider. Using WS-Addressing header elements the calling service will provide a ReplyTo callback-endpoint and a MessageId to be provided by the responding service as an RelatesTo MessageId.
This RelatesTo MessageId serves as a correlation id, that maps to the initiating MessageId. WS- Addressing is a Webservice standard that describes the SOAP Header elements to use. As said, you can do this in OSB, OSB even has the WS-Addressing namespaces already defined. However, you have to code the determination and the setting of the MessageId and ReplyTo-Address yourself.
Because of the inherently stateless foundation of OSB the services are short-lived and thats why OSB is not suitable for long running processes. The Oracle SOASuite BPEL engine on the other hand, is designed to orchestrate Services (WebServices originally, but from 12c onwards REST Services as well) in a statefull way. This makes BPEL suitable for long running transactions as well. Because of that after the acquisition of Collaxa, the company who created the BPEL Engine, Oracle decided to replace it's own database product Oracle Workflow (OWF) with BPEL. And SOA Suite and it's BPEL engine natively support WS-Addressing. Based upon an Async Request/Response WSDL it will make sure it adds the proper WS-Addressing elements and has a SOAP Endpoint to catch response messages. Based upon the RelatesTo message id in the response it will correlate the incoming response with the proper BPEL process instance that waits for that message.
A BPEL process may run from a few seconds, to several minutes, days, months, or potentially even years. Although experience learned us that we wouldn't recommend BPEL services to run for longer than a few days. For real long running process you should choose BPM Suite or Oracle Integration Cloud/Process.
WS-Addressing helps in correlating response messages to requests that are sent out previously. But, it does not correlate Ad-Hoc messages. When a process runs for more than a few minuts, chances are that the information stored within the process is changed externally. A customer waiting for a some process may have relocated or even died. So you may need to interact with a running process. You want to be able to send a message with the changed info to the running process instance. And you want to be sure that the engine correlates the message to the correct instance. Correlation Sets help with these ad-hoc messages that may or may not be send at any time during the running of the process.
An example BPEL process
Let's make a simple customer processing process that reads an xml file and processes it and writes it back to an xml file.My composite looks like:
It has two File Adapter definitions, an exposed service that polls on the /tmp/In folder for customer*.xml files. And a reference service that writes an xml file into the /tmp/Out folder as customer%SEQ%_%yyMMddHHmmss%.xml. I'm not going to explain how to setup the File adapters, that would be another course-chapter.
For both adapters I created the following XSD:
<?xml version="1.0" encoding="UTF-8" ?> <xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:cmr="http://xmlns.darwin-it.nl/xsd/demo/Customer" targetNamespace="http://xmlns.darwin-it.nl/xsd/demo/Customer" elementFormDefault="qualified"> <xsd:element name="customer" type="cmr:CustomerType"> <xsd:annotation> <xsd:documentation>A customer</xsd:documentation> </xsd:annotation> </xsd:element> <xsd:complexType name="CustomerType"> <xsd:sequence> <xsd:element name="id" maxOccurs="1" type="xsd:string"/> <xsd:element name="firstName" maxOccurs="1" type="xsd:string"/> <xsd:element name="lastName" maxOccurs="1" type="xsd:string"/> <xsd:element name="lastNamePrefixes" maxOccurs="1" type="xsd:string" minOccurs="0"/> <xsd:element name="gender" maxOccurs="1" type="xsd:string"/> <xsd:element name="streetName" maxOccurs="1" type="xsd:string"/> <xsd:element name="houseNumber" maxOccurs="1" type="xsd:string"/> <xsd:element name="country" maxOccurs="1" type="xsd:string"/> </xsd:sequence> </xsd:complexType> </xsd:schema>(Just finishing this article, I encounter that I missed a city element. It does not matter for the story, but in the rest of the example I use the country field for city.)
The first iteration of the BPEL process just receives the file from the customerIn adapter, assigns it to the the input variable of the invoke of the customerOut adapter and invokes it:
Deploy it to the SOA Server and test it:
[oracle@darlin-ind In]$ ls ../TestFiles/ customer1.xml customer2.xml [oracle@darlin-ind In]$ cp ../TestFiles/customer1.xml . [oracle@darlin-ind In]$ ls customer1.xml [oracle@darlin-ind In]$ ls customer1.xml [oracle@darlin-ind In]$ ls customer1.xml [oracle@darlin-ind In]$ ls [oracle@darlin-ind In]$ ls ../Out/ customer2_200617125051.xml [oracle@darlin-ind In]$The output customer hasn't changed and is just like the input:
[oracle@darlin-ind In]$ cat ../Out/customer2_200617125051.xml <?xml version="1.0" encoding="UTF-8" ?><customer xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.darwin-it.nl/xsd/demo/Customer ../Schemas/Customer.xsd" xmlns="http://xmlns.darwin-it.nl/xsd/demo/Customer"> <id>1001</id> <firstName>Jean-Michel</firstName> <lastName>Jarre</lastName> <gender>M</gender> <streetName>Rue d'Oxygene</streetName> <houseNumber>4</houseNumber> <country>Paris</country> </customer>
[oracle@darlin-ind In]$
This process is now rather short-lived and doesn't do much except for moving the contents of the file. Now, let's say that this processing of the file takes quite some time and but during the processing the customer can have relocated, or died or has otherwise changed it's information.
I expanded my composite with a SOAP Service, based on a One-Way WSDL, that is based upon the same xsd:
And this is how I changed the BPEL:
In this example, after setting the customer to the customerOut variable, there is a long running "customer processing" sequence, that takes "about" 5 minutes.
But in parallel it now also listens to the UpdateCustomer partnerlink using a Receive. This could be done in a loop to also receive more follow-up messages.
This might look like a bit unnecessarily complex, with the throw and catch combination. But the thing with the Flow activity is that it completes only when all the branches are completed. So, you need a means to "kill" the Receive_UpdateCustomer activity. Adding a Throw activity does this nicely. Although the activity is colored red, this is not an actual Fault exception. I use it here as a flow-control activity. It just has a simple fault name, that I found the easiest to enter in the source:
<throw name="ThrowFinished" faultName="client:Finished"/>
This is because you can just use the client namespace reference. While in the designer you should provide a complete namespace URI:
Same counts for the Catch, after creating one, it's easier to add the namespace from the source:
<catch faultName="client:Finished"> <assign name="AssignInputCustomer"> <copy> <from>$ReceiveCustomer_Read_InputVariable.body</from> <to expressionLanguage="urn:oasis:names:tc:wsbpel:2.0:sublang:xpath1.0">$Invoke_WriteCustomerOut_Write_InputVariable.body</to> </copy> </assign> </catch>
Side-note: did you know that if you click on an activity or scope/sequence in the Designer and switch to the source, the cursor will be moved to the definition of the activity you selected? To me this often comes handy with larger BPELs.
By throwing the Finished exception the flow activity is left and by that all the unfinished branches are also closed and by that the Receive is quit too.
When you get a SOAP message in the bpel example above you would still wait for finishing the process branch. You probably also need to notify the customer processing branch that the data is changed. That can be done in the same way, by doing a throw of a custom exception.
How to define Correlation Sets
The example above won't work as is. Because, how does BPEL know to which process instance the message has to be sent? We need to create a Correlation set. And to do so we need to define how we can correlate the UpdateCustomer message to the customerIn message. Luckily there is a Customer.id field. For this example that will do. But keep in mind: you can have multiple processes running for a customer. So you should add something that will identify the instance.You can add and edit correlation sets on the invoke, receive, and pick/onMessage activities. But also from the BPEL menu:
Then you can define a Correlation Set:
As you can see you can create multiple Correlation Sets, each with one or more properties. In the last window, create a property, then select it for the Correlation Set and click ok. Up to the first dialog.
You'll see that the Correlation Set isn't valid yet. What misses, what I didn't provide in the last dialog, are the property aliases. We need to map the properties to the messages.
I find it convenient to do that on the activities, since we also need to couple the correlation Sets to particular Invoke, Receive, and/or Pick/OnMessage activities. Let's begin with the first receive:
Select the Correlations tab, and add the Correlation Set. Since this is the activity that the Customer Id first appears in a message in the BPEL processe, we need to initiate the Correlation Set. This can also be done on an invoke, when calling a process that may cause multiple ad-hoc follow-up messages. So, set the Initiate property to yes.
Also, here you can have multiple correlation sets on an activity.
Then click the edit (pencil) button to edit the Correlation Set. And add a property alias:
To find the proper message type, I find it convenient to go through the partnerlink node, then select the proper WSDL. From that WSDL choose the proper message type. Now, you would think you could select the particular element. Unfortunately, it is slightly less user-friendly. After choosing the proper message type in the particular WSDL, click in the query field and type CTRL-Space. A balloon will pop up with the possible fields and when the field has a child element, then a follow-up balloon will pop-up. Doing so, finish your xpath, and click as many times on Ok to get all the dialogs closed properly.
Another side-node, the CTRL-Space way of working with balloons also works with the regular expression builder in with creating Assign-Copy-rules. Sometimes I get the balloons un-asked for, which I actually find annoying sometimes.
Do the same for the customer update Receive:
Here it is important to select No for the initate: we now adhere to the initiated Correlation Set.
Wrap this up, deploy the composite and test.
Test Correlations
As in the first version copy an xml file to the /tmp/In folder. This results in a following BPEL Flow:The yellow-highlighted activities are now active. So, apparently it waits for a Receive and for the processing (Wait activity).
From the flow trace you can click on the compositename, besides the instance id, and then click on the Test button:
And enter new values for your customer:
In the bottom right corner you can click on the "Test Web Service" button, and from the resulting Response tab you can click on the launch flow trace.
You'll find that the Receive has been done, and the Assign after that as well. Now, only the Wait activity is active.
After processing the flow it throws a Finished exception, and finishes the BPEL Flow.
In this case the Receive was earlier than finisihing the Wait activity. So, in this flow the throw is unnecessary, but when the message wasn't received, then the throw is needed.
Looking in the /tmp/Out folder we see that the file is updates neatly from the Update:
[oracle@darlin-ind In]$ ls ../Out/ customer2_200617125051.xml customer3_200619160921.xml [oracle@darlin-ind In]$ cat ../Out/customer3_200619160921.xml <?xml version="1.0" encoding="UTF-8" ?><ns1:customer xmlns:ns1="http://xmlns.darwin-it.nl/xsd/demo/Customer"> <ns1:id>1001</ns1:id> <ns1:firstName>Jean-Michel</ns1:firstName> <ns1:lastName>Jarre</ns1:lastName> <ns1:lastNamePrefixes/> <ns1:gender>M</ns1:gender> <ns1:streetName>Equinoxelane</ns1:streetName> <ns1:houseNumber>7</ns1:houseNumber> <ns1:country>Paris</ns1:country> </ns1:customer>[oracle@darlin-ind In]$
A bit of techie-candy
Where is all this beautifull stuff registered?First of all, for the Correlation properties, you will find a new WSDL has appeared:
At the top of the source of the BPEL you'll find the following snippet:
<bpelx:annotation> <bpelx:analysis> <bpelx:property name="propertiesFile"> <![CDATA[../WSDLs/ProcessCustomer_properties.wsdl]]> </bpelx:property> </bpelx:analysis> </bpelx:annotation> <import namespace="http://xmlns.oracle.com/pcbpel/adapter/file/CorrelationDemo/CorrelationDemo/customerIn" location="../WSDLs/customerIn.wsdl" importType="http://schemas.xmlsoap.org/wsdl/" ui:processWSDL="true"/>
Here you see a reference to the properties wsdl. Also an import of the customerIn.wsdl. Let's take a look in there:
<?xml version= '1.0' encoding= 'UTF-8' ?> <wsdl:definitions name="customerIn" targetNamespace="http://xmlns.oracle.com/pcbpel/adapter/file/CorrelationDemo/CorrelationDemo/customerIn" xmlns:tns="http://xmlns.oracle.com/pcbpel/adapter/file/CorrelationDemo/CorrelationDemo/customerIn" xmlns:jca="http://xmlns.oracle.com/pcbpel/wsdl/jca/" xmlns:plt="http://schemas.xmlsoap.org/ws/2003/05/partner-link/" xmlns:pc="http://xmlns.oracle.com/pcbpel/" xmlns:imp1="http://xmlns.darwin-it.nl/xsd/demo/Customer" xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/" xmlns:cor="http://xmlns.oracle.com/CorrelationDemo/CorrelationDemo/ProcessCustomer/correlationset" xmlns:bpel="http://docs.oasis-open.org/wsbpel/2.0/process/executable" xmlns:vprop="http://docs.oasis-open.org/wsbpel/2.0/varprop" xmlns:ns="http://oracle.com/sca/soapservice/CorrelationDemo/CorrelationDemo/Customer" > <plt:partnerLinkType name="Read_plt"> <plt:role name="Read_role"> <plt:portType name="tns:Read_ptt"/> </plt:role> </plt:partnerLinkType> <vprop:propertyAlias propertyName="cor:customerId" xmlns:tns="http://xmlns.oracle.com/pcbpel/adapter/file/CorrelationDemo/CorrelationDemo/customerIn" messageType="tns:Read_msg" part="body"> <vprop:query>imp1:id</vprop:query> </vprop:propertyAlias> <vprop:propertyAlias propertyName="cor:customerId" xmlns:ns13="http://oracle.com/sca/soapservice/CorrelationDemo/CorrelationDemo/Customer" messageType="ns13:requestMessage" part="part1"> <vprop:query>imp1:id</vprop:query> </vprop:propertyAlias> <wsdl:import namespace="http://oracle.com/sca/soapservice/CorrelationDemo/CorrelationDemo/Customer" location="Customer.wsdl"/> <wsdl:import namespace="http://xmlns.oracle.com/CorrelationDemo/CorrelationDemo/ProcessCustomer/correlationset" location="ProcessCustomer_properties.wsdl"/>
Below the partnerLinkType you find the propertyAliases.
Especially with older, migrated processes, this might be a bit tricky, because you might get the property aliases not in the wsdl you want. Then you need to register the proper wsdl in the BPEL and move the property aliases to the other wsdl, together with the vprop namespace declaration.
When you move the WSDL to the MDS for reuse, move the property aliases to another wrapper WSDL. You shouldn't move the property aliases to the MDS with it. Because they belong to the process and shouldn't be shared, but also it makes it impossible for the designer to change. I'm not sure if it even would work. Probably it does, but you should not have that.
As I mentioned before, you can have multiple Correlation Sets in your BPEL (or BPMN) process and even on an activity. In complex interactions this may make perfectly sense. For instance when there is overlap. You may have initiated one Correlation Set on an earlier Invoke or Receive, and use that to correlate to another message in a Receive. But that message may have another identifying field that can be used to correlate with other interactions. And so you may have a non-initiating Correlation Set on an activity that initiates another one. Maybe even based on different property-aliases on the same message.
Pitfalls
Per CorrelationSet you can have multiple properties. They are concatenated on to a string. Don't use too many properties to make-up the correlation set. Perferably only one. And use short scalar elements for the properties. In the past the maximum length was around 1000 characters. I've no idea what it is now. But multiple properties and property aliases may make it error-prone. During the concatenation, a different formatting may occur. It is harder to check, validate if the correlation elements in the messages conform with eachother.In the example above I used the customer id for the correlation property. This results in an initiated correlation set where the UpdateCustomer Receive is listening for. If you would initiate another process instance for the same customer, the process engine will find at the UpdateCustomer Receive that there already is a (same) Receive with the same Correlation Set. And will fail. The process engine identifies the particular activity in the process definition and the combination process-activity and Correlation Set is unique. A uniqueness violation at this point will result in a runtime fault.
It doesn't matter if the message is initiate before of after the Receive is activated. If you would be so fast to issue an UpdateCustomer request before the process instance has activated the Receive, then it will be stored in a table, and picked up when the Receive activity is reached.
Conclusion
This may be new to you and sound very sophisticated. Or, not of course, if you were already familiar with it. If this is new to you: it was already in the product when Oracle acquired it in 2004!And not only that, you can use it in OIC Processes as well. Also for years. I wrote about that in 2016.
More on correlation sets, check out the docs.
No comments :
Post a Comment