1. 程式人生 > >精通Apache Flink讀書筆記--1、2

精通Apache Flink讀書筆記--1、2

1、Apache Flink介紹

既然有了Apache Spark,為什麼還要使用Apache Flink?

因為Flink是一個純流式計算引擎,而類似於Spark這種微批的引擎,只是Flink流式引擎的一個特例。其他的不同點之後會陸續談到。

1.1 歷史

Flink起源於一個叫做Stratosphere的研究專案,目標是建立下一代大資料分析引擎,其在2014年4月16日成為Apache的孵化專案,從Stratosphere 0.6開始,正式更名為Flink。Flink 0.7中介紹了最重要的特性:Streaming API。最初只支援Java API,後來增加了Scala API。

1.2 架構

Flink 1.X版本的包含了各種各樣的元件,包括部署、flink core(runtime)以及API和各種庫。

這裡寫圖片描述

從部署上講,Flink支援local模式、叢集模式(standalone叢集或者Yarn叢集)、雲端部署。Runtime是主要的資料處理引擎,它以JobGraph形式的API接收程式,JobGraph是一個簡單的並行資料流,包含一系列的tasks,每個task包含了輸入和輸出(source和sink例外)。

DataStream API和DataSet API是流處理和批處理的應用程式介面,當程式在編譯時,生成JobGraph。編譯完成後,根據API的不同,優化器(批或流)會生成不同的執行計劃。根據部署方式的不同,優化後的JobGraph被提交給了executors去執行。

1.3 分散式執行

Flink分散式程式包含2個主要的程序:JobManager和TaskManager.當程式執行時,不同的程序就會參與其中,包括Jobmanager、TaskManager和JobClient。

這裡寫圖片描述

首先,Flink程式提交給JobClient,JobClient再提交到JobManager,JobManager負責資源的協調和Job的執行。一旦資源分配完成,task就會分配到不同的TaskManager,TaskManager會初始化執行緒去執行task,並根據程式的執行狀態向JobManager反饋,執行的狀態包括starting、in progress、finished以及canceled和failing等。當Job執行完成,結果會返回給客戶端。

1.3.1 JobManager

Master程序,負責Job的管理和資源的協調。包括任務排程,檢查點管理,失敗恢復等。

當然,對於叢集HA模式,可以同時多個master程序,其中一個作為leader,其他作為standby。當leader失敗時,會選出一個standby的master作為新的leader(通過zookeeper實現leader選舉)。
JobManager包含了3個重要的元件:

(1)Actor系統
(2)排程
(3)檢查點

1.3.1.1 Actor系統

Flink內部使用Akka模型作為JobManager和TaskManager之間的通訊機制。

Actor系統是個容器,包含許多不同的Actor,這些Actor扮演者不同的角色。Actor系統提供類似於排程、配置、日誌等服務,同時包含了所有actors初始化時的執行緒池。

所有的Actors存在著層級的關係。新加入的Actor會被分配一個父類的Actor。Actors之間的通訊採用一個訊息系統,每個Actor都有一個“郵箱”,用於讀取訊息。如果Actors是本地的,則訊息在共享記憶體中共享;如果Actors是遠端的,則訊息通過RPC遠端呼叫。

每個父類的Actor都負責監控其子類Actor,當子類Actor出現錯誤時,自己先嚐試重啟並修復錯誤;如果子類Actor不能修復,則將問題升級並由父類Actor處理。
在Flink中,actor是一個有狀態和行為的容器。Actor的執行緒持續的處理從“郵箱”中接收到的訊息。Actor中的狀態和行為則由收到的訊息決定。

這裡寫圖片描述

1.3.1.2 排程器

Flink中的Executors被定義為task slots(執行緒槽位)。每個Task Manager需要管理一個或多個task slots。

Flink通過SlotSharingGroup和CoLocationGroup來決定哪些task需要被共享,哪些task需要被單獨的slot使用。

1.3.1.3 檢查點

Flink的檢查點機制是保證其一致性容錯功能的骨架。它持續的為分散式的資料流和有狀態的operator生成一致性的快照。其改良自Chandy-Lamport演算法,叫做ABS(輕量級非同步Barrier快照),具體參見論文:
Lightweight Asynchronous Snapshots for Distributed Dataflows

Flink的容錯機制持續的構建輕量級的分散式快照,因此負載非常低。通常這些有狀態的快照都被放在HDFS中儲存(state backend)。程式一旦失敗,Flink將停止executor並從最近的完成了的檢查點開始恢復(依賴可重發的資料來源+快照)。

Barrier作為一種Event,是Flink快照中最主要的元素。它會隨著data record一起被注入到流資料中,而且不會超越data record。每個barrier都有一個唯一的ID,將data record分到不同的檢查點的範圍中。下圖展示了barrier是如何被注入到data record中的:
這裡寫圖片描述

