Monday, 10 October 2016

Configure JPA in Oracle Event Processing applications using Spring

Introduction


Oracle event processing is a really powerful platform to process events. In the Event Delivery Network (EDN) we define the OEP components needed to process our events like Inbound/Outbound adapters, channels, CQL processors, tables and event beans. The CQL processors are the brain of our application. In the processors we can define the business logic according to our application requirements. To define the business logic, we use Oracle CQL (Continuous Query Language), a query language based on SQL with added constructs that support streaming data. Using Oracle CQL, we express queries on event data streams to perform complex event processing (CEP) using Oracle OEP. Oracle CQL also allows us to enrich event data by combining event and RDBMS table data. We can easily define a table in the EDN Assembly file, connect the table to the CQL processor and use the table in our query. However, Oracle CQL doesn’t define any INSERT or UPDATE statements, so if as result of our event processing we need to update a field in the Database or insert a row, we have to define an Event Bean and do that in Java.
As in any Java application, to design our persistence layer, we can use plain JDBC or JPA with an Object Relational Mapping (ORM) like Eclipselink. Oracle Event Processing offers support for both approaches and most important it supports Spring. Any OEP application, in fact, is an OSGi Bundle based on Spring. The Assembly file, where we define our component, is an extension of a Spring Context file, so in it we can define not only our components but also Spring beans and we can benefit from the power of Spring.
In this post I will show you how to configure JPA in an OEP application using Spring. The code throughout this post is based on the Hello World example provided by Oracle. If during the OEP installation you selected Oracle Event Processing with examples the source code should be available at:

ORACLE_HOME\oep\examples\source\applications\helloworld

SpringJPADemo project


In this Demo project I will use an Oracle database and Eclipselink as JPA provider. The image below shows the project structure.


Looking at the project structure in JDeveloper we can see two logical folders, Application Sources and OEP Content. In the Application Sources JDeveloper show all the Java source files, in the OEP Content all the OEP project files. I will discuss the source code and the configuration separately.

Application Source


In the project I defined only one event type. The event properties are defined in the Java Bean HelloWorldEvent.
The HelloWorldAdapter generates events and sends them to the downstream component. Here our events are created concatenating the String HelloWorld - the current time is: with the current date. It implements the StreamSource and the RunnableBean interfaces. The StreamSource interface allows to get an instance of a StreamSender object implementing the method:

public void setEventSender(StreamSender sender) {
        eventSender = sender;
    }

The RunnableBean interface allows us to run a new Thread to generate events and send them using the StreamSender object implementing its run() method.
Here, I want to persist events with JPA, so I have to create an entity class representing the event using JPA annotation. The code below shows the HelloWorldEntity class:


package com.csc.oep.entity;

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.SequenceGenerator;
import javax.persistence.Table;

@Entity
@Table(name="HELLO_WORLD")
public class HelloWorldEntity {
    @Id
    @SequenceGenerator(name="SEQ_GEN", sequenceName="SEQ_HELLO_WORLD"allocationSize=1, initialValue=1)
    @GeneratedValue(strategy=GenerationType.SEQUENCE, generator="SEQ_GEN")
    private long id;
   
    private String message;

    public HelloWorldEntity() {
        super();
     }

    public HelloWorldEntity(long id, String message) {
        super();
        this.id = id;
        this.message = message;
    }

    public String getMessage() {
        return message;
    }
   
    public void setMessage (String message) {
        this.message = message;
    }

    public void setId(long id) {
        this.id = id;
    }

    public long getId() {
        return id;
    }
    
    @Override
    public String toString(){
      return "HelloWorldEvent [ id = "+ id + " message = "+message+ "] ";
    }
}

The entity id is generated using the SEQ_HELLO_WORLD defined in my Oracle Database.
To persist these entities, I have created a generic DAO interface, a generic DAO implementation and the HelloWorldEventDao as showed below:


//DAO Interface

package com.csc.oep.dao;

import java.util.List;

public interface Dao<E,K> {
  
   public E findOne(K id);
   public List<E> findAll();
   public E save(E entity);
   public E update(E entity);
   public void delete(E entity);
   public void deleteById(K Id);
  
}

//Generic DAO Implementation

package com.csc.oep.dao;

import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

public class GenericDaoImpl<E,K>  implements Dao<E,K>{
   
    @PersistenceContext  
    private EntityManager;
   
