1. 程式人生 > >Shuffle過程詳解及優化

Shuffle過程詳解及優化

1.MapReduce Shuffle

Map是對映,負責資料的過濾分 發;Reduce是規約,負責資料的計算歸併。Reduce的資料來源於Map,Map的輸出即是Reduce的輸入,Reduce需要通過 Shuffle來獲取資料。

從Map輸出到Reduce輸入的整個過程可以廣義地稱為Shuffle。Shuffle橫跨Map端和Reduce端,在Map端包括Spill過程,在Reduce端包括copy和sort過程,如圖所示:

Map的shuffle過程

Spill過程包括輸出、排序、溢寫、合併等步驟,如圖所示:

Collect:

每個Map任務不斷地以對的形式把資料輸出到在記憶體中構造的一個環形資料結構中。使用環形資料結構是為了更有效地使用記憶體空間,在記憶體中放置儘可能多的資料。這個資料結構其實就是個位元組陣列,叫Kvbuffer

Sort:

先把Kvbuffer中的資料按照partition值和key兩個關鍵字升序排序,移動的只是索引資料,排序結果是Kvmeta中資料按照partition為單位聚集在一起,同一partition內的按照key有序。

Spill:

Spill執行緒為這次Spill過程建立一個磁碟檔案:從所有的本地目錄中輪訓查詢能儲存這麼大空間的目錄,找到之後在其中建立一個類似於 “spill12.out”的檔案。Spill執行緒根據排過序的Kvmeta挨個partition的把資料吐到這個檔案中,一個partition對應的資料吐完之後順序地吐下個partition,直到把所有的partition遍歷 完。一個partition在檔案中對應的資料也叫段(segment)。

記憶體緩衝區是有大小限制的,預設是100MB。當map task的輸出結果很多時,就可能會撐爆記憶體,所以需要在一定條件下將緩衝區中的資料臨時寫入磁碟,然後重新利用這塊緩衝區。這個從記憶體往磁碟寫資料的過程被稱為Spill,中文可譯為溢寫。比例預設是0.8,也就是當緩衝區的資料已經達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫執行緒啟動,鎖定這80MB的記憶體,執行溢寫過程。Map task的輸出結果還可以往剩下的20MB記憶體中寫,互不影響。

Merge

         Map任務如果輸出資料量很大,可能會進行好幾次Spill,out檔案和Index檔案會產生很多,分佈在不同的磁碟上。最後把這些檔案進行合併的merge過程閃亮登場。如前面的例子,“aaa”從某個map task讀取過來時值是5,從另外一個map 讀取時值是8,因為它們有相同的key,所以得merge成group。什麼是group。對於“aaa”就是像這樣的:{“aaa”, [5, 8, 2, …]}

Reduce的shuffle過程

Copy

Reduce 任務通過HTTP向各個Map任務拖取它所需要的資料。每個節點都會啟動一個常駐的HTTP server,其中一項服務就是響應Reduce拖取Map資料。當有MapOutput的HTTP請求過來的時候,HTTP server就讀取相應的Map輸出檔案中對應這個Reduce部分的資料通過網路流輸出給Reduce。

Merge SORT

這裡使用的Merge和Map端使用的Merge過程一樣。Map的輸出資料已經是有序的,Merge進行一次合併排序,所謂Reduce端的 sort過程就是這個合併的過程。一般Reduce是一邊copy一邊sort,即copy和sort兩個階段是重疊而不是完全分開的。

Reducer的輸入檔案已定,整個Shuffle才最終結束

2.Spark的Shuffle過程介紹

Shuffle Writer

Spark豐富了任務型別,有些任務之間資料流轉不需要通過Shuffle,但是有些任務之間還是需要通過Shuffle來傳遞資料,比如wide dependency的group by key。

Spark中需要Shuffle 輸出的Map任務會為每個Reduce建立對應的bucket,Map產生的結果會根據設定的partitioner得到對應的bucketId,然後填 充到相應的bucket中去。每個Map的輸出結果可能包含所有的Reduce所需要的資料,所以每個Map會建立R個bucket(R是reduce的 個數),M個Map總共會建立M*R個bucket。

Map建立的bucket其實對應磁碟上的一個文 件,Map的結果寫到每個bucket中其實就是寫到那個磁碟檔案中,這個檔案也被稱為blockFile。每個Map要在節點上建立R個磁碟檔案用於結果輸出,Map的結果是直接輸 出到磁碟檔案上的,100KB的記憶體緩衝是用來建立Fast Buffered OutputStream輸出流。這種方式一個問題就是Shuffle檔案過多。

