1. 程式人生 > >第五章 Flink Complex Event Processing 複雜事件處理

第五章 Flink Complex Event Processing 複雜事件處理

   轉載需標明出處                      [email protected]

 Complex Event Processing  複雜事件處理   

In the previous chapter, we talked about the Table API provided by Apache Flink and how we can use it to process relational data structures. This chapter onwards, we will start learning more about the libraries provided by Apache Flink and how we can use them for specific use cases. To start with, let’s try to understand a library called Comrlex Event Processing

(CEP). CEP is a very interesting but complex topic that has its value in various industries. Wherever there is a stream of events expected, naturally people want to perform complex event processing in all such use cases. Let’s try to understand what CEP is all about. 在前一章中, 我們討論了Apache Flink 提供的表 api, 以及如何使用它來處理關係資料結構。本章之後, 我們將開始瞭解有關 apacheflink 提供的庫的更多資訊, 以及如何將它們用於特定的用例。首先, 讓我們嘗試瞭解一個名為 Comrlex Event Processing
(CEP) 的庫。cep 是一個非常有趣但複雜的話題, 在各個行業都有其價值。只要有預期的事件流, 人們自然希望在所有此類用例中執行復雜的事件處理。讓我們試著瞭解 cep 的意義。

 

What is complex event processing? 什麼是複雜事件處理?

CEP analyzes streams of disparate events occurring with high frequency and low latency. These days, streaming events can be found in various industries, for example: cep 分析以高頻和低延遲發生的不同事件流。如今, 流媒體事件可以在不同的行業中找到, 例如:

 

In the oil and gas domain, sensor data comes from various drilling tools or from upstream oil pipeline equipment 在石油和天然氣領域, 感測器資料來自各種鑽井工具或上游石油管道裝置

In the security domain, activity data, malware information, and usage pattern data come from various end points 在安全域中, 活動資料、惡意軟體資訊和使用模式資料來自不同的端點

In the wearable domain, data comes from various wrist bands with information about your heart beat rate, your activity, and so on 在可穿戴領域, 資料來自不同的腕帶, 其中包含有關您的心跳率、活動等資訊

In the banking domain, data comes from credit card usage, banking activities, and so on  在銀行領域, 資料來自信用卡使用、銀行活動等


 

 

It is very important to analyze variation patterns to get notified in real time about any change in the regular assembly. CEP can understand patterns across the streams of events, sub-events, and their sequences. CEP helps to identify meaningful patterns and complex relationships among unrelated events, and sends notifications in real and near real time to prevent damage: 分析變體模式以實時獲得有關常規程式集中任何更改的通知是非常重要的。cep 可以瞭解跨事件流、子事件及其序列的模式。cep 有助於識別不相關事件之間有意義的模式和複雜關係, 並實時和近實時傳送通知, 以防止損壞:

 

 

The preceding diagram shows how the CEP flow works. Even though the flow looks simple, CEP has various abilities such as: 上圖顯示了 cep 流的工作原理。儘管流看起來很簡單, cep 也有各種能力, 例如:

The ability to produce results as soon as the input event stream is available在輸入事件流可用時生成結果的能力

The ability to provide computations such as aggregation over time and timeout between two events of interest提供計算 (如隨時間的聚合和兩個感興趣的事件之間的超時) 的能力

The ability to provide real-timeInear real-time alerts and notifications on detection of complex event patterns能夠提供實時輸入實時警報和通知, 用於檢測複雜事件模式

The ability to connect and correlate heterogeneous sources and analyze patterns in them連線異構源並將其關聯並分析其中模式的能力

The ability to achieve high-throughput, low-latency processing實現高吞吐量、低延遲處理的能力

 

There are various solutions available on the market. With big data technology advancements, we have multiple options like Apache Spark, Apache Samza, Apache Beam, among others, but none of them have a dedicated library to fit all solutions. Now let us try to understand what we can achieve with Flink’s CEP library.

