If you are into integration world, you must have come across the word ‘Queuing’. An Architectural pattern which allows us to achieve de(or loose)-coupling and agility in real world. De-coupling is the measurement of dependency between the actual service provider and the service consumer(s). For a better function in SOA, this level of dependency should be low as possible. By Queing, the service consumer can publish the request & make them available for the provider which may or may not be available at when the request is published.
Oracle’s AQ (Advanced Queuing) is one of the extremely powerful (and less used) Oracle Database message queuing based on Oracle Streams. It is an Oracle’s DB feature which is said to be scalable and reliable as weblogic’s JMS. This post is not so much into de-coupling but the basic implementation of Oracle AQ adapter (more details) in SOA-12c.
In this tutorial, i have used Oracle 11g Express Edition as my DB.
AQ in-build Packages : There are two in-build packages which we need to know before we jump in.
- Oracle AQ administration package (DBMS_AQADM) which helps us by providing AQ’s administrative task like managing (create/delete/alter) queue table, privileged programs (grand/revoke), state of queue (start/stop), etc.
- Oracle AQ package (DBMS_AQ) which provides interface for enqueue and de-queue messages into the queue.
In simple steps, below are the steps to work with AQ adapter:
- create the needed db-user & queue in database.
- define the structure of message which is passing through the queue. Type is defined as Object (obviously it will have unique name) having multiple fields (defined using varchar, date, timeStamp,etc).
- create a queue table for persisting the messages. Each queue is defined by message carrying payload defined in the previous step.
- create the queue by associating the queue table in step 2.
- start the queue using DBMS_AQADM package.
- en-queue or de-queue the message as you want. I will describe how to do that using jca adapter.
For this tutorial, we will be considering an example of a newspaper shop which publishes different newspapers/magazines to its audience. As of now, consider that all the customers(OSB) are interested to receive all the newspaper/magazine published by the shop.
Lets get right into it.
Connect to the DB as sys and create the user :
create user shop_keeper identified by shop_keeper;
grant create session, resource, AQ_ADMINISTRATOR_ROLE,AQ_USER_ROLE to shop_keeper;
grant execute on DBMS_AQADM to shop_keeper;
grant execute on DBMS_AQ to shop_keeper;
Now, connect to DB as ‘shop_keeper’ as execute the below statements:
— Define the message structure
CREATE OR REPLACE TYPE ITEM_TYPE AS OBJECT (
— create the queue table using CREATE_QUEUE_TABLE package
exec DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => ‘SHOP_KEEPER.news_magazine_t’, queue_payload_type => ‘SHOP_KEEPER.ITEM_TYPE’);
exec DBMS_AQADM.CREATE_QUEUE(queue_name => ‘SHOP_KEEPER.news_magazine_q’, queue_table => ‘SHOP_KEEPER.news_magazine_t’);
Now, our queues are ready. It should look like this :
Lets take a look at some of the basic things needed for you to check the messages.
select * from SYS.USER_QUEUE_TABLES; — list all the queue table
select * from SYS.USER_QUEUES; — list all the queues defined
select * from NEWS_MAGAZINE_T ; — list all the messages in the queue table
Checking the status/messages in AQ queues (or queue tables) are as easy as checking records in usual database tables using select statements.
Now, lets de-queue the message using Oracle’s SOA-JCA adapter (in OSB for now). lets start to by creating the OSB projects in jdev.
- click on File –> New –> Applications. From the gallery pop-up, select Service Bus Applications with Service Bus Project. Click next.
- Type the name of the application (i typed as ‘proj-Fusion-Osb’). Click next.
- In the Project Name text box, type the name of the project. I gave mine as receiveArticles. Click on Finish. You should get a SCA to play with as below :
- Create a AQ adapter in the left lane named ‘Proxy Services’ by drag AQ adapter from the component panel. Now, AQ adaptor configuration wizard opens. Enter the name of the operation. I have typed as ‘receiveItemFromQueue’. Click Next.
- Select the connection to DB where we have created queue (it is shop_keeper schema). And, enter the jndi name as ‘eis/AQ/newsPaper’. Click next.
- In the adapter interface option step, select the radio button to Define from opeation and schema (specified later). Click next.
- Select Dequeue option. Click next.
- now, we had to select the queue name that we have created in the database. Click on Browse button. Select the schema name as ‘SHOP_KEEPER’ and click on search button. From the list of queues results at the bottom, select the one we have created i.e NEWS_MAGAZINE_Q. Click Ok. Click Next.
- Leave correlationid and Dequeue condition boxes empty.
- In the object payload dialog, select Whole Object ITEM_TYPE. This means adapter will be getting the entire message enqueued in the queue (the oter case means to get only the specific part of the payload).
- Click next and press Finish button. Now, the SCA will look like this.
- Now, we have configured the sca to get the message from the AQ. We now need to process it. So, lets create a pipeline in the mid-lane.Click and drag Pipeline resource from the Component panel. Type the name you want to give. I have given as ‘processPurchasedItem’.
13. In the step 2, select the WSDL radio button. Click on the button to browse WSDL. Select the concrete WSDL as shown below and un-check expose as proxy service.
14. Now, link the adapter and pipeline by connecting the interfaces. Now, double click on the pipe-line. Right click on the root node and select Insert into –> pipeline Pair Node. Re-name the pipeline pair and stages.
15. In the request pipeline flow, insert a reporting activity and configure the same with expression $body (to report the entire request). And, save the pipeline. The pipe-line should look like this :
Before deploying the project, lets create the JNDI name that we used to point to the data source(shop_keeper). i have created with the jndi name as ‘jdbc/newsMagazineShop’.
16. Now, lets create a new AQ adapter instance using the JNDI name we have used i.e.eis/AQ/newsPaper while configuring the adapter in jdeveloper. It can be done in Weblogic console via Domain Structure->Deployments->Deployments->AQAdapter.
17. Update the AQ adapter after the changes are done.
Now, deploy the OSB project. We have the queue ready now. We should be queuing the message now. I have written a procedure for that. Below is the one :
payload := item_type(1, ‘The Times of India’, ‘The Times Group’, to_date(sysdate));
dbms_aq.ENQUEUE(queue_name => ‘NEWS_MAGAZINE_Q’, enqueue_options => queue_option, message_properties => msg_properties, payload => payload, msgid => msg_id);
dbms_output.put_line(‘Message successfully Enqueued with messageId:’|| msg_id);
Having all things right, we should be able to de-queue the messages now (confirm the same by querying the queue table). And, the message reports should be like this:
Clicking on the report index (i.e. itemId=1;itemName=The Times of India). From the link ‘View Report Details’, we should be able to see the message data that we have queued using the procedure as below :
That is it ! We have now queued the message from the AQ message using adapter given in Oracle 12c. All the resources can be downloaded from the below location:
Thanks and great day.