1. 程式人生 > >大資料計算引擎之Flink Flink CEP複雜事件程式設計

大資料計算引擎之Flink Flink CEP複雜事件程式設計

原文地址: 大資料計算引擎之Flink Flink CEP複雜事件程式設計
複雜事件程式設計(CEP)是一種基於流處理的技術,將系統資料看作不同型別的事件,通過分析事件之間的關係,建立不同的時事件系序列庫,並利用過濾、關聯、聚合等技術,最終有簡單事件產生高階事件,並通過模式規則的方式對重要資訊進行跟蹤和分析,從實時資料中心發掘有價值的資訊。複雜事件處理主要應用於防範網路欺詐、裝置故障檢測、風險規避和智慧營銷等領域。目前主流的CEP工具具有Esper,Jboss Drools和上夜班的MicroSoft StreamInsight等,Flink基於DataStream API提供了FlinkCEP元件棧,專門用於對複雜事件的處理,幫助使用者從流式資料中發掘有價值的資訊。

基礎概念

FlinkCEP 說明

一個或多個由簡單事件構成的事件流通過一定的規則匹配,然後輸出使用者想得到的資料,滿足規則的複雜事件。具備如下的特徵:

  • 目標:從有序的簡單事件流中發現一些高階特徵
  • 輸入:一個或多個由簡單事件構成的事件流
  • 處理:識別簡單事件之間的內在聯絡,多個符合一定規則的簡單事件構成複雜事件
  • 輸出:滿足規則的複雜事件

CEP用於分析低延遲、頻繁產生的不同來源的事件流。 CEP 可以幫助在複雜的、不相關的事件流中找出有意義的模式和複雜的關係,以接近實時或準實時的獲得通知並阻止一些行為。

CEP支援在流 上進行模式匹配,根據模式的條件不同,分為連續的條件或不連續的條件;模式的條件允許有時間的限制,當在條件範圍內沒有達到滿足的條件時,會導致模式匹配超時。

CEP用於分析低延遲、頻繁產生的不同來源的事件流。 CEP 可以幫助在複雜的、不相關的事件流中找出有意義的模式和複雜的關係,以接近實時或準實時的獲得通知並阻止一些行為。

環境準備

這裡,我們需要引入相關的依賴包。

      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
        <version>1.9.0</version>
      </dependency>

基本概念

事件定義

簡單事件

簡單事件存在於現實場景中,主要特點為處理單一事件,事件的定義可以直接觀察出來,處理過程中無需關注多個事件之間的關係,能夠通過簡單的資料處理手段將結果計算出來。
複雜事件

相對於簡單事件,複雜事件處理的不僅是單一的事件,也處理由多個事件組成的複合事件複雜事件處理監測分析事件流(Event Streaming),當特定事件發生時來觸發某些動作。

事件關係

複雜事件中事件間包含多種關係,常見的有時序關係、聚合關係、層次關係、依賴關係以及因果關係。
時序關係

動作事件與動作事件之間、動作事件與狀態變化事件之間,都存在時間順序。事件和事件的時序關係決定了大部分的時序規則,例如: A 事件狀態持續為 1 的同時 B 事件狀態為 0 等;
聚合關係

動作事件和動作事件之間,狀態事件和狀態事件之間都存在聚合關係,即個體聚合形成整體集合。例如: A事件狀態為 1 的次數為 10 觸發預警。
層次關係

動作事件和動作事件之間,狀態事件和狀態事件之間都存在層次關係,即父類事件和子類事件的層次關係,從父類到子類是具體化的,從子類到父類是泛化的。這個可以對比Java裡面的繼承關係。
依賴關係

事物的狀態屬性之間彼此的依賴關係和約束關係,例如 A事件狀態觸發的條件前提是B事件觸發,則AB之間形成了依賴關係。
因果關係

對於完整的動作過程,結果狀態為果,初始化狀態和動作都可以視為因。

事件處理

複雜事件處理的目的是通過相應的負責對實時資料執行形影的處理策略,這些策略包括了推斷、查因、決策、預測等方面的應用。
事件推斷

主要利用事務狀態之間的約束關係,從一部分狀態屬性值可以推斷出另一部分的狀態屬性值。舉個栗子:1,1,2,3,5,8 ...... ,我們可以推斷出後面的是: 13,21 ......
事件查因

