Akka在Flink中的使用剖析
Akka與Actor 模型
Akka是一個用來開發支援併發、容錯、擴充套件性的應用程式框架。它是actor model
的實現,因此跟Erlang的併發模型很像。在actor模型的上下文中,所有的活動實體都被認為是互不依賴的actor。actor之間的互相通訊是通過彼此之間傳送非同步訊息來實現的。每個actor都有一個郵箱來儲存接收到的訊息。因此每個actor都維護著自己獨立的狀態。
每個actor是一個單一的執行緒,它不斷地從其郵箱中poll(拉取)訊息,並且連續不斷地處理。對於已經處理過的訊息的結果,actor可以改變它自身的內部狀態或者傳送一個新訊息或者孵化一個新的actor。儘管單個的actor是自然有序的,但一個包含若干個actor的系統卻是高度併發的並且極具擴充套件性的。因為那些處理執行緒是所有actor之間共享的。這也是我們為什麼不該在actor執行緒裡呼叫可能導致阻塞的“呼叫”。因為這樣的呼叫可能會阻塞該執行緒使得他們無法替其他actor處理訊息。
Actor系統
一個actor系統是所有actor存活的容器。它也提供一些共享的服務,比如排程,配置,日誌記錄等。一個actor系統也同時維護著一個為所有actor服務的執行緒池。多個actor系統可以在一臺主機上共存。如果一個actor系統以RemoteActorRefProvider
的身份啟動,那麼它可以被某個遠端主機上的另一個actor系統訪問。actor系統會自動得識別actor訊息被路由到處於同一個actor系統內的某個actor還是處於一個遠端actor系統內的actor。如果是本地通訊的情況(同一個actor系統),那麼訊息的傳輸可以有效得利用共享記憶體的方式;如果是遠端通訊,那麼訊息將通過網路棧來傳輸。
actor基於層次化的組織形式(也就是說它基於樹形結構)。每個新建立的actor都將以建立它的actor作為父節點。層次結構有利於監督、管理(父actor管理其子actor)。如果某個actor的子actor產生錯誤,該actor將會得到通知,如果它有能力處理這個錯誤,那麼它會嘗試處理否則它會負責重啟該子actor。系統建立的首個actor將託管於系統提供的guardian actor/user
。
Flink為什麼要用Akka來代替RPC
原先的RPC服務存在的問題:
- 沒有帶回調的非同步呼叫功能,這也是為什麼Flink的多個執行時元件需要poll狀態的原因,這導致了不必要的延時。
- 沒有
exception forwarding
,產生的異常都只能簡單地吞噬掉,這使得在執行時產生一些非常難除錯的古怪問題 - 處理器的執行緒數受到限制,RPC只能處理一定量的併發請求,這迫使你不得不隔離執行緒池
- 引數不支援原始資料型別(或者原始資料型別的裝箱型別),所有的一切都必須有一個特殊的序列化類
- 棘手的執行緒模型,RPC會持續的產生或終止執行緒
採用Akka的actor模型帶來的好處:
- Akka解決上述的所有問題,並對外透明
supervisor
模型允許你對actor做失效檢測,它提供一個統一的方式來檢測與處理失敗(比如心跳丟失、呼叫失敗…)- Akka有工具來持久化有狀態的actor,一旦失敗可以在其他機器上重啟他們。這個機制在master fail-over的場景下將會變得非常有用並且很重要。
- 你可以定義許多call target(actor),在TaskManager上的任務可以直接在JobManager上呼叫它們的
ExecutionVertex
,而不是呼叫JobManager,讓其產生一個執行緒來檢視執行狀態。 - actor模型接近於在actor上採用佇列模型一個接一個的執行,這使得狀態機的併發模型變得簡單而又健壯
Akka在Flink中的使用
Akka在Flink中用於三個分散式技術元件之間的通訊,他們是JobClient
,JobManager
,TaskManager
。Akka在Flink中主要的作用是用來充當一個coordinator
的角色。
JobClient
獲取使用者提交的job,然後將其提交給JobManager
。JobManager
隨後對提交的job進行執行的環境準備。首先,它會分配job的執行需要的大量資源,這些資源主要是在TaskManager
上的execution slots。在資源分配完成之後,JobManager
會部署不同的task到特定的TaskManager
上。在接收到task之後,TaskManager
會建立執行緒來執行。所有的狀態改變,比如開始計算或者完成計算都將給發回給JobManager
。基於這些狀態的改變,JobManager
將引導task的執行直到其完成。一旦job完成執行,其執行結果將會返回給JobClient
,進而告知使用者
它們之間的一些通訊流程如下圖所示:
上圖中三個使用Akka通訊的分散式元件都具有自己的actor系統。
程式碼分析
當前關於Akka相關的程式碼,都在runtime
module下,但實現的程式碼是Java
跟Scala
混合的(也許這塊的邏輯Flink正在過渡階段,後續會有更多的邏輯改為用Scala實現)。
其中,只有JobClient
的Akka程式碼是用Java實現的。JobManager
以及TaskManager
跟Akka相關的邏輯以Scala實現。
訊息定義
- Messages : 三個分散式元件都會用到的訊息定義
- JobClientMessages :
JobClient
相關的message,將會被org.apache.flink.runtime.client.JobClientActor
使用 - JobManagerMessages :
JobManager
相關的message - TaskManagerMessages :
TaskManager
相關的message定義
當然不止這麼多訊息,還有垂直劃分的幾種定義,比如:
RegistrationMessages
用於定義TaskManager
和JobManager
相關的register訊息。
下面我們看看在Java和Scala中,Flink實現的actor的基類。
基類FlinkUntypedActor
在Akka提供的Java lib中,實現一個actor通常是靠繼承UntypedActor
來實現。FlinkUntypedActor
也不例外。繼承自UntypedActor
的類,通常要覆蓋onReceive
方法,該方法的完整簽名如下:
public final void onReceive(Object message) throws Exception {}
然後,通常在這個方法裡會判斷具體的訊息型別,根據不同的訊息型別來實現不同的處理邏輯。而在FlinkUntypedActor
類中,它先對訊息進行一輪驗證,過濾掉非法的訊息後,再處理各種訊息的型別。驗證主要是比對sessionID是否合法(即是否等同於leader session id),然後才會呼叫核心處理邏輯方法handleMessage
。該方法是抽象方法,有待子類具體實現,目前只有涉及到JobClient
處理的JobClientActor
類繼承了該類。
由scala實現的FlinkActor
幾乎具有相同的語義,這裡不再囉嗦。
總結
本篇主要介紹了Akka,並對Akka在Flink中的使用進行了大致的介紹。其實,就原始碼而言倒沒有太多值得關注的地方,主要還是三個分散式元件之間的通訊/協同邏輯,下篇我們會談這方面的話題。