Spark DataFrame 的 groupBy vs groupByKey
在使用 Spark+SQL/">Spark SQL 的過程中,經常會用到 groupBy 這個函式進行一些統計工作。但是會發現除了 groupBy 外,還有一個 groupByKey(注意RDD 也有一個 groupByKey,而這裡的 groupByKey 是 DataFrame 的 ) 。這個 groupByKey 引起了我的好奇,那我們就到原始碼裡面一探究竟吧。
所用 spark 版本:spark 2.1.0
先從使用的角度來說,
groupBy:groupBy類似於傳統SQL語言中的group by子語句,但比較不同的是groupBy()可以帶多個列名,對多個列進行group。比如想根據 "id" 和 "name" 進行 groupBy 的話可以
df.goupBy("id","name")
groupBy返回的型別是RelationalGroupedDataset。
groupByKey:groupByKey則更加靈活,可以根據使用者自己對列的組合來進行groupBy,比如上面的那個例子,根據 "id" 和 "name" 進行 groupBy,使用groupByKey可以這樣。
//同前面的goupBy效果是一樣的,但返回的型別是不一樣的 df..toDF("id","name").goupByKey(row =>{ row.getString(0) + row.getString(1) })
但和groupBy不同的是groupByKey返回的型別是KeyValueGroupedDataset。
下面來看看這兩個方法的實現有何區別。
groupBy
def groupBy(cols: Column*): RelationalGroupedDataset = { RelationalGroupedDataset(toDF(), cols.map(_.expr), RelationalGroupedDataset.GroupByType) }
最終會去新建一個RelationalGroupedDataset,而這個方法提供count(),max(),agg(),等方法。值得一提的是,這個類在spark1.x的時候類名為“GroupedData”。看看類中的註釋吧
/** * A set of methods for aggregations on a `DataFrame`, created by `Dataset.groupBy`. * * The main method is the agg function, which has multiple variants. This class also contains * convenience some first order statistics such as mean, sum for convenience. * * This class was named `GroupedData` in Spark 1.x. * * @since 2.0.0 */ @InterfaceStability.Stable class RelationalGroupedDataset protected[sql](
groupByKey
@Experimental @InterfaceStability.Evolving def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = { val inputPlan = logicalPlan val withGroupingKey = AppendColumns(func, inputPlan) val executed = sparkSession.sessionState.executePlan(withGroupingKey) new KeyValueGroupedDataset( encoderFor[K], encoderFor[T], executed, inputPlan.output, withGroupingKey.newColumns) }
可以發現最後生成和返回的類是KeyValueGroupedDataset。這是dataset的子類,表示聚合過之後的dataset。
我們再看看這個類中的註釋吧
/** * :: Experimental :: * A [[Dataset]] has been logically grouped by a user specified grouping key.Users should not * construct a [[KeyValueGroupedDataset]] directly, but should instead call `groupByKey` on * an existing [[Dataset]]. * * @since 2.0.0 */ @Experimental @InterfaceStability.Evolving class KeyValueGroupedDataset[K, V] private[sql](
可以發現 groupByKey 還處於實驗階段。它是希望可以由使用者自己來實現 groupBy 的規則,而不像 groupBy() 一樣,需要被列屬性所束縛。
通過 groupByKey 使用者可以按照自己的需求來進行 grouping 。
總而言之,groupByKey雖然提供了更加靈活的處理 grouping 的方式,但 groupByKey 後返回的類是 KeyValueGroupedDataset ,它裡面所提供的操作介面也不如 groupBy 返回的 RelationalGroupedDataset 所提供的介面豐富。除非真的有一些特殊的 grouping 操作,否則還是使用 groupBy 吧。