1. 程式人生 > >Flink 原理與實現:資料流上的型別和操作

Flink 原理與實現:資料流上的型別和操作

轉載來源:http://wuchong.me/blog/2016/05/20/flink-internals-streams-and-operations-on-streams/

Flink 為流處理和批處理分別提供了 DataStream API 和 DataSet API。正是這種高層的抽象和 flunent API 極大地便利了使用者編寫大資料應用。不過很多初學者在看到官方 Streaming 文件中那一大坨的轉換時,常常會蒙了圈,文件中那些隻言片語也很難講清它們之間的關係。所以本文將介紹幾種關鍵的資料流型別,它們之間是如何通過轉換關聯起來的。下圖展示了 Flink 中目前支援的主要幾種流的型別,以及它們之間的轉換關係。

DataStream

DataStream 是 Flink 流處理 API 中最核心的資料結構。它代表了一個執行在多個分割槽上的並行流。一個 DataStream 可以從 StreamExecutionEnvironment 通過env.addSource(SourceFunction) 獲得。

DataStream 上的轉換操作都是逐條的,比如 map()flatMap()filter()。DataStream 也可以執行 rebalance(再平衡,用來減輕資料傾斜)和 broadcaseted

(廣播)等分割槽轉換。

 

val stream: DataStream[MyType] = env.addSource(new FlinkKafkaConsumer08[String](...))

val str1: DataStream[(String, MyType)] = stream.flatMap { ... }

val str2: DataStream[(String, MyType)] = stream.rebalance()

val str3: DataStream[AnotherType] = stream.map { ... }

上述 DataStream 上的轉換在執行時會轉換成如下的執行圖:

如上圖的執行圖所示,DataStream 各個運算元會並行執行,運算元之間是資料流分割槽。如 Source 的第一個並行例項(S1)和 flatMap() 的第一個並行例項(m1)之間就是一個數據流分割槽。而在 flatMap() 和 map() 之間由於加了 rebalance(),它們之間的資料流分割槽就有3個子分割槽(m1的資料流向3個map()例項)。這與 Apache Kafka 是很類似的,把流想象成 Kafka Topic,而一個流分割槽就表示一個 Topic Partition,流的目標並行運算元例項就是 Kafka Consumers。

KeyedStream

KeyedStream用來表示根據指定的key進行分組的資料流。一個KeyedStream可以通過呼叫DataStream.keyBy()來獲得。而在KeyedStream上進行任何transformation都將轉變回DataStream。在實現中,KeyedStream是把key的資訊寫入到了transformation中。每條記錄只能訪問所屬key的狀態,其上的聚合函式可以方便地操作和儲存對應key的狀態。

WindowedStream & AllWindowedStream

WindowedStream代表了根據key分組,並且基於WindowAssigner切分視窗的資料流。所以WindowedStream都是從KeyedStream衍生而來的。而在WindowedStream上進行任何transformation也都將轉變回DataStream

 

val stream: DataStream[MyType] = ...

val windowed: WindowedDataStream[MyType] = stream

.keyBy("userId")

.window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data

val result: DataStream[ResultType] = windowed.reduce(myReducer)

上述 WindowedStream 的樣例程式碼在執行時會轉換成如下的執行圖:

Flink 的視窗實現中會將到達的資料快取在對應的視窗buffer中(一個數據可能會對應多個視窗)。當到達視窗傳送的條件時(由Trigger控制),Flink 會對整個視窗中的資料進行處理。Flink 在聚合類視窗有一定的優化,即不會儲存視窗中的所有值,而是每到一個元素執行一次聚合函式,最終只儲存一份資料即可。

在key分組的流上進行視窗切分是比較常用的場景,也能夠很好地並行化(不同的key上的視窗聚合可以分配到不同的task去處理)。不過有時候我們也需要在普通流上進行視窗的操作,這就是 AllWindowedStreamAllWindowedStream是直接在DataStream上進行windowAll(...)操作。AllWindowedStream 的實現是基於 WindowedStream 的(Flink 1.1.x 開始)。Flink 不推薦使用AllWindowedStream,因為在普通流上進行視窗操作,就勢必需要將所有分割槽的流都彙集到單個的Task中,而這個單個的Task很顯然就會成為整個Job的瓶頸。