市場上有各種各樣的解決方案。隨著大資料技術的進步, 我們有多種選擇, apache spark, apache samza, apache beam , 但沒有一個專用的庫, 以適應所有的解決方案。現在, 讓我們嘗試瞭解 flink cep 庫可以實現什麼。


 

 

Flink CEP

Apache Flink provides the Flink CEP library, which provides APIs to perform complex event processing. The library consists of the following core components: apache flink 提供 flink cep 庫, 該庫提供用於執行復雜事件處理的 api。該庫由以下核心元件組成:

Event stream

Pattern definition  模式定義

Pattern detection  模式檢測

Alert generation 警告生成

 

 

 

 

Flink CEP works on Flink’s streaming API called DataStream. A programmer needs to define the pattern to be detected from the stream of events and then Flink’s CEP engine detects the pattern and takes the appropriate action, such as generating alerts. flink cep 適用於 flink 名為 datastream 的流媒體 api。程式設計師需要定義要從事件流中檢測到的模式, 然後 flink 的 cep 引擎檢測到該模式並採取適當的操作, 例如生成警報。

In order to get started, we need to add the following Maven dependency:

<!--

https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala_2.10 -

->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-cep-scala_2.11</artifactId>

<version>1.1.4</version>

</dependency>


 

 

Event streams

A very important component of CEP is its input event stream. In earlier chapters, we have seen details of the DataStream API. Now let’s use that knowledge to implement CEP. The very first thing we need to do is define a Java POJO for the event. Let’s assume we need to monitor a temperature sensor event stream. CEP的一個非常重要的組成部分是它的輸入事件流。在前面的章節中, 我們看到了DataStream API的詳細資訊。現在, 讓我們使用這些知識來實現CEP。我們需要做的第一件事就是為事件定義一個 Java POJO。假設我們需要監視溫度感測器事件流。

First we define an abstract class and then extend this class.

 

 

The following code snippets demonstrate this. First, we write an abstract class as shown here:

package com.demo.chapter05;

 

public abstract class MonitoringEvent { private String machineName;

public String getMachineName() { return machineName;

}

 

public void setMachineName(String machineName) { this.machineName = machineName;

}

 

@Override

public int hashCode() { final int prime = 31; int result = 1;

result = prime * result + ((machineName == null)  0 : machineName.hashCode());

return result;

}

 

@Override

public boolean equals(Object obj) { if (this == obj)

return true; if (obj == null)

return false;


 

 

if (getClass() != obj.getClass()) return false;

MonitoringEvent other = (MonitoringEvent) obj; if (machineName == null) {

if (other.machineName != null) return false;

} else if (!machineName.equals(other.machineName)) return false;

return true;

}

 

public MonitoringEvent(String machineName) { super();

this.machineName = machineName;

}

 

}

 

Then we create a POJO for the actual temperature event:

package com.demo.chapter05;

public class TemperatureEvent extends MonitoringEvent { public TemperatureEvent(String machineName) {

super(machineName);

}

private double temperature; public double getTemperature() {

return temperature;

}

 

public void setTemperature(double temperature) { this.temperature = temperature;

}

 

@Override

public int hashCode() { final int prime = 31;

int result = super.hashCode(); long temp;

temp = Double.doubleToLongBits(temperature);

result = prime * result + (int) (temp ^ (temp >>> 32)); return result;

}


 

 

@Override

public boolean equals(Object obj) { if (this == obj)

return true;

if (!super.equals(obj)) return false;

if (getClass() != obj.getClass()) return false;

TemperatureEvent other = (TemperatureEvent) obj; if (Double.doubleToLongBits(temperature) !=

Double.doubleToLongBits(other.temperature)) return false;

return true;

}

 

public TemperatureEvent(String machineName, double temperature) { super(machineName);

this.temperature = temperature;

}

 

@Override

public String toString() {

return "TemperatureEvent [getTemperature()=" + getTemperature() + ", getMachineName()=" + getMachineName()

+ "]";

}

 

}

 

