1. 程式人生 > >Flink架構、原理與部署測試

Flink架構、原理與部署測試

Apache Flink是一個面向分散式資料流處理和批量資料處理的開源計算平臺,它能夠基於同一個Flink執行時,提供支援流處理和批處理兩種型別應用的功能。

現有的開源計算方案,會把流處理和批處理作為兩種不同的應用型別,因為它們所提供的SLA(Service-Level-Aggreement)是完全不相同的:流處理一般需要支援低延遲、Exactly-once保證,而批處理需要支援高吞吐、高效處理。

Flink從另一個視角看待流處理和批處理,將二者統一起來:Flink是完全支援流處理,也就是說作為流處理看待時輸入資料流是無界的;批處理被作為一種特殊的流處理,只是它的輸入資料流被定義為有界的

Flink流處理特性:

  1. 支援高吞吐、低延遲、高效能的流處理
  2. 支援帶有事件時間的視窗(Window)操作
  3. 支援有狀態計算的Exactly-once語義
  4. 支援高度靈活的視窗(Window)操作,支援基於time、count、session,以及data-driven的視窗操作
  5. 支援具有Backpressure功能的持續流模型
  6. 支援基於輕量級分散式快照(Snapshot)實現的容錯
  7. 一個執行時同時支援Batch on Streaming處理和Streaming處理
  8. Flink在JVM內部實現了自己的記憶體管理
  9. 支援迭代計算
  10. 支援程式自動優化:避免特定情況下Shuffle、排序等昂貴操作,中間結果有必要進行快取

一、架構

Flink以層級式系統形式元件其軟體棧,不同層的棧建立在其下層基礎上,並且各層接受程式不同層的抽象形式。

  1. 執行時層以JobGraph形式接收程式。JobGraph即為一個一般化的並行資料流圖(data flow),它擁有任意數量的Task來接收和產生data stream。
  2. DataStream API和DataSet API都會使用單獨編譯的處理方式生成JobGraph。DataSet API使用optimizer來決定針對程式的優化方法,而DataStream API則使用stream builder來完成該任務。
  3. 在執行JobGraph時,Flink提供了多種候選部署方案(如local,remote,YARN等)。
  4. Flink附隨了一些產生DataSet或DataStream API程式的的類庫和API:處理邏輯表查詢的Table,機器學習的FlinkML,影象處理的Gelly,複雜事件處理的CEP。

二、原理

Flink程式是由Stream和Transformation這兩個基本構建塊組成,其中Stream是一箇中間結果資料,而Transformation是一個操作,它對一個或多個輸入Stream進行計算處理,輸出一個或多個結果Stream。

Flink程式被執行的時候,它會被對映為Streaming Dataflow。一個Streaming Dataflow是由一組Stream和Transformation Operator組成,它類似於一個DAG圖,在啟動的時候從一個或多個Source Operator開始,結束於一個或多個Sink Operator。

2. 並行資料流

一個Stream可以被分成多個Stream分割槽(Stream Partitions),一個Operator可以被分成多個Operator Subtask,每一個Operator Subtask是在不同的執行緒中獨立執行的。一個Operator的並行度,等於Operator Subtask的個數,一個Stream的並行度總是等於生成它的Operator的並行度。

One-to-one模式
比如從Source[1]到map()[1],它保持了Source的分割槽特性(Partitioning)和分割槽內元素處理的有序性,也就是說map()[1]的Subtask看到資料流中記錄的順序,與Source[1]中看到的記錄順序是一致的。

Redistribution模式
這種模式改變了輸入資料流的分割槽,比如從map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的Subtask向下遊的多個不同的Subtask傳送資料,改變了資料流的分割槽,這與實際應用所選擇的Operator有關係。

3. 任務、操作符鏈

Flink分散式執行環境中,會將多個Operator Subtask串起來組成一個Operator Chain,實際上就是一個執行鏈,每個執行鏈會在TaskManager上一個獨立的執行緒中執行。

4. 時間

