1. 程式人生 > >MapReduce shuffle過程詳解

MapReduce shuffle過程詳解

一、MapReduce計算模型

我們知道MapReduce計算模型主要由三個階段構成:Map、shuffle、Reduce。

Map是對映,負責資料的過濾分法,將原始資料轉化為鍵值對;Reduce是合併,將具有相同key值的value進行處理後再輸出新的鍵值對作為最終結果。為了讓Reduce可以並行處理Map的結果,必須對Map的輸出進行一定的排序與分割,然後再交給對應的Reduce,而這個將Map輸出進行進一步整理並交給Reduce的過程就是Shuffle。整個MR的大致過程如下:

這裡寫圖片描述

Map和Reduce操作需要我們自己定義相應Map類和Reduce類,以完成我們所需要的化簡、合併操作,而shuffle則是系統自動幫我們實現的,瞭解shuffle的具體流程能幫助我們編寫出更加高效的Mapreduce程式。

Shuffle過程包含在Map和Reduce兩端,即Map shuffleReduce shuffle

二、Map shuffle

在Map端的shuffle過程是對Map的結果進行分割槽、排序、分割,然後將屬於同一劃分(分割槽)的輸出合併在一起並寫在磁碟上,最終得到一個分割槽有序的檔案,分割槽有序的含義是map輸出的鍵值對按分割槽進行排列,具有相同partition值的鍵值對儲存在一起,每個分割槽裡面的鍵值對又按key值進行升序排列(預設),其流程大致如下:

這裡寫圖片描述

Partition

對於map輸出的每一個鍵值對,系統都會給定一個partition,partition值預設是通過計算key的hash值後對Reduce task的數量取模獲得。如果一個鍵值對的partition值為1,意味著這個鍵值對會交給第一個Reducer處理。

我們知道每一個Reduce的輸出都是有序的,但是將所有Reduce的輸出合併到一起卻並非是全域性有序的,如果要做到全域性有序,我們該怎麼做呢?最簡單的方式,只設置一個Reduce task,但是這樣完全發揮不出叢集的優勢,而且能應對的資料量也很受限。最佳的方式是自己定義一個Partitioner,用輸入資料的最大值除以系統Reduce task數量的商作為分割邊界,也就是說分割資料的邊界為此商的1倍、2倍至numPartitions-1倍,這樣就能保證執行partition後的資料是整體有序的。

另一種需要我們自己定義一個Partitioner的情況是各個Reduce task處理的鍵值對數量極不平衡。對於某些資料集,由於很多不同的key的hash值都一樣,導致這些鍵值對都被分給同一個Reducer處理,而其他的Reducer處理的鍵值對很少,從而拖延整個任務的進度。當然,編寫自己的Partitioner必須要保證具有相同key值的鍵值對分發到同一個Reducer。

Collector

Map的輸出結果是由collector處理的,每個Map任務不斷地將鍵值對輸出到在記憶體中構造的一個環形資料結構中。使用環形資料結構是為了更有效地使用記憶體空間,在記憶體中放置儘可能多的資料。

這個資料結構其實就是個位元組陣列,叫Kvbuffer,名如其義,但是這裡面不光放置了資料,還放置了一些索引資料,給放置索引資料的區域起了一個Kvmeta的別名,在Kvbuffer的一塊區域上穿了一個IntBuffer(位元組序採用的是平臺自身的位元組序)的馬甲。資料區域和索引資料區域在Kvbuffer中是相鄰不重疊的兩個區域,用一個分界點來劃分兩者,分界點不是亙古不變的,而是每次Spill之後都會更新一次。初始的分界點是0,資料的儲存方向是向上增長,索引資料的儲存方向是向下增長,如圖所示:

這裡寫圖片描述

Kvbuffer的存放指標bufindex是一直悶著頭地向上增長,比如bufindex初始值為0,一個Int型的key寫完之後,bufindex增長為4,一個Int型的value寫完之後,bufindex增長為8。

