In this article, we will look at the very common interaction between
Oracle Event Processing and JMS. JMS is a commonly used channel for decoupled
exchange of messages or events. An OEP Application can both consume messages
from a JMS destination, through a JMS inbound Adapter, and publish findings to
a JMS destination with a JMS outbound Adapter. The Oracle Event Processing JMS
adapters support any JMS service provider that provides a Java client that is
compliant with Java EE. In this demonstration we will use WebLogic as JMS
provider.
The inbound adapter converts
incoming JMS map messages to an
Event
.The outbound JMS adapter converts events into a JMS map message and sends the JMS message to a JMS destination.
When the JMS message type is not a Map message, we can still use JMS adapters, but we need to convert the incoming type to the desired event type and vice versa in our code.
Our objective in this article is demonstrate how customize the inbound
and outbound conversion, using Java Architecture for XML Binding (JAXB) and
EclipseLink Moxy, when the JMS message is text message in XML format.
Oracle Event Processing provides a simplified interface for JAXB
mapping capabilities in adapters and event beans to marshall and unmarshall
event data between XML and Java objects. The JAXB interface supports the JAXB
2.2 specification and EclipseLink Moxy provider extensions.
EclipseLink Moxy provides extensions that enable you to map between an
existing XML schema and a predefined set of Java classes without modifying the
XML schema or the Java classes without providing annotations. We provide the
mapping information in an external metadata file using a XPath syntax.
For this demonstration we will create an OEP Application that will
consume messages published in a JMS queue called “OrderError” using a JMS
inbound Adapter and publish them to another queue “OrderErrorOut” using a JMS
outbound adapter.
The published message will be in the XML format and will follow the OrderError.xsd schema showed below:
<?xml version="1.0"
encoding="windows-1252" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns="http://www.example.org"
targetNamespace="http://www.example.org" elementFormDefault="qualified">
<xsd:element name="orderError">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="orderId"
type="xsd:ID"/>
<xsd:element name="customerId"
type="xsd:ID"/>
<xsd:element name="errorCode"
type="xsd:string"/>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
</xsd:schema> |
We will customize the message conversion using a converter bean. In OEP applications the adapter or event bean that requires JAXB functionality obtains the functionality by injection of a bean that implements the
com.oracle.cep.mappers.api.Mapper
interface.
Create JMS Queues in WebLogic
Access the WebLogic console and create the JMS queues and the
connection factory.
Import Packages
To make a package available to our application at runtime for OEP, we
need to add it to the application's MANIFEST.MF file to the Import Package
section (since the OEP server is OSGi-based). For our demonstration we need to
import the following packages:
com.bea.wlevs.adapters.jms.api
javax.jms
-
com.oracle.cep.mappers.api
-
com.oracle.cep.mappers.jaxb
The MANIFEST.MF file will look as in the picture below:
Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: OrderJMS.JAXBConversionDemo
Bundle-SymbolicName: OrderJMS.JAXBConversionDemo
Bundle-Version: 1.0.0
Bundle-Localization: bundle
Bundle-Vendor: %project.vendor
Bundle-ClassPath: .
Import-Package: com.bea.wlevs.configuration;version="12.1.3",
com.bea.wlevs.ede.api;version="12.1.3",
com.bea.wlevs.ede.impl;version="12.1.3",
com.bea.wlevs.ede.spi;version="12.1.3",
com.bea.wlevs.ede;version="12.1.3",
com.bea.wlevs.management.spi;version="12.1.3",
com.bea.wlevs.spring.support;version="12.1.3",
com.bea.wlevs.spring;version="12.1.3",
com.bea.wlevs.util;version="12.1.3",
com.bea.wlevs.adapters.jms.api;version="12.1.3",
javax.jms;version="1.1.4", com.oracle.cep.mappers.api;version="12.1.3", com.oracle.cep.mappers.jaxb;version="12.1.3",
org.apache.commons.logging;version="1.1.1",
org.springframework.beans.factory.config;version="3.1.1",
org.springframework.beans.factory;version="3.1.1",
org.springframework.beans;version="3.1.1",
org.springframework.core.annotation;version="3.1.1",
org.springframework.osgi.context;version="1.2.0",
org.springframework.osgi.extensions.annotation;version="1.2.0",
org.springframework.osgi.service;version="1.2.0",
org.springframework.util;version="3.1.1"
|
We also need to add them to the application classpath in jdeveloper.
Event Type
Create the “OrderErrorEvent” JavaBean to map the XML message. The
Class will have an attribute for each XML element we need to map. Below is the
java code:
package com.csc.oep.ordererror;
public class
OrderErrorEvent {
private String customerId;
private String errorCode;
private String orderId;
public
OrderErrorEvent() {
super();
}
public void
setCustomerId(String customerId) {
this.customerId = customerId;
}
public String
getCustomerId() {
return customerId;
}
public void
setErrorCode(String errorCode) {
this.errorCode = errorCode;
}
public String
getErrorCode() {
return errorCode;
}
public void
setOrderId(String orderId) {
this.orderId = orderId;
}
public String
getOrderId() {
return orderId;
}
}
|
To use this bean as an Event Type we need to add it in our Event
Repository. Open the EPN-Diagram and click on Event Type tab. To add a new
Event Type click on the plus greed icon, enter the event type name as
“OrderErrorEvent”, select the “Properties defined in a Java Bean”, click the
glass icon and select the Java Bean just created.
This operation will add the Event Type to the Event Repository in the
assembly file as showed below:
…
<wlevs:event-type-repository >
<wlevs:event-type
type-name="OrderErrorEvent">
<wlevs:class >com.csc.oep.ordererror.OrderErrorEvent</wlevs:class>
</wlevs:event-type>
</wlevs:event-type-repository>
…
|
Mapper
OEP uses JAXB to handle XML payloads, so in order to understand what
an event type looks like in the XML format; We want to use an EclipseLink Moxy
external
metadata
file to
specify the mapping details, so create a new xml file, call it “orderErrorEventMetadata.xml”
and put the follow content in it:
<?xml version="1.0"?>
<xml-bindings xmlns="http://www.eclipse.org/eclipselink/xsds/persistence/oxm"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xs="http://www.w3.org/2001/XMLSchema"
xml-accessor-type="PROPERTY"
package-name="com.csc.oep.ordererror.OrderErrorEvent">
<xml-schema element-form-default="QUALIFIED"
namespace="http://www.example.org" >
<xml-ns
prefix="ns0" namespace-uri="http://www.example.org"
/>
</xml-schema>
<java-types>
<java-type name="com.csc.oep.ordererror.OrderErrorEvent">
<xml-root-element name="orderError"/>
<java-attributes>
<xml-element java-attribute="customerId"
/>
<xml-element java-attribute="errorCode"
/>
<xml-element java-attribute="orderId"
/>
</java-attributes>
</java-type>
</java-types>
</xml-bindings>
|
The file above must be created in the wlevs/mappers/jaxb directory as
showed below:
To start using the XML mapper, we need to configure in the assembly
file a bean factory for the following class:
com.oracle.cep.mappers.jaxb.JAXBMapperFactory
.
Listing below show the XML mapper defined in the assembly file:
….
<bean id="xmlMapper"
class="com.oracle.cep.mappers.jaxb.JAXBMapperFactory"
factory-method="create">
<property name="eventTypeName"
value="OrderErrorEvent" />
<property name="metadata"
value="orderErrorEventMetadata.xml" />
</bean>
….
|
Converter Bean
The inbound and outbound converter bean implement methods in the
following two inbound and outbound interfaces
- Inbound: com.bea.wlevs.adapters.jms.api.InboundMessageConverter. You have to implement its convert method. The return value is a List of events to be passed downstream.
public List convert(Message message)
throws
MessageConverterException, JMSException;
- Outbound: com.bea.wlevs.adapters.jms.api.OutboundMessageConverter interface. We have to implement its
convert
method. Thereturn
value is aList
of JMS messages.
public List<Message> convert(Session session,
Object event)
throws
MessageConverterException, JMSException;
Below
is the code of our converter bean :
package com.csc.oep.ordererror;
import com.bea.wlevs.adapters.jms.api.InboundMessageConverter;
import com.bea.wlevs.adapters.jms.api.MessageConverterException;
import com.bea.wlevs.adapters.jms.api.OutboundMessageConverter;
import com.oracle.cep.mappers.api.Mapper;
import
java.io.StringReader;
import
java.io.StringWriter;
import
java.util.ArrayList;
import
java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import
javax.xml.transform.stream.StreamResult;
import
javax.xml.transform.stream.StreamSource;
public class JmsJaxbMessageConverter
implements InboundMessageConverter,
OutboundMessageConverter {
private
OrderErrorEvent result;
private Mapper xmlMapper;
public
JmsJaxbMessageConverter() {
super();
}
@Override
public List convert(Message
message) throws MessageConverterException{
List<OrderErrorEvent> eventsList = new
ArrayList<>();
if(message instanceof TextMessage ){
TextMessage textMessage = (TextMessage)
message;
try {
StringReader reader = new StringReader(textMessage.getText());
result =
(OrderErrorEvent) xmlMapper
.createUnmarshaller()
.unmarshal(new StreamSource(reader));
eventsList.add(result);
System.out.println("JMS
Message Converted to OrderErrorEvent");
} catch (Exception e) {
System.out.println(e.getMessage());
}
} else {
System.out.println("NO TEXT
MESSAGE");
}
return eventsList;
}
@Override
public List<Message>
convert(Session, Object object)
throws MessageConverterException,
JMSException {
String strResult= null;
try {
OrderErrorEvent errorEvent =
(OrderErrorEvent)object;
StringWriter writer = new
StringWriter();
StreamResult result = new StreamResult(writer);
xmlMapper.createMarshaller().marshal(errorEvent, result);
strResult = writer.toString();
System.out.println("OrderErrorEvent
Converted to JMS Message");
} catch (Exception e) {
e.printStackTrace();
};
TextMessage m = session.createTextMessage();
m.setText(strResult);
List<Message> result = new ArrayList<Message>();
result.add(m);
return result;
}
public void setXmlMapper(Mapper
xmlMapper) {
this.xmlMapper = xmlMapper;
}
public Mapper
getXmlMapper() {
return xmlMapper;
}
}
|
As you can see from the code above the converter bean will use the xml
Mapper to perform marshall and unmarshall operations, so we need to inject the
mapper into the bean. Below is the converter bean configured in the assembly
file:
…
<bean id="jmsJaxbMessageConverter"
class="com.csc.oep.ordererror.JmsJaxbMessageConverter">
<property name="xmlMapper"
ref="xmlMapper"/>
</bean>
…
|
Event Processing Network
The diagram
below show the EPN created for our demonstration.
The JMS Adapters are configured using the adapter.xml file showed below:
<?xml version="1.0"
encoding="windows-1252" ?>
<wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application">
<jms-adapter>
<name>jms-inbound-adapter</name>
<jndi-provider-url>t3://${host}:7001</jndi-provider-url>
<jndi-factory>weblogic.jndi.WLInitialContextFactory</jndi-factory>
<connection-jndi-name>jms/OrderErrorCF</connection-jndi-name>
<destination-jndi-name>jms/OrderErrorQueue</destination-jndi-name>
<session-transacted>false</session-transacted>
</jms-adapter>
<jms-adapter>
<name>jms-outbound-adapter</name>
<jndi-provider-url>t3:// ${host}:7001</jndi-provider-url>
<jndi-factory>weblogic.jndi.WLInitialContextFactory</jndi-factory>
<connection-jndi-name>jms/OrderErrorCF</connection-jndi-name>
<destination-jndi-name>jms/OrderErrorOutQueue</destination-jndi-name>
<session-transacted>false</session-transacted>
</jms-adapter>
</wlevs:config>
|
The CQL processor is configured in the processor.xml file.
<?xml version="1.0"
encoding="UTF-8"?>
<wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application">
<processor>
<name>OrderErrorProcessor</name>
<rules>
<query id="AllErrorQuery"><![CDATA[
select * from OrderErrorInputChannel
]]></query>
</rules>
</processor>
</wlevs:config>
|
The query will pass any event from the input channel to the output
channel. To complete our configuration, we need to instruct OEP to use the
converter bean to convert messages processed by the JMS adapters. We do this
specifying the instance property “converterBean” in the assembly file for both
the inbound and outbound JMS adapters.
…
<wlevs:adapter id="jms-inbound-adapter"
provider="jms-inbound">
<wlevs:instance-property name="converterBean"
ref="jmsJaxbMessageConverter"/>
<wlevs:listener ref="OrderErrorInputChannel"/>
</wlevs:adapter>
<wlevs:adapter id="jms-outbound-adapter"
provider="jms-outbound">
<wlevs:instance-property name="converterBean"
ref="jmsJaxbMessageConverter"/>
</wlevs:adapter>
…
|
Test the Application
Deploy the application into your OEP server. To test the application,
we need to:
- produce a JMS message in the OrderErrorQueue;
- verify that the converter bean converts it to an OrderErrorEvent object.
- verify that the ErrorEventObject is converted back to the XML message;
- verify that the OrderErrorOut queue receive the XML message.
Below is an example message to use for our test:
<?xml
version="1.0" encoding="UTF-8" ?>
<ns0:orderError
xmlns:ns0="http://www.example.org">
<ns0:orderId>ID-1</ns0:orderId>
<ns0:customerId>ID-2</ns0:customerId>
<ns0:errorCode>ASD234</ns0:errorCode>
</ns0:orderError>
|
Use the WebLogic console to produce a new JMS message in The
OrderError queue and copy the message above as body.
The converter bean will print a message to the server console after
any conversion. We can verify from the server console that the message is been
converted to the OrderErrorEvent object and then again back to XML.
Now, using again the WebLogic console, we can verify that the message
is been received from the OrderErrorOut queue.
A copy of the source code is available here