JoinedStreams & CoGroupedStreams

雙流 Join 也是一個非常常見的應用場景。深入原始碼你可以發現,JoinedStreams 和 CoGroupedStreams 的程式碼實現有80%是一模一樣的,JoinedStreams 在底層又呼叫了 CoGroupedStreams 來實現 Join 功能。除了名字不一樣,一開始很難將它們區分開來,而且為什麼要提供兩個功能類似的介面呢??

實際上這兩者還是很點區別的。首先 co-group 側重的是group,是對同一個key上的兩組集合進行操作,而 join 側重的是pair,是對同一個key上的每對元素進行操作。co-group 比 join 更通用一些,因為 join 只是 co-group 的一個特例,所以 join 是可以基於 co-group 來實現的(當然有優化的空間)。而在 co-group 之外又提供了 join 介面是因為使用者更熟悉 join(源於資料庫吧),而且能夠跟 DataSet API 保持一致,降低使用者的學習成本。

JoinedStreams 和 CoGroupedStreams 是基於 Window 上實現的,所以 CoGroupedStreams 最終又呼叫了 WindowedStream 來實現。

 

val firstInput: DataStream[MyType] = ...

val secondInput: DataStream[AnotherType] = ...

val result: DataStream[(MyType, AnotherType)] = firstInput.join(secondInput)

.where("userId").equalTo("id")

.window(TumblingEventTimeWindows.of(Time.seconds(3)))

.apply (new JoinFunction () {...})

上述 JoinedStreams 的樣例程式碼在執行時會轉換成如下的執行圖:

雙流上的資料在同一個key的會被分別分配到同一個window視窗的左右兩個籃子裡,當window結束的時候,會對左右籃子進行笛卡爾積從而得到每一對pair,對每一對pair應用 JoinFunction。不過目前(Flink 1.1.x)JoinedStreams 只是簡單地實現了流上的join操作而已,距離真正的生產使用還是有些距離。因為目前 join 視窗的雙流資料都是被快取在記憶體中的,也就是說如果某個key上的視窗資料太多就會導致 JVM OOM(然而資料傾斜是常態)。雙流join的難點也正是在這裡,這也是社群後面對 join 操作的優化方向,例如可以借鑑Flink在批處理join中的優化方案,也可以用ManagedMemory來管理視窗中的資料,並當資料超過閾值時能spill到硬碟。

ConnectedStreams

在 DataStream 上有一個 union 的轉換 dataStream.union(otherStream1, otherStream2, ...),用來合併多個流,新的流會包含所有流中的資料。union 有一個限制,就是所有合併的流的型別必須是一致的。ConnectedStreams 提供了和 union 類似的功能,用來連線兩個流,但是與 union 轉換有以下幾個區別:

  1. ConnectedStreams 只能連線兩個流,而 union 可以連線多於兩個流。
  2. ConnectedStreams 連線的兩個流型別可以不一致,而 union 連線的流的型別必須一致。
  3. ConnectedStreams 會對兩個流的資料應用不同的處理方法,並且雙流之間可以共享狀態。這在第一個流的輸入會影響第二個流時, 會非常有用。

如下 ConnectedStreams 的樣例,連線 input 和 other 流,並在input流上應用map1方法,在other上應用map2方法,雙流可以共享狀態(比如計數)。

 

val input: DataStream[MyType] = ...

val other: DataStream[AnotherType] = ...

val connected: ConnectedStreams[MyType, AnotherType] = input.connect(other)

val result: DataStream[ResultType] =

connected.map(new CoMapFunction[MyType, AnotherType, ResultType]() {

override def map1(value: MyType): ResultType = { ... }

override def map2(value: AnotherType): ResultType = { ... }

})

當並行度為2時,其執行圖如下所示:

總結

本文介紹通過不同資料流型別的轉換圖來解釋每一種資料流的含義、轉換關係。後面的文章會深入講解 Window 機制的實現,雙流 Join 的實現等。