索引是對在kvbuffer中的鍵值對的索引,是個四元組,包括:value的起始位置、key的起始位置、partition值、value的長度,佔用四個Int長度,Kvmeta的存放指標Kvindex每次都是向下跳四個“格子”,然後再向上一個格子一個格子地填充四元組的資料。比如Kvindex初始位置是-4,當第一個鍵值對寫完之後,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的長度,然後Kvindex跳到-8位置,等第二個鍵值對和索引寫完之後,Kvindex跳到-12位置。

Kvbuffer的大小可以通過io.sort.mb設定,預設大小為100M。但不管怎麼設定,Kvbuffer的容量都是有限的,鍵值對和索引不斷地增加,加著加著,Kvbuffer總有不夠用的那天,那怎麼辦?把資料從記憶體刷到磁碟上再接著往記憶體寫資料,把Kvbuffer中的資料刷到磁碟上的過程就叫Spill,多麼明瞭的叫法,記憶體中的資料滿了就自動地spill到具有更大空間的磁碟。

關於Spill觸發的條件,也就是Kvbuffer用到什麼程度開始Spill,還是要講究一下的。如果把Kvbuffer用得死死得,一點縫都不剩的時候再開始Spill,那Map任務就需要等Spill完成騰出空間之後才能繼續寫資料;如果Kvbuffer只是滿到一定程度,比如80%的時候就開始Spill,那在Spill的同時,Map任務還能繼續寫資料,如果Spill夠快,Map可能都不需要為空閒空間而發愁。兩利相衡取其大,一般選擇後者。Spill的門限可以通過io.sort.spill.percent,預設是0.8。

Spill這個重要的過程是由Spill執行緒承擔,Spill執行緒從Map任務接到“命令”之後就開始正式幹活,乾的活叫SortAndSpill,原來不僅僅是Spill,在Spill之前還有個頗具爭議性的Sort。

Sort

當Spill觸發後,SortAndSpill先把Kvbuffer中的資料按照partition值和key兩個關鍵字升序排序,移動的只是索引資料,排序結果是Kvmeta中資料按照partition為單位聚集在一起,同一partition內的按照key有序。

Spill

Spill執行緒為這次Spill過程建立一個磁碟檔案:從所有的本地目錄中輪訓查詢能儲存這麼大空間的目錄,找到之後在其中建立一個類似於“spill12.out”的檔案。Spill執行緒根據排過序的Kvmeta挨個partition的把資料吐到這個檔案中,一個partition對應的資料吐完之後順序地吐下個partition,直到把所有的partition遍歷完。一個partition在檔案中對應的資料也叫段(segment)。在這個過程中如果使用者配置了combiner類,那麼在寫之前會先呼叫combineAndSpill(),對結果進行進一步合併後再寫出。Combiner會優化MapReduce的中間結果,所以它在整個模型中會多次使用。那哪些場景才能使用Combiner呢?Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計算結果。所以從我的想法來看,Combiner只應該用於那種Reduce的輸入key/value與輸出key/value型別完全一致,且不影響最終結果的場景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它對job執行效率有幫助,反之會影響reduce的最終結果。

所有的partition對應的資料都放在這個檔案裡,雖然是順序存放的,但是怎麼直接知道某個partition在這個檔案中存放的起始位置呢?強大的索引又出場了。有一個三元組記錄某個partition對應的資料在這個檔案中的索引:起始位置、原始資料長度、壓縮之後的資料長度,一個partition對應一個三元組。然後把這些索引資訊存放在記憶體中,如果記憶體中放不下了,後續的索引資訊就需要寫到磁碟檔案中了:從所有的本地目錄中輪訓查詢能儲存這麼大空間的目錄,找到之後在其中建立一個類似於“spill12.out.index”的檔案,檔案中不光儲存了索引資料,還儲存了crc32的校驗資料。spill12.out.index不一定在磁碟上建立,如果記憶體(預設1M空間)中能放得下就放在記憶體中,即使在磁碟上建立了,和spill12.out檔案也不一定在同一個目錄下。每一次Spill過程就會最少生成一個out檔案,有時還會生成index檔案,Spill的次數也烙印在檔名中。索引檔案和資料檔案的對應關係如下圖所示:

