Flink解讀 | 解讀Flink的宣告式資源管理與自動擴縮容設計
【今晚直播預告 - Flink 進階教程】
第一課、Flink Checkpoint--輕量級分散式快照
講師:唐雲(Apache Flink Contributor)
直播:5月9日20:00-21:00 (UTC+8)
觀看方式:釘釘搜尋群號 21789141 進群即可觀看
備註:視訊、PPT待直播後更新
完整課程見: https://github.com/flink-china/flink-training-course
這篇文章我們來解讀一下Flink還未實現但已納入計劃中的宣告式的資源管理及自動擴縮容的設計。
截止到目前(包括即將釋出的Flink 1.8),Flink都還無法基於資源的可用性來進行彈性地擴縮容。網上提出的一些擴縮容機制,一些是使用者自己根據獲取的指標去實現觸發Job的重啟擴容或藉助於上游系統反饋的指標來讓JobMaster作出針對性的響應(比如結合Dell開源的Pravega)。這些機制都不是Flink自身提供的,而且都有各自的侷限性。
在談論具體的設計之前,我們還需要了解Flink的狀態管理已經通過引入Key Group允許使用者調整運算元並行度。這一步是擴縮容的前提,不瞭解的可以閱讀Flink官方的一篇部落格:
A Deep Dive into Rescalable State in Apache Flink:
https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
Flink也曾為此發過一篇論文。 另外一點需要澄清的是,在“擴縮容”之前有很多修飾詞,包括但不限於:彈性、動態、線上、自動...。這些修飾詞在各種技術文章裡已經被使用得含糊不清了。這裡只澄清一點:下文所談論的擴縮容依然無法做到Job在執行時動態調整並行度不經歷重啟直接拉起新的Task例項執行,而是基於重啟恢復機制來實現的,因為涉及到狀態管理。
接下來,我們就來解讀Flink宣告式的資源管理及自動擴縮容機制。
Active與Reactive模式
資源獲取常見的兩種模式:
-
Active 模式:主動式,Flink可以主動地申請、釋放資源(通過與一些資源管理框架整合,如Yarn, Mesos);
-
Reactive 模式:被動響應式,由外部系統來分配、釋放資源,Flink只是簡單地對可用資源進行響應,這種模式對於基於容器的啟動環境相當有意義。
Flink希望這兩種模式能夠遵循同一套設計且都能支援自動地擴縮容(Flink job能夠動態調整它的並行度以對變動的工作“負荷”進行響應,當可用資源不夠的情況下,應該能夠自動且優雅地降級)。
宣告式的資源管理
其實,現在Flink所支援的資源管理模式就是Active模式,由使用者明確指定Job需要的資源,JobMaster會向ResourceManager去請求使用者指定的全量資源,當ResourceManager無法滿足JobMaster的申請需求時,該job將無法正常啟動,它會陷入失敗重啟->再次嘗試申請的迴圈。
那麼為了能夠同時支援Active/Reactive這兩種模式,就需要換一種方式來跟ResourceManager互動,為此引入了宣告式的資源管理設計。它跟原來的設計最大的差別在於:
-
JobMaster不再去逐個請求Slot,而是宣告它需要的資源的情況;
-
對資源的要求是個彈性的範圍,而不是固定的;
這裡所宣告的彈性資源是個四元組(也可以稱之為“四件套”):(min, target, max, rs),每個元素的含義如下:
-
min : 執行Job的最小資源
-
target : JobMaster期望獲得的常規資源需求,它是可變的,這是擴縮容的主要觸發機制
-
max : 最大資源期望
-
rs : resource spec(資源描述資訊)
宣告式的資源管理設計,可以應對Active以及Reactive模式:
-
active:不再強求像現有設計一樣一定要滿足期望獲取的資源(將它想象為target,只不過現在不允許改變)才能夠成功啟動任務。這種模式使得當無法滿足資源申請期望時,只要滿足最小條件(min)即可啟動執行;
-
reactive:target = max,這是與active最大的不同,因為reactive是被動響應式的,將target設定為max,就可以使得所有的運算元都是資源貪婪的,它能夠確保資源的最大化利用;
這一設計要求對現有的JobMaster與ResourceManager的互動進行重構。它要求JobMaster去週期性地獲取target的槽位數,target的槽位數目可以通過心跳作為載體傳遞給ResourceManager。如果target高於已經分配給JobMaster的槽位數,則為了補齊缺失的槽位,針對不同的模式,ResourceManager會採取不同機制:
-
active : 啟動新的Task executor(看作New slot);
-
reactive : 從可用槽位中分配相應的槽位(Available slot);
這引入了新的槽位分配檢視:
JobMaster基於其“required”的槽位數以及“available”槽位數,來作出啟動Job、從失敗恢復或者週期性地觸發彈性擴縮容的決策。
槽位分配協議的重構
引入宣告式的資源管理設計會導致現有的固定式的槽位申請機制不再適用,因此需要進行重構。
現有的槽位申請是在ExecutionGraph排程的時候進行,每個Execution一個slot。如果SlotPool中有槽位可用,將會直接提供槽位,如果可用的槽位數不夠,將會像ResourceManager申請,這一步也會分配新的TaskExecutor container,大致的流程如下圖所示:
重構後一個很大的不同點是將排程的邏輯從ExecutionGraph中分離出來並單獨形成一個Scheduler元件。另外,槽位的申請也不再是一個個獨立地進行,而是由JobMaster宣告資源需求並傳達給ResourceManager。重構後的大致協議如下圖:
排程器
新引入的排程器將排程與部署的邏輯從ExecutionGraph中解耦出來。這可以使得排程與部署的設計與實現更具擴充套件性並且將ExecutionGraph漸漸弱化為Job執行時狀態跟蹤的資料結構。
排程器充當了資源申請的客戶端與SlotPool互動。SlotPool如果獲得新的資源,將會通知排程器。排程器會檢查當前所擁有的資源是否滿足最小啟動資源(min),如果滿足可以啟動排程。
如此設計的優勢在於:大大減少了逐個申請Slot時大量的RPC請求,提升了申請效率,最小資源啟動也能加快排程的整體進度。
如果獲得新資源可用通知後,發現ExecutionGraph已經啟動(可能是以最小資源先啟動或者啟動後更改了target的值),那麼排程器可以決定擴容來使用額外的可用資源,當然這裡有一個等待Job進入“穩態”的過程(一個簡單的考慮是,如果可用資源在一定的時間內不再發生變化,則視為“穩態” ) ,因為這種場景可能在剛排程一個Job時發生,而且可能會發生多次。
排程器的職責
總結而言,排程器需要承擔如下職責:
-
決定排程策略以及宣告資源需求
-
接受資源變更通知
-
等待資源穩定
-
最小資源 ( min ) 滿足後觸發排程
-
管理擴縮容策略
-
週期性地查詢擴縮容策略並更新target值
-
擴縮容決策
槽位數的計算
排程器會負責計算Job的資源需求,它需要迭代JobGraph中所有的運算元,並根據每個運算元所返回的四元組 ( min, target, max, rs ) 對它們進行累加。
我們知道Flink有“Slot sharing”的機制,它可以讓多個運算元共享槽位,這種情況下的計算方式是對前三個元素求所有共享運算元的最大值,最後一個元素則是求和: ( mins' max, targets' max, maxs' max, sum(rs )) 。
擴縮容策略
為了支援自動擴縮容,我們必須允許一個運算元動態改變它的target值。這可以引入RescalingPolicy來指定,它可以被週期性地查詢並讓排程器獲取當前的target值。target值的改變最終也將會使得ResourceManager登記的Job的資源請求列表得到更新。而一旦SlotPool得到新分配的資源,它將會通知排程器觸發擴縮容。
失敗恢復
在引入宣告式的資源管理之後,失敗容錯也會有相應的變動。在發生失敗的情況下,Job首先需要考慮它是否要“縮容”重啟,比如有些Job的執行剛好使叢集的資源達到了飽和,但如果是因為TaskExecutor宕機導致的失敗,那麼很有可能直接重啟無法使它獲得失敗之前執行時所佔有的資源。這裡就可以允許一旦最小啟動資源(min ) 被滿足後即可重啟。
配置
資源配比在Flink中主要依賴於並行度 ( parallelism ) 的設定,Flink現在只支援並行度的“精確”設定。它會從三種途徑來獲取 ( 優先順序從高到底):
-
如果使用者呼叫setParallelism API,則以此為準
-
否則將會使用Job級別的並行度,如果通過CLI設定了 ( -p ) 引數的話
-
如果以上兩者都無法獲取,則從Flink的配置檔案中讀取預設的Job並行度
在引入宣告式的資源管理機制後,使用者需要為運算元指定資源四元組中的前三個元素的值,包括min, 初始的target以及max。如果使用者沒有呼叫setParallelism為運算元明確指定資源資訊,則Flink需要為運算元指定預設值:
-
Active 模式:(1, -p 或 叢集預設, max parallelism)
-
Reactive 模式:(1, max parallelism, max parallelism)
如果使用者顯示呼叫了setParallelism(p),則這三個值都將為(p, p, p)。
參考:
Declarative Resource Management: Active and reactive mode with re-scaling policies:
https://docs.google.com/document/d/1XKDXnrp8w45k2jIJNHxpNP2Zmr6FGru3H0dTHwYWJyE/edit#
大家工作學習遇到HBase技術問題,把問題釋出到HBase技術社群論壇http://hbase.group,歡迎大家論壇上面提問留言討論。想了解更多HBase技術關注HBase技術社群公眾號(微訊號:hbasegroup),非常歡迎大家積極投稿。
技術社群
【HBase生態+Spark社群大群】
群福利:群內每週進行群直播技術分享及問答
加入方式1:https://dwz.cn/Fvqv066s?spm=a2c4e.11153940.blogcont688191.19.1fcd1351nOOPvI
加入方式2:釘釘掃碼加入