1. 程式人生 > >[原始碼解析] 從TimeoutException看Flink的心跳機制

[原始碼解析] 從TimeoutException看Flink的心跳機制

# [原始碼解析] 從TimeoutException看Flink的心跳機制 [TOC] ## 0x00 摘要 本文從一個除錯時候常見的異常 "TimeoutException: Heartbeat of TaskManager timed out"切入,為大家剖析Flink的心跳機制。文中程式碼基於Flink 1.10。 ## 0x01 緣由 大家如果經常除錯Flink,當進入斷點看到了堆疊和變數內容之後,你容易陷入了沉思。當你發現了問題可能所在,高興的讓程式Resume的時候,你發現程式無法執行,有如下提示: ```java Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 93aa1740-cd2c-4032-b74a-5f256edb3217 timed out. ``` 這實在是很鬱悶的事情。作為程式猿不能忍啊,既然異常提示中有 Heartbeat 字樣,於是我們就來一起看看Flink的心跳機制,看看有沒有可以修改的途徑。 ## 0x02 背景概念 ### 2.1 四大模組 Flink有核心四大元件:Dispatcher,JobMaster,ResourceManager,TaskExecutor。 - **Dispatcher**(Application Master)用於接收client提交的任務和啟動相應的JobManager。其提供REST介面來接收client的application提交,負責啟動JM和提交application,同時執行Web UI。 - **ResourceManager**:主要用於資源的申請和分配。當TM有空閒的slot就會告訴JM,沒有足夠的slot也會啟動新的TM。kill掉長時間空閒的TM。 - **JobMaster** :功能主要包括(舊版本中JobManager的功能在新版本中以JobMaster形式出現,可能本文中會混淆這兩個詞,請大家諒解): - 將JobGraph轉化為ExecutionGraph(physical dataflow graph,並行化)。 - 向RM申請資源、schedule tasks、儲存作業的元資料。 - **TaskManager**:類似Spark的executor,會跑多個執行緒的task、資料快取與交換。Flink 架構遵循 Master - Slave 架構設計原則,JobMaster 為 Master 節點,TaskManager 為Slave節點。 這四大元件彼此之間的通訊需要依賴RPC實現。 ### 2.2 Akka Flink底層RPC基於Akka實現。Akka是一個開發併發、容錯和可伸縮應用的框架。它是Actor Model的一個實現,和Erlang的併發模型很像。在Actor模型中,所有的實體被認為是獨立的actors。actors和其他actors通過傳送非同步訊息通訊。 Actor模型的強大來自於非同步。它也可以顯式等待響應,這使得可以執行同步操作。但是強烈不建議同步訊息,因為它們限制了系統的伸縮性。 ### 2.3 RPC機制 RPC作用是:讓非同步呼叫看起來像同步呼叫。 Flink基於Akka構建了其底層通訊系統,引入了RPC呼叫,各節點通過GateWay方式回撥,隱藏通訊元件的細節,實現解耦。Flink整個通訊框架的元件主要由RpcEndpoint、RpcService、RpcServer、AkkaInvocationHandler、AkkaRpcActor等構成。 RPC相關的主要介面如下: - RpcEndpoint - RpcService - RpcGateway #### 2.3.1 RpcEndpoint:RPC的基類 RpcEndpoint是Flink RPC終端的基類,所有提供遠端過程呼叫的分散式元件必須擴充套件RpcEndpoint,其功能由RpcService支援。 RpcEndpoint的子類只有四類元件:Dispatcher,JobMaster,ResourceManager,TaskExecutor,即Flink中只有這四個元件有RPC的能力,換句話說只有這四個元件有RPC的這個需求。 每個RpcEndpoint對應了一個路徑(endpointId和actorSystem共同確定),每個路徑對應一個Actor,其實現了RpcGateway介面, #### RpcService:RPC服務提供者 RpcServer是RpcEndpoint的成員變數,為RpcService提供RPC服務/連線遠端Server,其只有一個子類實現:AkkaRpcService(可見目前Flink的通訊方式依然是Akka)。 RpcServer用於啟動和連線到RpcEndpoint, 連線到rpc伺服器將返回一個RpcGateway,可用於呼叫遠端過程。 Flink四大元件Dispatcher,JobMaster,ResourceManager,TaskExecutor,都是RpcEndpoint的實現,所以構建四大元件時,同步需要初始化RpcServer。如JobManager的構造方式,第一個引數就是需要知道RpcService。 #### RpcGateway:RPC呼叫的閘道器 Flink的RPC協議通過RpcGateway來定義;由前面可知,若想與遠端Actor通訊,則必須提供地址(ip和port),如在Flink-on-Yarn模式下,JobMaster會先啟動ActorSystem,此時TaskExecutor的Container還未分配,後面與TaskExecutor通訊時,必須讓其提供對應地址。 Dispatcher,JobMaster,ResourceManager,TaskExecutor 這四大元件通過各種方式實現了Gateway。以JobMaster為例,JobMaster實現JobMasterGateway介面。各元件類的成員變數都有需要通訊的其他元件的GateWay實現類,這樣可通過各自的Gateway實現RPC呼叫。 ### 2.4 常見心跳機制 常見的心跳檢測有兩種: - socket 套接字SO_KEEPALIVE本身帶有的心跳機制,定期向對方傳送心跳包,對方收到心跳包後會自動回覆; - 應用自身實現心跳機制,同樣也是使用定期傳送請求的方式; Flink實現的是第二種方案。 ## 0x03 Flink心跳機制 ### 3.1 程式碼和機制 Flink的心跳機制程式碼在: ```java Flink-master/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat ``` 四個介面: ```java HeartbeatListener.java HeartbeatManager.java HeartbeatTarget.java HeartbeatMonitor.java ``` 以及如下幾個類: ```java HeartbeatManagerImpl.java HeartbeatManagerSenderImpl.java HeartbeatMonitorImpl.java HeartbeatServices.java NoOpHeartbeatManager.java ``` Flink叢集有多種業務流程,比如Resource Manager, Task Manager, Job Manager。每種業務流程都有自己的心跳機制。Flink的心跳機制只是提供介面和基本功能,具體業務功能由各業務流程自己實現
。 我們首先設定 心跳系統中有兩種節點:sender和receiver。心跳機制是sender和receivers彼此相互檢測。但是檢測動作是Sender主動發起,即Sender主動傳送請求探測receiver是否存活,因為Sender已經發送過來了探測心跳請求,所以這樣receiver同時也知道Sender是存活的,然後Reciver給Sender迴應一個心跳錶示自己也是活著的。 因為Flink的幾個名詞和我們常見概念有所差別,所以流程上需要大家仔細甄別,即: - Flink Sender 主動傳送Request請求給Receiver,要求Receiver迴應一個心跳; - Flink Receiver 收到Request之後,通過Receive函式迴應一個心跳請求給Sender; ### 3.2 靜態架構 #### 3.2.1 HeartbeatTarget :監控目標抽象 HeartbeatTarget是對監控目標的抽象。心跳機制在行為上而言有兩種動作: - 向某個節點發送請求。 - 處理某個節點發來的請求。 HeartbeatTarget的函式就是這兩個動作: - receiveHeartbeat :向某個節點(Sender)傳送心跳回應,其引數heartbeatOrigin 就是 Receiver。 - requestHeartbeat :向某個節點(Receiver)要求其迴應一個心跳,其引數requestOrigin 就是 Sender。requestHeartbeat這個函式是Sender的函式,其中Sender通過RPC直接呼叫到Receiver
。 這兩個函式的引數也很簡單:分別是請求的傳送放和接收方,還有Payload載荷。對於一個確定節點而言,接收的和傳送的載荷是同一型別的。 ```java public interface HeartbeatTarget { /** * Sends a heartbeat response to the target. * @param heartbeatOrigin Resource ID identifying the machine for which a heartbeat shall be reported. */ // heartbeatOrigin 就是 Receiver void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload); /** * Requests a heartbeat from the target. * @param requestOrigin Resource ID identifying the machine issuing the heartbeat request. */ // requestOrigin 就是 Sender void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload); } ``` #### 3.2.2 HeartbeatMonitor : 管理heartbeat target的心跳狀態 對HeartbeatTarget的封裝,這樣Manager對Target的操作是通過對Monitor完成,後續會在其繼承類中詳細說明。 ```java public interface HeartbeatMon