    protected Class<E> type;
   
    public GenericDaoImpl() {
        super();
    }

    @Override
    public E findOne(K id) {
      return  entityManager.find(type, id);
    }

    @Override
    public List<E> findAll() {
       return  entityManager.createQuery( "from " + type.getName() )
                       .getResultList();
    }

    @Override
    public E save(E entity) {
        entityManager.persist( entity );
        return entity;
    }

    @Override
    public E update(E entity) {
        return entityManager.merge( entity );
    }

    @Override
    public void delete(E entity) {
        entityManager.remove( entity );  
    }

    @Override
    public void deleteById(K Id) {
        E entity = findOne( Id );
        delete( entity );
    }
}

//HelloWorldEventDAO Implementation
package com.csc.oep.dao;

import com.csc.oep.entity.HelloWorldEntity;

public class HelloWorldEventDao extends GenericDaoImpl<HelloWorldEntity,Long> {
    public HelloWorldEventDao() {
        super(HelloWorldEntity.class);
    }

}

Note the annotation @PersistenceContext in the generic DAO, this will work because in the configuration I created the EntityManagerBeanFactory and registered it as an OSGi Service. 
In the spring context file, I will create the bean of type HelloWorldEventDao and inject it into my service class. The code below shows the definition of the HelloWorldService interface and its implementation:



// HelloWorldService interface

package com.csc.oep.service;

import com.csc.oep.entity.HelloWorldEntity;

public interface HelloWorldService {
   
    public HelloWorldEntity saveEvent(HelloWorldEntity event);
    public HelloWorldEntity getEvent(Long id);
    
}

// HelloWorldService implementation

package com.csc.oep.service;

import com.csc.oep.dao.Dao;
import com.csc.oep.dao.HelloWorldEventDao;
import com.csc.oep.entity.HelloWorldEntity;
import org.springframework.transaction.annotation.Transactional;


@Transactional
public class HelloWorldServiceImpl implements HelloWorldService {  
   
    private HelloWorldEventDao helloWorldEventDao;
   
    public HelloWorldServiceImpl() {
        super();
    }

    public void setHelloWorldEventDao(HelloWorldEventDao helloWorldEventDao) {
        this.helloWorldEventDao = helloWorldEventDao;
    }

    public HelloWorldEntity saveEvent(HelloWorldEntity event) {
       return helloWorldEventDao.save(event);
    }

    @Override
    public HelloWorldEntity getEvent(Long id) {
       return helloWorldEventDao.findOne(id);
    }
}



Note the @Transactional annotation the will let Spring know I want to execute service methods in transactions. I will configure spring transaction manager in the spring configuration file.
I inject this service bean in the HelloWorldBean. This is an event bean that implements  StreamSink. StreamSink allows us to receive events from the upstream component implementing its method:


@Override
public void onInsertEvent(Object object) throws EventRejectedException {
}

The code below shows my HelloWorldBean:


package com.csc.oep.bean;

import com.bea.wlevs.ede.api.EventRejectedException;
import com.bea.wlevs.ede.api.StreamSink;
import com.csc.oep.entity.HelloWorldEntity;
import com.csc.oep.event.HelloWorldEvent;
import com.csc.oep.service.HelloWorldService;

public class HelloWorldBrean implements StreamSink {

 private HelloWorldService helloWorldService;
  
 @Override
 public void onInsertEvent(Object object) throws EventRejectedException {
       
   if(object instanceof HelloWorldEvent ) {
              
      HelloWorldEvent event = (HelloWorldEvent) object;
           
      //Create the entity object to save
      HelloWorldEntity tmp
new HelloWorldEntity(event.getId(), event.getMessage());    
      //Save the entity to DB
      tmp = helloWorldService.saveEvent(tmp);
          
      //Retrive the entity from DB
      HelloWorldEntity entityhelloWorldService.getEvent(tmp.getId());
           
      //Print the entity
      System.out.println(entity);      
   }
 }
 public void setHelloWorldService(HelloWorldService helloWorldService) {
        this.helloWorldService = helloWorldService;
 }    
}

Configuration Files


To configure my project and make it work I created or edited the following files:
  •          persistence.xml;
  •          beans-jpa-config.xml
  •          SpringJPADemo.context.xml
  •          processor.xml
  •          Manifest.MF

In the persistence.xml file I configured the helloworld Persistence Unit as follow:


<?xml version="1.0" encoding="windows-1252" ?>
<persistence xmlns="http://java.sun.com/xml/ns/persistence"   
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="
             http://java.sun.com/xml/ns/persistence   
             http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"
             version="2.0">
            
    <persistence-unit name="helloworld" transaction-type="RESOURCE_LOCAL">
     
      <provider>org.eclipse.persistence.jpa.PersistenceProvider</provider>
     
      <class>com.csc.oep.entity.HelloWorldEntity</class>
     
      <properties>
        <!-- Eclipselink Configuration -->
        <property name="eclipselink.ddl-generation" value="create-or-extend-tables"/>
        <property name="eclipselink.ddl-generation.output-mode" value="database"/>
        <property name="eclipselink.weaving" value="false"/>
        <property name="eclipselink.jdbc.read-connections.min" value="1" />
        <property name="eclipselink.jdbc.write-connections.mine" value="1" />
       
        <!-- Database Configuration -->
        <property name="javax.persistence.jdbc.driver" value="oracle.jdbc.OracleDriver"/>
        <property name="javax.persistence.jdbc.url"  
                  value="jdbc:oracle:thin:@my-dbhostname:1521:XE"/>
        <property name="javax.persistence.jdbc.user" value="JPADEMO"/>
        <property name="javax.persistence.jdbc.password" value="jpademo"/>
       
      </properties>

       </persistence-unit>          

</persistence>

I separated the JPA and EDN component configuration in two different files. The beans-jpa-config.xml showed below contains the bean definition foe beans that are not event bean and are related to my persistence layer.


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:osgi="http://www.springframework.org/schema/osgi"
       xmlns:wlevs="http://www.bea.com/ns/wlevs/spring"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="
  http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/osgi
  http://www.springframework.org/schema/osgi/spring-osgi.xsd
  http://www.bea.com/ns/wlevs/spring
  http://www.bea.com/ns/wlevs/spring/ocep-epn.xsd
  http://www.springframework.org/schema/tx
  http://www.springframework.org/schema/tx/spring-tx-3.1.xsd">
  <!--Spring Bean definitions go here-->

  <osgi:service id ="em" interface="javax.persistence.EntityManagerFactory"
                         ref="entityManagerFactory" />
 
  <!-- bean post-processor for JPA annotations  -->
  <bean  
     class="org.springframework.orm.jpa.support.PersistenceAnnotationBeanPostProcessor"/>


  <bean id="entityManagerFactory"
        class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean" >
    <property name="persistenceUnitName" value="helloworld" />  
   </bean>
  
   <bean id="txManager" class="org.springframework.orm.jpa.JpaTransactionManager">
        <property name="entityManagerFactory" ref="entityManagerFactory" />
    </bean>
  
   <bean id="helloWorldEventDao" class="com.csc.oep.dao.HelloWorldEventDao" />
  
   <bean id="helloWorldService" class="com.csc.oep.service.HelloWorldServiceImpl"
                                autowire="byName" />
   
    <tx:annotation-driven transaction-manager="txManager" />
 
</beans>

Using the osgi namespace I registered my EntityManagerFactory as an OSGi Service. This will enable the injection on the Entity Manager in the GenericDaoImpl class using the annotation @PersistenceContext. Here I also configure the JpaTransactionManager and my DAO and Service beans.

All my OEP components are defined in the file SpringJPADemo.context.xml file:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:osgi="http://www.springframework.org/schema/osgi"
       xmlns:wlevs="http://www.bea.com/ns/wlevs/spring"
       xmlns:jdbc="http://www.oracle.com/ns/ocep/jdbc"
       xmlns:hadoop="http://www.oracle.com/ns/oep/hadoop"
       xmlns:nosqldb="http://www.oracle.com/ns/oep/nosqldb"
       xsi:schemaLocation="
  http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/osgi
  http://www.springframework.org/schema/osgi/spring-osgi.xsd
  http://www.bea.com/ns/wlevs/spring
  http://www.bea.com/ns/wlevs/spring/ocep-epn.xsd
  http://www.oracle.com/ns/ocep/jdbc
  http://www.oracle.com/ns/ocep/jdbc/ocep-jdbc.xsd
  http://www.oracle.com/ns/oep/hadoop
  http://www.oracle.com/ns/oep/hadoop/oep-hadoop.xsd
  http://www.oracle.com/ns/oep/nosqldb
  http://www.oracle.com/ns/oep/nosqldb/oep-nosqldb.xsd">