處理Stream中的記錄時,記錄中通常會包含各種典型的時間欄位:

  1. Event Time:表示事件建立時間
  2. Ingestion Time:表示事件進入到Flink Dataflow的時間
  3. Processing Time:表示某個Operator對事件進行處理的本地系統時間

Flink使用WaterMark衡量時間的時間,WaterMark攜帶時間戳t,並被插入到stream中。

  1. WaterMark的含義是所有時間t'< t的事件都已經發生。
  2. 針對亂序的的流,WaterMark至關重要,這樣可以允許一些事件到達延遲,而不至於過於影響window視窗的計算。
  3. 並行資料流中,當Operator有多個輸入流時,Operator的event time以最小流event time為準。

5. 視窗

Flink支援基於時間視窗操作,也支援基於資料的視窗操作:

視窗分類:

  1. 按分割標準劃分:timeWindow、countWindow
  2. 按視窗行為劃分:Tumbling Window、Sliding Window、自定義視窗

Tumbling/Sliding Time Window

// Stream of (sensorId, carCnt)
val vehicleCnts: DataStream[(Int, Int)] = ...

val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts
  // key stream by sensorId
  .keyBy(0) 
  // tumbling time window of 1 minute length
  .timeWindow(Time.minutes(1))
  // compute sum over carCnt
  .sum(1) 

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
  .keyBy(0) 
  // sliding time window of 1 minute length and 30 secs trigger interval
  .timeWindow(Time.minutes(1), Time.seconds(30))
  .sum(1)

Tumbling/Sliding Count Window

// Stream of (sensorId, carCnt)
val vehicleCnts: DataStream[(Int, Int)] = ...

val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts
  // key stream by sensorId
  .keyBy(0)
  // tumbling count window of 100 elements size
  .countWindow(100)
  // compute the carCnt sum 
  .sum(1)

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
  .keyBy(0)
  // sliding count window of 100 elements size and 10 elements trigger interval
  .countWindow(100, 10)
  .sum(1)

自定義視窗

基本操作:

  1. window:建立自定義視窗
  2. trigger:自定義觸發器
  3. evictor:自定義evictor
  4. apply:自定義window function

6. 容錯

Barrier機制:

  1. 出現一個Barrier,在該Barrier之前出現的記錄都屬於該Barrier對應的Snapshot,在該Barrier之後出現的記錄屬於下一個Snapshot。
  2. 來自不同Snapshot多個Barrier可能同時出現在資料流中,也就是說同一個時刻可能併發生成多個Snapshot。
  3. 當一箇中間(Intermediate)Operator接收到一個Barrier後,它會發送Barrier到屬於該Barrier的Snapshot的資料流中,等到Sink Operator接收到該Barrier後會向Checkpoint Coordinator確認該Snapshot,直到所有的Sink Operator都確認了該Snapshot,才被認為完成了該Snapshot。

對齊:

當Operator接收到多個輸入的資料流時,需要在Snapshot Barrier中對資料流進行排列對齊:

  1. Operator從一個incoming Stream接收到Snapshot Barrier n,然後暫停處理,直到其它的incoming Stream的Barrier n(否則屬於2個Snapshot的記錄就混在一起了)到達該Operator
  2. 接收到Barrier n的Stream被臨時擱置,來自這些Stream的記錄不會被處理,而是被放在一個Buffer中。
  3. 一旦最後一個Stream接收到Barrier n,Operator會emit所有暫存在Buffer中的記錄,然後向Checkpoint Coordinator傳送Snapshot n。
  4. 繼續處理來自多個Stream的記錄

基於Stream Aligning操作能夠實現Exactly Once語義,但是也會給流處理應用帶來延遲,因為為了排列對齊Barrier,會暫時快取一部分Stream的記錄到Buffer中,尤其是在資料流並行度很高的場景下可能更加明顯,通常以最遲對齊Barrier的一個Stream為處理Buffer中快取記錄的時刻點。在Flink中,提供了一個開關,選擇是否使用Stream Aligning,如果關掉則Exactly Once會變成At least once。

