1. 程式人生 > >7、Flink 流計算處理和批處理平臺

7、Flink 流計算處理和批處理平臺

一、Flink 基本概念

Flink 是一個批處理和流處理結合的統一計算框架,其核心是一個提供了資料分發以及並行化計算的流資料處理引擎。它的最大亮點是流處理,是業界最頂級的開源流處理引擎。Flink 與 Storm 類似,屬於事件驅動型實時流系統。

所謂說事件驅動型指的就是一個應用提交之後,除非明確的指定停止,否則,該業務會一直持續的執行,它的執行條件就是觸發了某一個事件,比如在淘寶中,我們付款需要在支付寶付款,但是付款成功與否的條件是從淘寶獲取的,支付寶通過介面向淘寶反饋扣款結果,這個計算的應用是一直存在的,它需要獲取支付寶扣款的結果,將結果進行計算加入到後臺資料庫,記錄日誌並且向淘寶反饋扣款成功的資訊。這個時候,這一系列的操作都是由於使用者觸發了付款這個事件而導致的,之後系統就會進行這個計算,應用是持續存在的,沒有事件驅動的情況下,這個應用是處於靜止狀態的,事件驅動之後,應用進行計算和反饋。

1.批處理和流處理

批處理在大資料世界有著悠久的歷史。批處理主要操作大容量靜態資料集,並在計算過程完成後返回結果。

批處理模式中使用的資料集通常符合下列特徵:

(1) 有界:批處理資料集代表資料的有限集合

(2) 持久:資料通常始終儲存在某種型別的持久儲存位置中

(3) 大量:批處理操作通常是處理極為海量資料集的唯一方法

批處理非常適合需要訪問全套記錄才能完成的計算工作。例如在計算總數和平均數時,必須將資料集作為一個整體加以處理,而不能將其視作多條記錄的集合。這些操作要求在計算進行過程中資料維持自己的狀態。

需要處理大量資料的任務通常最適合用批處理操作進行處理。無論直接從持久儲存裝置處理資料集,或首先將資料集載入記憶體,批處理系統在設計過程中就充分考慮了資料的量,可提供充足的處理資源。由於批處理在應對大量持久資料方面的表現極為出色,因此經常被用於對歷史資料進行分析。大量資料的處理需要付出大量時間,因此批處理不適合對處理時間要求較高的場合。流處理系統會對隨時進入系統的資料進行計算。相比批處理模式,這是一種截然不同的處理方式。流處理方式無需針對整個資料集執行操作,而是對通過系統傳輸的每個資料項執行操作。

流處理中的資料集是“無邊界”的,這就產生了幾個重要的影響:

(1) 完整資料集只能代表截至目前已經進入到系統中的資料總量。

(2) 工作資料集也許更相關,在特定時間只能代表某個單一資料項。

(3) 處理工作是基於事件的,除非明確停止否則沒有“盡頭”。處理結果立刻可用,並會隨著新資料的抵達繼續更新。

流處理系統可以處理幾乎無限量的資料,但同一時間只能處理一條(真正的流處理)或很少量(微批處理,Micro-batch Processing)資料,不同記錄間只維持最少量的狀態。雖然大部分系統提供了用於維持某些狀態的方法,但流處理主要針對副作用更少,更加功能性的處理(Functional processing)進行優化。

功能性操作主要側重於狀態或副作用有限的離散步驟。針對同一個資料執行同一個操作會或略其他因素產生相同的結果,此類處理非常適合流處理,因為不同項的狀態通常是某些困難、限制,以及某些情況下不需要的結果的結合體。因此雖然某些型別的狀態管理通常是可行的,但這些框架通常在不具備狀態管理機制時更簡單也更高效。

此類處理非常適合某些型別的工作負載。有近實時處理需求的任務很適合使用流處理模式。分析、伺服器或應用程式錯誤日誌,以及其他基於時間的衡量指標是最適合的型別,因為對這些領域的資料變化做出響應對於業務職能來說是極為關鍵的。流處理很適合用來處理必須對變動或峰值做出響應,並且關注一段時間內變化趨勢的資料。

2.Flink 特點和應用場景

Flink 最適合的應用場景是低時延的資料處理場景:高併發處理資料,時延毫秒級,且兼具可靠性。

典型應用場景有:

(1) 網際網路金融業務。

(2) 點選流日誌處理。

(3) 輿情(輿論情緒)監控。 Flink 的特點有以下幾種:

(1) 低時延:提供 ms 級時延的處理能力。

(2) Exactly Once:提供非同步快照機制,保證所有資料真正只處理一次

(3) HA:JobManager 支援主備模式,保證無單點故障。

(4) 水平擴充套件能力:TaskManager 支援手動水平擴充套件。

