1. 程式人生 > >什麼是寬窄依賴,及特殊join運算元,join時何時產生shuffle,何時不產生shuffle

什麼是寬窄依賴,及特殊join運算元,join時何時產生shuffle,何時不產生shuffle

1、 什麼是寬窄依賴,
寬依賴: 發生shuffle時,一定會產生寬依賴,寬依賴是一個RDD中的一個Partition被多個子Partition所依賴(一個父親多有兒子),也就是說每一個父RDD的Partition中的資料,都可能傳輸一部分到下一個RDD的多個partition中,此時一定會發生shuffle

窄依賴: 一個RDD中的一個 Partition最多 被一個 子 Partition所依賴(一個父親有一個兒子)

2、 Spark中產生寬窄依賴的依據是shuffle,當發生shuffle時,會產生寬依賴,基本上shuffle運算元都會產生寬依賴,但是join除外,在執行join運算元之前如果先執行groupByKey,執行groupByKey之後,會把相同的key分到同一個分割槽,再執行join運算元,join運算元是把key相同的進行join(只是對於k v形式的資料可以使用),不一定會產生shuffle ,有可能發生shuffle,也有可能不發生

最後返回的結果是(Key,(rdd1的v,rdd2的v)),如下平行化建立,兩個RDD,對其進行Join。這中情況下就不一定會產生shuffle,根據具體情況而言

 #第一種情況, 不使用groupByKey直接進行join
 val rdd1 = sc.parallelize(List("aa"->5,"bb"->5,"dd"->6,"aa"->3,"tt"->2))	   
 val rdd2 = sc.parallelize(List("bb"->1,"aa"->3,"dd"->3,"ss"->6))
 val joind:
RDD[(String, (Int, Int))] = rdd1.join(rdd2)

對於join,第一種情況,在join之前不是groupByKey,發生shuffle

val rdd1 = sc.parallelize(List("aa"->5,"bb"->5,"dd"->6,"aa"->3"tt"->2)1)	   
val rdd2 = sc.parallelize(List("bb"->1,"aa"->3,"dd"->3,"ss"->6)1)
//設定兩個rdd的分割槽都是1,相同,
val joind: RDD[
(String, (Int, Int))] = rdd1.join(rdd2)

通過UI介面可以觀察到,一共有三個stage,在join 時進行了shuffle
兩個RDD直接進行Join

#第二種情況,分割槽相同,join之前進行groupByKey
 val rdd1 = sc.parallelize(List("aa"->5,"bb"->5,"dd"->6,"aa"->3"tt"->2)2)	   
 val rdd2 = sc.parallelize(List("bb"->1,"aa"->3,"dd"->3,"ss"->6)2)
 //設定兩個rdd的分割槽相同,在進行join之前使用groupBykey()
 val gb1 = rdd1.groupByKey()
 val gb2 = rdd2.groupByKey()
 val joind: RDD[(String, (Int, Int))] = gb1.join(gb2)

通過UI介面觀察到,一個共有3個stage,雖然使用了兩個shuffle運算元,但是和使用一個join的stage相同
從平行化到groupByKey是一個stage,這樣就是兩個stage,groupByKey到saveAsTextFile是一個stage
rdd1經過groupByKey後,假設兩個分割槽分別是P1:(aa,5)(aa,3) ,P2:(bb,5),(dd,6),(tt,2)
rdd2經過groupByKey後,假設兩個分割槽分別是P1:(aa,3), P2:(bb,1),(dd,3).(ss,6)
在進行join時按key 進行join,不會發生shuffle
在這裡插入圖片描述

#第三種情況,兩個rdd的分割槽數不同,在join之前進行groupByKey
 val rdd1 = sc.parallelize(List("aa"->5,"bb"->5,"dd"->6,"aa"->3"tt"->2)1)	   
 val rdd2 = sc.parallelize(List("bb"->1,"aa"->3,"dd"->3,"ss"->6)5)
 //設定兩個rdd的分割槽不同相同,在進行join之前使用groupBykey()
 //注:在分割槽不同時進行join,最後在join時的分割槽,和rdd分割槽的數相同
 //這種情況下一共 會 產生1+1+5+5個task
 val gb1 = rdd1.groupByKey()
 val gb2 = rdd2.groupByKey()
 val joind: RDD[(String, (Int, Int))] = gb1.join(gb2)

有stage的圖可以看出來,分割槽數小的rdd,在join時發生shuffle,
分割槽數大的rdd,在join時沒有發生shuffle
在這裡插入圖片描述

#第四種情況,兩個rdd的分割槽數不同,在join之前進行groupByKey,在join時從新指定分割槽
val rdd1 = sc.parallelize(List("aa"->5,"bb"->5,"dd"->6,"aa"->3"tt"->2)1)	   
 val rdd2 = sc.parallelize(List("bb"->1,"aa"->3,"dd"->3,"ss"->6)5)
 //設定兩個rdd的分割槽不同相同,在進行join之前使用groupBykey()
 //注:在分割槽不同時進行join,最後在join時的分割槽,和rdd分割槽的數相同

 val gb1 = rdd1.groupByKey()
 val gb2 = rdd2.groupByKey()
 val joind: RDD[(String, (Int, Int))] = gb1.join(gb2,6)
// val joind: RDD[(String, (Int, Int))] = gb1.join(gb2,5)
 //val joind: RDD[(String, (Int, Int))] = gb1.join(gb2,6)

在這裡插入圖片描述

在這裡插入圖片描述

在這裡插入圖片描述