1. 程式人生 > >spark:聚合函式填坑記之first

spark:聚合函式填坑記之first

我們有一張表:

val df = spark.createDataset(Seq(
            (1, "a", 66),
            (2, "a", 22),
            (3, "a", 11),
            (4, "b", 22),
            (5, "b", 66),
            (6, "b", 11))).toDF("a", "b", "c")
df.show
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  a| 66|
|  2|  a| 22|
|  3
| a| 11| | 4| b| 22| | 5| b| 66| | 6| b| 11| +---+---+---+

我們想要根據b列分組,然後聚合求出sum(c)、max(c)、max(c)所對應的a的值:

df.orderBy(desc("c"))
            .groupBy("b")
            .agg(sum("c"), max("c"), first("a"))
            .show
+---+------+------+---------------+
|  b|sum(c)|max(c)|first(a, false
)| +---+------+------+---------------+ | b| 99| 66| 4| | a| 99| 66| 2| +---+------+------+---------------+

觀察發現最後一列不對啊!然後多執行幾次,觀察結果:

+---+------+------+---------------+
|  b|sum(c)|max(c)|first(a, false)|
+---+------+------+---------------+
|  b|    99|    66
| 5| | a| 99| 66| 2| +---+------+------+---------------+ +---+------+------+---------------+ | b|sum(c)|max(c)|first(a, false)| +---+------+------+---------------+ | b| 99| 66| 6| | a| 99| 66| 2| +---+------+------+---------------+ +---+------+------+---------------+ | b|sum(c)|max(c)|first(a, false)| +---+------+------+---------------+ | b| 99| 66| 4| | a| 99| 66| 1| +---+------+------+---------------+

first函式返回的結果並不是固定的!我們檢視原始碼裡的文件發現:

/**
* Returns the first value of child for a group of rows. If the first value of child
* is null, it returns null (respecting nulls). Even if [[First]] is used on an already
* sorted column, if we do partial aggregation and final aggregation (when mergeExpression
* is used) its result will not be deterministic (unless the input table is sorted and has
* a single partition, and we use a single reducer to do the aggregation.).
*/

原來對排序後的Dataset使用first函式獲得的結果是不確定的(除非這個Dataset只有一個partition且只用一個reducer進行的聚合!)。

我們看一下df分割槽數量:

df.rdd.getNumPartitions
res9: Int = 6  //果然並非單一分割槽

知道了原因,那就改造程式,合併分割槽再試:

df.coalesce(1).orderBy(desc("c"))
            .groupBy("b")
            .agg(sum("c"), max("c"), first("a"))
            .show
+---+------+------+---------------+
|  b|sum(c)|max(c)|first(a, false)|
+---+------+------+---------------+
|  a|    99|    66|              1|
|  b|    99|    66|              5|
+---+------+------+---------------+

這次結果沒問題了。

類似的情況還有聚合函式last。

當資料量大的時候,使用合併分割槽的方式解決上面的問題顯然不是最好的,我們也可以用join聚合的方式實現同樣功能:

val df = spark.createDataset(Seq(
    (1, "a", 77),
    (2, "a", 22),
    (3, "a", 11),
    (4, "b", 22),
    (5, "b", 77),
    (6, "b", 77))).toDF("a", "b", "c")
df.show
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  a| 77|
|  2|  a| 22|
|  3|  a| 11|
|  4|  b| 22|
|  5|  b| 77|
|  6|  b| 77|
+---+---+---+

val df1 = df.dropDuplicates("b", "c").withColumnRenamed("c", "max")
df.groupBy("b")
    .agg(sum("c").as("sum"), max("c").as("max"))
    .join(df1, Seq("b", "max"), "left")
    .show
+---+---+---+---+
|  b|max|sum|  a|
+---+---+---+---+
|  b| 77|176|  6|
|  a| 77|110|  1|
+---+---+---+---+

當然我們也可以使用sql語句的分窗函式實現同樣功能,這裡就不舉例了。