每個快照中的狀態都會報告給Job Manager的檢查點協調器;快照發生時,flink會在某些有狀態的operator上對data record進行對齊操作(alignment),目的是避免失敗恢復時重複消費資料。這個過程也是exactly once的保證。通常對齊操作的時間僅是毫秒級的。但是對於某些極端的應用,在每個operator上產生的毫秒級延遲也不能允許的話,則可以選擇降級到at least once,即跳過對齊操作,當失敗恢復時可能發生重複消費資料的情況。Flink預設採用exactly once意義的處理。

1.3.2 TaskManager

Task Managers是具體執行tasks的worker節點,執行發生在一個JVM中的一個或多個執行緒中。Task的並行度是由執行在Task Manager中的task slots的數量決定。如果一個Task Manager有4個slots,那麼JVM的記憶體將分配給每個task slot 25%的記憶體。一個Task slot中可以執行1個或多個執行緒,同一個slot中的執行緒又可以共享相同的JVM。在相同的JVM中的tasks,會共享TCP連線和心跳訊息:
這裡寫圖片描述

1.3.3 Job Client

Job Client並不是Flink程式執行中的內部元件,而是程式執行的入口。Job Client負責接收使用者提交的程式,並建立一個data flow,然後將生成的data flow提交給Job Manager。一旦執行完成,Job Client將返回給使用者結果。

Data flow就是執行計劃,比如下面一個簡單的word count的程式:

這裡寫圖片描述

當用戶將這段程式提交時,Job Client負責接收此程式,並根據operator生成一個data flow,那麼這個程式生成的data flow也許看起來像是這個樣子:

這裡寫圖片描述

預設情況下,Flink的data flow都是分散式並行處理的,對於資料的並行處理,flink將operators和資料流進行partition。Operator partitions叫做sub-tasks。資料流又可以分為一對一的傳輸與重分佈的情況。

這裡寫圖片描述

我們看到,從source到map的data flow,是一個一對一的關係,沒必要產生shuffle操作;而從map到groupBy操作,flink會根據key將資料重分佈,即shuffle操作,目的是聚合資料,產生正確的結果。

1.4 特性

1.4.1 高效能

Flink本身就被設計為高效能和低延遲的引擎。不像Spark這種框架,你沒有必要做許多手動的配置,用以獲得最佳效能,Flink管道式(pipeline)的資料處理方式已經給了你最佳的效能。

1.4.2 有狀態的支援Exactly once的計算

通過檢查點+可重發的資料來源,使得Flink對於stateful的operator,支援exactly once的計算。當然你可以選擇降級到at least once。

1.4.3 靈活的流處理視窗

Flink支援資料驅動的視窗,這意味著我們可以基於時間(event time或processing time)、count和session來構建視窗;視窗同時可以定製化,通過特定的pattern實現。

1.4.4 容錯機制

通過輕量級、分散式快照實現。

1.4.5 記憶體管理

Flink在JVM內部進行記憶體的自我管理,使得其獨立於java本身的垃圾回收機制。當處理hash、index、caching和sorting時,Flink自我的記憶體管理方式使得這些操作很高效。但是,目前自我的記憶體管理只在批處理中實現,流處理程式並未使用。

1.4.6 優化器

為了避免shuffle、sort等操作,Flink的批處理API進行了優化,它可以確保避免過度的磁碟IO而儘可能使用快取。

1.4.7 流和批的統一

Flink中批和流有各自的API,你既可以開發批程式,也可以開發流處理程式。事實上,Flink中的流處理優先原則,認為批處理是流處理的一種特殊情況。

1.4.8 Libraries庫

Flink提供了用於機器學習、圖計算、Table API等庫,同時Flink也支援複雜的CEP處理和警告。

1.4.9 Event Time語義

Flink支援Event Time語義的處理,這有助於處理流計算中的亂序問題,有些資料也許會遲到,我們可以通過基於event time、count、session的視窗來處於這樣的場景。

1.5 快速安裝

1.6 Standalone 叢集安裝

1.7 例子

1.8 總結

Flink細節上的討論和處理模型。下一章將介紹Flink Streaming API。

2、用DataStream API處理資料

許多領域需要資料的實時處理,物聯網驅動的應用程式在資料的儲存、處理和分析上需要實時或準實時的進行。

Flink提供流處理的API叫做DataStream API,每個Flink程式都可以按照下面的步驟進行開發:

這裡寫圖片描述

2.1 執行環境

我們首先要獲得已經存在的執行環境或者建立它。有3種方法得到執行環境:

1)通過getExecutionEnvironment()獲得;這將根據上下文得到執行環境,假如local模式,則它會建立一個local的執行環境;假如是叢集模式,則會建立一個分散式的執行環境;
(2)通過createLocalEnvironment() 建立一個本地的執行環境;
(3)通過createRemoteEnvironment (String host, int port, String, and .jar files)建立一個遠端的執行環境。

