結合 Spark 講一下 Flink 的 runtime
Flink 執行時主要角色有兩個: JobManager 和 TaskManager ,無論是 standalone 叢集, on yarn 都是要啟動這兩個角色。有點類似於 MRv1 的架構了, JobManager 主要是負責接受客戶端的 job ,排程 job ,協調 checkpoint 等。 TaskManager 執行具體的 Task 。 TaskManager 為了對資源進行隔離和增加允許的 task 數,引入了 slot 的概念,這個 slot 對資源的隔離僅僅是對記憶體進行隔離,策略是均分,比如 taskmanager 的管理記憶體是 3GB ,假如有三個 slot ,那麼每個 slot 就僅僅有 1GB 記憶體可用。
根據經驗, taskslot 數最佳預設值就是 CPU 核心數。使用超執行緒,每個 task slot 需要 2 個或更多硬體執行緒上下文。
Client 這個角色主要是為 job 提交做些準備工作,比如構建 jobgraph 提交到 jobmanager ,提交完了可以立即退出,當然也可以用 client 來監控進度。
Jobmanager 和 TaskManager 之間通訊類似於 Spark 的早期版本,採用的是 actor 系統。
根據以上描述,繪製出執行架構圖就是下圖:
Task 到底是什麼玩意?
講到這可以先回顧一下 Spark 了,主要三個概念:
1. Shuffle
Spark 任務 job 中 shuffle 個數決定著 stage 個數。
2. 分割槽
Spark 運算元中 RDD 的分割槽數決定者 stage 任務的並行度。
3. 分割槽傳遞
複雜的入 union , join 等暫不提。簡單的呼叫鏈如下:
rdd.map-->filter-->reducebykey-->map。
例子中假設 rdd 有 6 個分割槽, map 到 fliter 的分割槽數傳遞是不變, filter 到 redcuebykey 分割槽就變了, reducebykey 的分割槽有個預設計算公式,星球裡講過了,假設我們在使用 reducebykey 的時候傳入了一個分割槽數 12 。
分割槽數, map 是 6 , filter 也是 6 , reducebykey 後面的 map 就是 12 。
override def getPartitions: Array[Partition] =firstParent[T].partitions
map 這類轉換完全繼承了父 RDD 的分割槽器和分割槽數,預設無法人為設定並行度,只有在 shuffle 的時候,我們才可以傳入並行度。
上述講解主要是想帶著大家搞明白,以下幾個概念:
-
Flink 的並行度由什麼決定的?
-
Flink 的 task 是什麼?
1. Flink 的並行度由什麼決定的?
這個很簡單, Flink 每個運算元都可以設定並行度,然後就是也可以設定全域性並行度。
Api 的設定
.map(new RollingAdditionMapper()).setParallelism(10)
全域性配置在 flink-conf.yaml 檔案中, parallelism.default ,預設是 1 :
2. Flink 的 task 是什麼?
按理說應該是每個運算元的一個並行度例項就是一個 subtask- 在這裡為了區分暫時叫做 substask 。那麼,帶來很多問題,由於 flink 的 taskmanager 執行 task 的時候是每個 task 採用一個單獨的執行緒,這就會帶來很多執行緒切換開銷,進而影響吞吐量。
為了減輕這種情況, flink 進行了優化,也即對 subtask 進行鏈式操作,鏈式操作結束之後得到的 task ,再作為一個排程執行單元,放到一個執行緒裡執行。
如下圖的, source/map 兩個運算元進行了鏈式; keyby/window/apply 有進行了鏈式, sink 單獨的一個。
註釋 :圖中假設是 source/map 的並行度都是 2 , keyby/window/apply 的並行度也都是 2 , sink 的是 1 ,總共 task 有五個,最終需要五個執行緒。
按照到這一步的理解,畫的執行圖應該是這樣的:
有些朋友該說了,據我觀察實際上並不是這樣的呀。。。
這個是實際上是 flink 又一次優化。
預設情況下, flink 允許如果任務是不同的 task 的時候,允許任務共享 slot ,當然,前提是必須在同一個 job 內部。
結果就是,每個 slot 可以執行 job 的一整個 pipeline ,如上圖。這樣做的好處主要有以下幾點:
1. Flink 叢集所需的 taskslots 數與 job 中最高的並行度一致。 也就是說我們不需要再去計算一個程式總共會起多少個 task 了。
2. 更容易獲得更充分的資源利用 。如果沒有 slot 共享,那麼非密集型操作 source/flatmap 就會佔用同密集型操作 keyAggregation/sink 一樣多的資源。如果有 slot 共享,將基線的 2 個並行度增加到 6 個,能充分利用 slot 資源,同時保證每個 TaskManager 能平均分配到重的 subtasks ,比如 keyby/window/apply 操作就會均分到申請的所有 slot 裡,這樣 slot 的負載就均衡了。
鏈式的原則,也即是什麼情況下才會對task進行鏈式操作呢?簡單梗概一下:
-
上下游的並行度一致
-
下游節點的入度為 1 (也就是說下游節點沒有來自其他節點的輸入)
-
上下游節點都在同一個 slot group 中(下面會解釋 slot group )
-
下游節點的 chain 策略為 ALWAYS (可以與上下游連結, map 、 flatmap 、 filter 等預設是 ALWAYS )
-
上游節點的 chain 策略為 ALWAYS 或 HEAD (只能與下游連結,不能與上游連結, Source 預設是 HEAD )
-
兩個節點間資料分割槽方式是 forward (參考理解資料流的分割槽)
-
使用者沒有禁用 chain
推薦閱讀:
ofollow,noindex"> Flink非同步IO第一講
歡迎點贊,轉發,給自己小夥伴們學習的機會。