1. 程式人生 > >Flink之CEP案例分析-網路攻擊檢測

Flink之CEP案例分析-網路攻擊檢測

上一篇我們介紹了Flink CEP的API,這一篇我們將以結合一個案例來練習使用CEP的API編寫應用程式,以強化對API的理解。所選取的案例是對網路遭受的潛在攻擊進行檢測並給出告警。當下網際網路安全形勢仍然嚴峻,網路攻擊屢見不鮮且花樣眾多,這裡我們以DDOS(分散式拒絕服務攻擊)產生的流入流量來作為遭受攻擊的判斷依據。

假定一家雲服務提供商,有多個跨地區的資料中心,每個資料中心會定時向監控中心上報其瞬時流量。

我們將檢測的結果分為三個等級:

  • 正常:流量在預設的正常範圍內;
  • 警告:某資料中心在10秒內連續兩次上報的流量超過認定的正常值;
  • 報警:某資料中心在30秒內連續兩次匹配警告;

首先,我們構建source,這裡我們選擇的是並行source,因此需要繼承RichParallelSourceFunction類。所有的資料通過模擬器隨機生成,其中資料中心編號為整型且取值範圍為[0, 10),資料生成的事件間隔由PAUSE常量指定,預設為100毫秒:

//parallel source
DataStream<MonitorEvent> inputEventStream = env.addSource(
    new MonitorEventSource(
        MAX_DATACENTER_ID,
        STREAM_STD,
        STREAM_MEAN,
        PAUSE
    )
).assignTimestampsAndWatermarks(new IngestionTimeExtractor<MonitorEvent>());

下面,我們來構建警告模式,按照我們設定的警告等級,其模式定義如下:

Pattern<MonitorEvent, ?> warningPattern = Pattern.<MonitorEvent>begin("first")
    .subtype(NetworkStreamEvent.class)
    .where(evt -> evt.getStream() >= STREAM_THRESHOLD)
    .next("second")
    .subtype(NetworkStreamEvent.class)
    .where(evt -> evt.getStream() >= STREAM_THRESHOLD)
    .within
(Time.seconds(10));

根據該模式構建模式流:

PatternStream<MonitorEvent> warningPatternStream =
    CEP.pattern(inputEventStream.keyBy("dataCenterId"), warningPattern);

在警告的模式流中篩選出配對的警告事件對,生成警告事件物件流(告警事件物件會算出,前後兩個匹配的流量事件的平均值):

DataStream<NetworkStreamWarning> warnings = warningPatternStream.select(
    (Map<String, MonitorEvent> pattern) -> {
        NetworkStreamEvent first = (NetworkStreamEvent) pattern.get("first");
        NetworkStreamEvent second = (NetworkStreamEvent) pattern.get("second");

        return new NetworkStreamWarning(first.getDataCenterId(),
            (first.getStream() + second.getStream()) / 2);
    }
);

按照設定的等級,告警模式定義如下:

Pattern<NetworkStreamWarning, ?> alertPattern = Pattern.<NetworkStreamWarning>
    begin("first").next("second").within(Time.seconds(30));

在警告事件流中應用告警模式,得到告警模式流:

PatternStream<NetworkStreamWarning> alertPatternStream = CEP.pattern(warnings.keyBy
    ("dataCenterId"), alertPattern);

在告警模式流中匹配警告模式對,如果模式對中第一個警告物件的平均流量值小於第二個警告物件的平均流量值,則構建告警物件並輸出該物件從而形成告警流:

DataStream<NetworkStreamAlert> alerts = alertPatternStream.flatSelect(
    (Map<String, NetworkStreamWarning> pattern, Collector<NetworkStreamAlert> out) -> {
        NetworkStreamWarning first = pattern.get("first");
        NetworkStreamWarning second = pattern.get("second");

        //first avg < second avg
        if (first.getAverageStream() < second.getAverageStream()) {
            out.collect(new NetworkStreamAlert(first.getDataCenterId()));
        }
    }
);

