1. 程式人生 > >Flink中的CEP複雜事件處理 (原始碼分析)

Flink中的CEP複雜事件處理 (原始碼分析)

其實CEP複雜事件處理,簡單來說你可以用通過類似正則表示式的方式去表示你的邏輯,表現能力非常的強,用過的人都知道

開篇先偷一張圖,整體瞭解FlinkCEP中的  一種重要的圖  NFA

FlinkCEP在執行時會將使用者的邏輯轉化成這樣的一個NFA Graph (nfa物件)

graph 中包含狀態(Flink中State物件),以及連線狀態的邊(Flink中StateTransition物件)

當從一個State跳變到另一個State時需要通過一條邊StateTransition,這條邊中包含一個Condition物件包含了使用者的邏輯就是我們使用者程式碼中.where()中返回Boolean的方法

也就是說Condition物件中包含是否可以完成狀態跳變的條件,A狀態要跳變到B狀態就必須滿足連線AB的邊中的條件(邊StateTransition物件屬於B state)

其中邊StateTransition分為三種

  take: 狀態滿足跳變條件後直接跳變到B狀態

  ignore: 狀態滿足跳變條件以後又回到原來狀態,狀態保持不變

  process: 這條邊可以忽略也可以不忽略

後面原始碼分析的時候可以看到他們之間的區別

接著從原始碼來看一下如何用這個NFA圖實現Flink中的CEP複雜事件處理的

因為CEP在Flink中被設計成運算元的一種而不是單獨的計算引擎,所以直接找到CepOperator.java中

來看一下它的初始化Open()

這裡看到有一個NFAFactory的工廠建立了一個NFA,這裡的這個工廠是在Driver端通過使用者編寫的程式碼返回的Patten物件轉換得到的,也就是使用者env.exection()的時候解析的,工廠物件還包含了使用者所有的State集合

繼續,在createNFA()方法中

 將工廠中的所有頂點也就是狀態States放到了NFA物件的一個Map中

 Key為這個States的Name(其實就是使用者程式碼中的.next("Name"))

接著看CepOperator.java中接收到資料processElement()方法做了什麼

 這裡是處理時間的,這裡其實就是直接執行了,這裡就不看了,直接看事件時間是如何處理的

先是取出資料的事件時間,判斷是不是小於當前水印了,小於這條資料就證明遲到太久了,如果有側輸出丟給側輸出處理,沒有就直接丟棄了,和WindowOperater一樣

然後看saveRegisterWatermarkTimer()方法

將 (當前水印+1) 註冊成了一個定時器timer用於觸發計算,和window原理一樣(不知道的可以看看前面的文章)

這裡主要是因為視窗是一批一批觸發而CEP需要逐個觸發,所以用(當前水印+1)當做定時器,也就是說只要水印往前推進了就觸發推進這段時間的所有計算

然後bufferEvent()將這條資料加入到了一個Queue中

現在來看觸發計算的具體邏輯

來到onEventTime()方法中

先是拿到一個用時間排序的優先佇列PriorityQueue裡面就是排序的事件時間

getNFAState()這裡比較重要,這裡通過nfa得到了一個nfaState具體來看一下

這裡這個NFAstate會初始化,NFAstate裡面包含了一個ComputationState的queue,主要目的是用於每條資料來的時候都會去遍歷這個queue,看這條資料是否能匹配上裡面的state如果匹配上了就更新下一個準備匹配的狀態

這裡就知道他為什麼NFAstate初始化的時候會把使用者所有的State中可以作為開始start的狀態放queue了吧

因為一開始沒資料,當來資料的時候我要判斷這條資料是不是屬於我CEP的Begin頭,這個state也就是我們使用者的begin()方法,所以才把所有的可以作為開始的狀態都放到這個PartialMatches這個queue中去,這個PartialMatches後面計算的時候會用到,注意

NFAState的初始化就講完了

繼續,回到處理邏輯

然後根據事件時間作為key拉取前面將資料放入的那個queue中資料,返回的是一個List包含這個事件時間的所有資料

然後排序,這裡是二次排序,第一次排序是用的事件時間,二次排序排的是同一時間的資料按什麼順序處理

然後這裡ProcessEvent()方法就是具體執行的邏輯了,這裡同時會把剛剛初始化好的NFAState傳遞進去

 一開始會獲取一個共享的緩衝區主要是為了減小CEP重複資料儲存的記憶體佔用,這裡不講了因為CEP論文裡面有,比較複雜

這裡process()方法就是具體邏輯了,返回了一個map這個map包含了process()方法這條資料匹配成功結束的資料也就是結果,而processMatchedSequences(patterns, timestamp)就是執行使用者的.select()邏輯了

既然這裡就得到了CEP匹配的結果,來看下具體計算邏輯nfa.process()

這裡又初始化兩個優先佇列

分別用於

  newPartialMatches  裝nfa匹配到一半沒有結束資料,也就是半匹配,

  potentialMatches     裝成功匹配完成的資料,用於返回,呼叫使用者的方法去處理結果

接著

這裡就直接去初始化好的NFAState中拿剛剛的那個PartialMatches,並且遍歷它,通過傳入這個computeNextStates()方法,用於判斷這條資料是否可以滿足這個ComputationState完成匹配

注意! 一開始時初始化裡面只有所有可作為CEP匹配頭的ComputationState,可想而知當後面匹配上了以後肯定會更新這個用於看資料是否匹配的queue

 

這裡就可以知道了整個CEP的處理方式了:  

   一開始會把所有可以作為CEP匹配頭的狀態State先放入queue,每來一條資料就會遍歷queue中所有state,看這條資料是否能能匹配上,能匹配上就在queue中加入下一個用於匹配的狀態,  用於看下一條資料能否繼續匹配上

   比如一個正則"abc"用於CEP匹配 當來了一條a資料,就匹配上CEP頭了,會把b state加入queue中,接著來了一條b資料,又繼續匹配上了,又把c state加入queue 直到來了一條c資料整個就匹配完成,返回結果

 

   總結 : 處理過程就是兩步

        1.來一條資料,遍歷queue中所有state,看哪些state能匹配上就匹配

        2.根據1的結果更新queue,用於下一條資料的匹配 

    

而判斷是否能匹配上就是這個computerNextStates()方法中

先把這個狀態state壓棧

從棧中取state遍歷它所有的邊 StateTransitions

呼叫使用者的方法看是否能滿足邊條件,也就是說是否能跳變到這個狀態

當滿足時,會根據邊

  ignore: 啥都不做

  take:       加入結果集中

  process:  又把這個狀態的下一個狀態state壓棧了,繼續迴圈處理

 

結果返回這條資料匹配上的狀態們,於是

遍歷所有匹配上的狀態得結果集,會把匹配上的狀態的下一個(target)用於匹配的狀態加進queue去

如果是結束,預設NFAstate中是有一個自帶"&end"的結束state

遍歷所有完成的狀態,當匹配上最後一個狀態時就是上面說的“&end”就證明完成了,丟到完成queue中

當匹配失敗了就清空狀態

當匹配上了但還沒有結束就丟到半匹配queue

接著

會先執行跳過策略把結果篩選一遍

然後

就是用我們前面說的那個半匹配queue了,用它又繼續更新了NFAState中的PartialMatches了

下一條資料來了以後就會用遍歷這個新queue集合來判斷是否可以繼續匹配了

然後返回這次匹配成功的資料,呼叫使用者select方法處理結