1. 程式人生 > >Akka在Flink中的使用剖析

Akka在Flink中的使用剖析

Akka與Actor 模型

Akka是一個用來開發支援併發容錯擴充套件性的應用程式框架。它是actor model的實現,因此跟Erlang的併發模型很像。在actor模型的上下文中,所有的活動實體都被認為是互不依賴的actor。actor之間的互相通訊是通過彼此之間傳送非同步訊息來實現的。每個actor都有一個郵箱來儲存接收到的訊息。因此每個actor都維護著自己獨立的狀態。

flink-akka-actor-model

每個actor是一個單一的執行緒,它不斷地從其郵箱中poll(拉取)訊息,並且連續不斷地處理。對於已經處理過的訊息的結果,actor可以改變它自身的內部狀態或者傳送一個新訊息或者孵化一個新的actor。儘管單個的actor是自然有序的,但一個包含若干個actor的系統卻是高度併發的並且極具擴充套件性的。因為那些處理執行緒是所有actor之間共享的。這也是我們為什麼不該在actor執行緒裡呼叫可能導致阻塞的“呼叫”。因為這樣的呼叫可能會阻塞該執行緒使得他們無法替其他actor處理訊息。

Actor系統

一個actor系統是所有actor存活的容器。它也提供一些共享的服務,比如排程配置日誌記錄等。一個actor系統也同時維護著一個為所有actor服務的執行緒池。多個actor系統可以在一臺主機上共存。如果一個actor系統以RemoteActorRefProvider的身份啟動,那麼它可以被某個遠端主機上的另一個actor系統訪問。actor系統會自動得識別actor訊息被路由到處於同一個actor系統內的某個actor還是處於一個遠端actor系統內的actor。如果是本地通訊的情況(同一個actor系統),那麼訊息的傳輸可以有效得利用共享記憶體的方式;如果是遠端通訊,那麼訊息將通過網路棧來傳輸。

actor基於層次化的組織形式(也就是說它基於樹形結構)。每個新建立的actor都將以建立它的actor作為父節點。層次結構有利於監督、管理(父actor管理其子actor)。如果某個actor的子actor產生錯誤,該actor將會得到通知,如果它有能力處理這個錯誤,那麼它會嘗試處理否則它會負責重啟該子actor。系統建立的首個actor將託管於系統提供的guardian actor/user

Flink為什麼要用Akka來代替RPC

原先的RPC服務存在的問題:

  • 沒有帶回調的非同步呼叫功能,這也是為什麼Flink的多個執行時元件需要poll狀態的原因,這導致了不必要的延時。
  • 沒有exception forwarding,產生的異常都只能簡單地吞噬掉,這使得在執行時產生一些非常難除錯的古怪問題
  • 處理器的執行緒數受到限制,RPC只能處理一定量的併發請求,這迫使你不得不隔離執行緒池
  • 引數不支援原始資料型別(或者原始資料型別的裝箱型別),所有的一切都必須有一個特殊的序列化類
  • 棘手的執行緒模型,RPC會持續的產生或終止執行緒

採用Akka的actor模型帶來的好處:

  • Akka解決上述的所有問題,並對外透明
  • supervisor模型允許你對actor做失效檢測,它提供一個統一的方式來檢測與處理失敗(比如心跳丟失、呼叫失敗…)
  • Akka有工具來持久化有狀態的actor,一旦失敗可以在其他機器上重啟他們。這個機制在master fail-over的場景下將會變得非常有用並且很重要。
  • 你可以定義許多call target(actor),在TaskManager上的任務可以直接在JobManager上呼叫它們的ExecutionVertex,而不是呼叫JobManager,讓其產生一個執行緒來檢視執行狀態。
  • actor模型接近於在actor上採用佇列模型一個接一個的執行,這使得狀態機的併發模型變得簡單而又健壯

Akka在Flink中的使用

Akka在Flink中用於三個分散式技術元件之間的通訊,他們是JobClientJobManagerTaskManager。Akka在Flink中主要的作用是用來充當一個coordinator的角色。

JobClient獲取使用者提交的job,然後將其提交給JobManagerJobManager隨後對提交的job進行執行的環境準備。首先,它會分配job的執行需要的大量資源,這些資源主要是在TaskManager上的execution slots。在資源分配完成之後,JobManager會部署不同的task到特定的TaskManager上。在接收到task之後,TaskManager會建立執行緒來執行。所有的狀態改變,比如開始計算或者完成計算都將給發回給JobManager。基於這些狀態的改變,JobManager將引導task的執行直到其完成。一旦job完成執行,其執行結果將會返回給JobClient,進而告知使用者

它們之間的一些通訊流程如下圖所示:

flink-actor-arch

上圖中三個使用Akka通訊的分散式元件都具有自己的actor系統。

程式碼分析

當前關於Akka相關的程式碼,都在runtimemodule下,但實現的程式碼是JavaScala混合的(也許這塊的邏輯Flink正在過渡階段,後續會有更多的邏輯改為用Scala實現)。

其中,只有JobClient的Akka程式碼是用Java實現的。JobManager以及TaskManager跟Akka相關的邏輯以Scala實現。

訊息定義

  • Messages : 三個分散式元件都會用到的訊息定義
  • JobClientMessages : JobClient相關的message,將會被org.apache.flink.runtime.client.JobClientActor使用
  • JobManagerMessages : JobManager相關的message
  • TaskManagerMessages : TaskManager相關的message定義

當然不止這麼多訊息,還有垂直劃分的幾種定義,比如:RegistrationMessages用於定義TaskManagerJobManager相關的register訊息。

下面我們看看在Java和Scala中,Flink實現的actor的基類。

基類FlinkUntypedActor

在Akka提供的Java lib中,實現一個actor通常是靠繼承UntypedActor來實現。FlinkUntypedActor也不例外。繼承自UntypedActor的類,通常要覆蓋onReceive方法,該方法的完整簽名如下:

    public final void onReceive(Object message) throws Exception {}

然後,通常在這個方法裡會判斷具體的訊息型別,根據不同的訊息型別來實現不同的處理邏輯。而在FlinkUntypedActor類中,它先對訊息進行一輪驗證,過濾掉非法的訊息後,再處理各種訊息的型別。驗證主要是比對sessionID是否合法(即是否等同於leader session id),然後才會呼叫核心處理邏輯方法handleMessage。該方法是抽象方法,有待子類具體實現,目前只有涉及到JobClient處理的JobClientActor類繼承了該類。

由scala實現的FlinkActor幾乎具有相同的語義,這裡不再囉嗦。

總結

本篇主要介紹了Akka,並對Akka在Flink中的使用進行了大致的介紹。其實,就原始碼而言倒沒有太多值得關注的地方,主要還是三個分散式元件之間的通訊/協同邏輯,下篇我們會談這方面的話題。

apache_flink_weichat