Hadoop 學習研究(四):MapReduce shuffle過程剖詳解及引數配置調優
MapReduce簡介
在Hadoop MapReduce中,框架會確保reduce收到的輸入資料是根據key排序過的。資料從Mapper輸出到Reducer接收,是一個很複雜的過程,框架處理了所有問題,並提供了很多配置項及擴充套件點。一個MapReduce的大致資料流如下圖:
更詳細的MapReduce介紹參考Hadoop MapReduce技術內幕
Mapper的輸出排序、然後傳送到Reducer的過程,稱為shuffle。
深入理解這個過程對於MapReduce調優至關重要,某種程度上說,shuffle過程是MapReduce的核心內容。
Map端:
當map函式通過context.write()
環形Buffer資料結構
每一個map任務有一個環形Buffer,map將輸出寫入到這個Buffer。環形Buffer是記憶體中的一種首尾相連的資料結構,專門用來儲存Key-Value格式的資料:
Hadoop中,環形緩衝其實就是一個位元組陣列:
// MapTask.java
private byte[] kvbuffer; // main output buffer
kvbuffer = new byte[maxMemUsage - recordCapacity];
- 1
- 2
- 3
- 4
- 1
- 2
- 3
- 4
kvbuffer包含資料區和索引區,這兩個區是相鄰不重疊的區域,用一個分界點來標識。分界點不是永恆不變的,每次Spill之後都會更新一次。初始分界點為0,資料儲存方向為向上增長,索引儲存方向向下:
bufferindex一直往上增長,例如最初為0,寫入一個int型別的key之後變為4,寫入一個int型別的value之後變成8。
索引是對key-value在kvbuffer中的索引,是個四元組,佔用四個Int長度,包括:
- value的起始位置
- key的起始位置
- partition值
- value的長度
private static final int VALSTART = 0; // val offset in acct
private static final int KEYSTART = 1; // key offset in acct
private static final int PARTITION = 2; // partition offset in acct
private static final int VALLEN = 3; // length of value
private static final int NMETA = 4; // num meta ints
private static final int METASIZE = NMETA * 4; // size in bytes
// write accounting info
kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
kvmeta的存放指標kvindex每次都是向下跳四個“格子”,然後再向上一個格子一個格子地填充四元組的資料。比如kvindex初始位置是-4,當第一個key-value寫完之後,(kvindex+0)的位置存放value的起始位置、(kvindex+1)的位置存放key的起始位置、(kvindex+2)的位置存放partition的值、(kvindex+3)的位置存放value的長度,然後kvindex跳到-8位置。
緩衝區的大小預設為100M,但是可以通過mapreduce.task.io.sort.mb
這個屬性來配置。
Spill溢寫過程:
map將輸出不斷寫入到這個緩衝區中,當緩衝區使用量達到一定比例之後,一個後臺執行緒開始把緩衝區的資料寫入磁碟,這個寫入的過程叫spill。開始spill的Buffer比例預設為0.80,可以通過mapreduce.map.sort.spill.percent
配置。在後臺執行緒寫入的同時,map繼續將輸出寫入這個環形緩衝,如果緩衝池寫滿了,map會阻塞直到spill過程完成,而不會覆蓋緩衝池中的已有的資料。
在寫入之前,後臺執行緒把資料按照他們將送往的reducer進行劃分,通過呼叫Partitioner
的getPartition()
方法就能知道該輸出要送往哪個Reducer。預設的Partitioner使用Hash演算法來分割槽,即通過key.hashCode()
mode R
來計算,R為Reducer的個數。getPartition
返回Partition事實上是個整數,例如有10個Reducer,則返回0-9的整數,每個Reducer會對應到一個Partition。map輸出的鍵值對,與partition一起存在緩衝中(即前面提到的kvmeta中)。假設作業有2個reduce任務,則資料在記憶體中被劃分為reduce1和reduce2:
並且針對每部分資料,使用快速排序演算法(QuickSort)對key排序。
如果設定了Combiner,則在排序的結果上執行combine。
排序後的資料被寫入到mapreduce.cluster.local.dir
配置的目錄中的其中一個,使用round robin fashion的方式輪流。注意寫入的是本地檔案目錄,而不是HDFS。Spill檔名像sipll0.out,spill1.out等。
不同Partition的資料都放在同一個檔案,通過索引來區分partition的邊界和起始位置。索引是一個三元組結構,包括起始位置、資料長度、壓縮後的資料長度,對應IndexRecord類:
public class IndexRecord {
public long startOffset;
public long rawLength;
public long partLength;
public IndexRecord() { }
public IndexRecord(long startOffset, long rawLength, long partLength) {
this.startOffset = startOffset;
this.rawLength = rawLength;
this.partLength = partLength;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
每個mapper也有對應的一個索引環形Buffer,預設為1KB,可以通過mapreduce.task.index.cache.limit.bytes
來配置,索引如果足夠小則存在記憶體中,如果記憶體放不下,需要寫入磁碟。
Spill檔案索引名稱類似這樣 spill110.out.index, spill111.out.index。
Spill檔案的索引事實上是 org.apache.hadoop.mapred.SpillRecord的一個數組,每個Map任務(原始碼中的MapTask.Java類)維護一個這樣的列表:
final ArrayList<SpillRecord> indexCacheList = new ArrayList<SpillRecord>();
- 1
- 1
建立一個SpillRecord時,會分配(Number_Of_Reducers * 24)Bytes緩衝:
public SpillRecord(int numPartitions) {
buf = ByteBuffer.allocate(
numPartitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
entries = buf.asLongBuffer();
}
- 1
- 2
- 3
- 4
- 5
- 1
- 2
- 3
- 4
- 5
numPartitions是Partition的個數,其實也就是Reducer的個數:
public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
partitions = jobContext.getNumReduceTasks();
final SpillRecord spillRec = new SpillRecord(partitions);
- 1
- 2
- 3
- 4
- 5
- 6
- 1
- 2
- 3
- 4
- 5
- 6
預設的索引緩衝為1KB,即1024*1024 Bytes,假設有2個Reducer,則每個Spill檔案的索引大小為2*24=48 Bytes,當Spill檔案超過21845.3時,索引檔案就需要寫入磁碟。
索引及spill檔案如下圖示意:
Spill的過程至少需要執行一次,因為Mapper的輸出結果必須要寫入磁碟,供Reducer進一步處理。
合併Spill檔案
在整個map任務中,一旦緩衝達到設定的閾值,就會觸發spill操作,寫入spill檔案到磁碟,因此最後可能有多個spill檔案。在map任務結束之前,這些檔案會根據情況合併到一個大的分割槽的、排序的檔案中,排序是在記憶體排序的基礎上進行全域性排序。下圖是合併過程的簡單示意:
相對應的索引檔案也會被合併,以便在Reducer請求對應Partition的資料的時候能夠快速讀取。
另外,如果spill檔案數量大於mapreduce.map.combiner.minspills配置的數,則在合併檔案寫入之前,會再次執行combiner。如果spill檔案數量太少,執行combiner的收益可能小於呼叫的代價。
mapreduce.task.io.sort.factor屬性配置每次最多合併多少個檔案,預設為10,即一次最多合併10個spill檔案。最後,多輪合併之後,所有的輸出檔案被合併為唯一一個大檔案,以及相應的索引檔案(可能只在記憶體中存在)。
壓縮:
在資料量大的時候,對map輸出進行壓縮通常是個好主意。要啟用壓縮,將mapreduce.map.output.compress
設為true,並使用mapreduce.map.output.compress.codec
設定使用的壓縮演算法。
通過HTTP拷貝Map輸出結果到Reduce中:
map輸出資料完成之後,通過執行一個HTTP Server暴露出來,供reduce端獲取。用來相應reduce資料請求的執行緒數量可以配置,預設情況下為機器核心數量的兩倍,如需自己配置,通過mapreduce.shuffle.max.threads
屬性來配置,注意該配置是針對NodeManager配置的,而不是每個作業配置。
同時,Map任務完成後,也會通知Application Master,以便Reducer能夠及時來拉取資料。
通過緩衝、劃分(partition)、排序、combiner、合併、壓縮等過程之後,map端的工作就算完畢:
Reduce端:
各個map任務執行完之後,輸出寫入執行任務的機器磁碟中。Reducer需要從各map任務中提取自己的那一部分資料(對應的partition)。每個map任務的完成時間可能是不一樣的,reduce任務在map任務結束之後會盡快取走輸出結果,這個階段叫copy。
Reducer是如何知道要去哪些機器去資料呢?一旦map任務完成之後,就會通過常規心跳通知應用程式的Application Master。reduce的一個執行緒會週期性地向master詢問,直到提取完所有資料(如何知道提取完?)。
資料被reduce提走之後,map機器不會立刻刪除資料,這是為了預防reduce任務失敗需要重做。因此map輸出資料是在整個作業完成之後才被刪除掉的。
reduce維護幾個copier執行緒,並行地從map任務機器提取資料。預設情況下有5個copy執行緒,可以通過mapreduce.reduce.shuffle.parallelcopies
配置。
如果map輸出的資料足夠小,則會被拷貝到reduce任務的JVM記憶體中。mapreduce.reduce.shuffle.input.buffer.percent
配置JVM堆記憶體的多少比例可以用於存放map任務的輸出結果。如果資料太大容不下,則被拷貝到reduce的機器磁碟上。
Reduce中的資料Merge:
記憶體中合併:
當緩衝中資料達到配置的閾值時,這些資料在記憶體中被合併、寫入機器磁碟。閾值有2種配置方式:
- 配置記憶體比例: 前面提到reduce JVM堆記憶體的一部分用於存放來自map任務的輸入,在這基礎之上配置一個開始合併資料的比例。假設用於存放map輸出的記憶體為500M,
mapreduce.reduce.shuffle.merger.percent
配置為0.80,則當記憶體中的資料達到400M的時候,會觸發合併寫入。 - 配置map輸出數量: 通過
mapreduce.reduce.merge.inmem.threshold
配置。
在合併的過程中,會對被合併的檔案做全域性的排序。如果作業配置了Combiner,則會執行combine函式,減少寫入磁碟的資料量。
Copy過程中磁碟合併:
在copy過來的資料不斷寫入磁碟的過程中,一個後臺執行緒會把這些檔案合併為更大的、有序的檔案。如果map的輸出結果進行了壓縮,則在合併過程中,需要在記憶體中解壓後才能給進行合併。這裡的合併只是為了減少最終合併的工作量,也就是在map輸出還在拷貝時,就開始進行一部分合並工作。合併的過程一樣會進行全域性排序。
最終磁碟中合併:
當所有map輸出都拷貝完畢之後,所有資料被最後合併成一個排序的檔案,作為reduce任務的輸入。這個合併過程是一輪一輪進行的,最後一輪的合併結果直接推送給reduce作為輸入,節省了磁碟操作的一個來回。最後(所以map輸出都拷貝到reduce之後)進行合併的map輸出可能來自合併後寫入磁碟的檔案,也可能來及記憶體緩衝,在最後寫入記憶體的map輸出可能沒有達到閾值觸發合併,所以還留在記憶體中。
每一輪合併並不一定合併平均數量的檔案數,指導原則是使用整個合併過程中寫入磁碟的資料量最小,為了達到這個目的,則需要最終的一輪合併中合併儘可能多的資料,因為最後一輪的資料直接作為reduce的輸入,無需寫入磁碟再讀出。因此我們讓最終的一輪合併的檔案數達到最大,即合併因子的值,通過mapreduce.task.io.sort.factor
來配置。
假設現在有50個map輸出檔案,合併因子配置為10,則需要5輪的合併。最終的一輪確保合併10個檔案,其中包括4個來自前4輪的合併結果,因此原始的50箇中,再留出6個給最終一輪。所以最後的5輪合併可能情況如下:
前4輪合併後的資料都是寫入到磁碟中的,注意到最後的2格顏色不一樣,是為了標明這些資料可能直接來自於記憶體
MemToMem合併:
除了記憶體中合併和磁碟中合併外,Hadoop還定義了一種MemToMem合併,這種合併將記憶體中的map輸出合併,然後再寫入記憶體。這種合併預設關閉,可以通過reduce.merge.memtomem.enabled
開啟,當map輸出檔案達到reduce.merge.memtomem.threshold
時,觸發這種合併。
最後一次合併後傳遞給reduce方法:
合併後的檔案作為輸入傳遞給Reducer,Reducer針對每個key及其排序的資料呼叫reduce函式。產生的reduce輸出一般寫入到HDFS,reduce輸出的檔案第一個副本寫入到當前執行reduce的機器,其他副本選址原則按照常規的HDFS資料寫入原則來進行。
效能調優:
如果能夠根據情況對shuffle過程進行調優,對於提供MapReduce效能很有幫助。相關的引數配置列在後面的表格中。
一個通用的原則是給shuffle過程分配儘可能大的記憶體,當然你需要確保map和reduce有足夠的記憶體來執行業務邏輯。因此在實現Mapper和Reducer時,應該儘量減少記憶體的使用,例如避免在Map中不斷地疊加。
執行map和reduce任務的JVM,記憶體通過mapred.child.java.opts
屬性來設定,儘可能設大記憶體。容器的記憶體大小通過mapreduce.map.memory.mb
和mapreduce.reduce.memory.mb
來設定,預設都是1024M。
map優化:
在map端,避免寫入多個spill檔案可能達到最好的效能,一個spill檔案是最好的。通過估計map的輸出大小,設定合理的mapreduce.task.io.sort.*
屬性,使得spill檔案數量最小。例如儘可能調大mapreduce.task.io.sort.mb
。
map端相關的屬性如下表:
屬性名 | 值型別 | 預設值 | 說明 |
---|---|---|---|
mapreduce.task.io.sort.mb | int | 100 | 用於map輸出排序的記憶體大小 |
mapreduce.map.sort.spill.percent | float | 0.80 | 開始spill的緩衝池閾值 |
mapreduce.task.io.sort.factor | int | 10 | 合併檔案數最大值,與reduce共用 |
mapreduce.map.combine.minspills | int | 3 | 執行combiner的最低spill檔案數 |
mapreduce.map.out.compress | boolean | false | 輸出是否壓縮 |
mapreduce.map.out.compress | 類名 | DefaultCodec | 壓縮演算法 |
mapreduce.shuffle.max.threads | int | 0 | 服務於reduce提取結果的執行緒數量 |
Reduce優化:
在reduce端,如果能夠讓所有資料都儲存在記憶體中,可以達到最佳的效能。通常情況下,記憶體都保留給reduce函式,但是如果reduce函式對記憶體需求不是很高,將mapreduce.reduce.merge.inmem.threshold
(觸發合併的map輸出檔案數)設為0,mapreduce.reduce.input.buffer.percent
(用於儲存map輸出檔案的堆記憶體比例)設為1.0,可以達到很好的效能提升。
屬性名 | 值型別 | 預設值 | 說明 |
---|---|---|---|
mapreduce.reduce.shuffle.parallelcopies | int | 5 | 提取map輸出的copier執行緒數 |
mapreduce.reduce.shuffle.maxfetchfailures | int | 10 | 提取map輸出最大嘗試次數,超出後報錯 |
mapreduce.task.io.sort.factor | int | 10 | 合併檔案數最大值,與map共用 |
mapreduce.reduce.shuffle.input.buffer.percent | float | 0.70 | copy階段用於儲存map輸出的堆記憶體比例 |
mapreduce.reduce.shuffle.merge.percent | float | 0.66 | 開始spill的緩衝池比例閾值 |
mapreduce.reduce.shuffle.inmem.threshold | int | 1000 | 開始spill的map輸出檔案數閾值,小於等於0表示沒有閾值,此時只由緩衝池比例來控制 |
mapreduce.reduce.input.buffer.percent | float | 0.0 | reduce函式開始執行時,記憶體中的map輸出所佔的堆記憶體比例不得高於這個值,預設情況記憶體都用於reduce函式,也就是map輸出都寫入到磁碟 |
通用優化:
Hadoop預設使用4KB作為緩衝,這個算是很小的,可以通過io.file.buffer.size
來調高緩衝池大小。