1. 程式人生 > >Flink 原理與實現:架構和拓撲概覽

Flink 原理與實現:架構和拓撲概覽

要了解一個系統,一般都是從架構開始。我們關心的問題是:系統部署成功後各個節點都啟動了哪些服務,各個服務之間又是怎麼互動和協調的。下方是 Flink 叢集啟動後架構圖。

當 Flink 叢集啟動後,首先會啟動一個 JobManger 和一個或多個的 TaskManager。由 Client 提交任務給 JobManager,JobManager 再排程任務到各個 TaskManager 去執行,然後 TaskManager 將心跳和統計資訊彙報給 JobManager。TaskManager 之間以流的形式進行資料的傳輸。上述三者均為獨立的 JVM 程序。

  • Client 為提交 Job 的客戶端,可以是執行在任何機器上(與 JobManager 環境連通即可)。提交 Job 後,Client 可以結束程序(Streaming的任務),也可以不結束並等待結果返回。
  • JobManager 主要負責排程 Job 並協調 Task 做 checkpoint,職責上很像 Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包等資源後,會生成優化後的執行計劃,並以 Task 的單元排程到各個 TaskManager 去執行。
  • TaskManager 在啟動的時候就設定好了槽位數(Slot),每個 slot 能啟動一個 Task,Task 為執行緒。從 JobManager 處接收需要部署的 Task,部署啟動後,與自己的上游建立 Netty 連線,接收資料並處理。

可以看到 Flink 的任務排程是多執行緒模型,並且不同Job/Task混合在一個 TaskManager 程序中。雖然這種方式可以有效提高 CPU 利用率,但是個人不太喜歡這種設計,因為不僅缺乏資源隔離機制,同時也不方便除錯。類似 Storm 的程序模型,一個JVM 中只跑該 Job 的 Tasks 實際應用中更為合理。

Job 例子

本文所示例子為 flink-1.0.x 版本

我們使用 Flink 自帶的 examples 包中的 SocketTextStreamWordCount ,這是一個從 socket 流中統計單詞出現次數的例子。

  • 首先,使用 netcat 啟動本地伺服器:

    $ nc -l 9000
    
  • 然後提交 Flink 程式

    $ bin/flink run examples/streaming/SocketTextStreamWordCount.jar \
      --hostname 10.218.130.9 \
      --port 9000
    

在netcat端輸入單詞並監控 taskmanager 的輸出可以看到單詞統計的結果。

SocketTextStreamWordCount 的具體程式碼如下:

public static void main(String[] args) throws Exception {
  // 檢查輸入
  final ParameterTool params = ParameterTool.fromArgs(args);
  ...

  // set up the execution environment
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  // get input data
  DataStream<String> text =
      env.socketTextStream(params.get("hostname"), params.getInt("port"), '\n', 0);

  DataStream<Tuple2<String, Integer>> counts =
      // split up the lines in pairs (2-tuples) containing: (word,1)
      text.flatMap(new Tokenizer())
          // group by the tuple field "0" and sum up tuple field "1"
          .keyBy(0)
          .sum(1);
  counts.print();
  
  // execute program
  env.execute("WordCount from SocketTextStream Example");
}

我們將最後一行程式碼 env.execute 替換成 System.out.println(env.getExecutionPlan()); 並在本地執行該程式碼(併發度設為2),可以得到該拓撲的邏輯執行計劃圖的 JSON 串,將該 JSON 串貼上到 http://flink.apache.org/visualizer/ 中,能視覺化該執行圖。

但這並不是最終在 Flink 中執行的執行圖,只是一個表示拓撲節點關係的計劃圖,在 Flink 中對應了 SteramGraph。另外,提交拓撲後(併發度設為2)還能在 UI 中看到另一張執行計劃圖,如下所示,該圖對應了 Flink 中的 JobGraph。

Graph

看起來有點亂,怎麼有這麼多不一樣的圖。實際上,還有更多的圖。Flink 中的執行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖。

  • StreamGraph: 是根據使用者通過 Stream API 編寫的程式碼生成的最初的圖。用來表示程式的拓撲結構。
  • JobGraph: StreamGraph經過優化後生成了 JobGraph,提交給 JobManager 的資料結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少資料在節點之間流動所需要的序列化/反序列化/傳輸消耗。
  • ExecutionGraph: JobManager 根據 JobGraph 生成的分散式執行圖,是排程層最核心的資料結構。
  • 物理執行圖: JobManager 根據 ExecutionGraph 對 Job 進行排程後,在各個TaskManager 上部署 Task 後形成的“圖”,並不是一個具體的資料結構。

