Friday, 15 April 2016

Oracle Event Processing JMS Adapters: Custom Conversion using JAXB and EclipseLink Moxy

In this article, we will look at the very common interaction between Oracle Event Processing and JMS. JMS is a commonly used channel for decoupled exchange of messages or events. An OEP Application can both consume messages from a JMS destination, through a JMS inbound Adapter, and publish findings to a JMS destination with a JMS outbound Adapter. The Oracle Event Processing JMS adapters support any JMS service provider that provides a Java client that is compliant with Java EE. In this demonstration we will use WebLogic as JMS provider.
The inbound adapter converts incoming JMS map messages to an Event.


The outbound JMS adapter converts events into a JMS map message and sends the JMS message to a JMS destination.


When the JMS message type is not a Map message, we can still use JMS adapters, but we need to convert the incoming type to the desired event type and vice versa in our code.
Our objective in this article is demonstrate how customize the inbound and outbound conversion, using Java Architecture for XML Binding (JAXB) and EclipseLink Moxy, when the JMS message is text message in XML format.
Oracle Event Processing provides a simplified interface for JAXB mapping capabilities in adapters and event beans to marshall and unmarshall event data between XML and Java objects. The JAXB interface supports the JAXB 2.2 specification and EclipseLink Moxy provider extensions.
EclipseLink Moxy provides extensions that enable you to map between an existing XML schema and a predefined set of Java classes without modifying the XML schema or the Java classes without providing annotations. We provide the mapping information in an external metadata file using a XPath syntax. 
For this demonstration we will create an OEP Application that will consume messages published in a JMS queue called “OrderError” using a JMS inbound Adapter and publish them to another queue “OrderErrorOut” using a JMS outbound adapter.


The published message will be in the XML format and will follow the OrderError.xsd schema showed below:


<?xml version="1.0" encoding="windows-1252" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns="http://www.example.org"
targetNamespace="http://www.example.org" elementFormDefault="qualified"                   

<xsd:element name="orderError">
    <xsd:complexType>
      <xsd:sequence>
        <xsd:element name="orderId" type="xsd:ID"/>
        <xsd:element name="customerId" type="xsd:ID"/>
        <xsd:element name="errorCode" type="xsd:string"/>
      </xsd:sequence>
    </xsd:complexType>
  </xsd:element>
</xsd:schema>

We will customize the message conversion using a converter bean. In OEP applications the adapter or event bean that requires JAXB functionality obtains the functionality by injection of a bean that implements the com.oracle.cep.mappers.api.Mapper interface.

Create JMS Queues in WebLogic
Access the WebLogic console and create the JMS queues and the connection factory.

  
Import Packages
To make a package available to our application at runtime for OEP, we need to add it to the application's MANIFEST.MF file to the Import Package section (since the OEP server is OSGi-based). For our demonstration we need to import the following packages:
  • com.bea.wlevs.adapters.jms.api
  • javax.jms
  • com.oracle.cep.mappers.api
  • com.oracle.cep.mappers.jaxb
The MANIFEST.MF file will look as in the picture below:

                                                                                           
Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: OrderJMS.JAXBConversionDemo
Bundle-SymbolicName: OrderJMS.JAXBConversionDemo
Bundle-Version: 1.0.0
Bundle-Localization: bundle
Bundle-Vendor: %project.vendor
Bundle-ClassPath: .
Import-Package: com.bea.wlevs.configuration;version="12.1.3",
 com.bea.wlevs.ede.api;version="12.1.3",
 com.bea.wlevs.ede.impl;version="12.1.3",
 com.bea.wlevs.ede.spi;version="12.1.3",
 com.bea.wlevs.ede;version="12.1.3",
 com.bea.wlevs.management.spi;version="12.1.3",
 com.bea.wlevs.spring.support;version="12.1.3",
 com.bea.wlevs.spring;version="12.1.3",
 com.bea.wlevs.util;version="12.1.3",
 com.bea.wlevs.adapters.jms.api;version="12.1.3", 
 javax.jms;version="1.1.4",
 com.oracle.cep.mappers.api;version="12.1.3",
 com.oracle.cep.mappers.jaxb;version="12.1.3",
 org.apache.commons.logging;version="1.1.1",
 org.springframework.beans.factory.config;version="3.1.1",
 org.springframework.beans.factory;version="3.1.1",
 org.springframework.beans;version="3.1.1",
 org.springframework.core.annotation;version="3.1.1",
 org.springframework.osgi.context;version="1.2.0",
 org.springframework.osgi.extensions.annotation;version="1.2.0",
 org.springframework.osgi.service;version="1.2.0",
 org.springframework.util;version="3.1.1"


We also need to add them to the application classpath in jdeveloper.


Event Type
Create the “OrderErrorEvent” JavaBean to map the XML message. The Class will have an attribute for each XML element we need to map. Below is the java code: 

                                                                                           
package com.csc.oep.ordererror;