針對上述Shuffle 過程產生的檔案過多問題,Spark有另外一種改進的Shuffle過程:consolidation Shuffle,以期顯著減少Shuffle檔案的數量。在consolidation Shuffle中每個bucket並非對應一個檔案,而是對應檔案中的一個segment部分。Job的map在某個節點上第一次執行,為每個 reduce建立bucket對應的輸出檔案,把這些檔案組織成ShuffleFileGroup,當這次map執行完之後,這個 ShuffleFileGroup可以釋放為下次迴圈利用;當又有map在這個節點上執行時,不需要建立新的bucket檔案,而是在上次的 ShuffleFileGroup中取得已經建立的檔案繼續追加寫一個segment;當前次map還沒執行完,ShuffleFileGroup還沒有 釋放,這時如果有新的map在這個節點上執行,無法迴圈利用這個ShuffleFileGroup,而是隻能建立新的bucket檔案組成新的 ShuffleFileGroup來寫輸出。

Shuffle Fetcher

Reduce去拖Map的輸出資料,Spark提供了兩套不同的拉取資料框架:通過socket連線去取資料;使用netty框架去取資料。Spark Map輸出的資料沒有經過排序,Spark Shuffle過來的資料也不會進行排序,Spark認為Shuffle過程中的排序不是必須的,並不是所有型別的Reduce需要的資料都需要排序,強 制地進行排序只會增加Shuffle的負擔。educe拖過來的資料會放在一個HashMap中,HashMap中儲存的也是對,key是Map輸出的key,Map輸出對應這個key的所有value組成HashMap的value。Spark將 Shuffle取過來的每一個對插入或者更新到HashMap中,來一個處理一個。HashMap全部放在記憶體中。

3.MapReduce Shuffle後續優化方向

壓縮:對資料進行壓縮,減少寫讀資料量;

減少不必要的排序:並不是所有型別的Reduce需要的資料都是需要排序的,排序這個nb的過程如果不需要最好還是不要的好;

記憶體化:Shuffle的資料不放在磁碟而是儘量放在記憶體中,除非逼不得已往磁碟上放;當然瞭如果有效能和記憶體相當的第三方儲存系統,那放在第三方儲存系統上也是很好的;這個是個大招;

網路框架:netty的效能據說要佔優了;

本節點上的資料不走網路框架:對於本節點上的Map輸出,Reduce直接去讀吧,不需要繞道網路框架。

4.Spark Shuffle後續優化方向

Spark作為MapReduce的進階架構,對於Shuffle過程已經是優化了的,特別是對於那些具有爭議的步驟已經做了優化,但是Spark的Shuffle對於我們來說在一些方面還是需要優化的。

壓縮:對資料進行壓縮,減少寫讀資料量;

記憶體化:Spark歷史版本中是有這樣設計的:Map寫資料先把資料全部寫到記憶體中,寫完之後再把資料刷到磁碟上;考慮記憶體是緊缺資源,後來修改成把資料直接寫到磁碟了;對於具有較大記憶體的叢集來講,還是儘量地往記憶體上寫吧,記憶體放不下了再放磁碟。

相關推薦

Shuffle過程優化

1.MapReduce Shuffle Map是對映,負責資料的過濾分 發;Reduce是規約,負責資料的計算歸併。Reduce的資料來源於Map,Map的輸出即是Reduce的輸入,Reduce需要通過 Shuffle來獲取資料。 從Map輸出到Reduce輸入的整

大資料教程(8.8)MR內部的shuffle過程&combiner的執行機制程式碼實現

        之前的文章已經簡單介紹過mapreduce的運作流程,不過其內部的shuffle過程並未深入講解;本篇部落格將分享shuffle的全過程。       

大資料教程(8.8)MR內部的shuffle過程&combiner的執行機制程式碼實現

        之前的文章已經簡單介紹過mapreduce的運作流程,不過其內部的shuffle過程並未深入講解;本篇部落格將分享shuffle的全過程。      

hadoop概念-MapReduce各個執行階段Shuffle過程

MapReduce各個執行階段 (1)MapReduce框架使用InputFormat模組做Map前的預處理,比如驗證輸入的格式是否符合輸入定義;然後,將輸入檔案切分為邏輯上的多個InputSplit,InputSplit是MapReduce對檔案進行處理和運算的輸入單位

(轉)MyISAM Key Cache優化

磁盤 詳解 update 根據 進行 緩沖 write 隊列 但是 原文:http://huanghualiang.blog.51cto.com/6782683/1372721 一、MyISAM Key Cache詳解: 為了最小化磁盤I/O,MyISAM將最頻繁訪問的索引

MapReduce-shuffle過程