Flink 能夠支援 Yarn,能夠從 HDFS 和 HBase 中獲取資料;能夠使用所有的Hadoop 的格式化輸入和輸出;能夠使用 Hadoop 原有的 Mappers 和 Reducers,並且能與 Flink 的操作混合使用;能夠更快的執行 Hadoop 的作業。

二、Flink 架構

1.Flink 元件架構

(1) Data storage 底層是資料儲存

(2) Single node execution 表示的是部署方式

(3) Local Environment 等表示的是不同的執行環境

(4) Flink Local Runtime 表示是執行執行緒

(5) Flink Optimizer,Flink Stream Builder 等表示的是優化器

(6) Common API 表示的是 Flink 平臺的 API

(7) Scala API 和 Java API 表示的是對外提供的 API

該邏輯圖按照從上向下的結構,我們可以看出,最高層的元件都是 API 介面,用於提供使用者的接入。第二層主要是建立編譯工作流,並且對工作流做優化操作。然後將輸入的資料按照建立好的工作流去執行,通過 Flink Local Runtime 元件來執行計算。第三層是環境層,不同的資料和應用的執行環境不同,就比如有一些遊戲需要執行在 Java 環境中,在對不同的資料進行計算的時候,我們需要的底層環境也是不同的。最下邊的一層是部署和資料的最底層,Flink 預設支援兩種部署模式,一種是單獨部署,也就是指 Flink 直接部署在叢集上,作為獨立計算工具執行。另一種是 Yarn 部署,就是將 Flink 認知為是 Hadoop 中的一個元件,和 Yarn 對接來使用。這樣做可以充分利用各個元件的優勢,元件之間互相結合來進行工作的執行。

2.Flink 的資料結構

DataStream 是資料模型,所有的資料在進入 Flink 到從 Flink 輸出都必須要按照 DataStream 的模型來進行計算和資料的轉換。Flink 用類 DataStream 來表示程式中的流式資料。使用者可以認為它們是含有重複資料的不可修改的集合(collection),DataStream 中元素的數量是無限的。這裡有兩個重點,首先, Stream 流式資料可能會存在有重複的資料,這點本身無可厚非,我們在實際寫入資料的時候,不同的使用者提交相同的資料是很有可能的,而系統也必須要對每一個相同的資料做都做相關的計算操作。那麼流資料還有一個特點就是元素是無限的。我們認為流式資料是一個無頭無尾的表,舊的資料已經計算完成就被淘汰,而新的資料會被切分成資料分片新增到資料流的結尾。

DataStream 之間的運算元操作:

(1) 含有 Window 的是視窗操作,與後面的視窗操作相關連,之間的關係可以通過 reduce,fold,sum,max 函式進行管關聯。

(2) connect:進行 Stream 之間的連線,可以通過 flatmap,map 函式進行操作。

(3) JoinedStream :進行 Stream 之間的 join 操作,類似於資料庫中的 join,可以通過 join 函式等進行關聯。

(4) CoGroupedStream:Stream 之間的聯合,類似於關係資料庫中的 group 操作,可以通過 coGroup 函式進行關聯。

(5) KeyedStream:主要是對資料流依據 key 進行處理,可以通過 keyBy 函式進行處理。

DataStream 在計算中一共分為了三個步驟:DataSource、Transformation 和DataSink。

(1) Data source:流資料來源的接入,支援 HDFS 檔案、kafka、文字資料等。

(2) Transformations:流資料轉換。

(3) Data sink:資料輸出,支援 HDFS、kafka、文字等。

在 DataStream 中,資料流轉換流程與 Spark 類似:

(1) 從 HDFS 讀取資料到 DataStream 中

(2) 接下來進行相關運算元操作,如 flatMap,Map,keyBy

(3) 接下來是視窗操作或運算元操作(4) 最後處理結果 sink 到 HDFS

三、Flink 執行流程

(1) Client:Flink  Client 主要給使用者提供向 Flink 系統提交使用者任務(流式作業)的能力。

(2) TaskManager :Flink 系統的業務執行節點,執行具體的使用者任務。TaskManager 可以有多個,各個 TaskManager 都平等。

(3) JobManager:Flink 系統的管理節點,管理所有的 TaskManager,並決策使用者任務在哪些 Taskmanager 執行。JobManager 在 HA 模式下可以有多個,但只有一個主 JobManager。

(4) TaskSlot(任務槽):類似 yarn 中的 container 用於資源隔離,但是該元件只包含記憶體資源,不包含 cpu 資源。每一個 TaskManager 當中包含 3 個 Task Slot,TaskManager 最多能同時併發執行的任務是可以控制的,那就是 3 個,因為不能超過 slot 的數量。 slot 有獨佔的記憶體空間,這樣在一個 TaskManager 中可以執行多個不同的作業,作業之間不受影響。slot之間可以共享 JVM 資源, 可以共享 Dataset 和資料結構,也可以通過多路複用(Multiplexing) 共享 TCP 連線和心跳訊息(Heatbeat Message)。

