1. 程式人生 > >Apache Flink流作業提交流程分析

Apache Flink流作業提交流程分析

提交流程呼叫的關鍵方法鏈

使用者編寫的程式邏輯需要提交給Flink才能得到執行。本文來探討一下客戶程式如何提交給Flink。鑑於使用者將自己利用Flink的API編寫的邏輯打成相應的應用程式包(比如Jar)然後提交到一個目標Flink叢集上去執行是比較主流的使用場景,因此我們的分析也基於這一場景進行。

Flink的API針對不同的執行環境有不同的Environment物件,這裡我們主要基於常用的RemoteStreamEnvironmentRemoteEnvironment進行分析

在前面我們談到了Flink中實現了“惰性求值”,只有當最終呼叫execute方法時,才會“真正”開始執行。因此,execute

方法是我們的切入點。

其原始碼位於org.apache.flink.streaming.api.environment.RemoteStreamEnvironment

首先,我們來看一下其execute方法觸發的關鍵方法呼叫鏈示意圖:

streaming-remoge-execute-method-chain

根據上圖的呼叫鏈,我們針對這些關鍵方法進行剖析,當然一些細節性的內容我們可能會暫時略過,這樣可以保證主路徑一直都很清晰。

getStreamGraph方法用於獲得一個StreamGraph的例項,該例項表示流的完整的拓撲結構並且包含了生成JobGraph所必要的相關資訊(包含了sourcesink的集合以及這些在圖中的“節點”抽象化的表示、一些虛擬的對映關係、執行和檢查點的配置等)。

獲得StreamGraph之後,通過呼叫executeRemotely方法進行遠端執行。該方法首先根據獲取到的使用者程式包的路徑以及類路徑建立載入使用者程式碼的類載入器:

ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(
jarFiles, globalClasspaths,   getClass().getClassLoader());

緊接著根據配置構建Client物件(Client物件是真正跟JobManager對接的內部代理):

Client client;
try {   
    client
= new Client(configuration); client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled()); }catch (Exception e) { throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e); }

後面的事情就此被Client接管:

try {   
    return client.runBlocking(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader);
}catch (ProgramInvocationException e) {   
    throw e;
}catch (Exception e) {   
    String term = e.getMessage() == null ? "." : (": " + e.getMessage());   
    throw new ProgramInvocationException("The program execution failed" + term, e);
}finally {   
    client.shutdown();
}

client物件呼叫了runBlocking以阻塞式的行為“執行”使用者程式並等待返回JobExecutionResult物件作為Job的執行結果。執行完成,最終在finally塊中,呼叫shutdown方法關閉並釋放資源。

runBlocking被呼叫後,呼叫鏈跳轉到Client類中。為了適配多種提交方式以及執行模式,runBlocking方法有著非常多的過載。在當前的遠端執行環境下,runBlocking在多個過載方法之間跳轉的過程中,會呼叫getJobGraph方法獲得JobGraph的例項。JobGraph表示Flink dataflow 程式,它將會被JobManager所理解並接收。在某個Job被提交給JobManager之前,通過Flink提供的高層次的API都將會被轉化為JobGraph表示。關於如何獲得JobGraph的實現,我們後面會進行剖析。這裡,讓我們忽視這些細節,進入下一個關鍵方法。

runBlocking_1其實是runBlocking方法的過載,這裡加一個字尾標識,只是為了跟上面的runBlocking進行區別。runBlocking_1方法中,首先利用LeaderRetrievalUtils建立了LeaderRetrievalService這一服務物件:

LeaderRetrievalService leaderRetrievalService;
try {   
    leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
} catch (Exception e) {   
    throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
}

顧名思義,LeaderRetrievalService在Flink中提供查詢主節點的服務。它會根據Flink的配置資訊(主要是recovery.mode來判斷基於哪種恢復機制來建立該服務。當前有兩種模式:一種是Standalone的獨立執行模式;另一種是基於Zookeeper的高可用模式)。Flink提供了一個稱之為LeaderRetrievalListener的回撥介面來獲得主節點的資訊。接下來,就是呼叫JobClientsubmitJobAndWait方法將產生的JobGraph以及主節點查詢的服務物件等相關資訊提交給JobManager並等待返回結果:

try {   
    this.lastJobID = jobGraph.getJobID();   
    return JobClient.submitJobAndWait(actorSystem, leaderRetrievalService, jobGraph, 
                                        timeout, printStatusDuringExecution, classLoader);
} catch (JobExecutionException e) {   
    throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
}

上面的submitJobAndWait方法的第一個引數actorSystemActorSystem的例項。在構造Client物件時建立,在Job提交併獲得返回結果後通過呼叫Clientshutdown方法關閉:

public void shutdown() {   
    if (!this.actorSystem.isTerminated()) {      
        this.actorSystem.shutdown();      
        this.actorSystem.awaitTermination();   
    }
}