當出現結果狀態,並且知道初識狀態,可以查明某個動作的原因;同樣,知道結果,知道過程,就可以查明初始狀態的原因。這個相當於:f(x) = kx + b ,知道f(x) , 知道 kx+b , 那我們就知道 x .
事件決策

想得到某個結果狀態,知道初始狀態,決定執行什麼動作。該過程和規則引擎相似,例如某個規則符合條件後出發行動,然後執行報警等操作。
事件預測

該種情況知道事件初始狀態,以及將要做的動作,預測未來發生的結果狀態。例如:天氣預報。

Pattern API

FlinkCEP 提供了 Pattern API 用於對輸入流資料的複雜事件規則定義,並從事件流中抽取事件結果。

每個Pattern 都應該包含幾個步驟,或者叫做 state 。從一個 state 到另一個 state . 例如:

    Pattern.begin[LoginEvent]("begin")
    .where(_.eventType.equals("fail"))
    .next("next")
    .where(_.eventType.equals("fail"))
    .within(Time.seconds(5))

    // 或者如下
    Pattern.begin[Event]("start")
    .where(_.typeEvent.equals("temperature"))
    .next("middle")
    .subtype(classOf[TempEvent])
    .where(_.temp > 35.0)
    .followedBy("end")
    .where(_.name.equals("end"))

說明:

  1. 每一個state都應該有一個標識,比如:begin[LoginEvent]("begin")這裡的 "begin" 和 begin[Event]("start") 這裡的 "start".
  2. 每個state 都需要有一個唯一的名字,而且需要一個 filter 來過濾條 件,這個過濾條件定義事件需要符合的條件.例如:.where(_.eventType.equals("fail"))
  3. 我們也可以通過 subtype 來限制 Event的子型別,例如:.subtype(classOf[TempEvent])
  4. 事實上,你可以多次呼叫subtype 和 where 方法;而且如果 where 條件是不相關的,你可以通過 or 來指定一個單獨的 filter 函式:pattern.where(...).or(...);
  5. 之後,我們可以在此條件基礎上,通過next 或者 follow edBy 方法切換到下一個state next 的意思是說上一步符合條件的元素之後緊挨著的元素;而 followedBy 並不要求一定是挨著的元素。這兩者分別稱為嚴格近鄰和非嚴格近鄰。
  6. 最後,我們可以將所有的Pattern 的條件限定在一定的時間範圍內:within(Time.seconds(5))
  7. 時間可以是 Processing Time , 也可以是 Event Time.

Pattern檢測

    val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventSource, pattern)
    patternStream.select(loginEventSource.keyBy(_.userID) , loginfailPattern)

一旦獲得PatternStream ,我們就可以通過 select 或 flatSelect ,從一個 Map 序列找到我們需要的警告資訊。

select

select方法需要實現一個 PatternSelectFunction ,通過 select 方法來輸出需要的警告。它接受一個 Map 對,包含 string/event ,其中 key 為 state 的名字, event 則為真實的 Event。

    val loginfailPattern = patternStream.select(
    (pattern: Map[String, Iterable[LoginEvent]]) => {
      val first = pattern.getOrElse("begin", null).iterator.next()
      val second = pattern.getOrElse("next", null).iterator.next()

      Warning(first.userID, first.eventTime, second.eventTime, "warning")
    })

其返回值僅為1 條記錄。

flatSelect

通過實現PatternFlatSelectFunction ,實現與 select 相似的功能。唯一的區別就是 flatSelect 方法可以返回多條記錄, 它通過一個 Collector[OUT] 型別的引數來將要輸出的資料傳遞到下游。
超時事件的處理

通過 within 方法,我們的 parttern 規則 將匹配的事件 限定在一定的視窗範圍內。當有超過視窗時間 之 後 到達的 event ,我們可以通過在 select 或 flatSelect 中,實現PatternTimeoutFunction 和 PatternFlatTimeoutF unction 來處理這種情況。

 val out: OutputTag[String] = OutputTag[String]("side-output")
    patternStream.select(out){
      (pattern:Map[String,Iterable[Event]],timestamp:Long)=>{
        TimeoutEvent()
      }{
          (pattern:Map[String,Iterable[Event]],timestamp:Long)=>{
            ComplexEvent()
          }
        }
    }

大體的看完之後,我們FlinkCEP程式設計也基本就是醬紫。那現在就來詳細一點的說一下。

模式定義

