1. 程式人生 > >大話Spark(4)-一文理解MapReduce Shuffle和Spark Shuffle

大話Spark(4)-一文理解MapReduce Shuffle和Spark Shuffle

Shuffle本意是 混洗, 洗牌的意思, 在MapReduce過程中需要各節點上同一類資料彙集到某一節點進行計算,把這些分佈在不同節點的資料按照一定的規則聚集到一起的過程成為Shuffle.

在Hadoop的MapReduce框架中, Shuffle是連線Map和Reduce之間的橋樑, Map的資料要用到Reduce中必須經過Shuffle這個環節. 由於Shuffle涉及到磁碟的讀寫和網路的傳輸, 所以Shuffle的效能高低直接影響到整個程式的效能和吞吐量.

MapReduce中的Shuffle

 

這張圖是官網對Shuffle過程的描述,我們來分別看下map端和reduce端做了什麼, 如何做的.

Map端

  1. map執行task時, 輸入資料來源於HDFS的block, 在MapReduce概念中, map的task只讀取split. split與block的對應關係可能是多對一, 預設是一對一. 
  2. map在寫磁碟之前, 會根據最終要傳給的reduce把資料劃分成相應的分割槽(partition). 每個分割槽中,後臺執行緒按鍵進行排序,如果有combiner,它在排序後的輸出上執行.(combiner可以使map的結果更緊湊,減少寫磁碟的資料和傳遞給reduce的資料[省空間和io])
  3. map產生檔案時, 並不是簡單地將它寫到磁碟. 它利用緩衝的方式把資料寫到記憶體並處於效率的考慮進行與排序.(如圖中 buffer in memory). 每一個map都有一個環形記憶體緩衝區用於儲存任務輸出.緩衝區大小預設100MB, 一旦達到閾值(預設80%), 一個後臺執行緒便開始把內容溢位(split)到磁碟.(如果在此期間[split期間]緩衝區被填滿,map會被阻塞,直到寫磁碟過程完成. 
  4. 每次記憶體緩衝區達到閾值移出,就會新建一個溢位檔案(split file)(上圖 partition,sort and split to disk). 因此在map任務最後一個記錄輸出之後,任務完成之前會把一出的多個檔案合併成一個已分割槽且已排序的輸出檔案.(上圖 merge on task)

Reduce端

  1. map的輸出檔案在map執行的機器的本地磁碟(reduce一般不寫本地), map的輸出檔案包括多個分割槽需要的資料, reduce的輸入需要叢集上多個map的輸出. 每個map的完成時間可能不同, 因此只要有一個map任務完成, reduce就開始複製其輸出.(上圖 fetch階段) reduce有少量複製執行緒(預設5個),因此能夠並行取得map輸出(頻寬瓶頸).
  2. reduce如何知道從哪臺機器獲取map結果? map執行完會通知master, reduce總有一個執行緒定期輪詢master(心跳)可以獲得map輸出的位置. master會等到所有reduce完成之後再通知map刪除其輸出.
  3. 如果map的輸出很小,會被複制到reduce任務jvm的記憶體.否則map輸出會被複制到磁碟(又寫磁碟)
  4. 複製完所有map輸出後,reduce任務進入排序合併階段(其實是合併階段,因為map的輸出上有序的).這個維持順序的合併過程是迴圈進行的.(比如50個map輸出,合併因子是10(預設值), 則合併將進行5次, 每次合併10個檔案, 最終有5箇中間檔案)
  5. 在最後reduce階段,直接把資料輸入reduce函式(上面的5箇中間檔案不會再合併成一個已排序的中間檔案). 輸出直接寫到檔案系統, 一般為HDFS. 

map輸出為什麼要排序?

  1. key存在combine操作,排序之後相同的key在一起方便合併.
  2. reduce按照key讀資料時, 按照key的順序去讀, 遇到不一樣的 key時即可知道之前的key的資料是否讀取完畢. 如果沒排序,則需要把全部資料都做處理. 

上面就是MapReduce的Shuffle過程, 其實Spark2.0之後的Shuffle過程與MapReduce的基本一致,都是基於排序的,早期spark版本中的shuffle是基於hash的,讓我們來一起看下.

Spark中的Shuffle

Spark有兩種Shuffle機制. 一種是基於Hash的Shuffle, 還有一種是基於Sort的Shuffle.在Shuffle機制轉變的過程中, 主要的一個優化點就是產生的小檔案個數.

 


以上圖為例,在Spark的運算元reduceByKey(_ + _, 2)產生的shuffle中,我們先看Shuffle Write階段.

Shuffle Write (Hash-based)

 


如圖所示, hash-based的Shuffle, 每個map會根據reduce的個數建立對應的bucket, 那麼bucket的總數量是: M * R (map的個數 * reduce的個數).
(假如分別有1k個map和reduce,將產生1百萬的小檔案!)
如上圖所示,2個core, 4個map task, 3個reduce task 產生了4*3 = 12個小檔案.(每個檔案中是不排序的)

Shuffle Write (Hash-based) 優化!

由於hash-based產生的小檔案太多, 對檔案系統的壓力很大, 後來做了優化. 
把同一個core上的多個map輸出到同一個檔案. 這樣檔案數就變成了 core * R個.如下圖:

 

2個core, 4個map task, 3個 reduce task, 產生了2*3 = 6個檔案.
(每個檔案中仍然不是排序的)

Shuffle Write (Sort-based)

由於優化後的hash-based Shuffle的檔案數為: core * R, 產生的小檔案仍然過大, 所以引入了 sort-based Shuffle

 


sort-based Shuffle中, 一個map task 輸出一個檔案.
檔案在一些到磁碟之前, 會根據key進行排序. 排序後, 分批寫入磁碟. task完成之後會將多次溢寫的檔案合併成一個檔案. 由於一個task只對應一個磁碟檔案, 因此還會單獨寫一份索引檔案, 標識下游各個task的資料對應在檔案中的起始和結束offset.

Shuffle Read

 


目前,hash-based 和 sort-based寫方式公用相同的shuffle read. 
如下圖所示: 

 

 


shuffle read task從多個map的輸出檔案中fetch自己需要的已排序好的資料. 
read task 會先從索引檔案中獲取自己需要的資料對應的索引, 在讀檔案的時候跳過對應的Block資料區, 只讀當前自己這個task需要的資料. 

什麼時候開始fetch資料?

當 parent stage 的所有ShuffleMapTasks結束後再fetch(這裡和MapReduce不同). 理論上講, 一個ShuffleMapTask結束後就可以fetch, 但是為了迎合 stage 的概念(即一個stage如果其parent stages沒有執行完,自己是不能被提交執行的),還是選擇全部ShuffleMapTasks執行完再去 etch.因為fetch來的 FileSegments要先在記憶體做緩衝(預設48MB緩衝界限), 所以一次fetch的 FileSegments總大小不能太大. 一個 softBuffer裡面一般包含多個 FileSegment,但如果某個FileSegment特別大的話, 這一個就可以填滿甚至超過 softBuffer 的界限.

邊 fetch 邊處理還是一次性 fetch 完再處理?

邊 fetch 邊處理.本質上,MapReduce shuffle階段就是邊fetch邊使用 combine()進行處理,只是combine()處理的是部分資料. MapReduce為了讓進入 reduce()的records有序, 必須等到全部資料都shuffle-sort後再開始 reduce(). 因為Spark不要求shuffle後的資料全域性有序,因此沒必要等到全部資料 shuffle完成後再處理. 
那麼如何實現邊shuffle邊處理, 而且流入的records是無序的?答案是使用可以 aggregate 的資料結構, 比如 HashMap. 每從shuffle得到(從緩衝的 FileSegment中deserialize出來)一個 <key, value="">record, 直接將其放進 HashMap 裡面.如果該HashMap已經存在相應的 Key. 那麼直接進行 aggregate 也就是 func(hashMap.get(Key), Value).

Shuffle aggregate

shuffle read task拿到多個map產生的相同的key的資料後,需要對資料進行聚合,把相同key的資料放到一起,這個過程叫做aggregate.

 


大致過程如下圖: 

 


task把讀來的 records 被逐個 aggreagte 到 HashMap 中,等到所有 records 都進入 HashMap,就得到最後的處理結果。

fetch 來的資料存放到哪裡?

剛 fetch 來的 FileSegment 存放在 softBuffer 緩衝區,經過處理後的資料放在記憶體 + 磁碟上。

小結:

其實MapReduce Suffle 和 Spark的Shuffle在主要方面還是基本一致的, 比如:都是基於sort的. 
細節上有一些區別, 比如 mapreduce完成一個map,就開始reduce, 而spark由於stage的概念,需要等所有ShuffleMap完成再ShuffleReduce.