public class OrderErrorEvent {
   
    private String customerId;
    private String errorCode;
    private String orderId;
   
    public OrderErrorEvent() {
        super();
    }

    public void setCustomerId(String customerId) {
        this.customerId = customerId;
    }

    public String getCustomerId() {
        return customerId;
    }

    public void setErrorCode(String errorCode) {
        this.errorCode = errorCode;
    }

    public String getErrorCode() {
        return errorCode;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getOrderId() {
        return orderId;
    }
}


To use this bean as an Event Type we need to add it in our Event Repository. Open the EPN-Diagram and click on Event Type tab. To add a new Event Type click on the plus greed icon, enter the event type name as “OrderErrorEvent”, select the “Properties defined in a Java Bean”, click the glass icon and select the Java Bean just created.

  
This operation will add the Event Type to the Event Repository in the assembly file as showed below:

…                                                                                          
<wlevs:event-type-repository >
   <wlevs:event-type type-name="OrderErrorEvent">
     <wlevs:class >com.csc.oep.ordererror.OrderErrorEvent</wlevs:class>
   </wlevs:event-type>
</wlevs:event-type-repository>

Mapper
OEP uses JAXB to handle XML payloads, so in order to understand what an event type looks like in the XML format; We want to use an EclipseLink Moxy external metadata file to specify the mapping details, so create a new xml file, call it “orderErrorEventMetadata.xml” and put the follow content in it:

                                                                                           
<?xml version="1.0"?>
<xml-bindings xmlns="http://www.eclipse.org/eclipselink/xsds/persistence/oxm" 
              xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xmlns:xs="http://www.w3.org/2001/XMLSchema"
              xml-accessor-type="PROPERTY"
              package-name="com.csc.oep.ordererror.OrderErrorEvent">
   
       <xml-schema element-form-default="QUALIFIED"
                   namespace="http://www.example.org" >
          <xml-ns prefix="ns0" namespace-uri="http://www.example.org" />
       </xml-schema>
       <java-types>
          <java-type name="com.csc.oep.ordererror.OrderErrorEvent">
            <xml-root-element name="orderError"/>
              <java-attributes>
                <xml-element java-attribute="customerId" />
                <xml-element java-attribute="errorCode" />
                <xml-element java-attribute="orderId" />
             </java-attributes>
          </java-type>
       </java-types>
</xml-bindings>


The file above must be created in the wlevs/mappers/jaxb directory as showed below:


To start using the XML mapper, we need to configure in the assembly file a bean factory for the following class: com.oracle.cep.mappers.jaxb.JAXBMapperFactory.
Listing below show the XML mapper defined in the assembly file:


….                                                                                         
<bean id="xmlMapper" class="com.oracle.cep.mappers.jaxb.JAXBMapperFactory"  
       factory-method="create">
       <property name="eventTypeName" value="OrderErrorEvent" />
       <property name="metadata" value="orderErrorEventMetadata.xml" />
   </bean>
….

Converter Bean
The inbound and outbound converter bean implement methods in the following two inbound and outbound interfaces
  • Inbound: com.bea.wlevs.adapters.jms.api.InboundMessageConverter. You have to implement its convert method. The return value is a List of events to be passed downstream.
public List convert(Message message)
     throws MessageConverterException, JMSException;
  • Outbound: com.bea.wlevs.adapters.jms.api.OutboundMessageConverter interface. We have to implement its convert method. The return value is a List of JMS messages.
public List<Message> convert(Session session, Object event)
     throws MessageConverterException, JMSException;

Below is the code of our converter bean :

                                                                                           
package com.csc.oep.ordererror;

import com.bea.wlevs.adapters.jms.api.InboundMessageConverter;
import com.bea.wlevs.adapters.jms.api.MessageConverterException;
import com.bea.wlevs.adapters.jms.api.OutboundMessageConverter;
import com.oracle.cep.mappers.api.Mapper;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.stream.StreamSource;


public class JmsJaxbMessageConverter implements InboundMessageConverter,   
                                                OutboundMessageConverter {
   
    private OrderErrorEvent result;
    private Mapper xmlMapper;
   
    public JmsJaxbMessageConverter() {
        super();
    }

    @Override
    public List convert(Message message) throws MessageConverterException{
       
      List<OrderErrorEvent> eventsList = new ArrayList<>();
       
      if(message instanceof TextMessage ){
           
          TextMessage textMessage = (TextMessage) message;
           
          try {
              StringReader reader = new StringReader(textMessage.getText());
              result = (OrderErrorEvent) xmlMapper
                                            .createUnmarshaller()
                                            .unmarshal(new StreamSource(reader));
                eventsList.add(result);
                System.out.println("JMS Message Converted to OrderErrorEvent");
            } catch (Exception e) {  
                System.out.println(e.getMessage());  
            }    
           
        } else {        
             System.out.println("NO TEXT MESSAGE");   
        }
      
        return eventsList;
    }

    @Override
    public List<Message> convert(Session, Object object)
                    throws MessageConverterException, JMSException {
       
        String strResult= null;
       
        try {
            OrderErrorEvent errorEvent = (OrderErrorEvent)object;      
            StringWriter writer = new StringWriter();
            StreamResult result = new StreamResult(writer);
            xmlMapper.createMarshaller().marshal(errorEvent, result);
            strResult = writer.toString();
           
            System.out.println("OrderErrorEvent Converted to JMS Message");
           
        } catch (Exception e) {
       
            e.printStackTrace();
       
        };
      
        TextMessage m = session.createTextMessage();
        m.setText(strResult);
        List<Message> result = new ArrayList<Message>();
        result.add(m);
        return result;
    }

    public void setXmlMapper(Mapper xmlMapper) {
        this.xmlMapper = xmlMapper;
    }

    public Mapper getXmlMapper() {
        return xmlMapper;
    }

}


As you can see from the code above the converter bean will use the xml Mapper to perform marshall and unmarshall operations, so we need to inject the mapper into the bean. Below is the converter bean configured in the assembly file:

…                                                                      
<bean id="jmsJaxbMessageConverter"
         class="com.csc.oep.ordererror.JmsJaxbMessageConverter">
        <property name="xmlMapper" ref="xmlMapper"/>
    </bean>

Event Processing Network
The diagram below show the EPN created for our demonstration.

The JMS Adapters are configured using the adapter.xml file showed below:

                                                                                           
<?xml version="1.0" encoding="windows-1252" ?>
<wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application">