例如上文中的 2個併發度的 SocketTextStreamWordCount 四層執行圖的演變過程如下圖所示(點選檢視大圖):

這裡對一些名詞進行簡單的解釋。

  • StreamGraph: 根據使用者通過 Stream API 編寫的程式碼生成的最初的圖。
    • StreamNode:用來代表 operator 的類,並具有所有相關的屬性,如併發度、入邊和出邊等。
    • StreamEdge:表示連線兩個StreamNode的邊。
  • JobGraph: StreamGraph經過優化後生成了 JobGraph,提交給 JobManager 的資料結構。
    • JobVertex:經過優化後符合條件的多個StreamNode可能會chain在一起生成一個JobVertex,即一個JobVertex包含一個或多個operator,JobVertex的輸入是JobEdge,輸出是IntermediateDataSet。
    • IntermediateDataSet:表示JobVertex的輸出,即經過operator處理產生的資料集。producer是JobVertex,consumer是JobEdge。
    • JobEdge:代表了job graph中的一條資料傳輸通道。source 是 IntermediateDataSet,target 是 JobVertex。即資料通過JobEdge由IntermediateDataSet傳遞給目標JobVertex。
  • ExecutionGraph: JobManager 根據 JobGraph 生成的分散式執行圖,是排程層最核心的資料結構。
    • ExecutionJobVertex:和JobGraph中的JobVertex一一對應。每一個ExecutionJobVertex都有和併發度一樣多的 ExecutionVertex。
    • ExecutionVertex:表示ExecutionJobVertex的其中一個併發子任務,輸入是ExecutionEdge,輸出是IntermediateResultPartition。
    • IntermediateResult:和JobGraph中的IntermediateDataSet一一對應。每一個IntermediateResult有與下游ExecutionJobVertex相同併發數的IntermediateResultPartition。
    • IntermediateResultPartition:表示ExecutionVertex的一個輸出分割槽,producer是ExecutionVertex,consumer是若干個ExecutionEdge。
    • ExecutionEdge:表示ExecutionVertex的輸入,source是IntermediateResultPartition,target是ExecutionVertex。source和target都只能是一個。
    • Execution:是執行一個 ExecutionVertex 的一次嘗試。當發生故障或者資料需要重算的情況下 ExecutionVertex 可能會有多個 ExecutionAttemptID。一個 Execution 通過 ExecutionAttemptID 來唯一標識。JM和TM之間關於 task 的部署和 task status 的更新都是通過 ExecutionAttemptID 來確定訊息接受者。
  • 物理執行圖: JobManager 根據 ExecutionGraph 對 Job 進行排程後,在各個TaskManager 上部署 Task 後形成的“圖”,並不是一個具體的資料結構。
    • Task:Execution被排程後在分配的 TaskManager 中啟動對應的 Task。Task 包裹了具有使用者執行邏輯的 operator。
    • ResultPartition:代表由一個Task的生成的資料,和ExecutionGraph中的IntermediateResultPartition一一對應。
    • ResultSubpartition:是ResultPartition的一個子分割槽。每個ResultPartition包含多個ResultSubpartition,其數目要由下游消費 Task 數和 DistributionPattern 來決定。
    • InputGate:代表Task的輸入封裝,和JobGraph中JobEdge一一對應。每個InputGate消費了一個或多個的ResultPartition。
    • InputChannel:每個InputGate會包含一個以上的InputChannel,和ExecutionGraph中的ExecutionEdge一一對應,也和ResultSubpartition一對一地相連,即一個InputChannel接收一個ResultSubpartition的輸出。

後續的文章,將會詳細介紹 Flink 是如何生成這些執行圖的。主要有以下內容:

  1. 如何生成 StreamGraph
  2. 如何生成 JobGraph
  3. 如何生成 ExecutionGraph
  4. 如何進行排程(如何生成物理執行圖)