<wlevs:event-type-repository>
    <wlevs:event-type type-name="HelloWorldEvent">
      <wlevs:class>com.csc.oep.event.HelloWorldEvent</wlevs:class>
    </wlevs:event-type>
</wlevs:event-type-repository>
 
  <wlevs:adapter id="hello-world-adapter" class="com.csc.oep.adapter.HelloWorldAdapter">
    <wlevs:instance-property name="message" value="HelloWorld - the current time is:"/>
    <wlevs:listener ref="inputChannel"/>
  </wlevs:adapter>
 
  <wlevs:channel id="inputChannel" event-type="HelloWorldEvent">
    <wlevs:listener ref="processor"/>
  </wlevs:channel>
 
  <wlevs:processor id="processor"/>

  <wlevs:event-bean id="eventBean" class="com.csc.oep.bean.HelloWorldBrean">
    <wlevs:instance-property name="helloWorldService" ref="helloWorldService"/>
  </wlevs:event-bean>
 
  <wlevs:channel id="outputChannel" event-type="HelloWorldEvent">
    <wlevs:listener ref="eventBean"/>
    <wlevs:source ref="processor"/>
  </wlevs:channel>
  
</beans>

Note that I am injecting the service bean I defined in the other context file without explicitly importing it. This is because the OEP server will automatically merge in the same context all the context files under the logical folder OEP Content/spring.
This file will create the following Event Delivery Network:



In the processor.xml file I defined the CQL ExampleQuery. In this query, I want to send all the events from the input to the output channel.


<?xml version="1.0" encoding="UTF-8"?>
<wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application">
<processor>
    <name>processor</name>
    <rules>
      <query id="ExampleQuery"><![CDATA[
        select * from inputChannel [now]
      ]]></query>
    </rules>
  </processor>
</wlevs:config>

As I stated at the beginning of this post, the application we create is an OSGi bundle. We will deploy this bundle in our OEP server. However, first we have to declare all our bundle and package dependencies in the bundle Manifest.


Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: DatabaseDemo.JPADemo
Bundle-SymbolicName: DatabaseDemo.JPADemo
Bundle-Version: 1.0.0
Bundle-Localization: bundle
Bundle-Vendor: %project.vendor
Bundle-ClassPath: .
Require-Bundle: org.eclipse.persistence.jpa;version="2.5.2",
 javax.persistence;jpa="2.1"
Import-Package: com.bea.wlevs.configuration;version="12.1.3",
 com.bea.wlevs.ede.api;version="12.1.3",
 com.bea.wlevs.ede.impl;version="12.1.3",
 com.bea.wlevs.ede.spi;version="12.1.3",
 com.bea.wlevs.ede;version="12.1.3",
 com.bea.wlevs.management.spi;version="12.1.3",
 com.bea.wlevs.spring.support;version="12.1.3",
 com.bea.wlevs.spring;version="12.1.3",
 com.bea.wlevs.util;version="12.1.3",
 org.apache.commons.logging;version="1.1.1",
 org.springframework.beans.factory.config;version="3.1.1",
 org.springframework.beans.factory;version="3.1.1",
 org.springframework.beans;version="3.1.1",
 org.springframework.core.annotation;version="3.1.1",
 org.springframework.osgi.context;version="1.2.0",
 org.springframework.osgi.extensions.annotation;version="1.2.0",
 org.springframework.osgi.service;version="1.2.0",
 org.springframework.orm.jpa;version="3.1.1",
 org.springframework.orm.jpa.support;version="3.1.1",
 org.springframework.transaction;version="3.1.1",
 org.springframework.transaction.annotation;version="3.1.1",
 org.springframework.transaction.interceptor;version="3.1.1",
 org.springframework.aop;version="3.1.1",
 org.springframework.aop.framework;version="3.1.1",
 org.springframework.aop.framework.autoproxy;version="3.1.1",
 org.springframework.util;version="3.1.1",
 org.springframework.jdbc.datasource;version="3.1.1",
 org.aopalliance.aop,
 oracle.jdbc

Test The application


The source code of this project is available in my GitHub repository here.
To test the project deploy it to the OEP Server. After the deploy you should see the event printed in console as I the image below:




The same events should be stored in the HELLO_WORLD table in the database.


Enjoy :)

No comments:

Post a Comment