1. 程式人生 > >[原始碼解析] Flink的Slot究竟是什麼?(2)

[原始碼解析] Flink的Slot究竟是什麼?(2)

# [原始碼解析] Flink 的slot究竟是什麼?(2) [ToC] ## 0x00 摘要 Flink的Slot概念大家應該都聽說過,但是可能很多朋友還不甚瞭解其中細節,比如具體Slot究竟代表什麼?在程式碼中如何實現?Slot在生成執行圖、排程、分配資源、部署、執行階段分別起到什麼作用?本文和上文將帶領大家一起分析原始碼,為你揭開Slot背後的機理。 ## 0x01 前文回顧 書接上回 [[原始碼解析] Flink 的slot究竟是什麼?(1)](https://www.cnblogs.com/rossiXYZ/p/13554085.html)。前文中我們已經從系統架構和資料結構角度來分析了Slot,本文我們將從業務流程角度來分析Slot。我們重新放出系統架構圖 ![](https://img2020.cnblogs.com/blog/1850883/202008/1850883-20200824153250938-547642498.png) 和資料結構邏輯關係圖 ![](https://img2020.cnblogs.com/blog/1850883/202008/1850883-20200824153310106-751657724.png) 下面我們從幾個流程入手一一分析。 ## 0x02 註冊/更新Slot 有兩個途徑會註冊Slot/更新Slot狀態。 - 當TaskExecutor註冊成功之後會和RM互動進行註冊時,一併註冊Slot; - 定時心跳時,會在心跳payload中附加Slot狀態資訊; ### 2.1 TaskExecutor註冊成功 當TaskExecutor註冊成功之後會和RM互動進行註冊。會通過如下的程式碼呼叫路徑來向ResourceManager(SlotManagerImpl)註冊Slot。SlotManagerImpl 在獲取訊息之後,會更新Slot狀態,如果此時已經有如果有pendingSlotRequest,就直接分配,否則就更新freeSlots變數。 - TaskExecutor#establishResourceManagerConnection; - TaskSlotTableImpl#createSlotReport;建立 report - 這時候的 report如下: ```java slotReport = {SlotReport@9633} 0 = {SlotStatus@8969} "SlotStatus{slotID=40d390ec-7d52-4f34-af86-d06bb515cc48_0, resourceProfile=ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationID=null, jobID=null}" slotID = {SlotID@8629} "40d390ec-7d52-4f34-af86-d06bb515cc48_0" resourceProfile = {ResourceProfile@4194} "ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}" allocationID = null jobID = null 1 = {SlotStatus@9638} "SlotStatus{slotID=40d390ec-7d52-4f34-af86-d06bb515cc48_1, resourceProfile=ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationID=null, jobID=null}" slotID = {SlotID@9643} "40d390ec-7d52-4f34-af86-d06bb515cc48_1" resourceProfile = {ResourceProfile@4194} "ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}" allocationID = null jobID = null ``` - ResourceManager#sendSlotReport;通過RPC(resourceManagerGateway.sendSlotReport)呼叫到RM - SlotManagerImpl#registerTaskManager;把TaskManager註冊到SlotManager - SlotManagerImpl#registerSlot; - SlotManagerImpl#createAndRegisterTaskManagerSlot;生成註冊了TaskManagerSlot - 這時候程式碼 & 變數如下,我們可以看到,就是把TM的Slot資訊註冊到SlotManager中: ```java private TaskManagerSlot createAndRegisterTaskManagerSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorConnection taskManagerConnection) { final TaskManagerSlot slot = new TaskManagerSlot( slotId, resourceProfile, taskManagerConnection); slots.put(slotId, slot); return slot; } slot = {TaskManagerSlot@13322} slotId = {SlotID@8629} "40d390ec-7d52-4f34-af86-d06bb515cc48_0" resourceProfile = {ResourceProfile@4194} cpuCores = {CPUResource@11616} "Resource(CPU: 89884656743115785...0)" taskHeapMemory = {MemorySize@11617} "4611686018427387903 bytes" taskOffHeapMemory = {MemorySize@11618} "4611686018427387903 bytes" managedMemory = {MemorySize@11619} "64 mb" networkMemory = {MemorySize@11620} "32 mb" extendedResources = {HashMap@11621} size = 0 taskManagerConnection = {WorkerRegistration@11121} allocationId = null jobId = null assignedSlotRequest = null state = {TaskManagerSlot$State@13328} "FREE" ``` - SlotManagerImpl#updateSlot - SlotManagerImpl#updateSlotState;如果有pendingSlotRequest,就直接分配 - SlotManagerImpl#handleFreeSlot;否則就更新freeSlots變數 流程結束後,SlotManager如下,可以看到此時slots個數是兩個,freeSlots也是兩個,說明都是空閒的: ```java this = {SlotManagerImpl@11120} scheduledExecutor = {ActorSystemScheduledExecutorAdapter@11125} slotRequestTimeout = {Time@11127} "300000 ms" taskManagerTimeout = {Time@11128} "30000 ms" slots = {HashMap@11122} size = 2 {SlotID@9643} "40d390ec-7d52-4f34-af86-d06bb515cc48_1" -> {TaskManagerSlot@19206} {SlotID@8629} "40d390ec-7d52-4f34-af86-d06bb515cc48_0" -> {TaskManagerSlot@13322} freeSlots = {LinkedHashMap@11129} size = 2 {SlotID@8629} "40d390ec-7d52-4f34-af86-d06bb515cc48_0" -> {TaskManagerSlot@13322} {SlotID@9643} "40d390ec-7d52-4f34-af86-d06bb515cc48_1" -> {TaskManagerSlot@19206} taskManagerRegistrations = {HashMap@11130} size = 1 fulfilledSlotRequests = {HashMap@11131} size = 0 pendingSlotRequests = {HashMap@11132} size = 0 pendingSlots = {HashMap@11133} size = 0 slotMatchingStrategy = {AnyMatchingSlotMatchingStrategy@11134} "INSTANCE" slotRequestTimeoutCheck = {ActorSystemScheduledExecutorAdapter$ScheduledFutureTask@11139} ``` ### 2.2 心跳機制更新Slot狀態 Flink的心跳機制也會被利用來進行Slots資訊的彙報,Slot Report被包括在心跳payload中。 首先在 TE 中建立Slot Report - TaskExecutor#heartbeatFromResourceManager - HeartbeatManagerImpl#requestHeartbeat - TaskExecutor$ResourceManagerHeartbeatListener # retrievePayload - TaskSlotTableImpl # createSlotReport 程式執行到 RM,於是 SlotManagerImpl 呼叫到 reportSlotStatus,進行Slot狀態更新。 - ResourceManager#heartbeatFromTaskManager - HeartbeatManagerImpl#receiveHeartbeat - ResourceManager$TaskManagerHeartbeatListener#reportPayload - SlotManagerImpl#reportSlotStatus,此時的SlotReport如下: - ```java slotReport = {SlotReport@8718} slotsStatus = {ArrayList@8717} size = 2 0 = {SlotStatus@9025} "SlotStatus{slotID=d99e16d7-a30c-4e21-b270-f82884b1813f_0, resourceProfile=ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationID=null, jobID=null}" slotID = {SlotID@9032} "d99e16d7-a30c-4e21-b270-f82884b1813f_0" resourceProfile = {ResourceProfile@4194} "ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}" allocationID = null jobID = null 1 = {SlotStatus@9026} "SlotStatus{slotID=d99e16d7-a30c-4e21-b270-f82884b1813f_1, resourceProfile=ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationID=null, jobID=null}" slotID = {SlotID@9029} "d99e16d7-a30c-4e21-b270-f82884b1813f_1" resourceProfile = {ResourceProfile@4194} "ResourceProfile{managedMemory=64.000mb (67108864 bytes), networkMemory=32.000mb (33554432 bytes)}" allocationID = null jobID = null ``` - SlotManagerImpl#updateSlot - SlotManagerImpl#updateSlotState;如果有pendingSlotRequest,就直接分配 - SlotManagerImpl#handleFreeSlot;否則就更新freeSlots變數 - ```java freeSlots.put(freeSlot.getSlotId(), freeSlot); ``` ## 0x03 生成ExecutionGraph階段 當Job提交之後,經過一系列處理,Scheduler會建立ExecutionGraph。ExecutionGraph 是 JobGraph 的並行版本。而通過一系列的分析,才可以最終把任務分發到相關的任務槽中。槽會根據CPU的數量提前指定出來,這樣可以最大限度的利用CPU的計算資源。如果Slot耗盡,也就意味著新分發的作業任務是無法執行的。 `ExecutionGraph`:`JobManager`根據`JobGraph`生成的分散式執行圖,是排程層最核心的資料結構。 一個JobVertex / ExecutionJobVertex代表的是一個operator,而具體的ExecutionVertex則代表了一個Task。
在生成StreamGraph時候,`StreamGraph.addOperator`方法就已經確定了operator是什麼型別,比如OneInputStreamTask,或者SourceStreamTask等。 假設`OneInputStreamTask.class`即為生成的StreamNode的vertexClass。這個值會一直傳遞,當StreamGraph被轉化成JobGraph的時候,這個值會被傳遞到JobVertex的invokableClass。然後當JobGraph被轉成ExecutionGraph的時候,這個值被傳入到ExecutionJobVertex.TaskInformation.invokableClassName中,最後一直傳到Task中。 本系列程式碼執行序列如下: - JobMaster#createScheduler - DefaultSchedulerFactory#createInstance - DefaultScheduler#init - SchedulerBase#init - SchedulerBase#createAndRestoreExecutionGraph - SchedulerBase#createExecutionGraph - ExecutionGraphBuilder#buildGraph - ExecutionGraph#attachJobGraph - ExecutionJobVertex#init,這裡根據並行度來確定要建立多少個Task,即多少個ExecutionVertex。 - ```java int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism; this.taskVertices = new ExecutionVertex[numTaskVertices]; ``` - ExecutionVertex#init,這裡會生成Execution。 - ```java this.currentExecution = new Execution( getExecutionGraph().getFutureExecutor(), this, 0, initialGlobalModVersion, createTimestamp, timeout); ``` ## 0x04 排程階段 任務的流程就是通過作業分發到TaskManager,然後再分發到指定的Slot進行執行。 這部分排程階段的程式碼只是利用CompletableFuture把程式執行架構搭建起來,可以把認為是自頂之下進行操作
。 Job開始排程之後,程式碼執行序列如下: - JobMaster#startJobExecution - JobMaster#resetAndStartScheduler - Future操作 - JobMaster#startScheduling - SchedulerBase#startScheduling - DefaultScheduler#startSchedulingInternal - LazyFromSourcesSchedulingStrategy#startScheduling,這裡開始針對Vertices進行資源分配和部署 - ```java allocateSlotsAndDeployExecutionVertices(schedulingTopology.getVertices()); ``` - LazyFromSourcesSchedulingStrategy#allocateSlotsAndDeployExecutionVertices,這裡會遍歷ExecutionVertex,篩選出Create狀態的 & 輸入Ready的節點。 - ```java private void allocateSlotsAndDeployExecutionVertices( final Iterable> vertices) { // 取出狀態是CREATED,且輸入Ready的 ExecutionVertex f