Flink 原理與實現:資料流上的型別和操作
轉載來源:http://wuchong.me/blog/2016/05/20/flink-internals-streams-and-operations-on-streams/
Flink 為流處理和批處理分別提供了 DataStream API 和 DataSet API。正是這種高層的抽象和 flunent API 極大地便利了使用者編寫大資料應用。不過很多初學者在看到官方 Streaming 文件中那一大坨的轉換時,常常會蒙了圈,文件中那些隻言片語也很難講清它們之間的關係。所以本文將介紹幾種關鍵的資料流型別,它們之間是如何通過轉換關聯起來的。下圖展示了 Flink 中目前支援的主要幾種流的型別,以及它們之間的轉換關係。
DataStream
DataStream
是 Flink 流處理 API 中最核心的資料結構。它代表了一個執行在多個分割槽上的並行流。一個 DataStream
可以從 StreamExecutionEnvironment
通過env.addSource(SourceFunction)
獲得。
DataStream 上的轉換操作都是逐條的,比如 map()
,flatMap()
,filter()
。DataStream 也可以執行 rebalance
(再平衡,用來減輕資料傾斜)和 broadcaseted
val stream: DataStream[MyType] = env.addSource(new FlinkKafkaConsumer08[String](...)) val str1: DataStream[(String, MyType)] = stream.flatMap { ... } val str2: DataStream[(String, MyType)] = stream.rebalance() val str3: DataStream[AnotherType] = stream.map { ... } |
上述 DataStream 上的轉換在執行時會轉換成如下的執行圖:
如上圖的執行圖所示,DataStream 各個運算元會並行執行,運算元之間是資料流分割槽。如 Source 的第一個並行例項(S1)和 flatMap() 的第一個並行例項(m1)之間就是一個數據流分割槽。而在 flatMap() 和 map() 之間由於加了 rebalance(),它們之間的資料流分割槽就有3個子分割槽(m1的資料流向3個map()例項)。這與 Apache Kafka 是很類似的,把流想象成 Kafka Topic,而一個流分割槽就表示一個 Topic Partition,流的目標並行運算元例項就是 Kafka Consumers。
KeyedStream
KeyedStream
用來表示根據指定的key進行分組的資料流。一個KeyedStream
可以通過呼叫DataStream.keyBy()
來獲得。而在KeyedStream
上進行任何transformation都將轉變回DataStream
。在實現中,KeyedStream
是把key的資訊寫入到了transformation中。每條記錄只能訪問所屬key的狀態,其上的聚合函式可以方便地操作和儲存對應key的狀態。
WindowedStream & AllWindowedStream
WindowedStream
代表了根據key分組,並且基於WindowAssigner
切分視窗的資料流。所以WindowedStream
都是從KeyedStream
衍生而來的。而在WindowedStream
上進行任何transformation也都將轉變回DataStream
。
val stream: DataStream[MyType] = ... val windowed: WindowedDataStream[MyType] = stream .keyBy("userId") .window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data val result: DataStream[ResultType] = windowed.reduce(myReducer) |
上述 WindowedStream 的樣例程式碼在執行時會轉換成如下的執行圖:
Flink 的視窗實現中會將到達的資料快取在對應的視窗buffer中(一個數據可能會對應多個視窗)。當到達視窗傳送的條件時(由Trigger控制),Flink 會對整個視窗中的資料進行處理。Flink 在聚合類視窗有一定的優化,即不會儲存視窗中的所有值,而是每到一個元素執行一次聚合函式,最終只儲存一份資料即可。
在key分組的流上進行視窗切分是比較常用的場景,也能夠很好地並行化(不同的key上的視窗聚合可以分配到不同的task去處理)。不過有時候我們也需要在普通流上進行視窗的操作,這就是 AllWindowedStream
。AllWindowedStream
是直接在DataStream
上進行windowAll(...)
操作。AllWindowedStream 的實現是基於 WindowedStream 的(Flink 1.1.x 開始)。Flink 不推薦使用AllWindowedStream
,因為在普通流上進行視窗操作,就勢必需要將所有分割槽的流都彙集到單個的Task中,而這個單個的Task很顯然就會成為整個Job的瓶頸。
JoinedStreams & CoGroupedStreams
雙流 Join 也是一個非常常見的應用場景。深入原始碼你可以發現,JoinedStreams 和 CoGroupedStreams 的程式碼實現有80%是一模一樣的,JoinedStreams 在底層又呼叫了 CoGroupedStreams 來實現 Join 功能。除了名字不一樣,一開始很難將它們區分開來,而且為什麼要提供兩個功能類似的介面呢??
實際上這兩者還是很點區別的。首先 co-group 側重的是group,是對同一個key上的兩組集合進行操作,而 join 側重的是pair,是對同一個key上的每對元素進行操作。co-group 比 join 更通用一些,因為 join 只是 co-group 的一個特例,所以 join 是可以基於 co-group 來實現的(當然有優化的空間)。而在 co-group 之外又提供了 join 介面是因為使用者更熟悉 join(源於資料庫吧),而且能夠跟 DataSet API 保持一致,降低使用者的學習成本。
JoinedStreams 和 CoGroupedStreams 是基於 Window 上實現的,所以 CoGroupedStreams 最終又呼叫了 WindowedStream 來實現。
val firstInput: DataStream[MyType] = ... val secondInput: DataStream[AnotherType] = ... val result: DataStream[(MyType, AnotherType)] = firstInput.join(secondInput) .where("userId").equalTo("id") .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new JoinFunction () {...}) |
上述 JoinedStreams 的樣例程式碼在執行時會轉換成如下的執行圖:
雙流上的資料在同一個key的會被分別分配到同一個window視窗的左右兩個籃子裡,當window結束的時候,會對左右籃子進行笛卡爾積從而得到每一對pair,對每一對pair應用 JoinFunction。不過目前(Flink 1.1.x)JoinedStreams 只是簡單地實現了流上的join操作而已,距離真正的生產使用還是有些距離。因為目前 join 視窗的雙流資料都是被快取在記憶體中的,也就是說如果某個key上的視窗資料太多就會導致 JVM OOM(然而資料傾斜是常態)。雙流join的難點也正是在這裡,這也是社群後面對 join 操作的優化方向,例如可以借鑑Flink在批處理join中的優化方案,也可以用ManagedMemory來管理視窗中的資料,並當資料超過閾值時能spill到硬碟。
ConnectedStreams
在 DataStream 上有一個 union 的轉換 dataStream.union(otherStream1, otherStream2, ...)
,用來合併多個流,新的流會包含所有流中的資料。union 有一個限制,就是所有合併的流的型別必須是一致的。ConnectedStreams
提供了和 union 類似的功能,用來連線兩個流,但是與 union 轉換有以下幾個區別:
- ConnectedStreams 只能連線兩個流,而 union 可以連線多於兩個流。
- ConnectedStreams 連線的兩個流型別可以不一致,而 union 連線的流的型別必須一致。
- ConnectedStreams 會對兩個流的資料應用不同的處理方法,並且雙流之間可以共享狀態。這在第一個流的輸入會影響第二個流時, 會非常有用。
如下 ConnectedStreams 的樣例,連線 input
和 other
流,並在input
流上應用map1
方法,在other
上應用map2
方法,雙流可以共享狀態(比如計數)。
val input: DataStream[MyType] = ... val other: DataStream[AnotherType] = ... val connected: ConnectedStreams[MyType, AnotherType] = input.connect(other) val result: DataStream[ResultType] = connected.map(new CoMapFunction[MyType, AnotherType, ResultType]() { override def map1(value: MyType): ResultType = { ... } override def map2(value: AnotherType): ResultType = { ... } }) |
當並行度為2時,其執行圖如下所示:
總結
本文介紹通過不同資料流型別的轉換圖來解釋每一種資料流的含義、轉換關係。後面的文章會深入講解 Window 機制的實現,雙流 Join 的實現等。