 <jms-adapter>
    <name>jms-inbound-adapter</name>
    <jndi-provider-url>t3://${host}:7001</jndi-provider-url>
    <jndi-factory>weblogic.jndi.WLInitialContextFactory</jndi-factory>
    <connection-jndi-name>jms/OrderErrorCF</connection-jndi-name>
    <destination-jndi-name>jms/OrderErrorQueue</destination-jndi-name>
    <session-transacted>false</session-transacted>
  </jms-adapter>
 
 <jms-adapter>
    <name>jms-outbound-adapter</name>
    <jndi-provider-url>t3:// ${host}:7001</jndi-provider-url>
    <jndi-factory>weblogic.jndi.WLInitialContextFactory</jndi-factory>
    <connection-jndi-name>jms/OrderErrorCF</connection-jndi-name>
    <destination-jndi-name>jms/OrderErrorOutQueue</destination-jndi-name>
    <session-transacted>false</session-transacted>
  </jms-adapter>

</wlevs:config>


The CQL processor is configured in the processor.xml file.

                                                                                           
<?xml version="1.0" encoding="UTF-8"?>
<wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application">

  <processor>
    <name>OrderErrorProcessor</name>
    <rules>
      <query id="AllErrorQuery"><![CDATA[
         select * from OrderErrorInputChannel
    ]]></query>
    </rules>
  </processor>

</wlevs:config>


The query will pass any event from the input channel to the output channel. To complete our configuration, we need to instruct OEP to use the converter bean to convert messages processed by the JMS adapters. We do this specifying the instance property “converterBean” in the assembly file for both the inbound and outbound JMS adapters.

…                                                                                          
  <wlevs:adapter id="jms-inbound-adapter" provider="jms-inbound">
      <wlevs:instance-property name="converterBean" ref="jmsJaxbMessageConverter"/>
      <wlevs:listener ref="OrderErrorInputChannel"/>
    </wlevs:adapter>

    <wlevs:adapter id="jms-outbound-adapter" provider="jms-outbound">
        <wlevs:instance-property name="converterBean" ref="jmsJaxbMessageConverter"/>
    </wlevs:adapter>

Test the Application
Deploy the application into your OEP server. To test the application, we need to:
  1. produce a JMS message in the OrderErrorQueue; 
  2. verify that the converter bean converts it to an OrderErrorEvent object.    
  3. verify that the ErrorEventObject is converted back to the XML message;
  4. verify that the OrderErrorOut queue receive the XML message.
Below is an example message to use for our test:

                                                                                           
<?xml version="1.0" encoding="UTF-8" ?>
   <ns0:orderError xmlns:ns0="http://www.example.org">
   <ns0:orderId>ID-1</ns0:orderId>
   <ns0:customerId>ID-2</ns0:customerId>
   <ns0:errorCode>ASD234</ns0:errorCode>
</ns0:orderError>


Use the WebLogic console to produce a new JMS message in The OrderError queue and copy the message above as body.
The converter bean will print a message to the server console after any conversion. We can verify from the server console that the message is been converted to the OrderErrorEvent object and then again back to XML.


Now, using again the WebLogic console, we can verify that the message is been received from the OrderErrorOut queue.


A copy of the source code is available here