這裡寫圖片描述

在Spill執行緒如火如荼的進行SortAndSpill工作的同時,Map任務不會因此而停歇,而是一無既往地進行著資料輸出。Map還是把資料寫到kvbuffer中,那問題就來了:只顧著悶頭按照bufindex指標向上增長,kvmeta只顧著按照Kvindex向下增長,是保持指標起始位置不變繼續跑呢,還是另謀它路?如果保持指標起始位置不變,很快bufindex和Kvindex就碰頭了,碰頭之後再重新開始或者移動記憶體都比較麻煩,不可取。Map取kvbuffer中剩餘空間的中間位置,用這個位置設定為新的分界點,bufindex指標移動到這個分界點,Kvindex移動到這個分界點的-16位置,然後兩者就可以和諧地按照自己既定的軌跡放置資料了,當Spill完成,空間騰出之後,不需要做任何改動繼續前進。分界點的轉換如下圖所示:

這裡寫圖片描述

Map任務總要把輸出的資料寫到磁碟上,即使輸出資料量很小在記憶體中全部能裝得下,在最後也會把資料刷到磁碟上。

Merge

Map任務如果輸出資料量很大,可能會進行好幾次Spill,out檔案和Index檔案會產生很多,分佈在不同的磁碟上。最後把這些檔案進行合併的merge過程閃亮登場。

Merge過程怎麼知道產生的Spill檔案都在哪了呢?從所有的本地目錄上掃描得到產生的Spill檔案,然後把路徑儲存在一個數組裡。Merge過程又怎麼知道Spill的索引資訊呢?沒錯,也是從所有的本地目錄上掃描得到Index檔案,然後把索引資訊儲存在一個列表裡。到這裡,又遇到了一個值得納悶的地方。在之前Spill過程中的時候為什麼不直接把這些資訊儲存在記憶體中呢,何必又多了這步掃描的操作?特別是Spill的索引資料,之前當記憶體超限之後就把資料寫到磁碟,現在又要從磁碟把這些資料讀出來,還是需要裝到更多的記憶體中。之所以多此一舉,是因為這時kvbuffer這個記憶體大戶已經不再使用可以回收,有記憶體空間來裝這些資料了。(對於記憶體空間較大的土豪來說,用記憶體來省卻這兩個io步驟還是值得考慮的。)

這裡寫圖片描述

然後為merge過程建立一個叫file.out的檔案和一個叫file.out.Index的檔案用來儲存最終的輸出和索引,一個partition一個partition的進行合併輸出。對於某個partition來說,從索引列表中查詢這個partition對應的所有索引資訊,每個對應一個段插入到段列表中。也就是這個partition對應一個段列表,記錄所有的Spill檔案中對應的這個partition那段資料的檔名、起始位置、長度等等。

然後對這個partition對應的所有的segment進行合併,目標是合併成一個segment。當這個partition對應很多個segment時,會分批地進行合併:先從segment列表中把第一批取出來,以key為關鍵字放置成最小堆,然後從最小堆中每次取出最小的輸出到一個臨時檔案中,這樣就把這一批段合併成一個臨時的段,把它加回到segment列表中;再從segment列表中把第二批取出來合併輸出到一個臨時segment,把其加入到列表中;這樣往復執行,直到剩下的段是一批,輸出到最終的檔案中。最終的索引資料仍然輸出到Index檔案中。

三、Reduce shuffle

在Reduce端,shuffle主要分為複製Map輸出、排序合併兩個階段。

Copy

