1. 程式人生 > >Spark原始碼系列:RDD repartition、coalesce 對比

Spark原始碼系列:RDD repartition、coalesce 對比

在上一篇文章中 Spark原始碼系列:DataFrame repartition、coalesce 對比 對DataFrame的repartition、coalesce進行了對比,在這篇文章中,將會對RDD的repartition、coalesce進行對比。

RDD重新分割槽的手段與DataFrame類似,有repartition、coalesce兩個方法

repartition

  • def repartition(numPartitions: Int): JavaRDD[T]
 1   /**
 2    * Return a new RDD that has exactly numPartitions partitions.
3 * 4 * Can increase or decrease the level of parallelism in this RDD. Internally, this uses 5 * a shuffle to redistribute data. 6 * 7 * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 8 * which can avoid performing a shuffle. 9 */ 10 def repartition(numPartitions: Int): JavaRDD[T] = rdd.repartition(numPartitions) 
 

返回一個新的RDD,該RDD恰好具有numPartitions分割槽。

repartition這個方法可以增加或減少此RDD中的並行度。在內部,這使用shuffle來重新分配資料。

如果要減少RDD中的分割槽數量,請考慮使用“coalesce”,這樣可以避免執行shuffle。 

這個方法在org.apache.spark.api.java.JavaRDD裡面

真正呼叫的是org.apache.spark.rdd.RDD裡面的repartition 

 1   /**
 2    * Return a new RDD that has exactly numPartitions partitions.
3 * 4 * Can increase or decrease the level of parallelism in this RDD. Internally, this uses 5 * a shuffle to redistribute data. 6 * 7 * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 8 * which can avoid performing a shuffle. 9 */ 10 def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { 11 coalesce(numPartitions, shuffle = true) 12 } 

從上面可以看出,在此處還不是方法最終的,還呼叫了coalesce(numPartitions, shuffle = true) 這個方法,這個方法實現如下:

 1   /**
 2    * Return a new RDD that is reduced into `numPartitions` partitions.
 3    *
 4    * This results in a narrow dependency, e.g. if you go from 1000 partitions
 5    * to 100 partitions, there will not be a shuffle, instead each of the 100
 6    * new partitions will claim 10 of the current partitions.
 7    *
 8    * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
 9    * this may result in your computation taking place on fewer nodes than
10    * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
11    * you can pass shuffle = true. This will add a shuffle step, but means the
12    * current upstream partitions will be executed in parallel (per whatever
13    * the current partitioning is).
14    *
15    * Note: With shuffle = true, you can actually coalesce to a larger number
16    * of partitions. This is useful if you have a small number of partitions,
17    * say 100, potentially with a few partitions being abnormally large. Calling
18    * coalesce(1000, shuffle = true) will result in 1000 partitions with the
19    * data distributed using a hash partitioner.
20    */
21   def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
22       : RDD[T] = withScope {
23     if (shuffle) {
24       /** Distributes elements evenly across output partitions, starting from a random partition. 注意,鍵的雜湊程式碼就是鍵本身。HashPartitioner將用分割槽的總數對它進行修改。*/
25       val distributePartition = (index: Int, items: Iterator[T]) => {
26         var position = (new Random(index)).nextInt(numPartitions)
27         items.map { t =>
28           // Note that the hash code of the key will just be the key itself. The HashPartitioner
29           // will mod it with the number of total partitions.
30           position = position + 1
31           (position, t)
32         }
33       } : Iterator[(Int, T)]
34 
35       // include a shuffle step so that our upstream tasks are still distributed 包含一個shuffle步驟,以便我們的上游任務仍然是分散式的。
36       new CoalescedRDD(
37         new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
38         new HashPartitioner(numPartitions)),
39         numPartitions).values
40     } else {
41       new CoalescedRDD(this, numPartitions)
42     }
43   } 

這個方法返回一個新的RDD,它被簡化為"numpartition"分割槽。

這導致了一個狹窄的依賴關係,例如,如果從1000個分割槽到100個分割槽,將不會有一個shuffle,而是100個新分割槽中的每一個都會宣告10個當前分割槽。

然而,如果你正在做一個劇烈的合併,例如當numPartitions = 1時,這可能導致您的計算髮生在比您期待的更少的節點上(例如numpartition=1的情況下只有一個節點),即可能導致並行度下降,無法充分利用分散式環境的優勢。