CheckPoint:
Snapshot並不僅僅是對資料流做了一個狀態的Checkpoint,它也包含了一個Operator內部所持有的狀態,這樣才能夠在保證在流處理系統失敗時能夠正確地恢復資料流處理。狀態包含兩種:

  1. 系統狀態:一個Operator進行計算處理的時候需要對資料進行緩衝,所以資料緩衝區的狀態是與Operator相關聯的。以視窗操作的緩衝區為例,Flink系統會收集或聚合記錄資料並放到緩衝區中,直到該緩衝區中的資料被處理完成。
  2. 一種是使用者自定義狀態(狀態可以通過轉換函式進行建立和修改),它可以是函式中的Java物件這樣的簡單變數,也可以是與函式相關的Key/Value狀態。

7. 排程

在JobManager端,會接收到Client提交的JobGraph形式的Flink Job,JobManager會將一個JobGraph轉換對映為一個ExecutionGraph,ExecutionGraph是JobGraph的並行表示,也就是實際JobManager排程一個Job在TaskManager上執行的邏輯檢視。

物理上進行排程,基於資源的分配與使用的一個例子:

  1. 左上子圖:有2個TaskManager,每個TaskManager有3個Task Slot
  2. 左下子圖:一個Flink Job,邏輯上包含了1個data source、1個MapFunction、1個ReduceFunction,對應一個JobGraph
  3. 左下子圖:使用者提交的Flink Job對各個Operator進行的配置——data source的並行度設定為4,MapFunction的並行度也為4,ReduceFunction的並行度為3,在JobManager端對應於ExecutionGraph
  4. 右上子圖:TaskManager 1上,有2個並行的ExecutionVertex組成的DAG圖,它們各佔用一個Task Slot
  5. 右下子圖:TaskManager 2上,也有2個並行的ExecutionVertex組成的DAG圖,它們也各佔用一個Task Slot
  6. 在2個TaskManager上執行的4個Execution是並行執行的

8. 迭代

機器學習和圖計算應用,都會使用到迭代計算,Flink通過在迭代Operator中定義Step函式來實現迭代演算法,這種迭代演算法包括Iterate和Delta Iterate兩種型別。

Iterate

Iterate Operator是一種簡單的迭代形式:每一輪迭代,Step函式的輸入或者是輸入的整個資料集,或者是上一輪迭代的結果,通過該輪迭代計算出下一輪計算所需要的輸入(也稱為Next Partial Solution),滿足迭代的終止條件後,會輸出最終迭代結果。

流程虛擬碼:

IterationState state = getInitialState();

while (!terminationCriterion()) {
    state = step(state);
}

setFinalState(state);

Delta Iterate

Delta Iterate Operator實現了增量迭代。

流程虛擬碼:

IterationState workset = getInitialState();
IterationState solution = getInitialSolution();

while (!terminationCriterion()) {
    (delta, workset) = step(workset, solution);

    solution.update(delta)
}

setFinalState(solution);

最小值傳播:

流處理系統中,當下遊Operator處理速度跟不上的情況,如果下游Operator能夠將自己處理狀態傳播給上游Operator,使得上游Operator處理速度慢下來就會緩解上述問題,比如通過告警的方式通知現有流處理系統存在的問題。

Flink Web介面上提供了對執行Job的Backpressure行為的監控,它通過使用Sampling執行緒對正在執行的Task進行堆疊跟蹤取樣來實現。

預設情況下,JobManager會每間隔50ms觸發對一個Job的每個Task依次進行100次堆疊跟蹤呼叫,過計算得到一個比值,例如,radio=0.01,表示100次中僅有1次方法呼叫阻塞。Flink目前定義瞭如下Backpressure狀態:
OK: 0 <= Ratio <= 0.10
LOW: 0.10 < Ratio <= 0.5
HIGH: 0.5 < Ratio <= 1

三、庫

1. Table

Flink的Table API實現了使用類SQL進行流和批處理。

2. CEP

Flink的CEP(Complex Event Processing)支援在流中發現複雜的事件模式,快速篩選使用者感興趣的資料。

