1. 程式人生 > >MOQL-複雜事件處理(CEP)

MOQL-複雜事件處理(CEP)

    MOQL做為一款基於Java的面向記憶體物件的過濾、查詢及統計分析工具,有非常豐富的應用場景。複雜事件處理就是其應用場景之一。MOQL工程在遷移到GitHub之後,進行了一次大的調整。整個專案被拆分成了moql-core和moql-cep兩個模組。其中moql-core即原來的moql主工程,而moql-cep則是基於moql開發的複雜事件處理模組,其功能類似Esper,但待處理的資料是無模式的。所謂無模式即不用像Esper一樣,事先指定待處理的資料的模式,對於事件型別比較多的應用場景更適用,開發也更簡單。

    示例程式碼如下(注:該程式碼在moql-cep的測試類中可以找到):

//構造測試資料,測試資料為物件陣列列表。資料共5*20=100條
List<Object[]> dataList = DataSimulator.createDataList(5, 20);
//構造複雜事件處理規則
CeperMetadata metadata = new CeperMetadata();
//設定規則名
metadata.setName("cep1");
//設定規則的視窗型別,關於規則支援的視窗型別見後文
metadata.setWinType(SlideWindowEnum.SW_BATCH.name());
/*基於Moql語法設定匹配規則。該規則含義為以事件陣列的0下標索引值做名字,對下標為2的值
*求和。當有任何一組值的和大於100時,則匹配規則,並輸出name和sum兩個欄位。注:from
*關鍵字後跟兩個evt,第一個表示事件流的名字,第二個表示別名。Moql語法要求必須有別名,
*參見Moql相關語法文件。
*/
metadata.setMoql("select evt[0] name, sum(evt[2]) sum from evt evt group by name having sum > 100");
//設定視窗桶的個數 
metadata. setBucketCount(5);
/*設定桶大小,每個桶可以快取的資料條目。每個視窗由bucketCount個桶組成,其容量為
* bucketCount*bucketSize。每當視窗滑動時,引發計算並按桶進行資料淘汰,即桶的大小約定了
*視窗的資料處理粒度。該示例表示視窗最多可以容納50/10=5個桶,當第6桶資料來時,第1桶被
*淘汰,只保留最新的5桶資料
*/
metadata.setBucketSize(10);
/*建立複雜事件規則物件,其中引數evt表示傳給ceper的事件流的名字。該名字是規則定義時
*moql語句中from後的表被命名為evt的原因。
*/
Ceper<Object[]> ceper = new MoqlCeper<Object[]>("evt", metadata);
//設定事件監聽器,當有匹配條件的事件發生時通過監聽器可以訂閱到
ceper.addCepListener(new CepPrintListener());
//開始進行復雜事件處理
for(Object[] data : dataList) {
  ceper.operate(data);
}

    moql-cep目前支援4種滑動視窗型別。每種視窗都是由bucketCount個桶(bucket)組成,其容量為 bucketCount*bucketSize。桶表示了視窗的最小處理粒度,每當有新的一桶資料產生時,就會引發一次計算(視窗已滿的情況下),同時會淘汰掉最老的那桶資料。

Ø  BatchWindow

    批處理視窗,以事件的數目做為視窗滑動的觸發條件。bucketCount表示視窗能容納的桶的總量;bucketSize表示一桶能容納的事件的總量。

Ø  TimeWindow

    時間視窗,以時間做為視窗滑動的觸發條件。bucketCount表示視窗能容納的桶的總量;bucketSize表示每桶能夠容納多長時間的事件,單位秒。

Ø  BatchAndTimeWindow

    批處理與時間並行視窗,同時以事件數及時間做為視窗滑動的觸發條件,即那個條件先滿足,就用哪個條件觸發視窗滑動及相關計算。bucketCount與bucketSize引數的相關含義見BatchWindow。另外,在上例的程式碼中CeperMetadata還有一個Map型別的parameters屬性沒有介紹,各視窗可以通過該屬性設定視窗執行時特有的引數。BatchAndTimeWindow有一個名為win.batchandtime.timebucketsize的屬性,該屬性約定了時間視窗的大小。這意味著,當一個桶只要滿足bucketSize屬性或者timebucketsize屬性設定的任一條件就會觸發視窗滑動。

Ø  MatcherWindow

    值匹配視窗,即當某個指定值發生改變時引發視窗滑動。bucketCount表示視窗能容納的桶的總量;bucketSize在此時沒有實際意義,設定為0。MatcherWindow有一個名為win.matcher.expression的引數,該引數是一個表示式,符合Moql中Operand的相關定義。當用該表示式在事件流中計算出的值發生變化時,產生一個以新值為當前值的新桶,並觸發視窗的滑動和相關計算。如:當對儲存後的資料進行事件流回放時,由於事件已不能正常反映實時的流動情況,此時可以用該視窗對事件中的時間欄位按秒進行匹配,可模擬出實時流動時的計算效果。

    有沒講清楚的,可以給我留言。MOQL工程換了新地址,地址如下:

專案地址:https://github.com/colorknight/moql