Reduce任務通過HTTP向各個Map任務拖取它所需要的資料。Map任務成功完成後,會通知父TaskTracker狀態已經更新,TaskTracker進而通知JobTracker(這些通知在心跳機制中進行)。所以,對於指定作業來說,JobTracker能記錄Map輸出和TaskTracker的對映關係。Reduce會定期向JobTracker獲取Map的輸出位置,一旦拿到輸出位置,Reduce任務就會從此輸出對應的TaskTracker上覆制輸出到本地,而不會等到所有的Map任務結束。

Merge Sort

Copy過來的資料會先放入記憶體緩衝區中,如果記憶體緩衝區中能放得下這次資料的話就直接把資料寫到記憶體中,即記憶體到記憶體merge。Reduce要向每個Map去拖取資料,在記憶體中每個Map對應一塊資料,當記憶體快取區中儲存的Map資料佔用空間達到一定程度的時候,開始啟動記憶體中merge,把記憶體中的資料merge輸出到磁碟上一個檔案中,即記憶體到磁碟merge。在將buffer中多個map輸出合併寫入磁碟之前,如果設定了Combiner,則會化簡壓縮合並的map輸出。Reduce的記憶體緩衝區可通過mapred.job.shuffle.input.buffer.percent配置,預設是JVM的heap size的70%。記憶體到磁碟merge的啟動門限可以通過mapred.job.shuffle.merge.percent配置,預設是66%。

當屬於該reducer的map輸出全部拷貝完成,則會在reducer上生成多個檔案(如果拖取的所有map資料總量都沒有記憶體緩衝區,則資料就只存在於記憶體中),這時開始執行合併操作,即磁碟到磁碟merge,Map的輸出資料已經是有序的,Merge進行一次合併排序,所謂Reduce端的sort過程就是這個合併的過程。一般Reduce是一邊copy一邊sort,即copy和sort兩個階段是重疊而不是完全分開的。最終Reduce shuffle過程會輸出一個整體有序的資料塊。

相關推薦

MapReduce-shuffle過程

等待 通知 10個 線程數 硬盤 res .sh 現在 溢出 Shuffle map端 map函數開始產生輸出時,並不是簡單地將它寫到磁盤。這個過程很復雜,它利用緩沖的方式寫到內存並出於效率的考慮進行預排序。每個map任務都有一個環形內存緩沖區用於存儲任務輸出。在默認

MapReduce shuffle過程

一、MapReduce計算模型 我們知道MapReduce計算模型主要由三個階段構成:Map、shuffle、Reduce。 Map是對映,負責資料的過濾分法,將原始資料轉化為鍵值對;Reduce是合併,將具有相同key值的value進行處理後再輸出新的鍵值

MapReduce和spark的shuffle過程

存在 位置 方式 傳遞 第一個 2個 過濾 之前 第三方 面試常見問題,必備答案。 參考:https://blog.csdn.net/u010697988/article/details/70173104 mapReducehe和Spark之間的最大區別是前者較偏向於離

Hadoop Mapreduceshuffle過程

1、map task讀取資料時預設呼叫TextInputFormat的成員RecoreReader,RecoreReader呼叫自己的read()方法,進行逐行讀取,返回一個key、value; 2、返回的key、value交給自定義的map方法,輸出的context.write(key,value),再交

MapReduce內部shuffle過程(Combiner的使用)

Maptask呼叫一個元件FileInputFormat FileInputFormat有一個最高層的介面 --> InputFormat 我們不需要去寫自己的實現類,使用的就是內部預設的元件:TextInputFormat maptask先呼叫TextInputFormat,

hadoop概念-MapReduce各個執行階段及Shuffle過程

MapReduce各個執行階段 (1)MapReduce框架使用InputFormat模組做Map前的預處理,比如驗證輸入的格式是否符合輸入定義;然後,將輸入檔案切分為邏輯上的多個InputSplit,InputSplit是MapReduce對檔案進行處理和運算的輸入單位

MapReduce階段原始碼分析以及shuffle過程