Now we can define the event source as follows: In Java:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0),

new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1), new TemperatureEvent("xyz", 22.2),

new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.3), new TemperatureEvent("xyz", 22.1),

new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz",


 

22.7),



new TemperatureEvent("xyz", 27.0));


 

 

In Scala:

val env: StreamExecutionEnvironment =

StreamExecutionEnvironment.getExecutionEnvironment

val input: DataStream[TemperatureEvent] = env.fromElements(new TemperatureEvent("xyz", 22.0),

new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1),

new TemperatureEvent("xyz", 22.2),

new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.3),

new TemperatureEvent("xyz", 22.1),

new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz", 22.7),

new TemperatureEvent("xyz", 27.0))

 

 

Pattern API

The Pattern API allows you to define complex event patterns very easily. Each pattern consists of multiple states. To go from one state to another state, generally we need to define the conditions. The conditions could be continuity or filtered out events. Pattern API 允許您非常輕鬆地定義複雜的事件模式。每個模式由多個狀態組成。要從一種狀態到另一個狀態, 一般我們需要定義條件。條件可以是連續性, 也可以是篩選出事件。

 

 

Let’s try to understand each pattern operation in detail.


 

 

Begin

The initial state can be defined as follows: In Java:

Pattern<Event, > start = Pattern.<Event>begin("start");

 

In Scala:

val start : Pattern[Event, _] = Pattern.begin("start")

 

 

Filter

We can also specify the filter condition for the initial state: In Java:

start.where(new FilterFunction<Event>() {

@Override

public boolean filter(Event value) { return ... // condition

}

});

 

In Scala:

 

start.where(event => ... /* condition */)

 

 

Subtype

We can also filter out events based on their sub-types, using the subtype() method: In Java:

start.subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {

@Override

public boolean filter(SubEvent value) { return ... // condition

}

});

 

In Scala:

 

start.subtype(classOf[SubEvent]).where(subEvent => ... /* condition */)


 

 

OR

Pattern API also allows us define multiple conditions together. We can use OR and AND

operators. In Java:

pattern.where(new FilterFunction<Event>() {

@Override

public boolean filter(Event value) { return ... // condition

}

}).or(new FilterFunction<Event>() {

@Override

public boolean filter(Event value) { return ... // or condition

}

});

 

In Scala:

 

pattern.where(event => ... /* condition */).or(event => ... /* or condition

*/)

 

 

Continuity

As stated earlier, we do not always need to filter out events. There can always be some pattern where we need continuity instead of filters.

Continuity can be of two types strict continuity and non-strict continuity.

如前所述, 我們並不總是需要篩選出事件。總是可以有一些模式, 我們需要連續性, 而不是過濾器。連續性可以是兩種型別-嚴格的連續性和不嚴格的連續性。

Strict continuity

Strict continuity needs two events to succeed directly which means there should be no other event in between. This pattern can be defined by next().嚴格的連續性需要兩個事件直接成功, 這意味著兩者之間不應該有其他事件。此模式可以由下一個  定義。

In Java:

Pattern<Event, > strictNext = start.next("middle");

 

In Scala:

val strictNext: Pattern[Event, _] = start.next("middle")


 

 

Non-strict continuity

Non-strict continuity can be stated as other events are allowed to be in between the specific two events. This pattern can be defined by followedBy().非嚴格的連續性可以說是, 因為允許在特定的兩個事件之間出現其他事件。此模式可以通過以下定義 ()。

In Java:

Pattern<Event, > nonStrictNext = start.followedBy("middle");

 

In Scala:

val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")

 

 

Within

Pattern API also allows us to do pattern matching based on time intervals. We can define a time-based temporal constraint as follows. Pattern API 還允許我們根據時間間隔進行模式匹配。我們可以定義一個基於時間的時間約束, 如下所示。

In Java:

next.within(Time.seconds(30));

 

In Scala:

