1. 程式人生 > >[原始碼解析] GroupReduce,GroupCombine 和 Flink SQL group by

[原始碼解析] GroupReduce,GroupCombine 和 Flink SQL group by

# [原始碼解析] GroupReduce,GroupCombine和Flink SQL group by [TOC] ## 0x00 摘要 本文從原始碼和例項入手,為大家解析 Flink 中 GroupReduce 和 GroupCombine 的用途。也涉及到了 Flink SQL group by 的內部實現。 ## 0x01 緣由 在前文[[原始碼解析] Flink的Groupby和reduce究竟做了什麼](https://www.cnblogs.com/rossiXYZ/p/13080429.html)中,我們剖析了Group和reduce都做了些什麼,也對combine有了一些瞭解。但是總感覺意猶未盡,因為在Flink還提出了若干新運算元,比如GroupReduce和GroupCombine。這幾個運算元不搞定,總覺得如鯁在喉,但沒有找到一個良好的例子來進行剖析說明。 本文是筆者在探究Flink SQL UDF問題的一個副產品。起初是為了除錯一段sql程式碼,結果發現Flink本身給出了一個GroupReduce和GroupCombine使用的完美例子。於是就拿出來和大家共享,一起分析看看究竟如何使用這兩個運算元。 請注意:這個例子是Flink SQL,所以本文中將涉及Flink SQL goup by內部實現的知識。 ## 0x02 概念 Flink官方對於這兩個運算元的使用說明如下: ### 2.1 GroupReduce GroupReduce運算元應用在一個已經分組了的DataSet上,其會對每個分組都呼叫到使用者定義的group-reduce函式。它與Reduce的區別在於使用者定義的函式會立即獲得整個組。 Flink將在組的所有元素上使用Iterable呼叫使用者自定義函式,並且可以返回任意數量的結果元素。 ### 2.2 GroupCombine GroupCombine轉換是可組合GroupReduceFunction中組合步驟的通用形式。它在某種意義上被概括為允許將輸入型別 I 組合到任意輸出型別O。與此相對的是,GroupReduce中的組合步驟僅允許從輸入型別 I 到輸出型別 I 的組合。這是因為GroupReduceFunction的 "reduce步驟" 期望自己的輸入型別為 I。 在一些應用中,我們期望在執行附加變換(例如,減小資料大小)之前將DataSet組合成中間格式。這可以通過CombineGroup轉換能以非常低的成本實現。 注意:分組資料集上的GroupCombine在記憶體中使用貪婪策略執行,該策略可能不會一次處理所有資料,而是以多個步驟處理。它也可以在各個分割槽上執行,而無需像GroupReduce轉換那樣進行資料交換。這可能會導致輸出的是部分結果,所以GroupCombine是不能替代GroupReduce操作的,儘管它們的操作內容可能看起來都一樣。 ### 2.3 例子 是不是有點暈?還是直接讓程式碼來說話吧。以下官方示例演示瞭如何將CombineGroup和GroupReduce轉換用於WordCount實現。即通過combine操作先對單詞數目進行初步排序,然後通過reduceGroup對combine產生的結果進行最終排序。因為combine進行了初步排序,所以在運算元之間傳輸的資料量就少多了。 ```java