該方法的呼叫見上面executeRemotely方法的程式碼段的finally語句塊。

JobClient的出現可能會讓你產生疑惑——它跟Client是什麼關係?作用是什麼?下面這幅示意圖可以用來解釋這些疑問:

Client-JobClient-relationship

上面這幅圖展示了Client物件與其他幾個物件的關係。JobClient在其中起到了“橋接”作用,它在基於API的程式設計層面上橋接了同步的方法呼叫和非同步的訊息通訊。更具體得說,JobClient可以看做是一個“靜態類”提供了一些靜態方法,這裡我們主要關注上面的submitJobAndWait方法,該方法內部封裝了Actor之間的非同步通訊(具體的通訊物件是JobClientActor,它負責跟JobManagerActorSystemActor進行通訊),並以阻塞的形式返回結果。而Client只需呼叫JobClient的這些方法,而無需關注其內部是如何實現的。

通過呼叫JobClient的靜態方法submitJobAndWait,會觸發基於AkkaActor之間的訊息通訊來完成後續的提交JobGraph的動作。JobClient提交Job的基於訊息互動的抽象示意圖如下:

JobClient-Actor-SubmitJobGraph

總體來說這裡總共有兩個ActorSystem,一個歸屬於Client,另一個歸屬於JobManager。在submitJobAndWait方法中,其首先會建立一個JobClientActorActorRef

ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);

然後向其發起一個SubmitJobAndWait訊息,該訊息將JobGraph的例項提交給jobClientActor。該訊息的發起模式是ask,它表示需要一個應答訊息。

JobClient向JobClientActor傳送訊息的程式碼段如下所示:

Future<Object> future = Patterns.ask(jobClientActor,      
                                     new JobClientMessages.SubmitJobAndWait(jobGraph),      
                                     new Timeout(AkkaUtils.INF_TIMEOUT()));
answer = Await.result(future, AkkaUtils.INF_TIMEOUT());

JobClient會阻塞等待該future返回結果。在得到返回結果answer之後,先進行解析判斷它是Job被成功執行返回的結果還是失敗返回的結果。

小結

至此,Client提交Streaming Job的關鍵方法呼叫路徑已梳理完成。這裡為了突出主路線,同時避免被太多的實現細節干擾,我們暫時忽略了一些重要資料結構和關鍵概念的解讀。不過,後續我們會對它們進行分析。

微信掃碼關注公眾號:Apache_Flink

apache_flink_weichat

QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)

qrcode_for_apache_flink_qq_group

相關推薦

Apache Flink作業提交流程分析

提交流程呼叫的關鍵方法鏈 使用者編寫的程式邏輯需要提交給Flink才能得到執行。本文來探討一下客戶程式如何提交給Flink。鑑於使用者將自己利用Flink的API編寫的邏輯打成相應的應用程式包(比如Jar)然後提交到一個目標Flink叢集上去執行是比較主流的使

hdfs 檔案提交與mr作業提交流程分析

看了不少東西,想把他們總結出來,若有錯誤還望不吝賜教 在hdfs架構中可以吧Client認為是送貨人,nn是倉庫管理員,dn是一個個倉庫。當客戶端要送貨(檔案)的時候,進行如下步驟 1.送貨員通過rpc通訊告知倉庫管理員(nn)說我這裡有貨物,請告訴我放在哪裡啊 2.倉

FlinkFlink作業排程流程分析

1. 概述 當向Flink叢集提交使用者作業時,從使用者角度看,只需要作業處理邏輯正確,輸出正確的結果即可;而不用關心作業何時被排程的,作業申請的資源又是如何被分配的以及作業何時會結束;但是瞭解作業在執行時的具體行為對於我們深入瞭解Flink原理有非常大的幫助,並且對我們如何編寫更合理的作業邏輯有指導意義,因

YARN作業提交流程剖析

容器 其他 lin 角色 機架 接受 cati -a let YARN(MapReduce2) Yet Another Resource Negotiator / YARN Application Resource Negotiator對於節點數超出4000的大型集群,Ma

Apache Flink處理(一)

Apache Flink是一個分散式流處理器,它使用直接且富有表現力的API來實現有狀態的流處理應用程式。它以容錯的方式高效地大規模執行此類應用程式。Flink於2014年4月加入Apache軟體基金會作為孵化專案,並於2015年1月成為頂級專案。從一開始,Flink就有一個非常活躍且不斷增

Apache Flink處理(二)

到目前為止,您已經瞭解了流處理如何解決傳統批處理的侷限性,以及它如何支援新的應用程式和體系結構。您已經熟悉了開源的流處理空間的演變,並對Flink流應用程式有了簡單的瞭解。在這一章,你將進入流世界中,並得到本書本書剩下部分所必要的基礎知識。 這一章仍然與Flink無關。它的目標是介紹流處

