1. 程式人生 > >SODBASE CEP學習(十六):CEP與資料庫互動

SODBASE CEP學習(十六):CEP與資料庫互動

一些時候出於專案需求或複用,需要將CEP和資料庫結合起來用。SODBASE CEP可以很好地支援這型別需求。本文將介紹CEP與資料庫互動的兩種常用方式。

1. 示例操作

(1)為示例操作簡單,下載Oracle Express Edition (11g) Windows版,安裝過程中會提示為sys和system使用者設定密碼,設定為123456
安裝完畢後從開始選單執行“Oracle Database 11g Express Edition”->"執行 SQL 命令列"
我們就用Oracle自帶的示例表空間users,自帶的使用者HR
SQL>connect system/123456
SQL>ALTER USER "HR" identified by "123456"
SQL>ALTER USER HR ACCOUNT UNLOCK;
SQL>disconnect
SQL> connect HR/123456
已連線。
SQL> select * from regions;
 REGION_ID REGION_NAME
---------- --------------------------------------------------
         1 Europe
         2 Americas
         3 Asia
         4 Middle East and Africa

方式一:

在畫板上右鍵->檢視程式碼檢視此模型的EPL語句
SELECT * FROM T1:randomEventStream 
PATTERN T1 
WHERE JAVA:f.DB:in(T1.value2,'select distinct region_id from regions','oracle.jdbc.OracleDriver','jdbc:oracle:thin:@localhost:1521:xe','HR','123456')   WITHIN 0 

測試執行,結果如下所示,流的value2欄位的值只有是在REGION_ID裡面才會輸出

方式二:

oracleadaptor01是定時查詢資料庫,oracleadaptor01_output列印輸出,執行結果如下所示


2.工作原理

2.1 方式一:函式呼叫方式

可以把資料庫查詢作為一個函式。比如說一個簡單的需求,我們需要在資料流transacation_flow上查出黑名單使用者的交易記錄。當一筆交易資料請求過來時,我們需要檢查交易賬號是否在黑名單裡,即向資料庫發起查詢 select account from black_list 然後在返回的結果裡做判斷。一般來講這種操作會拉慢CEP引擎的速度,但是如果事件量不大 、tps要求不高的時候,這種方案也是可行的。

SELECT * FROM transacation_flow t 
PATTERN t
WHERE JAVA:f.DB:in(t.from_account,'select distinct account from black_list','oracle.jdbc.OracleDriver','jdbc:oracle:thin:@localhost:1521:xe','user','password') 

其中from_account是transacation_flow的欄位,表示轉出賬號。

JAVA:f.DB:in(String x /*被查值*/, String sql, String jdbcDriver, String url, String user, String password) 是SODBASE系統實現的自定義函式。

一共6個引數,如果x在資料庫上執行sql語句返回的資料集中,如果包含x,此函式返回true,否則返回false。

其它資料庫示例

mysql:

SELECT * FROM transacation_flow t 
PATTERN t
WHERE JAVA:f.DB:in(t.from_account,'select distinct account from black_list','com.mysql.jdbc.Driver','jdbc:mysql://localhost:3306/cep?useUnicode=true&characterEncoding=utf8','username','password')


postgresql:
SELECT * FROM transacation_flow t 
PATTERN t
WHERE JAVA:f.DB:in(t.from_account,'select distinct account from black_list',''org.postgresql.Driver','jjdbc:postgresql://ip:port/mydb','username','password')

Microsoft Sql Server:

SELECT * FROM transacation_flow t 
PATTERN t
WHERE JAVA:f.DB:in(t.from_account,'select distinct account from black_list',''com.microsoft.sqlserver.jdbc.SQLServerDriver','jdbc:sqlserver://localhost:1433;DatabaseName=mydb','username','password')

使用者也參考原始碼可以進行二次開發,開發出滿足自己需求的函式來。

2.2 方式二:事件觸發方式

流查詢 SELECT * FROM T1:oracleadaptor01 的輸出介面卡中,觸發資料庫查詢'select * from regions;'右鍵點選OUTPUT,檢視配置


如果要使用輸出事件的欄位值作為引數,採用?{},如 'select '?{t_id}','from_account','to_account', from black_list where account = ?{from_account} limit 1'

這種方式可以將資料庫互動納入統一的事件建模中。

3. 原始碼


自定義函式JAVA:f.DB:in在sodbase-cep-udfunction-db.jar中,原始碼如下所示,讀者也可以修改二次開發自定義函式,修改完畢後打jar包,放到lib目錄下面即可
/**
 * 
 */
package f;




import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;




/**
 *
 */
public class DB
{
	Connection conn = null;
	
	public boolean in(String x, String sql, String jdbcDriver, String url, String user, String password)
	{
		try
		{
			Connection conn = getConnection(jdbcDriver,  url,  user,  password);
			conn.setAutoCommit(true);
			Statement stmt = conn.createStatement();
			
			ResultSet r = stmt.executeQuery(sql );
			while(r.next())
			{
    				Object o = r.getObject(1);
					if(o!=null&&o.toString().equals(x))
						return true;
			}
			r.close();
			stmt.close();
		} catch (SQLException e)
		{
			e.printStackTrace();
		}
		finally
		{
			if(conn!=null)
				try
				{
					conn.close();
				} catch (SQLException e)
				{
					e.printStackTrace();
				}
			conn=null;
		}
		return false;
	}
	
	
	
	public Connection getConnection(String jdbcDriver, String dburl,String dbusername,
			String dbpassword) 
	{
		
		String dbclass =jdbcDriver;
		try {
			Class.forName(dbclass);
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}
		
		
		try {
			if(conn!=null&&!conn.isClosed())
				return conn;
		} catch (SQLException e1) {
			e1.printStackTrace();
		}
		
		try {




			conn =DriverManager.getConnection(dburl, dbusername,
					dbpassword);




		} catch (SQLException e) {
			e.printStackTrace();
		}
		return conn;
	}
}