3. Gelly

Gelly是Flink提供的圖計算API,提供了簡化開發和構建圖計算分析應用的介面。

FlinkML是Flink提供的機器學習庫,提供了可擴充套件的機器學習演算法、簡潔的API和工具簡化機器學習系統的開發。

四、部署

當Flink系統啟動時,首先啟動JobManager和一至多個TaskManager。JobManager負責協調Flink系統,TaskManager則是執行並行程式的worker。當系統以本地形式啟動時,一個JobManager和一個TaskManager會啟動在同一個JVM中。
當一個程式被提交後,系統會建立一個Client來進行預處理,將程式轉變成一個並行資料流的形式,交給JobManager和TaskManager執行。

1. 啟動測試

編譯flink,本地啟動。

$ java -version
java version "1.8.0_111"
$ git clone https://github.com/apache/flink.git
$ git checkout release-1.1.4 -b release-1.1.4
$ cd flink
$ mvn clean package -DskipTests
$ cd flink-dist/target/flink-1.1.4-bin/flink-1.1.4
$ ./bin/start-local.sh

編寫本地流處理demo。

SocketWindowWordCount.java

public class SocketWindowWordCount {
    publicstaticvoidmain(String[] args) throws Exception {

        // the port to connect to
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
            return;
        }

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream("localhost", port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    publicvoidflatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // Data type for words with count
    public static class WordWithCount {

        public String word;
        public long count;

        publicWordWithCount() {}

        publicWordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

pom.xml

<!-- Use this dependency if you are using the DataStream API -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.10</artifactId>
    <version>1.1.4</version>
</dependency>
<!-- Use this dependency if you are using the DataSet API -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.1.4</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.10</artifactId>
    <version>1.1.4</version>
</dependency>

執行mvn構建。

$ mvn clean install
$ ls target/flink-demo-1.0-SNAPSHOT.jar

開啟9000埠,用於輸入資料:

$ nc -l 9000

提交flink任務:

$ ./bin/flink run -c com.demo.florian.WordCount  $DEMO_DIR/target/flink-demo-1.0-SNAPSHOT.jar --port 9000

在nc裡輸入資料後,檢視執行結果:

$ tail -f log/flink-*-jobmanager-*.out

檢視flink web頁面:localhost:8081

2. 程式碼結構

Flink系統核心可分為多個子專案。分割專案旨在減少開發Flink程式需要的依賴數量,並對測試和開發小元件提供便捷。

Flink當前還包括以下子專案:

  1. Flink-dist:distribution專案。它定義瞭如何將編譯後的程式碼、指令碼和其他資源整合到最終可用的目錄結構中。
  2. Flink-quick-start:有關quickstart和教程的指令碼、maven原型和示例程式
  3. flink-contrib:一系列有使用者開發的早起版本和有用的工具的專案。後期的程式碼主要由外部貢獻者繼續維護,被flink-contirb接受的程式碼的要求低於其他專案的要求。

Flink在YARN叢集上執行時:Flink YARN Client負責與YARN RM通訊協商資源請求,Flink JobManager和Flink TaskManager分別申請到Container去執行各自的程序。

YARN AM與Flink JobManager在同一個Container中,這樣AM可以知道Flink JobManager的地址,從而AM可以申請Container去啟動Flink TaskManager。待Flink成功執行在YARN叢集上,Flink YARN Client就可以提交Flink Job到Flink JobManager,並進行後續的對映、排程和計算處理。

  1. 設定Hadoop環境變數
$ export HADOOP_CONF_DIR=/etc/hadoop/conf
  1. 以叢集模式提交任務,每次都會新建flink叢集
$ ./bin/flink run -m yarn-cluster -c com.demo.florian.WordCount  $DEMO_DIR/target/flink-demo-1.0-SNAPSHOT.jar
  1. 啟動共享flink叢集,提交任務
$ ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096 -d
$ ./bin/flink run -c com.demo.florian.WordCount $DEMO_DIR/target/flink-demo-1.0.SNAPSHOT.jar