Spark原始碼系列:RDD repartition、coalesce 對比
在上一篇文章中 Spark原始碼系列:DataFrame repartition、coalesce 對比 對DataFrame的repartition、coalesce進行了對比,在這篇文章中,將會對RDD的repartition、coalesce進行對比。
RDD重新分割槽的手段與DataFrame類似,有repartition、coalesce兩個方法
repartition
- def repartition(numPartitions: Int): JavaRDD[T]
1 /** 2 * Return a new RDD that has exactly numPartitions partitions.3 * 4 * Can increase or decrease the level of parallelism in this RDD. Internally, this uses 5 * a shuffle to redistribute data. 6 * 7 * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 8 * which can avoid performing a shuffle. 9 */ 10 def repartition(numPartitions: Int): JavaRDD[T] = rdd.repartition(numPartitions)
返回一個新的RDD,該RDD恰好具有numPartitions分割槽。
repartition這個方法可以增加或減少此RDD中的並行度。在內部,這使用shuffle來重新分配資料。
如果要減少RDD中的分割槽數量,請考慮使用“coalesce”,這樣可以避免執行shuffle。
這個方法在org.apache.spark.api.java.JavaRDD裡面
真正呼叫的是org.apache.spark.rdd.RDD裡面的repartition
1 /** 2 * Return a new RDD that has exactly numPartitions partitions.3 * 4 * Can increase or decrease the level of parallelism in this RDD. Internally, this uses 5 * a shuffle to redistribute data. 6 * 7 * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 8 * which can avoid performing a shuffle. 9 */ 10 def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { 11 coalesce(numPartitions, shuffle = true) 12 }
從上面可以看出,在此處還不是方法最終的,還呼叫了coalesce(numPartitions, shuffle = true) 這個方法,這個方法實現如下:
1 /** 2 * Return a new RDD that is reduced into `numPartitions` partitions. 3 * 4 * This results in a narrow dependency, e.g. if you go from 1000 partitions 5 * to 100 partitions, there will not be a shuffle, instead each of the 100 6 * new partitions will claim 10 of the current partitions. 7 * 8 * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, 9 * this may result in your computation taking place on fewer nodes than 10 * you like (e.g. one node in the case of numPartitions = 1). To avoid this, 11 * you can pass shuffle = true. This will add a shuffle step, but means the 12 * current upstream partitions will be executed in parallel (per whatever 13 * the current partitioning is). 14 * 15 * Note: With shuffle = true, you can actually coalesce to a larger number 16 * of partitions. This is useful if you have a small number of partitions, 17 * say 100, potentially with a few partitions being abnormally large. Calling 18 * coalesce(1000, shuffle = true) will result in 1000 partitions with the 19 * data distributed using a hash partitioner. 20 */ 21 def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) 22 : RDD[T] = withScope { 23 if (shuffle) { 24 /** Distributes elements evenly across output partitions, starting from a random partition. 注意,鍵的雜湊程式碼就是鍵本身。HashPartitioner將用分割槽的總數對它進行修改。*/ 25 val distributePartition = (index: Int, items: Iterator[T]) => { 26 var position = (new Random(index)).nextInt(numPartitions) 27 items.map { t => 28 // Note that the hash code of the key will just be the key itself. The HashPartitioner 29 // will mod it with the number of total partitions. 30 position = position + 1 31 (position, t) 32 } 33 } : Iterator[(Int, T)] 34 35 // include a shuffle step so that our upstream tasks are still distributed 包含一個shuffle步驟,以便我們的上游任務仍然是分散式的。 36 new CoalescedRDD( 37 new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), 38 new HashPartitioner(numPartitions)), 39 numPartitions).values 40 } else { 41 new CoalescedRDD(this, numPartitions) 42 } 43 }
這個方法返回一個新的RDD,它被簡化為"numpartition"分割槽。
這導致了一個狹窄的依賴關係,例如,如果從1000個分割槽到100個分割槽,將不會有一個shuffle,而是100個新分割槽中的每一個都會宣告10個當前分割槽。
然而,如果你正在做一個劇烈的合併,例如當numPartitions = 1時,這可能導致您的計算髮生在比您期待的更少的節點上(例如numpartition=1的情況下只有一個節點),即可能導致並行度下降,無法充分利用分散式環境的優勢。
為了避免這種情況,可以傳遞shuffle = true。這將新增一個shuffle步驟,但意味著當前的上游分割槽將並行執行(無論當前分割槽是什麼)。
注意:使用shuffle = true,您實際上可以合併到更多的分割槽。
如果您有少量的分割槽(比如100個),可能有一些分割槽非常大,那麼這是非常有用的,呼叫coalesce(1000, shuffle = true)將產生1000個分割槽,使用雜湊分割槽器分發資料。
從上面的原始碼可以看到,def repartition(numPartitions: Int): JavaRDD[T] 其實呼叫的是coalesce(numPartitions, shuffle = true)這個方法,而且這個方法產生shuffle操作,分割槽的規則採用的個是雜湊分割槽。
coalesce
- def coalesce(numPartitions: Int): JavaRDD[T]
1 2 /** 3 * Return a new RDD that is reduced into `numPartitions` partitions. 4 */ 5 def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions)
而這個方法呼叫的是org.apache.spark.rdd.RDD裡面的def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) : RDD[T]。
這個方法和上面repartitions的是一樣的,只不過此處的shuffle引數是預設的false。
真正呼叫的是new CoalescedRDD(this, numPartitions)此時不會觸發shuffle。
- def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T]
1 /** 2 * Return a new RDD that is reduced into `numPartitions` partitions. 3 */ 4 def coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T] = 5 rdd.coalesce(numPartitions, shuffle)
這個和上面的coalesce(numPartitions: Int)類似,只是此處的shuffle引數不再是預設的false,而是自己指定的了,當shuffle為true是會觸發shuffle,反之不會。
演示
1 scala> var rdd1=sc.textFile("hdfs://file.txt") 2 rdd1: org.apache.spark.rdd.RDD[String] = hdfs://file.txt MapPartitionsRDD[20] at textFile at <console>:27 3 4 //預設分割槽數量為177 5 scala> rdd1.partitions.size 6 res12: Int = 177 7 8 //呼叫coalesce(10) 減少分割槽數量 9 scala> var rdd2 = rdd1.coalesce(10) 10 rdd2: org.apache.spark.rdd.RDD[String] = CoalescedRDD[21] at coalesce at <console>:29 11 12 //分割槽數量減少到10個 13 scala> rdd2.partitions.size 14 res13: Int = 10 15 16 //直接增加分割槽數量到200 17 scala> var rdd2 = rdd1.coalesce(200) 18 rdd2: org.apache.spark.rdd.RDD[String] = CoalescedRDD[22] at coalesce at <console>:29 19 20 //方法沒有生效 21 scala> rdd2.partitions.size 22 res14: Int = 177 23 24 //將shuffle設定為true,增加分割槽到200 25 scala> var rdd2 = rdd1.coalesce(200,true) 26 rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at coalesce at <console>:29 27 28 //重新分割槽生效 29 scala> rdd2.partitions.size 30 res15: Int = 200 31 32 ------------------------------------------------------------------------------------------------ 33 //對於repartition增加分割槽到200 34 scala> var rdd2 = rdd1.repartition 直接增加o(200) 35 rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[30] at repartition at <console>:29 36 37 //增加分割槽生效 38 scala> rdd2.partitions.size 39 res16: Int = 200 40 41 //對於repartition減少分割槽到10 42 scala> var rdd2 = rdd1.repartition(10) 43 rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[34] at repartition at <console>:29 44 45 //減少分割槽生效 46 scala> rdd2.partitions.size 47 res17: Int = 10
總結
- coalesce(numPartitions: Int)
當新的分割槽數小於原來的分割槽時,分割槽生效切並且不會觸發shuffle;
當新的分割槽數大於原來的分割槽時,分割槽無效還是原來的數量。
- coalesce(numPartitions: Int, shuffle: Boolean)
當shuffle為true時候,無論新的分割槽比原來的大還是小,分割槽均生效,並且觸發shuffle操作,此時等同於repartition(numPartitions: Int);
當shuffle為false時候,等同於coalesce(numPartitions: Int)。
- def repartition(numPartitions: Int)
無論新的分割槽比原來的大還是小,分割槽均生效,並且觸發shuffle操作;
很明顯repartition就是當shuffle為true時候的coalesce(numPartitions: Int, shuffle: Boolean)方法。
此為本人學習工作總結,轉載請註明出處!!!!