(5) Task:任務執行的單元。執行流程:

(1) 任務的執行流程主要是分成了工作流的下發建立和資料流的執行流程兩個部分。在執行資料流計算之前,必須先把任務的執行流程先做好。所以 Client 收到使用者提交的應用之後,會通過 FlinkProgram 將使用者提交的應用轉換成為流式作業,以 Topology 的形式提交到 JobManager 中,該流式作業的 Topology 如果沒有在使用者的強制指定關閉的情況下,會一直持續的按照事件驅動型進行執行。

(2) JobManager 通過 Actor 程序和其他元件進行聯絡,通過 scheduler 程序檢查當前叢集中所有 TaskManager 中的叢集負載,選擇負載最小的TaskManager,將任務下發到不同的 TaskManager 中。

(3) TaskManager 其實可以理解成為是節點,TaskManager 通過 ActorSystem收到 JobManager 的請求之後,下一步會將提交的作業進行下發執行,但是執行之前 TaskManager 還需要檢測當前叢集資源的使用情況,將記憶體資源封裝成 TaskSlot,下發到其中進行執行。CPU 資源由節點所有程序共享。

(4) 最終 TaskSlot 執行完任務之後,會將執行的結果直接傳送到下一個TaskManager 中,而不是反饋給 JobManager。所以作為 JobManager,其只負責了任務的下發,資料的下發,還有結果的接收,對於所有的中間結果,JobManager 都不負責管理。

(1) Flink YARN Client 首先會檢驗是否有足夠的資源來啟動 YARN 叢集,如果資源足夠的話,會將 jar 包、配置檔案等上傳到 HDFS。

(2) Flink YARN Client 首先與 YARN Resource Manager 進行通訊,申請啟動ApplicationMaster(以下簡稱 AM)。在 Flink YARN 的叢集中,AM 與 Flink JobManager 在同一個 Container 中。

(3) AM 在啟動的過程中會和 YARN 的 RM 進行互動,向 RM 申請需要的 Task ManagerContainer,申請到 Task Manager Container 後,在對應的 NodeManager 節點上啟動 TaskManager 程序。

(4) AM 與 Fink JobManager 在同一個 container 中,AM 會將 JobManager 的 RPC 地址通過 HDFS 共享的方式通知各個 TaskManager,TaskManager 啟動成功後,會向 JobManager 註冊。

(5) 等所有 TaskManager 都向 JobManager 註冊成功後,Flink 基於 YARN 的叢集啟動成功,Flink YARN Client 就可以提交 Flink Job 到 Flink JobManager,並進行後續的對映、排程和計算處理。

四、Flink 技術原理

1.流式資料執行原理

使用者實現的 Flink 程式是由 Stream 資料和 Transformation 運算元組成。Stream 是一箇中間結果資料,而 Transformation 是運算元,它對一個或多個輸入 Stream 進行計算處理,輸出一個或多個結果 Stream。

Flink 程式執行時,它會被對映為 Streaming Dataflow 。一個 Streaming Dataflow 是由一組 Stream 和 Transformation Operator 組成,它類似於一個DAG 圖,在啟動的時候從一個或多個 Source Operator 開始,結束於一個或多個Sink Operator。

(1) Source:流資料來源的接入,支援 HDFS 檔案、kafka、文字資料等。

(2) Sink:資料輸出,支援 HDFS、kafka、文字等。

(3) Stream 是 Flink 計算流程中產生的中間資料。Flink 是按 event 驅動的,每個 event 都有一個 event time 就是事件的時間戳,表明事件發生的時間,這個時間戳對 Flink 的處理效能很重要,後面會講到 Flink 處理亂序資料流時,就是靠時間戳來判斷處理的先後順序。

一個 Stream 可以被分成多個 Stream 分割槽(Stream Partitions),一個 Operator 可以被分成多個 Operator Subtask,每一個 Operator Subtask 是在不同的執行緒中獨立執行的。一個 Operator 的並行度,等於 Operator Subtask 的個數,一個Stream 的並行度等於生成它的 Operator 的並行度。

1.One-to-one 模式比如從 Source[1]到 map()[1],它保持了 Source 的分割槽特性(Partitioning)和分割槽內元素處理的有序性,也就是說 map()[1]的 Subtask 看到資料流中記錄的順序,與 Source[1]中看到的記錄順序是一致的。

2.Redistribution 模式這種模式改變了輸入資料流的分割槽,比如從 map()[1] 、 map()[2] 到 keyBy()/window()/apply()