個體Pattern可以是單次執行模式,也可以是迴圈執行模式。單次執行模式一次只接受一個事件,迴圈模式可以接受多個事件。通常情況下,可以指定迴圈次數將單次執行模式變為迴圈執行模式。每種模式能夠將多個條件組合應用到同一事件之上,條件組合可以通過 where 方法進行疊合。

個體 Pattern 都是通過 begin 方法定義的,例如以下通過 Pattern.begin 方法基於 Event 事件型別的 Pattern , 其中

val start = Pattern.begin[Event]("start_pattern")

下一步通過 Pattern.where() 方法在 Pattern 指定 Condition , 只有當 Condition 滿足之後,當前的 Pattern 才會接收事件。

start.where(_.typeEvent.equals("temperature"))

指定迴圈次數

對於已經建立好的 Pattern , 可以指定迴圈次數,形成迴圈執行的 Pattern , 且有 3 種 方式來指定迴圈方式。

  • times : 可以通過 times 指定固定的迴圈執行次數
// 指定2迴圈觸發 4 次
start.times(4)
// 可以指定迴圈次數範圍
start.times(2 , 4)
  • optional : 也可以通過 optional 關鍵字指定要麼不觸發,要麼觸發指定次數
// 指定2迴圈觸發 4 次
start.times(4).optional()
// 可以指定迴圈次數範圍
start.times(2 , 4).optional()
  • greedy: 可以通過 greedy 將 Pattern 標記為 貪婪模式,在 Pattern 匹配成功的前提下,會盡可能多的觸發
// 觸發 2,3,4 次,儘可能重複執行
start.times(2 , 4).optional()
// 觸發 0,2,3,4 次,儘可能重複執行
start.times(2 , 4).optional().greedy()
  • oneOrMore: 可以通過 oneOrMore 方法指定觸發一次或多次
// 觸發一次或者多次
start.oneOrMore()
// 觸發一次或者多次,儘可能重複執行
start.oneOrMore().greedy()
// 觸發 0 次或者 多次
start.oneOrMore().optional()
// 觸發 0 次或者 多次 , 儘可能多次執行
start.oneOrMore().optional().greedy()
  • timesOrMore: 通過 timesOrMore 方法可以指定觸發固定次數以上,例如執行兩次以上:
// 觸發兩次或者多次
start.timesOrMore(2)
// 觸發兩次或者多次,儘可能多次重複執行
start.timesOrMore(2).greedy()

模式條件

每個模式都需要指定觸發條件,作為時間進入到該模式是否接受的判斷依據,當時間中的數值滿足了條件,便進行下一步操作。在FlinkCEP中通過 patter.where()、pattern.or()、及patter.until()方法來為 Pattern 指定條件,且 Pattern 條件有 Iterative Conditions 、 Simple Conditions 及 Combining Conditions 三中型別。

迭代條件

Iterative Conditions 能夠對前面模式所有接收的事件進行處理,根據接收的事件集合統計出計算指標,並作為本次模式匹配中的條件輸入引數。如:

 .oneOrMore
      .subtype(classOf[TempEvent])
      .where(
        (value , ctx) => {
            // the condition for you
        }
      )

通過 subtype 將 Event 事件轉換為 TempEvent 事件,然後在 where 條件中通過使用 ctx.getEventsForPattern(...) 方法獲取 “middle” 模式所有接收得到 Event 記錄,並基於這些 Event 資料之上對溫度求取平均值,然後判斷當前事件的溫度是否小於平均值,然後判斷當前事件的溫度是否小於平均值。

簡單條件

Simple Condition 繼承於 Iternative Condition 類,其主要根據事件中的欄位資訊進行判斷,決定是否接受該事件。如下:

start.where(event=>event.enevtType.equals("temperature"))

同樣,我們可以通過 subtype 對事件進行子類型別轉換,然後在 where 方法中針對子類定義模式條件。

組合條件

組合條件是將簡單條件進行合併,通常情況也可以使用 where 方法進行條件組合,預設每個條件通過 AND 邏輯相連。如果需要使用 OR 邏輯 , 如:

pattern.where(event => event.name.startWith("foo").or(event => enevt.eventType.equals("temperature")))

終止條件

如果程式中使用了 oneOrMore 或者 oneOrMore().optional() 方法,則必須指定終止條件,否則模式中的規則會一直迴圈下去,如:

patern.oneOrMore().until(event => event.name.equals("end"))