next.within(Time.seconds(10))

 

 

Detecting patterns

To detect patterns against a stream of events, we need to run the stream though the pattern. The CEP.pattern() returns PatternStream. 要針對事件流檢測模式, 我們需要通過模式執行該流。cep 模式 () 返回 patternstream。

The following code snippet shows how we can detect a pattern. First the pattern is defined to check if the temperature value is greater than 26.0 degrees in 10 seconds. 下面的程式碼段演示瞭如何檢測模式。首先定義模式是為了在10秒內檢查溫度值是否大於22.0度。

In Java:

Pattern<TemperatureEvent, > warningPattern = Pattern.<TemperatureEvent> begin("first")

.subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {

public boolean filter(TemperatureEvent value) { if (value.getTemperature() >= 26.0) {

return true;

}


 

 

return false;

}

}).within(Time.seconds(10));

 

PatternStream<TemperatureEvent> patternStream = CEP.pattern(inputEventStream, warningPattern);

In Scala:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

 

val input = // data

 

val pattern: Pattern[TempEvent, _] = Pattern.begin("start").where(event => event.temp >= 26.0)

val patternStream: PatternStream[TempEvent] = CEP.pattern(input, pattern)

 

 

Selecting from patterns

Once the pattern stream is available, we need to select the pattern from it and then take appropriate actions based on it. We can use the select or flatSelect method to select data from the pattern. 一旦模式流可用, 我們需要從中選擇模式, 然後根據它採取適當的行動。我們可以使用選擇或平面選擇方法從模式中選擇資料。

 

Select

The select method needs PatternSelectionFunction implementation. It has a select method which would be called for each event sequence. The select method receives a map of stringIevent pairs of matched events. The string is defined by the name of the state. The select method returns exactly one result. 選擇方法需要模式選擇函式實現。它有一個選擇方法, 將為每個事件序列呼叫該方法。選擇方法接收匹配事件的字串事件對的對映。字串由狀態的名稱定義。選擇方法只返回一個結果。

To collect the results, we need to define the output POJO. In our case, let’s say we need to generate alerts as output. Then we need to define POJO as follows: 為了收集結果, 我們需要定義輸出 pojo。在我們的例子中, 假設我們需要生成警報作為輸出。然後, 我們需要定義 pojo, 如下所示:

 

package com.demo.chapter05; public class Alert {

private String message;

 

public String getMessage() { return message;

}


 

 

public void setMessage(String message) { this.message = message;

}

 

public Alert(String message) { super();

this.message = message;

}

 

@Override

public String toString() {

return "Alert [message=" + message + "]";

}

 

@Override

public int hashCode() { final int prime = 31; int result = 1;

result = prime * result + ((message == null)  0 : message.hashCode());

return result;

}

 

@Override

public boolean equals(Object obj) { if (this == obj)

return true; if (obj == null)

return false;

if (getClass() != obj.getClass()) return false;

Alert other = (Alert) obj; if (message == null) {

if (other.message != null) return false;

} else if (!message.equals(other.message)) return false;

return true;

}

 

}


 

 

Next we define the select functions. In Java:

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {

@Override

public OUT select(Map<String, IN> pattern) { IN startEvent = pattern.get("start"); IN endEvent = pattern.get("end"); return new OUT(startEvent, endEvent);

}

}

 

In Scala:

 

def selectFn(pattern : mutable.Map[String, IN]): OUT = { val startEvent = pattern.get("start").get

val endEvent = pattern.get("end").get OUT(startEvent, endEvent)

}

 

 

flatSelect

The flatSelect method is similar to the select method. The only difference between the two is that flatSelect can return an arbitrary number of results. The flatSelect method has an additional Collector parameter which is used for output element. 平板選擇方法類似於選擇方法。兩者之間的唯一區別是, 平面選擇可以返回任意數量的結果。平面選擇方法有一個額外的收集器引數, 用於輸出元素。

The following example shows how we can use the flatSelect method. In Java:

