1. 程式人生 > >Esper學習和原理分析

Esper學習和原理分析

   最近一直有同事跟我說目前開發的資料流平臺僅僅只是把資料推送過來作用不大。希望最好能夠連資料分析也一起做了,告訴他們結果就好。這樣的需求一般交給資料分析組去做就好了,不過了解了一下現在只有離線分析,最快也只能半小時統計一次,實時分析這塊還沒有實現。      去搜了下,看看有哪些開源的實時分析引擎可以用。之前先看了下storm,twitter做的,大公司大品牌,完全開源,看上去的確不錯。但是看了一下功能,相當於是一個實時hadoop,只能幫你計算,但是怎麼算還得你自己寫程式,離我想要的功能上有一些距離。目前想使用實時分析的產品一大堆,大部分產品資料量都不大,每個都給他們寫段分析程式碼,加上除錯,能累死人,維護代價高。      之後發現了Esper,仔細看了下,還真不錯,他是一個強大的支援ESP( Event Stream Process
 )和CEP( Complex Event Process )分析的引擎,特別是他的EPL解析語言吸引了我,可以通過簡單的寫一條類似SQL一樣的語句完成統計。      國內對esper研究的資料很少,因此就從http://esper.codehaus.org/官網下載了esper的原始碼和文件,仔細研究了一下,接下來分享一下我對esper的學習心得。     首先介紹一下ESPER的大體結構,esper從內容上分為兩塊,esper的核心esper-4.x.x.jar和esper-io。      (1)esper的核心包包含了EPL語法解析引擎,事件監聽機制,事件處理等核心模組。     (2)esper的io包含從各種資料來源讀取資料以及將輸出結果寫入各種資料來源,包括excel,database,JMS,http,socket,XML。     貼一張esper官網上的結構圖,方便大家瞭解esper的結構 Esper原理分析 - 顧費勇 - 顧費勇的部落格
    接下來對上述結構圖進行詳細的解釋讓大家加深對ESPER的瞭解     1.  Event物件:ESPER處理的事件的最小單位,一個任意的JavaBean物件,屬性支援簡單的java型別、陣列、map、以及巢狀JavaBean,很靈活,下面是一個簡單的Event物件

publicclassOrderEvent{ privateString itemName; privatedouble price; publicOrderEvent(String itemName,double price){ this.itemName = itemName; this.price = price

; } publicString getItemName(){ return itemName; } publicdouble getPrice(){ return price; } }



   2.EPL:      EPL是ESPER的核心,它類似於SQL,但是和SQL的執行方式不同。      SQL是資料在那裡,你每次執行SQL就會觸發一次查詢;而EPL是查詢在這裡,資料輸入達到一定條件即可觸發查詢。      這個條件可以有多種:     a).每個event物件來就觸發一次查詢,並只處理當前物件

select*fromOrderEvent


這個EPL語句會在每個OrderEvent物件到達後,並將該event交給後續的Listener(後面會降到)來進行處理。但是這種用法不多見,意義不大。   b).視窗處理模式:     EPL最大的特色就是這個視窗處理模式,有兩種視窗,時間視窗和長度視窗。    時間視窗 : 大家想一下,如果有一個場景,要獲取最近3秒內OrderEvent的price的平均值,那該怎麼做呢?一般的做法需要做個後臺執行緒來做3秒的時間統計,時間到了再做後續處理,雖然不復雜,但是也挺繁瑣的。 看看EPL是怎麼做的

select avg(price)from test.OrderEvent.win:time(3 sec)

win:time(3 sec)就是定義了3秒的時間視窗,avg(price)就是統計了3秒內的OrderEvent物件的price的平均值     長度視窗:   長度視窗和時間視窗比較類似

select avg(price)from test.OrderEvent.win:length(100)