MapReducer工作流程圖: 1. MapReduce階段原始碼分析 1)客戶端提交原始碼分析 解釋:   - 判斷是否列印日誌   - 判斷是否使用新的API,檢查連線   - 在檢查連線時,檢查輸入輸出路徑,計算切片,將jar、配置檔案複製到HDFS   - 計算切片時,計算最小切片數(預設為1

MapReduce階段源碼分析以及shuffle過程

不同 小文件 需要 因此 輸入輸出 map 定義 shu mas MapReducer工作流程圖: 1. MapReduce階段源碼分析 1)客戶端提交源碼分析 解釋:   - 判斷是否打印日誌   - 判斷是否使用新的API,檢

大資料教程(8.8)MR內部的shuffle過程&combiner的執行機制及程式碼實現

        之前的文章已經簡單介紹過mapreduce的運作流程,不過其內部的shuffle過程並未深入講解;本篇部落格將分享shuffle的全過程。       

大資料教程(8.8)MR內部的shuffle過程&combiner的執行機制及程式碼實現

        之前的文章已經簡單介紹過mapreduce的運作流程,不過其內部的shuffle過程並未深入講解;本篇部落格將分享shuffle的全過程。      

大資料基礎課之Hadoop MapReduce執行過程

述一下mapreduce的流程(shuffle的sort,partitions,group) 首先是 Mapreduce經過SplitInput 輸入分片 決定map的個數在用Record記錄 key value。然後分為以下三個流程: Map: 輸入 key

Shuffle過程及優化

1.MapReduce Shuffle Map是對映,負責資料的過濾分 發;Reduce是規約,負責資料的計算歸併。Reduce的資料來源於Map,Map的輸出即是Reduce的輸入,Reduce需要通過 Shuffle來獲取資料。 從Map輸出到Reduce輸入的整

Mapreduec流程之Shuffle過程

作為整個Mapreduce中最為神祕,複雜的部分,恰恰是平時業務中最經常接觸的地方。僅僅依靠map和reduce階段的業務程式碼編輯,是不能滿足平時的業務需要的。真正的業務處理中,經常會涉及到自定義partition,sort,groupcomparator等情況。而只有瞭

Hadoop MapReduce執行過程(帶hadoop例子)

問題導讀1.MapReduce是如何執行任務的? 2.Mapper任務是怎樣的一個過程? 3.Reduce是如何執行任務的? 4.鍵值對是如何編號的? 5.例項,如何計算沒見最高氣溫? 分析MapReduce執行過程    MapReduce執行的時候,會通過Mapper執

Hadoop 學習研究(四):MapReduce shuffle過程及引數配置調優

MapReduce簡介    在Hadoop  MapReduce中,框架會確保reduce收到的輸入資料是根據key排序過的。資料從Mapper輸出到Reducer接收,是一個很複雜的過程,框架

大資料技術學習筆記之Hadoop框架基礎3-網站日誌分析及MapReduce過程

一、回顧     -》Hadoop啟動方式         -》單個程序             sbin/h

MapReduce過程及其效能優化

 1、這裡的merge和map端的merge動作類似,只是陣列中存放的是不同map端copy來的數值。Copy過來的資料會先放入記憶體緩衝區中,然後當使用記憶體達到一定量的時候才spill磁碟。這裡的緩衝區大小要比map端的更為靈活,它基於JVM的heap size設定。這個記憶體大小的控制就不像map一樣可

MapReduce On yarn執行過程

老的MapReduce主要包括Job Tracker和Task Tracker,YARN中主要是三個元件:Resource Manager、Node Manager和Application Master。Resource Manager負責全域性資源分配,Applicatio

Hadoop Mapreduce分割槽、分組、二次排序過程[轉]

徐海蛟 教學用途 1、MapReduce中資料流動    (1)最簡單的過程:  map - reduce    (2)定製了partitioner以將map的結果送往指定reducer的過程: map - partition - reduce    (3)增加了在本地先進性一次reduce(優化)過程: 

Hadoop Mapreduce分割槽、分組、連線以及輔助排序(也叫二次排序)過程

package com.hadoop; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import or