2.2 資料來源

Flink支援許多預定義的資料來源,同時也支援自定義資料來源。下面我們看看有哪些預定義的資料來源。

2.2.1 基於socket

DataStream API支援從socket讀取資料,有如下3個方法:

socketTextStream(hostName, port);
socketTextStream(hostName,port,delimiter)
socketTextStream(hostName,port,delimiter, maxRetry)

2.2.2 基於檔案

你可以使用readTextFile(String path)來消費檔案中的資料作為流資料的來源,預設情況下的格式是TextInputFormat。當然你也可以通過readFile(FileInputFormat inputFormat, String path)來指定FileInputFormat的格式。

Flink同樣支援讀取檔案流:

readFileStream(String filePath, long intervalMillis,
FileMonitoringFunction.WatchType watchType)

readFile(fileInputFormat, path, watchType, interval, pathFilter,
typeInfo)。

關於基於檔案的資料流,這裡不再過多介紹。

2.2.3 Transformation

Transformation允許將資料從一種形式轉換為另一種形式,輸入可以是1個源也可以是多個,輸出則可以是0個、1個或者多個。下面我們一一介紹這些Transformations。

2.2.3.1 Map

輸入1個元素,輸出一個元素,Java API如下:

inputStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 5 * value;
}
});

2.2.3.2 FlatMap

輸入1個元素,輸出0個、1個或多個元素,Java API如下:

inputStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});

2.2.3.3 Filter

條件過濾時使用,當結果為true時,輸出記錄;

inputStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 1;
}
});

2.2.3.4 keyBy

邏輯上按照key分組,內部使用hash函式進行分組,返回keyedDataStream:

inputStream.keyBy("someKey");

2.2.3.5 Reduce

keyedStream流上,將上一次reduce的結果和本次的進行操作,例如sum reduce的例子:

keyedInputStream. reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});

2.2.3.6 Fold

在keyedStream流上的記錄進行連線操作,例如:

keyedInputStream keyedStream.fold("Start", new FoldFunction<Integer,
String>() {
@Override
public String fold(String current, Integer value) {
return current + "=" + value;
}
});

假如是一個(1,2,3,4,5)的流,那麼結果將是:Start=1=2=3=4=5

2.2.3.7 Aggregation

在keyedStream上應用類似min、max等聚合操作:

keyedInputStream.sum(0)
keyedInputStream.sum("key")
keyedInputStream.min(0)
keyedInputStream.min("key")
keyedInputStream.max(0)
keyedInputStream.max("key")
keyedInputStream.minBy(0)
keyedInputStream.minBy("key")
keyedInputStream.maxBy(0)
keyedInputStream.maxBy("key")

2.2.3.8 Window

視窗功能允許在keyedStream上應用時間或者其他條件(count或session),根據key分組做聚合操作。

流是無界的,為了處理無界的流,我們可以將流切分到有界的視窗中去處理,根據指定的key,切分為不同的視窗。我們可以使用Flink預定義的視窗分配器。當然你也可以通過繼承WindowAssginer自定義分配器。

下面看看有哪些預定義的分配器。

2.2.3.8.1 Global windows

Global window的範圍是無限的,你需要指定觸發器來觸發視窗。通常來講,每個資料按照指定的key分配到不同的視窗中,如果不指定觸發器,則視窗永遠不會觸發。

2.2.3.8.2 Tumbling Windows

Tumbling視窗是基於特定時間建立的,他們的大小固定,視窗間不會發生重合。例如你想基於event timen每隔10分鐘計算一次,這個視窗就很適合。

2.2.3.8.3 Sliding Windows

Sliding視窗的大小也是固定的,但視窗之間會發生重合,例如你想基於event time每隔1分鐘,統一過去10分鐘的資料時,這個視窗就很適合。

2.2.3.8.4 Session Windows

Session視窗允許我們設定一個gap時間,來決定在關閉一個session之前,我們要等待多長時間,是衡量使用者活躍與否的標誌。

2.2.3.9 WindowAll

WindowAll操作不是基於key的,是對全域性資料進行的計算。由於不基於key,因此是非並行的,即並行度是1.在使用時效能會受些影響。

inputStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)));

2.2.3.10 Union

Union功能就是在2個或多個DataStream上進行連線,成為一個新的DataStream。

inputStream. union(inputStream1, inputStream2, ...)

2.2.3.11 Join

Join允許在2個DataStream上基於相同的key進行連線操作,計算的範圍也是要基於一個window進行。

inputStream. join(inputStream1)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply (new JoinFunction () {...})

2.2.3.12 Split

Split的功能是根據某些條件將一個流切分為2個或多個流。例如你有一個混合資料的流,根據資料自身的某些特徵,將其劃分到多個不同的流單獨處理。