等待 通知 10個 線程數 硬盤 res .sh 現在 溢出 Shuffle map端 map函數開始產生輸出時,並不是簡單地將它寫到磁盤。這個過程很復雜,它利用緩沖的方式寫到內存並出於效率的考慮進行預排序。每個map任務都有一個環形內存緩沖區用於存儲任務輸出。在默認

MapReduce和spark的shuffle過程

存在 位置 方式 傳遞 第一個 2個 過濾 之前 第三方 面試常見問題,必備答案。 參考:https://blog.csdn.net/u010697988/article/details/70173104 mapReducehe和Spark之間的最大區別是前者較偏向於離

Hadoop Mapreduce的shuffle過程

1、map task讀取資料時預設呼叫TextInputFormat的成員RecoreReader,RecoreReader呼叫自己的read()方法,進行逐行讀取,返回一個key、value; 2、返回的key、value交給自定義的map方法,輸出的context.write(key,value),再交

MapReduce內部shuffle過程(Combiner的使用)

Maptask呼叫一個元件FileInputFormat FileInputFormat有一個最高層的介面 --> InputFormat 我們不需要去寫自己的實現類,使用的就是內部預設的元件:TextInputFormat maptask先呼叫TextInputFormat,

大資料技術學習筆記之Hadoop框架基礎4-MapReduceshuffer過程zookeeper框架學習

一、MapReduce Shuffle     -》MapReduce執行五個階段         input           

MfC開啟過程應用

本文主要介紹:在MFC中,選單開啟命令的響應過程。 一、MFC開啟命令的響應過程: File->Open 對應的ID為ID_FILE_OPEN,其響應過程如下: 注:如果自己已將ID_FLIE_OPEN在MFC中過載了,則會直接響應過載函式,不會按以下過程響應。 1.

MapReduce shuffle過程

一、MapReduce計算模型 我們知道MapReduce計算模型主要由三個階段構成:Map、shuffle、Reduce。 Map是對映,負責資料的過濾分法,將原始資料轉化為鍵值對;Reduce是合併,將具有相同key值的value進行處理後再輸出新的鍵值

MapReduce階段原始碼分析以及shuffle過程

MapReducer工作流程圖: 1. MapReduce階段原始碼分析 1)客戶端提交原始碼分析 解釋:   - 判斷是否列印日誌   - 判斷是否使用新的API,檢查連線   - 在檢查連線時,檢查輸入輸出路徑,計算切片,將jar、配置檔案複製到HDFS   - 計算切片時,計算最小切片數(預設為1

MapReduce階段源碼分析以及shuffle過程

不同 小文件 需要 因此 輸入輸出 map 定義 shu mas MapReducer工作流程圖: 1. MapReduce階段源碼分析 1)客戶端提交源碼分析 解釋:   - 判斷是否打印日誌   - 判斷是否使用新的API,檢

Mysql索引優化(key和index區別)

MySQL索引的概念    索引是一種特殊的檔案(InnoDB資料表上的索引是表空間的一個組成部分),它們包含著對資料表裡所有記錄的引用指標。更通俗的說,資料庫索引好比是一本書前面的目錄,能加快資料庫的查詢速度。    索引分為聚簇索引和非聚簇索引兩種,聚簇索引是按照資料存放

JVM優化

1. JVM堆記憶體劃分 這兩天看到下面這篇文章的圖不錯。 1.1 JDK7及以前的版本 其中最上一層是Nursery記憶體,一個物件被建立以後首先被放到Nursery中的Eden內  存中,如果存活期超兩個Survivor之後

mysql distinct 用法優化

本事例實驗用表task,結構如下  mysql> desc task; +-------------+------------+------+-----+-------------------+-------+ | Field       | Type    

springboot使用Redis完整過程常見問題總結

一.背景:專案中需要使用到Redis做快取 (ide:IDEA  redis伺服器:騰訊centos7) 二.步驟: 1.伺服器上安裝redis a.執行指令:$ wget http://download.redis.io/releases/redis-4.0.5.ta

Mapreduec流程之Shuffle過程

作為整個Mapreduce中最為神祕,複雜的部分,恰恰是平時業務中最經常接觸的地方。僅僅依靠map和reduce階段的業務程式碼編輯,是不能滿足平時的業務需要的。真正的業務處理中,經常會涉及到自定義partition,sort,groupcomparator等情況。而只有瞭

Tomcat Service.xml配置優化

Service.xml Server.xml配置檔案用於對整個容器進行相關的配置。 <Server>元素:是整個配置檔案的根元素。表示整個Catalina容器。 屬性: className:實現了org.apache.catalina.Server介面