Introduction
Oracle event processing is a
really powerful platform to process events. In the Event Delivery Network (EDN) we define the OEP components
needed to process our events like Inbound/Outbound adapters, channels, CQL
processors, tables and event beans. The CQL processors are the brain of our
application. In the processors we can define the business logic according to
our application requirements. To define the business logic, we use Oracle CQL (Continuous Query
Language), a query language based on SQL with added constructs that support
streaming data. Using Oracle CQL, we express queries on event data streams to
perform complex event processing (CEP) using Oracle OEP. Oracle CQL also allows
us to enrich event data by combining event and RDBMS table data. We can easily
define a table in the EDN Assembly file, connect the table to the CQL processor
and use the table in our query. However, Oracle CQL doesn’t define any INSERT
or UPDATE statements, so if as result of our event processing we need to update
a field in the Database or insert a row, we have to define an Event Bean and do
that in Java.
As in any Java application, to
design our persistence layer, we can use plain JDBC or JPA with an Object
Relational Mapping (ORM) like
Eclipselink. Oracle Event Processing offers support for both approaches and
most important it supports Spring. Any OEP application, in fact, is an OSGi Bundle based on Spring. The
Assembly file, where we define our component, is an extension of a Spring
Context file, so in it we can define not only our components but also Spring
beans and we can benefit from the power of Spring.
In this post I will show you how
to configure JPA in an OEP application using Spring. The code throughout this
post is based on the Hello World example provided by Oracle. If during the OEP
installation you selected Oracle Event
Processing with examples the source code should be available at:
ORACLE_HOME\oep\examples\source\applications\helloworld
|
SpringJPADemo project
In this Demo project I will use an Oracle database and
Eclipselink as JPA provider. The image below shows the project structure.
Looking at the project structure
in JDeveloper we can see two logical folders, Application Sources and OEP
Content. In the Application Sources JDeveloper show all the Java source files,
in the OEP Content all the OEP project files. I will discuss the source code
and the configuration separately.
Application Source
In the project I defined only one
event type. The event properties are defined in the Java Bean HelloWorldEvent.
The HelloWorldAdapter
generates events and sends them to the downstream component. Here our
events are created concatenating the String HelloWorld - the
current time is: with the current date. It implements the StreamSource
and the RunnableBean interfaces. The StreamSource
interface allows to get an instance of a StreamSender object implementing the
method:
public void
setEventSender(StreamSender sender) {
eventSender = sender;
}
|
The RunnableBean interface
allows us to run a new Thread to generate events and send them using the StreamSender
object implementing its run() method.
Here, I want to persist events
with JPA, so I have to create an entity class representing the event using JPA
annotation. The code below shows the HelloWorldEntity class:
package com.csc.oep.entity;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.SequenceGenerator;
import
javax.persistence.Table;
@Entity
@Table(name="HELLO_WORLD")
public class
HelloWorldEntity {
@Id
@SequenceGenerator(name="SEQ_GEN",
sequenceName="SEQ_HELLO_WORLD", allocationSize=1, initialValue=1)
@GeneratedValue(strategy=GenerationType.SEQUENCE, generator="SEQ_GEN")
private long id;
private String
message;
public
HelloWorldEntity() {
super();
}
public
HelloWorldEntity(long id, String message) {
super();
this.id = id;
this.message = message;
}
public String
getMessage() {
return message;
}
public void setMessage
(String message) {
this.message = message;
}
public void setId(long id) {
this.id = id;
}
public long getId() {
return id;
}
@Override
public String
toString(){
return "HelloWorldEvent
[ id = "+ id + " message = "+message+ "]
";
}
}
|
The entity id is generated using
the SEQ_HELLO_WORLD defined in my Oracle Database.
To persist these entities, I have
created a generic DAO interface, a generic DAO implementation and the
HelloWorldEventDao as showed below:
//DAO
Interface
package
com.csc.oep.dao;
import
java.util.List;
public interface
Dao<E,K> {
public E findOne(K id);
public List<E>
findAll();
public E save(E entity);
public E update(E entity);
public void delete(E entity);
public void deleteById(K Id);
}
//Generic DAO Implementation
package
com.csc.oep.dao;
import
java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
public class
GenericDaoImpl<E,K> implements
Dao<E,K>{
@PersistenceContext
private EntityManager;
protected
Class<E> type;
public
GenericDaoImpl() {
super();
}
@Override
public E findOne(K id) {
return entityManager.find(type, id);
}
@Override
public List<E>
findAll() {
return entityManager.createQuery( "from
" + type.getName() )
.getResultList();
}
@Override
public E save(E entity) {
entityManager.persist( entity );
return entity;
}
@Override
public E update(E entity) {
return entityManager.merge(
entity );
}
@Override
public void delete(E entity) {
entityManager.remove( entity );
}
@Override
public void deleteById(K Id) {
E entity = findOne( Id );
delete( entity );
}
}
//HelloWorldEventDAO Implementation
package
com.csc.oep.dao;
import
com.csc.oep.entity.HelloWorldEntity;
public class
HelloWorldEventDao extends GenericDaoImpl<HelloWorldEntity,Long> {
public
HelloWorldEventDao() {
super(HelloWorldEntity.class);
}
}
|
Note the annotation @PersistenceContext
in the generic DAO, this will work because in the configuration I created
the EntityManagerBeanFactory and registered it as an OSGi
Service.
In the spring context file, I
will create the bean of type HelloWorldEventDao and inject it
into my service class. The code below shows the definition of the HelloWorldService
interface and its implementation:
//
HelloWorldService interface
package
com.csc.oep.service;
import
com.csc.oep.entity.HelloWorldEntity;
public interface
HelloWorldService {
public
HelloWorldEntity saveEvent(HelloWorldEntity event);
public
HelloWorldEntity getEvent(Long id);
}
// HelloWorldService implementation
package
com.csc.oep.service;
import
com.csc.oep.dao.Dao;
import
com.csc.oep.dao.HelloWorldEventDao;
import
com.csc.oep.entity.HelloWorldEntity;
import org.springframework.transaction.annotation.Transactional;
@Transactional
public class
HelloWorldServiceImpl implements HelloWorldService {
private
HelloWorldEventDao helloWorldEventDao;
public
HelloWorldServiceImpl() {
super();
}
public void
setHelloWorldEventDao(HelloWorldEventDao helloWorldEventDao) {
this.helloWorldEventDao = helloWorldEventDao;
}
public
HelloWorldEntity saveEvent(HelloWorldEntity event) {
return helloWorldEventDao.save(event);
}
@Override
public
HelloWorldEntity getEvent(Long id) {
return helloWorldEventDao.findOne(id);
}
}
|
Note
the @Transactional
annotation the will let Spring know I want to execute service methods in
transactions. I will configure spring transaction manager in the spring
configuration file.
I inject
this service bean in the HelloWorldBean. This is an event bean that implements StreamSink. StreamSink allows us to receive
events from the upstream component implementing its method:
@Override
public void onInsertEvent(Object
object) throws EventRejectedException {
…
}
|
The code below shows my HelloWorldBean:
package
com.csc.oep.bean;
import com.bea.wlevs.ede.api.EventRejectedException;
import com.bea.wlevs.ede.api.StreamSink;
import
com.csc.oep.entity.HelloWorldEntity;
import
com.csc.oep.event.HelloWorldEvent;
import
com.csc.oep.service.HelloWorldService;
public class
HelloWorldBrean implements StreamSink {
private
HelloWorldService helloWorldService;
@Override
public void
onInsertEvent(Object object) throws
EventRejectedException {
if(object instanceof
HelloWorldEvent ) {
HelloWorldEvent event =
(HelloWorldEvent) object;
//Create the entity object to save
HelloWorldEntity tmp =
new
HelloWorldEntity(event.getId(), event.getMessage());
//Save the entity to DB
tmp = helloWorldService.saveEvent(tmp);
//Retrive the entity from DB
HelloWorldEntity entity = helloWorldService.getEvent(tmp.getId());
//Print the entity
System.out.println(entity);
}
}
public void
setHelloWorldService(HelloWorldService helloWorldService) {
this.helloWorldService = helloWorldService;
}
}
|
Configuration Files
To configure my project and make it work I created or edited
the following files:
- persistence.xml;
- beans-jpa-config.xml
- SpringJPADemo.context.xml
- processor.xml
- Manifest.MF
In the persistence.xml
file I configured the helloworld Persistence Unit as follow:
<?xml version="1.0"
encoding="windows-1252" ?>
<persistence xmlns="http://java.sun.com/xml/ns/persistence"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://java.sun.com/xml/ns/persistence
http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"
version="2.0">
<persistence-unit name="helloworld"
transaction-type="RESOURCE_LOCAL">
<provider>org.eclipse.persistence.jpa.PersistenceProvider</provider>
<class>com.csc.oep.entity.HelloWorldEntity</class>
<properties>
<!-- Eclipselink Configuration
-->
<property name="eclipselink.ddl-generation"
value="create-or-extend-tables"/>
<property name="eclipselink.ddl-generation.output-mode"
value="database"/>
<property name="eclipselink.weaving"
value="false"/>
<property name="eclipselink.jdbc.read-connections.min"
value="1" />
<property name="eclipselink.jdbc.write-connections.mine"
value="1" />
<!-- Database Configuration
-->
<property name="javax.persistence.jdbc.driver"
value="oracle.jdbc.OracleDriver"/>
<property name="javax.persistence.jdbc.url"
value="jdbc:oracle:thin:@my-dbhostname:1521:XE"/>
<property name="javax.persistence.jdbc.user"
value="JPADEMO"/>
<property name="javax.persistence.jdbc.password"
value="jpademo"/>
</properties>
</persistence-unit>
</persistence>
|
I separated the JPA and EDN
component configuration in two different files. The beans-jpa-config.xml showed
below contains the bean definition foe beans that are not event bean and are
related to my persistence layer.
<?xml version="1.0"
encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xmlns:wlevs="http://www.bea.com/ns/wlevs/spring"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/osgi
http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://www.bea.com/ns/wlevs/spring
http://www.bea.com/ns/wlevs/spring/ocep-epn.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.1.xsd">
<!--Spring Bean definitions go
here-->
<osgi:service id
="em"
interface="javax.persistence.EntityManagerFactory"
ref="entityManagerFactory" />
<!-- bean post-processor for JPA
annotations -->
<bean
class="org.springframework.orm.jpa.support.PersistenceAnnotationBeanPostProcessor"/>
<bean id="entityManagerFactory"
class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean"
>
<property name="persistenceUnitName"
value="helloworld" />
</bean>
<bean id="txManager"
class="org.springframework.orm.jpa.JpaTransactionManager">
<property name="entityManagerFactory"
ref="entityManagerFactory" />
</bean>
<bean id="helloWorldEventDao"
class="com.csc.oep.dao.HelloWorldEventDao"
/>
<bean id="helloWorldService"
class="com.csc.oep.service.HelloWorldServiceImpl"
autowire="byName" />
<tx:annotation-driven transaction-manager="txManager"
/>
</beans>
|
Using
the osgi namespace I registered my EntityManagerFactory as an OSGi Service. This
will enable the injection on the Entity Manager in the GenericDaoImpl class using
the annotation @PersistenceContext. Here I also configure the JpaTransactionManager
and my DAO and Service beans.
All my OEP components are defined in the file
SpringJPADemo.context.xml file:
<?xml version="1.0"
encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xmlns:wlevs="http://www.bea.com/ns/wlevs/spring"
xmlns:jdbc="http://www.oracle.com/ns/ocep/jdbc"
xmlns:hadoop="http://www.oracle.com/ns/oep/hadoop"
xmlns:nosqldb="http://www.oracle.com/ns/oep/nosqldb"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/osgi
http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://www.bea.com/ns/wlevs/spring
http://www.bea.com/ns/wlevs/spring/ocep-epn.xsd
http://www.oracle.com/ns/ocep/jdbc
http://www.oracle.com/ns/ocep/jdbc/ocep-jdbc.xsd
http://www.oracle.com/ns/oep/hadoop
http://www.oracle.com/ns/oep/hadoop/oep-hadoop.xsd
http://www.oracle.com/ns/oep/nosqldb
http://www.oracle.com/ns/oep/nosqldb/oep-nosqldb.xsd">
<wlevs:event-type-repository>
<wlevs:event-type type-name="HelloWorldEvent">
<wlevs:class>com.csc.oep.event.HelloWorldEvent</wlevs:class>
</wlevs:event-type>
</wlevs:event-type-repository>
<wlevs:adapter id="hello-world-adapter"
class="com.csc.oep.adapter.HelloWorldAdapter">
<wlevs:instance-property name="message"
value="HelloWorld - the current time is:"/>
<wlevs:listener ref="inputChannel"/>
</wlevs:adapter>
<wlevs:channel id="inputChannel"
event-type="HelloWorldEvent">
<wlevs:listener ref="processor"/>
</wlevs:channel>
<wlevs:processor id="processor"/>
<wlevs:event-bean id="eventBean"
class="com.csc.oep.bean.HelloWorldBrean">
<wlevs:instance-property name="helloWorldService"
ref="helloWorldService"/>
</wlevs:event-bean>
<wlevs:channel id="outputChannel"
event-type="HelloWorldEvent">
<wlevs:listener ref="eventBean"/>
<wlevs:source ref="processor"/>
</wlevs:channel>
</beans>
|
Note that I am injecting the
service bean I defined in the other context file without explicitly importing
it. This is because the OEP server will automatically merge in the same context
all the context files under the logical folder OEP Content/spring.
This file will create the
following Event Delivery Network:
In the processor.xml file I defined
the CQL ExampleQuery. In this query, I want to send all the events from the
input to the output channel.
<?xml version="1.0"
encoding="UTF-8"?>
<wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application">
<processor>
<name>processor</name>
<rules>
<query id="ExampleQuery"><![CDATA[
select * from inputChannel [now]
]]></query>
</rules>
</processor>
</wlevs:config>
|
As I stated at the beginning of
this post, the application we create is an OSGi bundle. We will deploy this
bundle in our OEP server. However, first we have to declare all our bundle and
package dependencies in the bundle Manifest.
Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: DatabaseDemo.JPADemo
Bundle-SymbolicName: DatabaseDemo.JPADemo
Bundle-Version: 1.0.0
Bundle-Localization: bundle
Bundle-Vendor: %project.vendor
Bundle-ClassPath: .
Require-Bundle: org.eclipse.persistence.jpa;version="2.5.2",
javax.persistence;jpa="2.1"
Import-Package:
com.bea.wlevs.configuration;version="12.1.3",
com.bea.wlevs.ede.api;version="12.1.3",
com.bea.wlevs.ede.impl;version="12.1.3",
com.bea.wlevs.ede.spi;version="12.1.3",
com.bea.wlevs.ede;version="12.1.3",
com.bea.wlevs.management.spi;version="12.1.3",
com.bea.wlevs.spring.support;version="12.1.3",
com.bea.wlevs.spring;version="12.1.3",
com.bea.wlevs.util;version="12.1.3",
org.apache.commons.logging;version="1.1.1",
org.springframework.beans.factory.config;version="3.1.1",
org.springframework.beans.factory;version="3.1.1",
org.springframework.beans;version="3.1.1",
org.springframework.core.annotation;version="3.1.1",
org.springframework.osgi.context;version="1.2.0",
org.springframework.osgi.extensions.annotation;version="1.2.0",
org.springframework.osgi.service;version="1.2.0",
org.springframework.orm.jpa;version="3.1.1",
org.springframework.orm.jpa.support;version="3.1.1",
org.springframework.transaction;version="3.1.1",
org.springframework.transaction.annotation;version="3.1.1",
org.springframework.transaction.interceptor;version="3.1.1",
org.springframework.aop;version="3.1.1",
org.springframework.aop.framework;version="3.1.1",
org.springframework.aop.framework.autoproxy;version="3.1.1",
org.springframework.util;version="3.1.1",
org.springframework.jdbc.datasource;version="3.1.1",
org.aopalliance.aop,
oracle.jdbc
|
Test The application
The source code of this project is available in my GitHub repository
here.
To test the project deploy it to
the OEP Server. After the deploy you should see the event printed in console as
I the image below:
The same events should be stored
in the HELLO_WORLD table in the database.
Enjoy :)
No comments:
Post a Comment