1. 程式人生 > >SparkShuffle的分類和執行過程的一些總結

SparkShuffle的分類和執行過程的一些總結

什麼是Spark Shuffle?

reduceByKey會將上一個RDD中的每一個key對應的所有value聚合成一個value,然後生成一個新的RDD,元素型別是<key,value>對的形式,這樣每一個key對應一個聚合起來的value。
在聚合之前,相同的key可能在不同的分割槽中,這些分割槽也可能子不同的節點上,RDD是彈性的分散式資料集,RDD的partitiion很可能在不同的節點上

如何聚合
–Shuffle Write:上一個stage的每個map task就必須保證將自己處理的當前分割槽的資料相同的key寫入一個分割槽檔案中,可能會寫入多個不同的分割槽檔案中。

– Shuffle Read:reduce task就會從上一個stage的所有task所在的機器上尋找屬於己的那些分割槽檔案,這樣就可以保證每一個key所對應的value都會匯聚到同一個節點上去處理和聚合。

Spark中有三種Shuffle型別,HashShuffle、SortShuffle、鎢絲SortShuffle,Spark1.2之前是HashShuffle預設的分割槽器是HashPartitioner,Spark1.2引入SortShuffle預設的分割槽器是RangePartitioner,鎢絲SortShuffle是後來引入的,也是預設排序的,是對SortShuffle的優化

HashShuffle的執行過程

1、注意: 這種是沒有加consolidation優化的HashShuffle

在這裡插入圖片描述

執行流程
a) 每一個map task將不同結果寫到不同的buffer中,每個buffer的大小為32K。buffer起到資料快取的作用。
b) 每個buffer檔案最後對應一個磁碟小檔案。
c) reduce task來拉取對應的磁碟小檔案。

總結
① .map task的計算結果會根據分割槽器(預設是hashPartitioner)來決定寫入到哪一個磁碟小檔案中去。ReduceTask會去Map端拉取相應的磁碟小檔案。
② .產生的磁碟小檔案的個數:
M(map task的個數)* R(reducetask的個數)

存在的問題

產生的磁碟小檔案過多,會導致以下問題:
a) 在Shuffle Write過程中會產生很多寫磁碟小檔案的物件。
b) 在Shuffle Read過程中會產生很多讀取磁碟小檔案的物件。
c) 在JVM堆記憶體中物件過多會造成頻繁的gc,gc還無法解決執行所需要的記憶體 的話,就會OOM。
d) 在資料傳輸過程中會有頻繁的網路通訊,頻繁的網路通訊出現通訊故障的可能性大大增加,一旦網路通訊出現了故障會導致shuffle file cannot find 由於這個錯誤導致的task失敗,TaskScheduler不負責重試,由DAGScheduler負責重試Stage。

2、注意:這種是開啟consolidation優化機制的HashShuffle

在這裡插入圖片描述

總結
產生磁碟小檔案的個數:C(core的個數)*R(reduce的個數)

SortShuffle的執行過程

  1. 普通機制執行流程

a) map task 的計算結果會寫入到一個記憶體資料結構裡面,記憶體資料結構預設是5M

b) 在shuffle的時候會有一個定時器,不定期的去估算這個記憶體結構的大小,當記憶體結構中的資料超過5M時,比如現在記憶體結構中的資料為5.01M,那麼他會申請5.01*2-5=5.02M記憶體給記憶體資料結構。

c) 如果申請成功不會進行溢寫,如果申請不成功,這時候會發生溢寫磁碟。

d) 在溢寫之前記憶體結構中的資料會進行排序分割槽

e) 然後開始溢寫磁碟,寫磁碟是以batch的形式去寫,一個batch是1萬條資料,

f) map task執行完成後,會將這些磁碟小檔案合併成一個大的磁碟檔案,同時生成一個索引檔案。

g) reduce task去map端拉取資料的時候,首先解析索引檔案,根據索引檔案再去拉取對應的資料。

總結

產生磁碟小檔案的個數: 2*M(map task的個數)
2) bypass機制

bypass機制示意圖

在這裡插入圖片描述

總結

① .bypass執行機制的觸發條件如下:
shuffle reduce task的數量小於spark.shuffle.sort.bypassMergeThreshold的引數值。這個值預設是200。
② .產生的磁碟小檔案為:2*M(map task的個數)

Shuffle檔案定址

  1. MapOutputTracker

MapOutputTracker是Spark架構中的一個模組,是一個主從架構。管理磁碟小檔案的地址。

Ø MapOutputTrackerMaster是主物件,存在於Driver中。

Ø MapOutputTrackerWorker是從物件,存在於Excutor中。

  1. BlockManager

BlockManager塊管理者,是Spark架構中的一個模組,也是一個主從架構。

Ø BlockManagerMaster,主物件,存在於Driver中。

BlockManagerMaster會在叢集中有用到廣播變數和快取資料或者刪除快取資料的時候,通知BlockManagerSlave傳輸或者刪除資料。

Ø BlockManagerWorker,從物件,存在於Excutor中。

BlockManagerWorker會與BlockManagerWorker之間通訊。

¬ 無論在Driver端的BlockManager還是在Excutor端的BlockManager都含有四個物件:

① DiskStore:負責磁碟的管理。

② MemoryStore:負責記憶體的管理。

③ ConnectionManager:負責連線其他的BlockManagerWorker。

④ BlockTransferService:負責資料的傳輸。

Shuffle檔案定址圖
在這裡插入圖片描述

Shuffle檔案定址流程

a) 當map task執行完成後,會將task的執行情況和磁碟小檔案的地址封裝到MpStatus物件中,通過MapOutputTrackerWorker物件向Driver中的MapOutputTrackerMaster彙報。

b) 在所有的map task執行完畢後,Driver中就掌握了所有的磁碟小檔案的地址。

c) 在reduce task執行之前,會通過Excutor中MapOutPutTrackerWorker向Driver端的MapOutputTrackerMaster獲取磁碟小檔案的地址。

d) 獲取到磁碟小檔案的地址後,會通過BlockManager中的ConnectionManager連線資料所在節點上的ConnectionManager,然後通過BlockTransferService進行資料的傳輸。

e) BlockTransferService預設啟動5個task去節點拉取資料。預設情況下,5個task拉取資料量不能超過48M。