[1] 、keyBy()/window()/apply()[2] , 上 遊 的Subtask 向下遊的多個不同的 Subtask 傳送資料,改變了資料流的分割槽,這與實際應用所選擇的 Operator 有關係。 Subtask 的個數,一個 Stream 的並行度總是等於生成它的 Operator 的並行度。

Flink 內部有一個優化的功能,根據上下游運算元的緊密程度來進行優化。緊密度高的運算元可以進行優化,優化後可以將多個 Operator Subtask 串起想·來組成一個 Operator Chain,實際上就是一個執行鏈,每個執行鏈會在 TaskManager 上一個獨立的執行緒中執行。上半部分表示的是將兩個緊密度高的運算元優化後串成一個 Operator Chain,實際上一個 Operator Chain 就是一個大的 Operator 的概念。途中的 Operator Chain 表示一個 Operator,keyBy 表示一個 Operator,Sink 表示一個 Operator,他們通過 Stream 連線,而每個 Operator 在執行時對應一個 Task,也就是說圖中的上半部分 3 個 Operator 對應的是 3 個 Task。下半部分是上半部分的一個並行版本,對每一個 Task 都並行華為多個 Subtask,這裡只是演示了 2 個並行度,sink 運算元是 1 個並行度。

2.Flink 視窗技術

Flink 支援基於時間視窗操作,也支援基於資料的視窗操作:

(1) 按分割標準劃分:timeWindow、countWindow。

(2) 按視窗行為劃分:Tumbling Window、Sliding Window、自定義視窗。視窗按驅動的型別分為時間視窗(timeWindow)和事件視窗(countWindow)。視窗可以是時間驅動的(Time Window,例如:每 30 秒鐘),也可以是資料驅動的(Count Window,例如:每一百個元素)。

視窗按照其想要實現的功能分為:

 翻滾視窗(Tumbling Window,無時間重疊,固定時間劃分或者固定事件個數劃分)

滾動視窗(Sliding Window,有時間重疊)

會話視窗(Session Window,將事件聚合到會話視窗中,由非活躍的間隙分隔開)。

3.Flink 容錯機制

checkpoint 機制是 Flink 執行過程中容錯的重要手段。 checkpoint 機制不斷繪製流應用的快照,流應用的狀態快照被儲存在配置的位置(如:JobManager 的記憶體裡,或者 HDFS 上)。Flink 分散式快照機制的核心是 barriers,這些 barriers 週期性插入到資料流中,並作為資料流的一部分隨之流動。barrier 是一個特殊的元組,這些元組被週期性注入到流圖中並隨資料流在流圖中流動。每個 barrier 是當前快照和下一個快照的分界線。在同一條流中 barriers 並不會超越其前面的資料,嚴格的按照線性流動。一個 barrier 將屬於本週期快照的資料與下一個週期快照的資料分隔開來。每個 barrier 均攜帶所屬快照週期的 ID,barrier 並不會阻斷資料流,因此十分輕量。Checkpoint 機制是 Flink 可靠性的基石,可以保證 Flink 叢集在某個運算元因為某些原因(如異常退出)出現故障時,能夠將整個應用流圖的狀態恢復到故障之前的某一狀態,保證應用流圖狀態的一致性。該機制可以保證應用在執行過程中出現失敗時,應用的所有狀態能夠從某一個檢查點恢復,保證資料僅被處理一次(Exactly Once)。另外,也可以選擇至少處理一次(at least once)。

每個需要 checkpoint 的應用在啟動時,Flink 的 JobManager 為其建立一個CheckpointCoordinator,CheckpointCoordinator 全權負責本應用的快照製作。用 戶 通 過 CheckpointConfig 中 的 setCheckpointInterval() 接 口 設 置checkpoint 的週期。

CheckPoint 機制

CheckpointCoordinator 週期性的向該流應用的所有 source 運算元傳送barrier。當某個 source 運算元收到一個 barrier 時,便暫停資料處理過程,然後將自己的當前狀態製作成快照,並儲存到指定的持久化儲存中,最後向 CheckpointCoordinator 報告自己快照製作情況,同時向自身所有下游運算元廣播該 barrier,恢復資料處理。下游運算元收到 barrier 之後,會暫停自己的資料處理過程,然後將自身的相關狀態製作成快照,並儲存到指定的持久化儲存中,最後向 CheckpointCoordinator 報告自身快照情況,同時向自身所有下游運算元廣播該barrier,恢復資料處理。每個運算元按照步驟 3 不斷製作快照並向下遊廣播,直到最後 barrier 傳遞到 sink 運算元,快照製作完成。當 CheckpointCoordinator 收到所有運算元的報告之後,認為該週期的快照製作成功;否則,如果在規定的時間內沒有收到所有運算元的報告,則認為本週期快照製作失敗。