# [原始碼解析] Flink的groupBy和reduce究竟做了什麼 [TOC] ## 0x00 摘要 Groupby和reduce是大資料領域常見的運算元,但是很多同學應該對其背後機制不甚瞭解。本文將從原始碼入手,為大家解析Flink中Groupby和reduce的原理,看看他們在背後做了什麼。 ## 0x01 問題和概括 ### 1.1 問題 探究的原因是想到了幾個問題 : - groupby的運算元會對資料進行排序嘛。 - groupby和reduce過程中究竟有幾次排序。 - 如果有多個groupby task,什麼機制保證所有這些grouby task的輸出中,同樣的key都分配給同一個reducer。 - groupby和reduce時候,有沒有Rebalance 重新分配。 - reduce運算元會不會重新劃分task。 - reduce運算元有沒有可能和前後的其他運算元組成Operator Chain。 ### 1.2 概括 為了便於大家理解,我們先總結下,對於一個Groupby + Reduce的操作,Flink做了如下處理: - Group其實沒有真實對應的運算元,它只是在在reduce過程之前的一箇中間步驟或者輔助步驟。 - 在Flink生成批處理執行計劃後,有意義的結果是Reduce運算元。 - 為了更好的reduce,Flink在reduce之前大量使用了Combine操作。Combine可以理解為是在map端的reduce的操作,對單個map任務的輸出結果資料進行合併的操作。 - 在Flink生成批處理優化計劃(Optimized Plan)之後,會把reduce分割成兩段,一段是SORTED_PARTIAL_REDUCE,一段是SORTED_REDUCE。 - SORTED_PARTIAL_REDUCE就是Combine。 - Flink生成JobGraph之後,Flink形成了一個Operator Chain:Reduce(SORTED_PARTIAL_REDUCE)和其上游合併在一起。 - Flink用Partitioner來保證多個 grouby task 的輸出中同樣的key都分配給同一個reducer。 - groupby和reduce過程中至少有三次排序: - combine - sort + merge - reduce 這樣之前的疑問就基本得到了解釋。 ## 0x02 背景概念 ### 2.1 MapReduce細分 MapReduce是一種程式設計模型,用於大規模資料集的並行運算。概念 "Map(對映)"和"Reduce(歸約)" 是它們的主要思想,其是從函數語言程式設計語言,向量程式語言裡借來的特性。 我們目前使用的Flink,Spark都出自於MapReduce,所以我們有必有追根溯源,看看MapReduce是如何區分各個階段的。 ### 2.2 MapReduce細分 如果把MapReduce細分,可以分為一下幾大過程: - Input-Split(輸入分片):此過程是將從HDFS上讀取的檔案分片,然後送給Map端。有多少分片就有多少Mapper,一般分片的大小和HDFS中的塊大小一致。 - Shuffle-Spill(溢寫):每個Map任務都有一個環形緩衝區。一旦緩衝區達到閾值80%,一個後臺執行緒便開始把內容“溢寫”-“spill”到磁碟。在溢寫過程中,map將繼續輸出到剩餘的20%空間中,互不影響,如果緩衝區被填滿map會被堵塞直到寫磁碟完成。 - Shuffle-Partition(分割槽):由於每個Map可能處理的資料量不同,所以到達reduce有可能會導致資料傾斜。分割槽可以幫助我們解決這一問題,在shuffle過程中會按照預設key的雜湊碼對分割槽數量取餘,reduce便根據分割槽號來拉取對應的資料,達到資料均衡。分割槽數量對應Reduce個數。 - Shuffle-Sort(排序):在分割槽後,會對此分割槽的資料進行內排序,排序過程會穿插在整個MapReduce中,在很多地方都存在。 - Shuffle-Group(分組):分組過程會把key相同的value分配到一個組中,wordcount程式就利用了分組這一過程。 - Shuffle-Combiner(組合):這一過程我們可以理解為一個小的Reduce階段,當資料量大的時候可以在map過程中執行一次combine,這樣就相當於在map階段執行了一次reduce。由於reduce和map在不同的節點上執行,所以reduce需要遠端拉取資料,combine就可以有效降低reduce拉取資料的量,減少網路負荷(這一過程預設是不開啟的,在如求平均值的mapreduce程式中不要使用combine,因為會影響結果)。 - Compress(壓縮):在緩衝區溢寫磁碟的時候,可以對資料進行壓縮,節約磁碟空間,同樣減少給reducer傳遞的資料量。 - Reduce-Merge(合併):reduce端會拉取各個map輸出結果對應的分割槽檔案,這樣reduce端就會有很多檔案,所以在此階段,reduce再次將它們合併/排序再送入reduce執行。 - Output(輸出):在reduce階段,對已排序輸出中的每個鍵呼叫reduce函式。此階段的輸出直接寫到輸出檔案系統,一般為HDFS。 ### 2.3 Combine Combine是我們需要特殊注意的。在mapreduce中,map多,reduce少。在reduce中由於資料量比較多,所以我們乾脆在map階段中先把自己map裡面的資料歸類,這樣到了reduce的時候就減輕了壓力。 Combine可以理解為是在map端的reduce的操作,對單個map任務的輸出結果資料進行合併的操作。combine是對一個map的,而reduce合併的物件是對於多個map。 map函式操作所產生的鍵值對會作為combine函式的輸入,經combine函式處理後再送到reduce函式進行處理,減少了寫入磁碟的資料量,同時也減少了網路中鍵值對的傳輸量。在Map端,使用者自定義實現的Combine優化機制類Combiner在執行Map端任務的節點本身執行,相當於對map函式的輸出做了一次reduce。 叢集上的可用頻寬往往是有限的,產生的中間臨時資料量很大時就會出現效能瓶頸,因此應該儘量避免Map端任務和Reduce端任務之間大量的資料傳輸。使用Combine機制的意義就在於使Map端輸出更緊湊,使得寫到本地磁碟和傳給Reduce端的資料更少。 ### 2.4 Partition Partition是分割map每個節點的結果,按照key分別對映給不同的reduce,mapreduce使用雜湊HashPartitioner幫我們歸類了。這個我們也可以自定義。 這裡其實可以理解歸類。我們對於錯綜複雜的資料歸類。比如在動物園裡有牛羊雞鴨鵝,他們都是混在一起的,但是到了晚上他們就各自牛回牛棚,羊回羊圈,雞回雞窩。partition的作用就是把這些資料歸類。只不過是在寫程式的時候, 在經過mapper的執行後,我們得知mapper的輸出是這樣一個key/value對: key是“aaa”, value是數值1。因為當前map端只做加1的操作,在reduce task裡才去合併結果集。假如我們知道這個job有3個reduce task,到底當前的“aaa”應該交由哪個reduce task去做呢,是需要立刻決定的。 MapReduce提供Partitioner介面,它的作用就是根據key或value及reduce task的數量來決定當前的這對輸出資料最終應該交由哪個reduce task處理。預設對key hash後再以reduce task數量取模。預設的取模方式只是為了平均reduce的處理能力,如果使用者自己對Partitioner有需求,可以訂製並設定到job上。 在我們的例子中,假定 “aaa”經過Partitioner後返回0,也就是這對值應當交由第一個reducer來處理。 ### 2.5 Shuffle shuffle就是map和reduce之間的過程,包含了兩端的combine和partition。它比較難以理解,因為我們摸不著,看不到它。它屬於mapreduce的框架,程式設計的時候,我們用不到它。 Shuffle的大致範圍就是:怎樣把map task的輸出結果有效地傳送到reduce端。也可以這樣理解, Shuffle描述著資料從map task輸出到reduce task輸入的這段過程。 ### 2.6 Reducer 簡單地說,reduce task在執行之前的工作就是不斷地拉取當前job裡每個map task的最終結果,然後對從不同地方拉取過來的資料不斷地做merge,最終形成一個檔案作為reduce task的輸入檔案。 ## 0x03 程式碼 我們以Flink的KMeans演算法作為樣例,具體摘要如下: ```java public class WordCountExampleReduce { DataStream ds; public static void main(String[] args) throws Exception { //構建環境 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //通過字串構建資料集