[原始碼解析] GroupReduce,GroupCombine 和 Flink SQL group by
阿新 • • 發佈:2020-06-17
# [原始碼解析] GroupReduce,GroupCombine和Flink SQL group by
[TOC]
## 0x00 摘要
本文從原始碼和例項入手,為大家解析 Flink 中 GroupReduce 和 GroupCombine 的用途。也涉及到了 Flink SQL group by 的內部實現。
## 0x01 緣由
在前文[[原始碼解析] Flink的Groupby和reduce究竟做了什麼](https://www.cnblogs.com/rossiXYZ/p/13080429.html)中,我們剖析了Group和reduce都做了些什麼,也對combine有了一些瞭解。但是總感覺意猶未盡,因為在Flink還提出了若干新運算元,比如GroupReduce和GroupCombine。這幾個運算元不搞定,總覺得如鯁在喉,但沒有找到一個良好的例子來進行剖析說明。
本文是筆者在探究Flink SQL UDF問題的一個副產品。起初是為了除錯一段sql程式碼,結果發現Flink本身給出了一個GroupReduce和GroupCombine使用的完美例子。於是就拿出來和大家共享,一起分析看看究竟如何使用這兩個運算元。
請注意:這個例子是Flink SQL,所以本文中將涉及Flink SQL goup by內部實現的知識。
## 0x02 概念
Flink官方對於這兩個運算元的使用說明如下:
### 2.1 GroupReduce
GroupReduce運算元應用在一個已經分組了的DataSet上,其會對每個分組都呼叫到使用者定義的group-reduce函式。它與Reduce的區別在於使用者定義的函式會立即獲得整個組。
Flink將在組的所有元素上使用Iterable呼叫使用者自定義函式,並且可以返回任意數量的結果元素。
### 2.2 GroupCombine
GroupCombine轉換是可組合GroupReduceFunction中組合步驟的通用形式。它在某種意義上被概括為允許將輸入型別 I 組合到任意輸出型別O。與此相對的是,GroupReduce中的組合步驟僅允許從輸入型別 I 到輸出型別 I 的組合。這是因為GroupReduceFunction的 "reduce步驟" 期望自己的輸入型別為 I。
在一些應用中,我們期望在執行附加變換(例如,減小資料大小)之前將DataSet組合成中間格式。這可以通過CombineGroup轉換能以非常低的成本實現。
注意:分組資料集上的GroupCombine在記憶體中使用貪婪策略執行,該策略可能不會一次處理所有資料,而是以多個步驟處理。它也可以在各個分割槽上執行,而無需像GroupReduce轉換那樣進行資料交換。這可能會導致輸出的是部分結果,所以GroupCombine是不能替代GroupReduce操作的,儘管它們的操作內容可能看起來都一樣。
### 2.3 例子
是不是有點暈?還是直接讓程式碼來說話吧。以下官方示例演示瞭如何將CombineGroup和GroupReduce轉換用於WordCount實現。即通過combine操作先對單詞數目進行初步排序,然後通過reduceGroup對combine產生的結果進行最終排序。因為combine進行了初步排序,所以在運算元之間傳輸的資料量就少多了。
```java