最終,sink到控制檯:

warnings.print();
alerts.print();

從上面的程式碼段可見,CEP的關鍵是定義合適的模式。關於模式的相關的API,我們之前已進行過分析。為了節省篇幅,本文只列出了核心程式碼片段。

需要注意的是,因為包含Java 8的lambdas,當你使用javac作為編譯器時,將會得到錯誤提示:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: 
The generic type parameters of 'Map' are missing. 
It seems that your compiler has not stored them into the .class file. 
Currently, only the Eclipse JDT compiler preserves the type information necessary to 
use the lambdas feature type-safely. 
See the documentation for more information about how to compile jobs containing lambda expressions.
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1331)
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:1317)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:347)
at org.apache.flink.cep.PatternStream.select(PatternStream.java:81)
at com.diveintoapacheflink.chapter11.NetworkAttackMonitor.main(NetworkAttackMonitor.java:55)
at ...

解決方案是使用Eclipse JDT來編譯程式碼。

微信掃碼關注公眾號:Apache_Flink

apache_flink_weichat

QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)

qrcode_for_apache_flink_qq_group

相關推薦

FlinkCEP案例分析-網路攻擊檢測

上一篇我們介紹了Flink CEP的API,這一篇我們將以結合一個案例來練習使用CEP的API編寫應用程式,以強化對API的理解。所選取的案例是對網路遭受的潛在攻擊進行檢測並給出告警。當下網際網路安全形勢仍然嚴峻,網路攻擊屢見不鮮且花樣眾多,這裡我們以DDOS(

【 MATLAB 】訊號處理工具箱 fft 案例分析

上篇博文:【 MATLAB 】訊號處理工具箱之fft簡介及案例分析介紹了MATLAB訊號處理工具箱中的訊號變換 fft 並分析了一個案例,就是被噪聲汙染了的訊號的頻譜分析。 這篇博文繼續分析幾個小案例: Gaussian Pulse 這個案例是將高斯脈衝從時域變換到頻域,高斯脈衝的資

使用Spark Mlib K-Means演算法分析網路攻擊資料

package apache.spark.mlib.rdd.kmeanclustering import org.apache.spark.ml.clustering.KMeans import org.apache.spark.ml.feature.{StandardScaler, Vect

JavaScript學習筆記圖片庫案例分析

all 指定節點 學習 snapshot att tle art you lse 本文實例講述了JavaScript圖片庫。分享給大家供大家參考,具體如下: 一、一個javascript 圖片庫實例,下面是效果圖 點擊頂部導航,會在本頁面進行刷新圖片,然後,在底部會顯示文

吳恩達深度學習筆記卷積神經網路(目標檢測)

3.1 目標定位 定位分類問題意味著我們不僅要用演算法判斷出圖片中是否有該物體,還要標記出它的位置,例如圖片有一輛汽車,我們需要用邊框把汽車圈起來。 影象分類問題已不陌生,例如輸入一張圖片到多層卷積神經網路,它會輸出一個特徵向量,並反饋給softmax來預測

Wireshark進階網路問題案例分析

本文假定的基礎是閱讀者會使用Wireshark了,這裡就對一些應用的場景以及一些不正常的網路環境來進行分析的案例~ 這裡先列一下篩選器的語法: 過濾語法: 限定詞 例子 Type host、net、port Dir src、dst Protocol ether、ip、tcp

Elasticsearch學習深入聚合分析三---案例實戰

引用 實戰 avg buck oba core 電視 針對 過濾 1. 統計指定品牌下每個顏色的銷量 任何的聚合,都必須在搜索出來的結果數據中進行,搜索結果,就是聚合分析操作的scope GET /tvs/sales/_search { "size": 0, "

Elasticsearch學習深入聚合分析五---案例實戰

ppi ont doc indices 理解 req eve 同步 nod 1. fielddata核心原理   fielddata加載到內存的過程是lazy加載的,對一個analzyed field執行聚合時,才會加載,而且是field-level加載的,一個index的

案例分析運行順序

結束 per color void 結果 方法 word pre subclass public class shunXu { private static shunXu d = new shunXu(); private SubClass t = new

APP案例分析華為瀏覽器

logs es2017 感覺 尋找 nbsp 5.1 是否 華為 分析 第一部分 對華為瀏覽器的調研,評測 1、對華為瀏覽器的第一次上手體驗   我會使用華為瀏覽器呢,是因為我買的手機是華為nova,該瀏覽器也是手機裏面預裝的。剛開始用的時候也沒太註意,感覺跟以前用的其他

第2次作業:軟件案例分析王者榮耀

好聽 來源 英雄 http 數據 了解 碎片 img 身邊 一、產品相關信息   隨著時代的不斷發展,我們的生活越來越豐富多彩。以小見大,我們接觸的手機軟件越來越多而且花樣百出:微信、網易雲音樂、王者榮耀、淘寶、STEAM、支付寶、微博等。這些軟件有好有懷,他們不斷地發現問

第二次作業:軟件案例分析微信

padding 自己的 習慣 作用 郵箱 1.2 store 漂流瓶 用戶體驗 1.產品相關信息  1.1我選擇的產品是:微信  1.2為什麽選擇該產品作為分析? 微信是時下廣泛使用的聊天軟件之一,具有很強的代表性,其功能已較為完善,具有更多的借鑒和學習作

ADO.NetSqlConnection、 Sqlcommand的應用學習心得(思維導圖,知識解析,案例分析

ros statistic system 啟動 clone() 連接字符串 esp ans 用戶 ADO.Net之SqlConnection、 Sqlcommand的應用 一、思維導圖: ADO.NET與SQL連接: 二、知識點介紹: SqlConnecti

Linux網路程式設計案例分析

本程式碼來自於博主:輝夜星辰  本篇主要對執行程式碼中出現的問題進行分析,程式碼本身的內容後續展開討論。 伺服器端程式碼 1 /* 2 Linux網路程式設計之TCP程式設計,伺服器端讀資料 3 socket函式之後,返回值serfd,作為後面所有網路程式設計函式

大資料電話日誌分析callLog案例(四)

一、修改kafka資料在主題中的貯存時間,預設是7天 ------------------------------------------------- [kafka/conf/server.properties] log.retention.hours=1 二、使用hive進行聚

大資料電話日誌分析callLog案例(三)

一、查詢使用者最近的通話資訊 -------------------------------------------- 1.實現分析 使用ssm可視介面提供查詢串 -- controller連線 hiveserver2 -- 將命令轉化成hsql語句 -- hive繫結hba

大資料電話日誌分析callLog案例(二)

一、前端實現 -- 按照時間段查詢通話記錄 ----------------------------------------- 1.完善calllog的dao類calllog.class ----------------------------------------------

Unity載入網路圖片並顯示在UGUI上,解決載入網路圖片出現問號的問題及其案例分析,例項Demo親測可用

Unity載入網路圖片並顯示在UGUI上,解決載入網路圖片出現問號的問題及其案例分析,例項Demo親測可用 最近自己在載入網路圖片的時候也遇到了載入的圖片無法顯示或者是問號的問題。下面就分析下為什麼會出現這樣的情況。   首先我們直接上程式碼(比較簡單) using U

【 MATLAB 】訊號處理工具箱fft簡介及案例分析

目錄   Syntax Description Y = fft(X) Y = fft(X,n) Y = fft(X,n,dim) Examples Noisy Signal Syntax Y = fft(X) Y = fft(X

【 MATLAB 】訊號處理工具箱 ifft 簡介及案例分析

這篇博文和上篇博文對應:【 MATLAB 】訊號處理工具箱之fft簡介及案例分析 目錄 ifft Syntax Description 案例分析 Inverse Transform of Vector Padded Inverse Transform of Matri