win:length(10)就是定義了10個Event的,avg(price)就是統計了最近10個的OrderEvent物件的price的平均值 以上這些都比較容易理解,雖然知道了處理方法,也比較好用,我還是比較喜歡鑽研一下他的內部實現方式。先來看一張時間視窗模式的圖 Esper原理分析(一) - 顧費勇 - 顧費勇的部落格 他僅保留最近時間視窗的物件內容,但是每個Event到來都會觸發一次UpdateListener的操作   EPL語句會作為一個Statement來監聽事件的到來,當New Events有新事件時就會觸發UpdateListener的操作,下面是一個updateListener的簡單例子,event.get("avg(price))就可以獲得EPL查詢所獲得的price平均值,然後就可以加入自己的程式碼進行處理,比如將結果寫入本地檔案 而New Events和Old Events就是他的輸入,而ave(price)操作所計算的物件就是Length Window中的內容。

publicclassMyListenerimplementsUpdateListener{ publicvoid update(EventBean[] newEvents,EventBean[] oldEvents){ EventBeanevent= newEvents[0]; System.out.println("avg="+event.get("avg(price)")); } }

事件視窗也基本類似。 EPL的時間視窗的計時是怎麼實現的呢?我們來看下他的原始碼

ScheduledThreadPoolExecutor timer;//省略構造

timerTask = new EPLTimerTask(timerCallback);

ScheduledFuture<?> future = timer.scheduleAtFixedRate(timerTask, 0, msecTimerResolution, TimeUnit.MILLISECONDS);//估計每100毫秒執行一次

... 

_lastDrift = Math.abs(future.getDelay(TimeUnit.MILLISECONDS));//計算延遲

...

CurrentTimeEvent currentTimeEvent = new CurrentTimeEvent(msec); sendEvent(currentTimeEvent);//傳送時間控制Event

貼得比較亂 簡單解釋一下,ScheduledThreadPoolExecutor.scheduleAtFixedRate固定每100ms(可配置)執行一次,並且通過計算延遲future.getDelay來確保計時精確,接下來通過傳送一個CurrentTimeEvent來推送時間前進100+delay(ms),也就是說ESPER中的時間不是完全受機器時間控制的,而是通過傳送TimeEvent由應用來進行控制的,這方便做很多的擴充套件。 c)批量視窗處理模式 視窗模式是會在每個Event來都觸發一次UpdateListener操作,如果每秒Event數量達到很大的話這種方式明顯是不行的 CPU消耗會很厲害 批量視窗處理模式正好可以解決這個問題 批量時間視窗模式

select avg(price)from test.OrderEvent.win:time_batch(3 sec)

批量長度視窗模式

select avg(price)from test.OrderEvent.win:length_batch(10)

時間批量模式的操作圖如下 Esper原理分析(一) - 顧費勇 - 顧費勇的部落格  上圖的時間視窗大小為4s,他會在4s的視窗時間到達以後才將視窗中的內容一起扔給UpdateListener來進行處理,效能相對節約很多,特別是大資料量的情況下。長度批量視窗的處理模式也是類似。 上述視窗模式下記憶體使用情況又是如何呢?經過本人測試和研究程式碼發現,它會保留兩個視窗的記憶體使用量,一個儲存當前視窗的Events,一個儲存上一個視窗的Events,因此在估算一個數據分析程式佔用多少記憶體要看上面監聽的EPL語句開的視窗的大小以及資料的TPS,防止記憶體OOM。 掌握了上面的視窗的概念,後面其他的內容都很好理解了 d) 過濾 where過濾

select avg(price)from test.OrderEvent.win:time_batch(3 sec)where price>10

having過濾

select avg(price)from test.OrderEvent.win:time_batch(3 sec) having price>10

似曾相識啊,執行方式也基本和SQL裡的where 和 having差不多。 在EPL裡where 是在incoming Events到window之間進行過濾,having是在window到New Eventing之間進行過濾 e)聚合 count

select count(price)from test.OrderEvent.win:time_batch(3 sec)where price>10


sum

select sum(price)from test.OrderEvent.win:time_batch(3 sec)where price>10

group by

selectitemName,sum(price)from test.OrderEvent.win:time_batch(3 sec)where price>10groupby itemName


都很簡單,瞭解SQL的都狠容易上手 f) 函式 ESPER預設載入 java.lang.* java.math.* java.text.* java.util.* 支援這些包下的函式方法,例如

selectMath.round(sum(price))from test.OrderEvent.win:time_batch(3 sec)where price>10

它還支援自定義函式,舉個例子,做個計算百分比的函式

publicclassUtil{ publicstaticdouble computePercent(double amount,double total){ return amount / total *100; } }

配置一下

<plugin-singlerow-functionname="percent" function-class="mycompany.MyUtilityClass"function-method="computePercent"/>

OK了,可以用了

select percent(price,total)fromOrderEvent


總體來說,ESPER的EPL功能非常強大,而且基本和SQL類似,入門容易,構造一個實時資料分析系統比較簡單,且維護成本低,新應用進來只需要簡單配置一下EPL語句就可以了,方便快捷,對大部分的系統還是比較適合的。