Spark:DataFrame repartition、coalesce 對比
在Spark開發中,有時為了更好的效率,特別是涉及到關聯操作的時候,對資料進行重新分割槽操作可以提高程式執行效率(很多時候效率的提升遠遠高於重新分割槽的消耗,所以進行重新分割槽還是很有價值的)。
在SparkSQL中,對資料重新分割槽主要有兩個方法 repartition 和 coalesce ,下面將對兩個方法比較
repartition
repartition 有三個過載的函式:
- def repartition(numPartitions: Int): DataFrame
1 /** 2* Returns a new [[DataFrame]] that has exactly `numPartitions` partitions. 3* @group dfops 4* @since 1.3.0 5*/ 6def repartition(numPartitions: Int): DataFrame = withPlan { 7Repartition(numPartitions, shuffle = true, logicalPlan) 8}
此方法返回一個新的[[DataFrame]],該[[DataFrame]]具有確切的 'numpartition' 分割槽。
- def repartition(partitionExprs: Column*): DataFrame
1 /** 2* Returns a new [[DataFrame]] partitioned by the given partitioning expressions preserving 3* the existing number of partitions. The resulting DataFrame is hash partitioned. 4* 5* This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). 6* 7* @group dfops 8* @since 1.6.0 9*/ [email protected] 11def repartition(partitionExprs: Column*): DataFrame = withPlan { 12RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions = None) 13}
此方法返回一個新的[[DataFrame]]分割槽,它由保留現有分割槽數量的給定分割槽表示式劃分。得到的DataFrame是雜湊分割槽的。
這與SQL (Hive QL)中的“distribution BY”操作相同。
- def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame
1/** 2* Returns a new [[DataFrame]] partitioned by the given partitioning expressions into 3* `numPartitions`. The resulting DataFrame is hash partitioned. 4* 5* This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). 6* 7* @group dfops 8* @since 1.6.0 9*/ [email protected] 11def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame = withPlan { 12RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, Some(numPartitions)) 13}
此方法返回一個新的[[DataFrame]],由給定的分割槽表示式劃分為 'numpartition' 。得到的DataFrame是雜湊分割槽的。
這與SQL (Hive QL)中的“distribution BY”操作相同。
coalesce
- coalesce(numPartitions: Int): DataFrame
1/** 2* Returns a new [[DataFrame]] that has exactly `numPartitions` partitions. 3* Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. 4* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of 5* the 100 new partitions will claim 10 of the current partitions. 6* @group rdd 7* @since 1.4.0 8*/ 9def coalesce(numPartitions: Int): DataFrame = withPlan { 10Repartition(numPartitions, shuffle = false, logicalPlan) 11}
返回一個新的[[DataFrame]],該[[DataFrame]]具有確切的 'numpartition' 分割槽。類似於在[[RDD]]上定義的coalesce,這種操作會導致一個狹窄的依賴關係,例如:
如果從1000個分割槽到100個分割槽,就不會出現shuffle,而是100個新分割槽中的每一個都會宣告10個當前分割槽。
反過來從100個分割槽到1000個分割槽,將會出現shuffle。
注:coalesce(numPartitions: Int): DataFrame 和 repartition(numPartitions: Int): DataFrame 底層呼叫的都是 class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
1 /** 2* Returns a new RDD that has exactly `numPartitions` partitions. Differs from 3* [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user 4* asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer 5* of the output requires some specific ordering or distribution of the data. 6*/ 7 case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) 8extends UnaryNode { 9override def output: Seq[Attribute] = child.output 10 }
返回一個新的RDD,該RDD恰好具有“numpartition ”分割槽。與[[RepartitionByExpression ]]不同的是,這個方法直接由DataFrame呼叫,因為使用者需要'coalesce '或'repartition '。
當輸出的使用者需要特定的資料排序或分佈時使用[[RepartitionByExpression ]]。(原始碼裡面說的是RDD,但是返回型別寫的是DataFrame,感覺沒差)。
而repartition(partitionExprs: Column*): DataFrame 和repartition(numPartitions: Int, partitionExprs: Column*): DataFrame 底層呼叫是
class RepartitionByExpression(partitionExpressions:Seq[Expression],child:LogicalPlan,numPartitions:Option[Int]=None) extends RedistributeData
1 /** 2* This method repartitions data using [[Expression]]s into `numPartitions`, and receives 3* information about the number of partitions during execution. Used when a specific ordering or 4* distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like 5* `coalesce` and `repartition`. 6* If `numPartitions` is not specified, the number of partitions will be the number set by 7* `spark.sql.shuffle.partitions`. 8*/ 9 case class RepartitionByExpression( 10partitionExpressions: Seq[Expression], 11child: LogicalPlan, 12numPartitions: Option[Int] = None) extends RedistributeData { 13numPartitions match { 14case Some(n) => require(n > 0, "numPartitions must be greater than 0.") 15case None => // Ok 16} 17 }
該方法使用[[Expression ]]將資料重新劃分為 'numpartition ',並在執行期間接收關於分割槽數量的資訊。當用戶期望某個特定的排序或分佈時使用。使用[[Repartition ]]用於類rdd的 'coalesce ' 和 'Repartition '。
如果沒有指定 'numpartition ',那麼分割槽的數量將由 "spark.sql.shuffle.partition " 設定。
使用示例
- def repartition(numPartitions: Int): DataFrame
1 //獲取一個測試的DataFrame 裡面包含一個user欄位 2val testDataFrame: DataFrame = readMysqlTable(sqlContext, "MYSQLTABLE", proPath) 3 //獲得10個分割槽的DataFrame 4testDataFrame.repartition(10)
- def repartition(partitionExprs: Column*): DataFrame
1 //獲取一個測試的DataFrame 裡面包含一個user欄位 2val testDataFrame: DataFrame = readMysqlTable(sqlContext, "MYSQLTABLE", proPath) 3 //根據 user 欄位進行分割槽,分割槽數量由 spark.sql.shuffle.partition 決定 4testDataFrame.repartition($"user")
- def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame
1 //獲取一個測試的DataFrame 裡面包含一個user欄位 2val testDataFrame: DataFrame = readMysqlTable(sqlContext, "MYSQLTABLE", proPath) 3 //根據 user 欄位進行分割槽,將獲得10個分割槽的DataFrame,此方法有時候在join的時候可以極大的提高效率,但是得注意出現數據傾斜的問題 4testDataFrame.repartition(10,$"user")
- coalesce(numPartitions: Int): DataFrame
1 val testDataFrame1: DataFrame = readMysqlTable(sqlContext, "MYSQLTABLE", proPath) 2val testDataFrame2=testDataFrame1.repartition(10) 3 //不會觸發shuffle 4testDataFrame2.coalesce(5) 5 //觸發shuffle 返回一個100分割槽的DataFrame 6testDataFrame2.coalesce(100)
至於分割槽的資料設定,得根據自己的實際情況來,多了浪費少了負優化。
現在的知識出不得探討,具體的底層程式碼實現,後續去研究一下。
此文為本人工作學習整理筆記,轉載請註明出處!!!!!!