Flink架構、原理與部署測試
Apache Flink是一個面向分散式資料流處理和批量資料處理的開源計算平臺,它能夠基於同一個Flink執行時,提供支援流處理和批處理兩種型別應用的功能。
現有的開源計算方案,會把流處理和批處理作為兩種不同的應用型別,因為它們所提供的SLA(Service-Level-Aggreement)是完全不相同的:流處理一般需要支援低延遲、Exactly-once保證,而批處理需要支援高吞吐、高效處理。
Flink從另一個視角看待流處理和批處理,將二者統一起來:Flink是完全支援流處理,也就是說作為流處理看待時輸入資料流是無界的;批處理被作為一種特殊的流處理,只是它的輸入資料流被定義為有界的
Flink流處理特性:
- 支援高吞吐、低延遲、高效能的流處理
- 支援帶有事件時間的視窗(Window)操作
- 支援有狀態計算的Exactly-once語義
- 支援高度靈活的視窗(Window)操作,支援基於time、count、session,以及data-driven的視窗操作
- 支援具有Backpressure功能的持續流模型
- 支援基於輕量級分散式快照(Snapshot)實現的容錯
- 一個執行時同時支援Batch on Streaming處理和Streaming處理
- Flink在JVM內部實現了自己的記憶體管理
- 支援迭代計算
- 支援程式自動優化:避免特定情況下Shuffle、排序等昂貴操作,中間結果有必要進行快取
一、架構
Flink以層級式系統形式元件其軟體棧,不同層的棧建立在其下層基礎上,並且各層接受程式不同層的抽象形式。
- 執行時層以JobGraph形式接收程式。JobGraph即為一個一般化的並行資料流圖(data flow),它擁有任意數量的Task來接收和產生data stream。
- DataStream API和DataSet API都會使用單獨編譯的處理方式生成JobGraph。DataSet API使用optimizer來決定針對程式的優化方法,而DataStream API則使用stream builder來完成該任務。
- 在執行JobGraph時,Flink提供了多種候選部署方案(如local,remote,YARN等)。
- Flink附隨了一些產生DataSet或DataStream API程式的的類庫和API:處理邏輯表查詢的Table,機器學習的FlinkML,影象處理的Gelly,複雜事件處理的CEP。
二、原理
Flink程式是由Stream和Transformation這兩個基本構建塊組成,其中Stream是一箇中間結果資料,而Transformation是一個操作,它對一個或多個輸入Stream進行計算處理,輸出一個或多個結果Stream。
Flink程式被執行的時候,它會被對映為Streaming Dataflow。一個Streaming Dataflow是由一組Stream和Transformation Operator組成,它類似於一個DAG圖,在啟動的時候從一個或多個Source Operator開始,結束於一個或多個Sink Operator。
2. 並行資料流
一個Stream可以被分成多個Stream分割槽(Stream Partitions),一個Operator可以被分成多個Operator Subtask,每一個Operator Subtask是在不同的執行緒中獨立執行的。一個Operator的並行度,等於Operator Subtask的個數,一個Stream的並行度總是等於生成它的Operator的並行度。
One-to-one模式
比如從Source[1]到map()[1],它保持了Source的分割槽特性(Partitioning)和分割槽內元素處理的有序性,也就是說map()[1]的Subtask看到資料流中記錄的順序,與Source[1]中看到的記錄順序是一致的。
Redistribution模式
這種模式改變了輸入資料流的分割槽,比如從map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的Subtask向下遊的多個不同的Subtask傳送資料,改變了資料流的分割槽,這與實際應用所選擇的Operator有關係。
3. 任務、操作符鏈
Flink分散式執行環境中,會將多個Operator Subtask串起來組成一個Operator Chain,實際上就是一個執行鏈,每個執行鏈會在TaskManager上一個獨立的執行緒中執行。
4. 時間
處理Stream中的記錄時,記錄中通常會包含各種典型的時間欄位:
- Event Time:表示事件建立時間
- Ingestion Time:表示事件進入到Flink Dataflow的時間
- Processing Time:表示某個Operator對事件進行處理的本地系統時間
Flink使用WaterMark衡量時間的時間,WaterMark攜帶時間戳t,並被插入到stream中。
- WaterMark的含義是所有時間t'< t的事件都已經發生。
- 針對亂序的的流,WaterMark至關重要,這樣可以允許一些事件到達延遲,而不至於過於影響window視窗的計算。
- 並行資料流中,當Operator有多個輸入流時,Operator的event time以最小流event time為準。
5. 視窗
Flink支援基於時間視窗操作,也支援基於資料的視窗操作:
視窗分類:
- 按分割標準劃分:timeWindow、countWindow
- 按視窗行為劃分:Tumbling Window、Sliding Window、自定義視窗
Tumbling/Sliding Time Window
// Stream of (sensorId, carCnt)
val vehicleCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts
// key stream by sensorId
.keyBy(0)
// tumbling time window of 1 minute length
.timeWindow(Time.minutes(1))
// compute sum over carCnt
.sum(1)
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// sliding time window of 1 minute length and 30 secs trigger interval
.timeWindow(Time.minutes(1), Time.seconds(30))
.sum(1)
Tumbling/Sliding Count Window
// Stream of (sensorId, carCnt)
val vehicleCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts
// key stream by sensorId
.keyBy(0)
// tumbling count window of 100 elements size
.countWindow(100)
// compute the carCnt sum
.sum(1)
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// sliding count window of 100 elements size and 10 elements trigger interval
.countWindow(100, 10)
.sum(1)
自定義視窗
基本操作:
- window:建立自定義視窗
- trigger:自定義觸發器
- evictor:自定義evictor
- apply:自定義window function
6. 容錯
Barrier機制:
- 出現一個Barrier,在該Barrier之前出現的記錄都屬於該Barrier對應的Snapshot,在該Barrier之後出現的記錄屬於下一個Snapshot。
- 來自不同Snapshot多個Barrier可能同時出現在資料流中,也就是說同一個時刻可能併發生成多個Snapshot。
- 當一箇中間(Intermediate)Operator接收到一個Barrier後,它會發送Barrier到屬於該Barrier的Snapshot的資料流中,等到Sink Operator接收到該Barrier後會向Checkpoint Coordinator確認該Snapshot,直到所有的Sink Operator都確認了該Snapshot,才被認為完成了該Snapshot。
對齊:
當Operator接收到多個輸入的資料流時,需要在Snapshot Barrier中對資料流進行排列對齊:
- Operator從一個incoming Stream接收到Snapshot Barrier n,然後暫停處理,直到其它的incoming Stream的Barrier n(否則屬於2個Snapshot的記錄就混在一起了)到達該Operator
- 接收到Barrier n的Stream被臨時擱置,來自這些Stream的記錄不會被處理,而是被放在一個Buffer中。
- 一旦最後一個Stream接收到Barrier n,Operator會emit所有暫存在Buffer中的記錄,然後向Checkpoint Coordinator傳送Snapshot n。
- 繼續處理來自多個Stream的記錄
基於Stream Aligning操作能夠實現Exactly Once語義,但是也會給流處理應用帶來延遲,因為為了排列對齊Barrier,會暫時快取一部分Stream的記錄到Buffer中,尤其是在資料流並行度很高的場景下可能更加明顯,通常以最遲對齊Barrier的一個Stream為處理Buffer中快取記錄的時刻點。在Flink中,提供了一個開關,選擇是否使用Stream Aligning,如果關掉則Exactly Once會變成At least once。
CheckPoint:
Snapshot並不僅僅是對資料流做了一個狀態的Checkpoint,它也包含了一個Operator內部所持有的狀態,這樣才能夠在保證在流處理系統失敗時能夠正確地恢復資料流處理。狀態包含兩種:
- 系統狀態:一個Operator進行計算處理的時候需要對資料進行緩衝,所以資料緩衝區的狀態是與Operator相關聯的。以視窗操作的緩衝區為例,Flink系統會收集或聚合記錄資料並放到緩衝區中,直到該緩衝區中的資料被處理完成。
- 一種是使用者自定義狀態(狀態可以通過轉換函式進行建立和修改),它可以是函式中的Java物件這樣的簡單變數,也可以是與函式相關的Key/Value狀態。
7. 排程
在JobManager端,會接收到Client提交的JobGraph形式的Flink Job,JobManager會將一個JobGraph轉換對映為一個ExecutionGraph,ExecutionGraph是JobGraph的並行表示,也就是實際JobManager排程一個Job在TaskManager上執行的邏輯檢視。
物理上進行排程,基於資源的分配與使用的一個例子:
- 左上子圖:有2個TaskManager,每個TaskManager有3個Task Slot
- 左下子圖:一個Flink Job,邏輯上包含了1個data source、1個MapFunction、1個ReduceFunction,對應一個JobGraph
- 左下子圖:使用者提交的Flink Job對各個Operator進行的配置——data source的並行度設定為4,MapFunction的並行度也為4,ReduceFunction的並行度為3,在JobManager端對應於ExecutionGraph
- 右上子圖:TaskManager 1上,有2個並行的ExecutionVertex組成的DAG圖,它們各佔用一個Task Slot
- 右下子圖:TaskManager 2上,也有2個並行的ExecutionVertex組成的DAG圖,它們也各佔用一個Task Slot
- 在2個TaskManager上執行的4個Execution是並行執行的
8. 迭代
機器學習和圖計算應用,都會使用到迭代計算,Flink通過在迭代Operator中定義Step函式來實現迭代演算法,這種迭代演算法包括Iterate和Delta Iterate兩種型別。
Iterate
Iterate Operator是一種簡單的迭代形式:每一輪迭代,Step函式的輸入或者是輸入的整個資料集,或者是上一輪迭代的結果,通過該輪迭代計算出下一輪計算所需要的輸入(也稱為Next Partial Solution),滿足迭代的終止條件後,會輸出最終迭代結果。
流程虛擬碼:
IterationState state = getInitialState();
while (!terminationCriterion()) {
state = step(state);
}
setFinalState(state);
Delta Iterate
Delta Iterate Operator實現了增量迭代。
流程虛擬碼:
IterationState workset = getInitialState();
IterationState solution = getInitialSolution();
while (!terminationCriterion()) {
(delta, workset) = step(workset, solution);
solution.update(delta)
}
setFinalState(solution);
最小值傳播:
流處理系統中,當下遊Operator處理速度跟不上的情況,如果下游Operator能夠將自己處理狀態傳播給上游Operator,使得上游Operator處理速度慢下來就會緩解上述問題,比如通過告警的方式通知現有流處理系統存在的問題。
Flink Web介面上提供了對執行Job的Backpressure行為的監控,它通過使用Sampling執行緒對正在執行的Task進行堆疊跟蹤取樣來實現。
預設情況下,JobManager會每間隔50ms觸發對一個Job的每個Task依次進行100次堆疊跟蹤呼叫,過計算得到一個比值,例如,radio=0.01,表示100次中僅有1次方法呼叫阻塞。Flink目前定義瞭如下Backpressure狀態:
OK: 0 <= Ratio <= 0.10
LOW: 0.10 < Ratio <= 0.5
HIGH: 0.5 < Ratio <= 1
三、庫
1. Table
Flink的Table API實現了使用類SQL進行流和批處理。
2. CEP
Flink的CEP(Complex Event Processing)支援在流中發現複雜的事件模式,快速篩選使用者感興趣的資料。
3. Gelly
Gelly是Flink提供的圖計算API,提供了簡化開發和構建圖計算分析應用的介面。
FlinkML是Flink提供的機器學習庫,提供了可擴充套件的機器學習演算法、簡潔的API和工具簡化機器學習系統的開發。
四、部署
當Flink系統啟動時,首先啟動JobManager和一至多個TaskManager。JobManager負責協調Flink系統,TaskManager則是執行並行程式的worker。當系統以本地形式啟動時,一個JobManager和一個TaskManager會啟動在同一個JVM中。
當一個程式被提交後,系統會建立一個Client來進行預處理,將程式轉變成一個並行資料流的形式,交給JobManager和TaskManager執行。
1. 啟動測試
編譯flink,本地啟動。
$ java -version
java version "1.8.0_111"
$ git clone https://github.com/apache/flink.git
$ git checkout release-1.1.4 -b release-1.1.4
$ cd flink
$ mvn clean package -DskipTests
$ cd flink-dist/target/flink-1.1.4-bin/flink-1.1.4
$ ./bin/start-local.sh
編寫本地流處理demo。
SocketWindowWordCount.java
public class SocketWindowWordCount {
publicstaticvoidmain(String[] args) throws Exception {
// the port to connect to
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
publicvoidflatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
publicWordWithCount() {}
publicWordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
pom.xml
<!-- Use this dependency if you are using the DataStream API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.1.4</version>
</dependency>
<!-- Use this dependency if you are using the DataSet API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.1.4</version>
</dependency>
執行mvn構建。
$ mvn clean install
$ ls target/flink-demo-1.0-SNAPSHOT.jar
開啟9000埠,用於輸入資料:
$ nc -l 9000
提交flink任務:
$ ./bin/flink run -c com.demo.florian.WordCount $DEMO_DIR/target/flink-demo-1.0-SNAPSHOT.jar --port 9000
在nc裡輸入資料後,檢視執行結果:
$ tail -f log/flink-*-jobmanager-*.out
檢視flink web頁面:localhost:8081
2. 程式碼結構
Flink系統核心可分為多個子專案。分割專案旨在減少開發Flink程式需要的依賴數量,並對測試和開發小元件提供便捷。
Flink當前還包括以下子專案:
- Flink-dist:distribution專案。它定義瞭如何將編譯後的程式碼、指令碼和其他資源整合到最終可用的目錄結構中。
- Flink-quick-start:有關quickstart和教程的指令碼、maven原型和示例程式
- flink-contrib:一系列有使用者開發的早起版本和有用的工具的專案。後期的程式碼主要由外部貢獻者繼續維護,被flink-contirb接受的程式碼的要求低於其他專案的要求。
3. Flink On YARN
Flink在YARN叢集上執行時:Flink YARN Client負責與YARN RM通訊協商資源請求,Flink JobManager和Flink TaskManager分別申請到Container去執行各自的程序。
YARN AM與Flink JobManager在同一個Container中,這樣AM可以知道Flink JobManager的地址,從而AM可以申請Container去啟動Flink TaskManager。待Flink成功執行在YARN叢集上,Flink YARN Client就可以提交Flink Job到Flink JobManager,並進行後續的對映、排程和計算處理。
- 設定Hadoop環境變數
$ export HADOOP_CONF_DIR=/etc/hadoop/conf
- 以叢集模式提交任務,每次都會新建flink叢集
$ ./bin/flink run -m yarn-cluster -c com.demo.florian.WordCount $DEMO_DIR/target/flink-demo-1.0-SNAPSHOT.jar
- 啟動共享flink叢集,提交任務
$ ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096 -d
$ ./bin/flink run -c com.demo.florian.WordCount $DEMO_DIR/target/flink-demo-1.0.SNAPSHOT.jar