為了避免這種情況,可以傳遞shuffle = true。這將新增一個shuffle步驟,但意味著當前的上游分割槽將並行執行(無論當前分割槽是什麼)。

注意:使用shuffle = true,您實際上可以合併到更多的分割槽。

如果您有少量的分割槽(比如100個),可能有一些分割槽非常大,那麼這是非常有用的,呼叫coalesce(1000, shuffle = true)將產生1000個分割槽,使用雜湊分割槽器分發資料。

從上面的原始碼可以看到,def repartition(numPartitions: Int): JavaRDD[T] 其實呼叫的是coalesce(numPartitions, shuffle = true)這個方法,而且這個方法產生shuffle操作,分割槽的規則採用的個是雜湊分割槽。

coalesce

  • def coalesce(numPartitions: Int): JavaRDD[T]
1  
2  /**
3    * Return a new RDD that is reduced into `numPartitions` partitions.
4    */
5   def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions) 

而這個方法呼叫的是org.apache.spark.rdd.RDD裡面的def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) : RDD[T]。

這個方法和上面repartitions的是一樣的,只不過此處的shuffle引數是預設的false。

真正呼叫的是new CoalescedRDD(this, numPartitions)此時不會觸發shuffle。

  • def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T]
1 /**
2    * Return a new RDD that is reduced into `numPartitions` partitions.
3    */
4   def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T] =
5     rdd.coalesce(numPartitions, shuffle) 

這個和上面的coalesce(numPartitions: Int)類似,只是此處的shuffle引數不再是預設的false,而是自己指定的了,當shuffletrue是會觸發shuffle,反之不會。

演示

 1 scala> var rdd1=sc.textFile("hdfs://file.txt")
 2 rdd1: org.apache.spark.rdd.RDD[String] = hdfs://file.txt MapPartitionsRDD[20] at textFile at <console>:27
 3 
 4 //預設分割槽數量為177
 5 scala> rdd1.partitions.size
 6 res12: Int = 177
 7 
 8 //呼叫coalesce(10) 減少分割槽數量
 9 scala> var rdd2 = rdd1.coalesce(10)
10 rdd2: org.apache.spark.rdd.RDD[String] = CoalescedRDD[21] at coalesce at <console>:29
11 
12 //分割槽數量減少到10個
13 scala> rdd2.partitions.size
14 res13: Int = 10
15 
16 //直接增加分割槽數量到200
17 scala> var rdd2 = rdd1.coalesce(200)
18 rdd2: org.apache.spark.rdd.RDD[String] = CoalescedRDD[22] at coalesce at <console>:29
19 
20 //方法沒有生效
21 scala> rdd2.partitions.size
22 res14: Int = 177
23 
24 //將shuffle設定為true,增加分割槽到200
25 scala> var rdd2 = rdd1.coalesce(200,true)
26 rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at coalesce at <console>:29
27 
28 //重新分割槽生效
29 scala> rdd2.partitions.size
30 res15: Int = 200
31 
32 ------------------------------------------------------------------------------------------------
33 //對於repartition增加分割槽到200
34 scala> var rdd2 = rdd1.repartition 直接增加o(200)
35 rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[30] at repartition at <console>:29
36 
37 //增加分割槽生效
38 scala> rdd2.partitions.size
39 res16: Int = 200
40 
41 //對於repartition減少分割槽到10
42 scala> var rdd2 = rdd1.repartition(10)
43 rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[34] at repartition at <console>:29
44 
45 //減少分割槽生效
46 scala> rdd2.partitions.size
47 res17: Int = 10 

總結

  • coalesce(numPartitions: Int)

當新的分割槽數小於原來的分割槽時,分割槽生效切並且不會觸發shuffle

當新的分割槽數大於原來的分割槽時,分割槽無效還是原來的數量。

  • coalesce(numPartitions: Int, shuffle: Boolean)

shuffletrue時候,無論新的分割槽比原來的大還是小,分割槽均生效,並且觸發shuffle操作,此時等同於repartition(numPartitions: Int)

shufflefalse時候,等同於coalesce(numPartitions: Int)

  • def repartition(numPartitions: Int)

無論新的分割槽比原來的大還是小,分割槽均生效,並且觸發shuffle操作;

很明顯repartition就是當shuffletrue時候的coalesce(numPartitions: Int, shuffle: Boolean)方法。

 

此為本人學習工作總結,轉載請註明出處!!!!