class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {

@Override

public void select(Map<String, IN> pattern, Collector<OUT> collector) { IN startEvent = pattern.get("start");

IN endEvent = pattern.get("end");

 

for (int i = 0; i < startEvent.getValue(); i++ ) { collector.collect(new OUT(startEvent, endEvent));

}

}

}


 

 

In Scala:

def flatSelectFn(pattern : mutable.Map[String, IN], collector : Collector[OUT]) = {

val startEvent = pattern.get("start").get val endEvent = pattern.get("end").get for (i <- 0 to startEvent.getValue) {

collector.collect(OUT(startEvent, endEvent))

}

}

 

 

Handling timed-out partial patterns

Sometimes we may miss out certain events if we have constrained the patterns with a time boundary. It is possible that events may be discarded because they exceed the length. In order to take actions on the timed out events, the select and flatSelect methods allow a timeout handler. This handler is called for each timeout event pattern. 有時, 如果我們限制了具有時間邊界的模式, 我們可能會錯過某些事件。事件可能會因為超過長度而被丟棄。為了對超時事件執行操作, select 和平面 select 方法允許超時處理程式。為每個超時事件模式呼叫此處理程式。

In this case, the select method contains two parameters: PatternSelectFunction and PatternTimeoutFunction. The return type for a timed out function can be different from the select pattern function. The timed out event and select event are wrapped in the class Either.Right and Either.Left.

The following code snippets shows how we do things in practice. In Java:

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

 

DataStream<Either<TimeoutEvent, ComplexEvent>> result = patternStream.select(

new PatternTimeoutFunction<Event, TimeoutEvent>() {...}, new PatternSelectFunction<Event, ComplexEvent>() {...}

);

 

DataStream<Either<TimeoutEvent, ComplexEvent>> flatResult = patternStream.flatSelect(

new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...}, new PatternFlatSelectFunction<Event, ComplexEvent>() {...}

);


 

 

In Scala, the select API:

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

 

DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.select{

(pattern: mutable.Map[String, Event], timestamp: Long) => TimeoutEvent()

} {

pattern: mutable.Map[String, Event] => ComplexEvent()

}

 

The flatSelect API is called with the Collector as it can emit an arbitrary number of events:

 

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

 

DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect{

(pattern: mutable.Map[String, Event], timestamp: Long, out: Collector[TimeoutEvent]) =>

out.collect(TimeoutEvent())

} {

(pattern: mutable.Map[String, Event], out: Collector[ComplexEvent]) => out.collect(ComplexEvent())

}

 

 

Use case - complex event processing on a temperature sensor

In earlier sections, we learnt about various features provided by the Flink CEP engine. Now it’s time to understand how we can use it in real-world solutions. For that, let’s assume we work for a mechanical company which produces some products. In the product factory, there is a need to constantly monitor certain machines. The factory has already set up the sensors which keep on sending the temperature of the machines at a given time. 在前面的部分中, 我們瞭解了 flink cep 引擎提供的各種功能。現在×××解我們如何在現實世界的解決方案中使用它了。為此, 讓我們假設我們在一家生產一些產品的機械公司工作。在產品工廠, 需要不斷監控某些機器。工廠已經安裝了感測器, 在給定的時間繼續傳送機器的溫度。

 

Now we will be setting up a system that constantly monitors the temperature value and generates an alert if the temperature exceeds a certain value.

現在我們將設定一個系統, 不斷監測溫度值, 並在溫度超過某個值的情況下生成警報。


 

 

We can use the following architecture:

 

 

Here we will be using Kafka to collect events from sensors. In order to write a Java application, we first need to create a Maven project and add the following dependency:

 

<!--

https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala_2.11 -

->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-cep-scala_2.11</artifactId>

<version>1.1.4</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink- streaming-java_2.11 -->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_2.11</artifactId>

<version>1.1.4</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink- streaming-scala_2.11 -->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-scala_2.1