SplitStream<Integer> split = inputStream.split(new
OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}else {
output.add("odd");}
return output;
}
})

2.2.3.13 select

DataStream根據選擇的欄位,將流轉換為新的流。

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");

2.2.3.14 project

Project功能允許你選擇流中的一部分元素作為新的資料流中的欄位,相當於做個對映。

DataStream<Tuple4<Integer, Double, String, String>> in = // [...]
DataStream<Tuple2<String, String>> out = in.project(3,2);

2.2.4 物理分片

Flink允許我們在流上執行物理分片,當然你可以選擇自定義partitioning。

2.2.4.1 自定義partitioning

根據某個具體的key,將DataStream中的元素按照key重新進行分片,將相同key的元素聚合到一個執行緒中執行。

inputStream.partitionCustom(partitioner, "someKey");
inputStream.partitionCustom(partitioner, 0);

2.2.4.2 隨機partitioning

不根據具體的key,而是隨機將資料打散。

inputStream.shuffle();

2.2.4.3 Rebalancing partitioning

內部使用round robin方法將資料均勻打散。這對於資料傾斜時是很好的選擇。

inputStream.rebalance();

2.2.4.4 Rescaling

Rescaling是通過執行oepration運算元來實現的。由於這種方式僅發生在一個單一的節點,因此沒有跨網路的資料傳輸。

inputStream.rescale();

2.2.4.5 廣播

廣播用於將dataStream所有資料發到每一個partition。

inputStream.broadcast();

2.2.5 資料Sink

我們最終需要將結果儲存在某個地方,Flink提供了一些選項:

1)writeAsText():將結果以字串的形式一行一行寫到文字檔案中。

(2)writeAsCsV():儲存為csv格式。

(3print()/printErr():標準輸出或錯誤輸出。輸出到Terminal或者out檔案。

(4)writeUsingOutputFormat():自定義輸出格式,要考慮序列化與反序列化。

(5)writeUsingOutputFormat():也可以輸出到socket,但是你需要定義SerializationSchema。

對於Flink中的connector以及自定義輸出,後續的章節會講到。

2.2.6 Event Time和watermark

Flink Streaming API受到了Google DataFlow模型的啟發,支援3種不同型別的時間概念:

1) Event Time2) Processing Time3) Ingestion Time

(1)Event Time
事件發生的時間,一般資料中自帶時間戳。這就可能導致亂序的發生。

(2)Processing Time
Processing Time是機器的時間,這種時間跟資料本身沒有關係,完全依賴於機器的時間。

(3)Ingestion Time
是資料進入到Flink的時間。注入時間比processing time更加昂貴(多了一個assign timestamp的步驟),但是其準確性相比processing time的處理更好。由於是進入Flink才分配時間戳,因此無法處理亂序。

我們通過在env中設定時間屬性來選擇不同的時間概念:

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//or
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
//or
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Flink提供了預定義的時間戳抽取器和水位線生成器。參考:

2.2.7 connectors聯結器

2.2.7.1 Kafka connector

kafka是一個基於釋出、訂閱的分散式訊息系統。Flink定義了kafka consumer作為資料來源。我們只需要引入特定的依賴即可(這裡以kafka 0.9為例):

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.9_2.11/artifactId>
    <version>1.1.4</version>
</dependency

在使用時,我們需要指定topic name以及反序列化器:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> input = env.addSource(new
FlinkKafkaConsumer09<String>("mytopic", new SimpleStringSchema(),
properties));

Flink預設支援String和Json的反序列化。

Kafka consumer在實現時實現了檢查點功能,因此失敗恢復時可以重發。

Kafka除了consumer外,我們也可以將結果輸出到kafka。即kafka producer。例如:

stream.addSink(new FlinkKafkaProducer09[String]("localhost:9092",
"mytopic", new SimpleStringSchema()))

2.2.7.2 Twitter connector

用twitter作為資料來源,首先你需要用於twitter賬號。之後你需要建立twitter應用並認證。

Pom中新增依賴:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-twitter_2.11/artifactId>
    <version>1.1.4</version>
</dependency>

API:

Properties props = new Properties();
props.setProperty(TwitterSource.CONSUMER_KEY, "");
props.setProperty(TwitterSource.CONSUMER_SECRET, "");
props.setProperty(TwitterSource.TOKEN, "");
props.setProperty(TwitterSource.TOKEN_SECRET, "");
DataStream<String> streamSource = env.addSource(new
TwitterSource(props));

2.2.7.3 RabbitMQ connector

2.2.7.4 ElasticSearch connector

2.2.7.5 Cassandra connector

這3個connetor略過,殼參考官方文件:

2.2.8 例子

這裡可以參考OSCON的例子:

2.2.9 總結

本章介紹了Flink的DataStream API,下一章將介紹DataSet API。