請注意:在上述的迭代條件通過呼叫 ctx.getEventsForPattern("middle")

模式序列

將互相獨立的模式進行組合然後形成模式序列。模式序列基本的編寫方式和獨立模式一致,各個模式之間通過鄰近條件進行連線即可。其中有嚴格鄰近,寬鬆臨近,非確定寬鬆臨近三種臨近連線條件,如下:

val start : Pattern[]

嚴格鄰近

嚴格鄰近條件中,需要所有的事件都按照滿足模式條件,不允許忽略任意不滿足的模式。如下:在start Pattern 後使用 next 方法指定 下一個 Pattern ,生成嚴格鄰近的 Pattern.

val strict : Pattern[Event,_] = start.next("middle").where(...)

寬鬆鄰近

在寬鬆鄰近條件下,會忽略沒有成功匹配模式條件,並不會像嚴格鄰近要求的那麼高,可以簡單理解為 OR 的邏輯關係。如下:

val strict : Pattern[Event,_] = start.followeBy("middle").where(...)

非確定寬鬆鄰近

和寬鬆鄰近條件相比,非確定寬鬆鄰近條件指在模式匹配過程中可以忽略已經匹配的條件。如下:

​```scala
val nonDetermin : Pattern[Event,_] = start.followerByAny("middle").where(....)

除了上述條件外, Flink 還提供了 notNext()、notFollowerBy()等連結條件 。notNext() 表示不想讓某一模式跟另一個模式之後不發生;notFollowerBy() 強調不想讓某一模式觸發處於兩個模式之間觸發。

注意點:模式序列不能以 notFollowerBy() 結尾,且 not 型別的模式不能和 optional 關鍵字同時使用

模式組

模式序列可以作為 begin , followerBy , floowerByAny 及 next 等連線條件的輸入引數從而形成的模式組。在GroupPattern 上可以指定 oneOrMore 、 times 、 optional 等迴圈條件,應用在 GroupPattern 中的模式序列上,每個模式序列完成自己內部的條件匹配,最後在米歐式組層面對模型序列結果進行彙總。如:

    val value: GroupPattern[Event, _] = Pattern.begin(Pattern.begin[Event]("start")
      .where(_.name.equals("name"))
      .followedBy("start_middle")
      .where(_.name.equals("yang")))

    val value1: Pattern[Event, _] = Pattern.begin(Pattern.begin[Event]("start")
      .next("next_start")
      .where(_.name.equals("name"))
      .followedBy("next_middle")
      .where(_.name.equals("yang"))).times(3)

AfterMatchSkipStrategy

在給定的 Pattern 中,當同一事件符合多種模式條件組合之後,需要指定 AfterMatchSkipStrategy 策略以處理已經匹配的事件。在 AfterMatchSkipStrategy 配置中有四件事件處理策略,分別為 NO_SKIP / SKIP_PAST_LAST_EVENT / SKIP_TO_FIRST / SKIP_TO_LAST 。 每種策略的定義和使用方式如下:其中SKIP_TO_FIRST 和 SKIP_TO_LAST 在定義過程中需要指定有效的PatternName.

  • [ ] NO_SKIP: 該策略表示將所有可能匹配的事件進行輸出,不忽略任何一條。
AfterMatchSkipStrategy.noSkip()
  • [ ] SKIP_PAST_LAST_EVENT: 該策略表示忽略從模式條件開始觸發到當前觸發 Pattern 中的所有部分匹配事件。
AfterMatchSkipStrategy.skipPastLastEvent()
  • [ ] SKIP_TO_FIRST: 該策略表示忽略第一個匹配指定 PatternName 的 Pattern 其之前的部分匹配事件。
AfterMatchSkipStrategy.skipToFirst(patternName)
  • [ ] SKIP_TO_LAST 該策略表示忽略最後一個匹配指定 PatternName 的 Pattern 之前的部分匹配之間
AfterMatchSkipStrategy.skipToLast(patternName)
  • [ ] SKIP_TO_NEXT: 該策略表示忽略指定 PatternName 的 Pattern 之後的部分匹配事件
AfterMatchSkipStrategy.skipToNext(patternName)

選擇完 AfterMatchSkipStrategy 之後,可以再建立 Pattern 時 , 通過 begin 方法中指定 skipStrategy , 然後就可以將 AfterMatchSkipStrategy 應用到當前的 Pattern 中。

val skipStrategy = { }
Pattern.begin("pattern_name" , skipStrategy)

事件獲取

對於前面已經定義的模式序列或模式組,需要和輸入資料流進行結合,才能發現事件中潛在的匹配關係。如:

val input : DataStream[Event] = ...
val pattern : Pattern[Event, _] = ...
var comparator : EventComparator[Event] = ... // optional

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)

FlinkCEP 提供了 CEP.pattern 方法將 DataStream 和 Pattern 應用在一起,得到 PatternStream 型別資料集,且後續時間資料獲取都基於PatternStream 進行。另外可以選擇建立 EventComparator , 對傳入的 Pattern 中的事件 進行排序,當 Event Time 相等或者同時 到達 Pattern 時 , EventComparator 鍾定一的排序策略可以幫助事件的先後順序。

當可以 CEP.pattern 方法被執行後,會生成 PatternStream 資料集,該資料集中包含了所有匹配事件。目前在FlinkCEP中提供了 select 和 flatSelect 兩種方法從 PatternStream 提取事件結果。

通過 Select Function 抽取正常事件

可以通過在 PatternStream 的 Select 方法中傳入自定義 Seclect Function 完成對匹配事件的轉換與輸出。其中 Select Function 的輸入引數為 Map[String,Iterable[IN]],Map 中的 Key 為模式序列中的 Pattern 名稱, Value 為對應 Pattern 所接受的事件集合,格式為輸入事件的資料型別。需要注意的是: Select Funtion將會在每次呼叫後僅輸出一條結果 如下:

  def selectFunction (pattern:Map[String,Iterable[IN]]):OUT = {
    // 獲取 pattern 中的 startEvent
    val startEvent = pattern.get("start_pattern").get.next
    // 獲取 pattern 中的 middleEvent
    val middleEvent = pattern.get("middle_pattern").get.next
    // 返回結果
    OUT(startEvent , middleEvent)
  }

通過 Select Function 抽取超時事件

  val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventSource, pattern)
    // 建立 OutputTag ,並命名為 timeout-output
    val timeoutTag: OutputTag[String] = OutputTag[String]("timeout-output")
    // 呼叫 PatternStream Select() 並指定 timeoutTag
    patternStream.select(timeoutTag) {
      // 超時時間獲取
      (pattern: Map[String, Iterable[LoginEvent]], timestamp: Long) => {
        TimeOutEvent()
      }

    }{
      (pattern: Map[String, Iterable[LoginEvent]], timestamp: Long) => {
        NormalEvent()
      }
        // 呼叫 getSideOutput 方法,並指定 timeoutTag 將超時事件輸出
        val timeoutResult : DataStream[TimeOutEvent] = result.getSideOutput(timeoutTag)
    }

通過 Flat Select Function 抽取正常事件

Flat Seclect Function 和 Select Function 相似,不過 Flat Select Function 在每次呼叫可以返回任意數量的結果。因為 Flat Select Function 使用 Collector 作為返回結果的容器,可以將需要輸出的事件都放置在 Collector 中返回。如下:

  def faltSelectFunction(pattern:Map[String,Iterable[IN]],collector:Collector[OUT])={
    // 獲取 pattern 中的 startEvent
    val startEvent = pattern.get("start_pattern").get.next
    // 獲取 pattern 中的 middleEvent
    val middleEvent = pattern.get("middle_pattern").get.next
    // 根據 startEvent 返回結果
    for (i <- 0 to startEvent.value){
      collector.collect( OUT(startEvent , middleEvent))
    }
  }

通過 Flat Select Function 抽取超時事件

  val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventSource, pattern)
    // 建立 OutputTag ,並命名為 timeout-output
    val timeoutTag: OutputTag[String] = OutputTag[String]("timeout-output")
    // 呼叫 PatternStream Select() 並指定 timeoutTag
    patternStream.select(timeoutTag) {
      // 超時時間獲取
      (pattern: Map[String, Iterable[LoginEvent]], timestamp: Long , out:Collector[TimeoutEvent]) => {
        out.collect(TimeOutEvent())
      }

    }{
      (pattern: Map[String, Iterable[LoginEvent]], timestamp: Long) => {
        out.collect(NormalEvent())
      }
        // 呼叫 getSideOutput 方法,並指定 timeoutTag 將超時事件輸出
        val timeoutResult : DataStream[TimeOutEvent] = result.getSideOutput(timeoutTag)
    }