回顧2016--Apache Flink處理在生產中的實踐

從2016年4月底開始接觸Flink,到現在已經8個多月了。從瞭解到熟悉,再到實際開發,這個過程就是我從0到實際開發使用Flink的過程。 上週,我們的Flink流計算程式終於上線了。也算是在實時流計算方面的一個成果。 下面,我將簡要介紹下公司如何使用Fli

MapReduce作業提交流程

Job Submission 1.客戶端呼叫job.submit方法提交作業,該方法內部建立一個JobSubmitter物件例項,該例項物件呼叫submitJobInternal方法提交作業。當作業成功提交後,客戶端呼叫的waitForCompletion方法

Spark(三)————作業提交流程

1、Spark核心API [SparkContext] 連線到spark叢集,入口點. [RDD] 它是一個分割槽的集合. 彈性分散式資料集. 不可變的資料分割槽集合. 基本操作(map filter , persist) 分割槽列表

spark入門之二 spark作業提交流程

spark作業提交流程   在執行Spar應用程式時,會將Spark應用程式打包後使用spark-submit指令碼提交到Spark中執行,執行提交命令如下: ./bin/spark-submit examples/src/main/r/dataframe. 1.1為弄清

Apache Flink 處理例項

維基百科在 IRC 頻道上記錄 Wiki 被修改的日誌,我們可以通過監聽這個 IRC 頻道,來實時監控給定時間視窗內的修改事件。Apache Flink 作為流計算引擎,非常適合處理流資料,並且,類似於 Hadoop MapReduce 等框架,Flink 提供了非常良好的抽象,使得業務邏輯程式碼編寫非常簡單

Flink on Yarn模式啟動流程分析

cin XML images ont list action -i 多個 信息 此文已由作者嶽猛授權網易雲社區發布。歡迎訪問網易雲社區,了解更多網易技術產品運營經驗。Flink On Yarn 架構Paste_Image.png前提條件首先需要配置YARN_CONF_DIR

Apache Flink-資料之上的有狀態的計算

官網給出的Flink應用場景圖: 1.狀態計算。 2.從上圖看出的應用場景有? -----事件驅動式的流處理 -----ETL管道 -----資料分析 3.一般性架構 從架構上來說與一般的流式架構沒有太大的不同,任務排程和資源管理可以放在我們熟悉的yarn上進

Flink視頻教程_基於Flink處理的動態實時電商實時分析系統

分布 業務 電商分析 apr 進行 處理 密碼 教程 包括 Flink視頻教程_基於Flink流處理的動態實時電商實時分析系統 課程分享地址鏈接:https://pan.baidu.com/s/1cX7O-45y6yUPT4B-ACfliA 密碼:jqmk 在開始學習前給

Spark2.x原始碼分析---spark-submit提交流程

本文以spark on yarn的yarn-cluster模式進行原始碼解析,如有不妥之處,歡迎吐槽。 步驟1.spark-submit提交任務指令碼 spark-submit  --class 主類路徑 \ --master yarn \ --deploy-mode c

Flink處理過程的部分原理分析

文章目錄 前言 流的時間有序性保證 視窗序列對齊 流資料的容錯:Checkpoint機制 Barrier State 引用 前言 在分散式領域,計算和儲存一直是兩大子領域。很多分散式

數字影象處理第一次作業——JPEG格式與壓縮流程分析

歡迎閱讀 此篇部落格是由曹老師數字影象處理課程佈置的第一次作業(2018年9月16日) 作業內容: 分析JPEG格式、原理、壓縮流程、下載實現程式碼並調通執行、計算壓縮率。 此篇部落格以分析原理為主,在每個演算法之後會貼出對應的C語言程式碼。 本文程式碼使用的

Apache Flink 1.5.6 釋出,處理框架

   Apache Flink 1.5.6 釋出了,Flink 是一個流處理框架,應用於分散式、高效能、始終可用的與準確的資料流應用程式。 主要更新如下: [FLINK-4173] - flink-metrics 中用 maven-shade-plugin 替換

基於Flink處理的動態實時電商實時分析系統

在開始學習前給大家說下什麼是Flink? 1.Flink是一個針對流資料和批資料的分散式處理引擎,主要用Java程式碼實現。 2.Apache Flink作為Apache的頂級專案,Flink集眾多優點於一身,包括快速、可靠可擴充套件、完全相容Hadoop、使

Apache Flink處理中Window的概念

Apache Flink–DataStream–Window 什麼是Window?有哪些用途?  下面我們結合一個現實的例子來說明。 我們先提出一個問題:統計經過某紅綠燈的汽車數量之和?  假設在一個紅綠燈處,我們每隔15秒統計一次通過此紅綠燈的汽車數量,如下圖:   可