1. 程式人生 > >Hadoop 學習研究(四):MapReduce shuffle過程剖詳解及引數配置調優

Hadoop 學習研究(四):MapReduce shuffle過程剖詳解及引數配置調優

MapReduce簡介

   在Hadoop  MapReduce中,框架會確保reduce收到的輸入資料是根據key排序過的。資料從Mapper輸出到Reducer接收,是一個很複雜的過程,框架處理了所有問題,並提供了很多配置項及擴充套件點。一個MapReduce的大致資料流如下圖:

這裡寫圖片描述

更詳細的MapReduce介紹參考Hadoop MapReduce技術內幕

Mapper的輸出排序、然後傳送到Reducer的過程,稱為shuffle

深入理解這個過程對於MapReduce調優至關重要,某種程度上說,shuffle過程是MapReduce的核心內容。


Map端:

       當map函式通過context.write()

開始輸出資料時,不是單純地將資料寫入到磁碟。為了效能,map輸出的資料會寫入到緩衝區,並進行預排序的一些工作,整個過程如下圖:

這裡寫圖片描述

環形Buffer資料結構

       每一個map任務有一個環形Buffer,map將輸出寫入到這個Buffer。環形Buffer是記憶體中的一種首尾相連的資料結構,專門用來儲存Key-Value格式的資料:

這裡寫圖片描述

Hadoop中,環形緩衝其實就是一個位元組陣列:

// MapTask.java
private byte[] kvbuffer;  // main output buffer

kvbuffer = new byte[maxMemUsage - recordCapacity]; 
  • 1
  • 2
  • 3
  • 4
  • 1
  • 2
  • 3
  • 4

kvbuffer包含資料區和索引區,這兩個區是相鄰不重疊的區域,用一個分界點來標識。分界點不是永恆不變的,每次Spill之後都會更新一次。初始分界點為0,資料儲存方向為向上增長,索引儲存方向向下:

這裡寫圖片描述

bufferindex一直往上增長,例如最初為0,寫入一個int型別的key之後變為4,寫入一個int型別的value之後變成8。

索引是對key-value在kvbuffer中的索引,是個四元組,佔用四個Int長度,包括:

  • value的起始位置
  • key的起始位置
  • partition值
  • value的長度
private static final int VALSTART = 0;    // val offset in acct
private static final int KEYSTART = 1; // key offset in acct private static final int PARTITION = 2; // partition offset in acct private static final int VALLEN = 3; // length of value private static final int NMETA = 4; // num meta ints private static final int METASIZE = NMETA * 4; // size in bytes // write accounting info kvmeta.put(kvindex + PARTITION, partition); kvmeta.put(kvindex + KEYSTART, keystart); kvmeta.put(kvindex + VALSTART, valstart); kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

kvmeta的存放指標kvindex每次都是向下跳四個“格子”,然後再向上一個格子一個格子地填充四元組的資料。比如kvindex初始位置是-4,當第一個key-value寫完之後,(kvindex+0)的位置存放value的起始位置、(kvindex+1)的位置存放key的起始位置、(kvindex+2)的位置存放partition的值、(kvindex+3)的位置存放value的長度,然後kvindex跳到-8位置。

緩衝區的大小預設為100M,但是可以通過mapreduce.task.io.sort.mb這個屬性來配置。

Spill溢寫過程:

        map將輸出不斷寫入到這個緩衝區中,當緩衝區使用量達到一定比例之後,一個後臺執行緒開始把緩衝區的資料寫入磁碟,這個寫入的過程叫spill。開始spill的Buffer比例預設為0.80,可以通過mapreduce.map.sort.spill.percent配置。在後臺執行緒寫入的同時,map繼續將輸出寫入這個環形緩衝,如果緩衝池寫滿了,map會阻塞直到spill過程完成,而不會覆蓋緩衝池中的已有的資料。

       在寫入之前,後臺執行緒把資料按照他們將送往的reducer進行劃分,通過呼叫PartitionergetPartition()方法就能知道該輸出要送往哪個Reducer。預設的Partitioner使用Hash演算法來分割槽,即通過key.hashCode() mode R來計算,R為Reducer的個數。getPartition返回Partition事實上是個整數,例如有10個Reducer,則返回0-9的整數,每個Reducer會對應到一個Partition。map輸出的鍵值對,與partition一起存在緩衝中(即前面提到的kvmeta中)。假設作業有2個reduce任務,則資料在記憶體中被劃分為reduce1和reduce2:

這裡寫圖片描述

並且針對每部分資料,使用快速排序演算法(QuickSort)對key排序。

如果設定了Combiner,則在排序的結果上執行combine。

排序後的資料被寫入到mapreduce.cluster.local.dir配置的目錄中的其中一個,使用round robin fashion的方式輪流。注意寫入的是本地檔案目錄,而不是HDFS。Spill檔名像sipll0.out,spill1.out等。

不同Partition的資料都放在同一個檔案,通過索引來區分partition的邊界和起始位置。索引是一個三元組結構,包括起始位置、資料長度、壓縮後的資料長度,對應IndexRecord類:

public class IndexRecord {
  public long startOffset;
  public long rawLength;
  public long partLength;

  public IndexRecord() { }

  public IndexRecord(long startOffset, long rawLength, long partLength) {
    this.startOffset = startOffset;
    this.rawLength = rawLength;
    this.partLength = partLength;
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

       每個mapper也有對應的一個索引環形Buffer,預設為1KB,可以通過mapreduce.task.index.cache.limit.bytes來配置,索引如果足夠小則存在記憶體中,如果記憶體放不下,需要寫入磁碟。 
       Spill檔案索引名稱類似這樣 spill110.out.index, spill111.out.index。

       Spill檔案的索引事實上是 org.apache.hadoop.mapred.SpillRecord的一個數組,每個Map任務(原始碼中的MapTask.Java類)維護一個這樣的列表:

final ArrayList<SpillRecord> indexCacheList = new ArrayList<SpillRecord>();
  • 1
  • 1

建立一個SpillRecord時,會分配(Number_Of_Reducers * 24)Bytes緩衝:

public SpillRecord(int numPartitions) {
  buf = ByteBuffer.allocate(
      numPartitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
  entries = buf.asLongBuffer();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5

numPartitions是Partition的個數,其實也就是Reducer的個數:

public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;

partitions = jobContext.getNumReduceTasks();
final SpillRecord spillRec = new SpillRecord(partitions);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

預設的索引緩衝為1KB,即1024*1024 Bytes,假設有2個Reducer,則每個Spill檔案的索引大小為2*24=48 Bytes,當Spill檔案超過21845.3時,索引檔案就需要寫入磁碟。

索引及spill檔案如下圖示意:

這裡寫圖片描述

Spill的過程至少需要執行一次,因為Mapper的輸出結果必須要寫入磁碟,供Reducer進一步處理。

合併Spill檔案

       在整個map任務中,一旦緩衝達到設定的閾值,就會觸發spill操作,寫入spill檔案到磁碟,因此最後可能有多個spill檔案。在map任務結束之前,這些檔案會根據情況合併到一個大的分割槽的、排序的檔案中,排序是在記憶體排序的基礎上進行全域性排序。下圖是合併過程的簡單示意:

這裡寫圖片描述

      相對應的索引檔案也會被合併,以便在Reducer請求對應Partition的資料的時候能夠快速讀取。

      另外,如果spill檔案數量大於mapreduce.map.combiner.minspills配置的數,則在合併檔案寫入之前,會再次執行combiner。如果spill檔案數量太少,執行combiner的收益可能小於呼叫的代價。

      mapreduce.task.io.sort.factor屬性配置每次最多合併多少個檔案,預設為10,即一次最多合併10個spill檔案。最後,多輪合併之後,所有的輸出檔案被合併為唯一一個大檔案,以及相應的索引檔案(可能只在記憶體中存在)。

壓縮:

         在資料量大的時候,對map輸出進行壓縮通常是個好主意。要啟用壓縮,將mapreduce.map.output.compress設為true,並使用mapreduce.map.output.compress.codec設定使用的壓縮演算法。

通過HTTP拷貝Map輸出結果到Reduce中:

       map輸出資料完成之後,通過執行一個HTTP Server暴露出來,供reduce端獲取。用來相應reduce資料請求的執行緒數量可以配置,預設情況下為機器核心數量的兩倍,如需自己配置,通過mapreduce.shuffle.max.threads屬性來配置,注意該配置是針對NodeManager配置的,而不是每個作業配置。

      同時,Map任務完成後,也會通知Application Master,以便Reducer能夠及時來拉取資料。

      通過緩衝、劃分(partition)、排序、combiner、合併、壓縮等過程之後,map端的工作就算完畢:

這裡寫圖片描述

Reduce端:

各個map任務執行完之後,輸出寫入執行任務的機器磁碟中。Reducer需要從各map任務中提取自己的那一部分資料(對應的partition)。每個map任務的完成時間可能是不一樣的,reduce任務在map任務結束之後會盡快取走輸出結果,這個階段叫copy。 

Reducer是如何知道要去哪些機器去資料呢?一旦map任務完成之後,就會通過常規心跳通知應用程式的Application Master。reduce的一個執行緒會週期性地向master詢問,直到提取完所有資料(如何知道提取完?)。

        資料被reduce提走之後,map機器不會立刻刪除資料,這是為了預防reduce任務失敗需要重做。因此map輸出資料是在整個作業完成之後才被刪除掉的。

reduce維護幾個copier執行緒,並行地從map任務機器提取資料。預設情況下有5個copy執行緒,可以通過mapreduce.reduce.shuffle.parallelcopies配置。

如果map輸出的資料足夠小,則會被拷貝到reduce任務的JVM記憶體中。mapreduce.reduce.shuffle.input.buffer.percent配置JVM堆記憶體的多少比例可以用於存放map任務的輸出結果。如果資料太大容不下,則被拷貝到reduce的機器磁碟上。

Reduce中的資料Merge:

記憶體中合併:

當緩衝中資料達到配置的閾值時,這些資料在記憶體中被合併、寫入機器磁碟。閾值有2種配置方式:

  • 配置記憶體比例: 前面提到reduce JVM堆記憶體的一部分用於存放來自map任務的輸入,在這基礎之上配置一個開始合併資料的比例。假設用於存放map輸出的記憶體為500M,mapreduce.reduce.shuffle.merger.percent配置為0.80,則當記憶體中的資料達到400M的時候,會觸發合併寫入。
  • 配置map輸出數量: 通過mapreduce.reduce.merge.inmem.threshold配置。

在合併的過程中,會對被合併的檔案做全域性的排序。如果作業配置了Combiner,則會執行combine函式,減少寫入磁碟的資料量。

Copy過程中磁碟合併:

在copy過來的資料不斷寫入磁碟的過程中,一個後臺執行緒會把這些檔案合併為更大的、有序的檔案。如果map的輸出結果進行了壓縮,則在合併過程中,需要在記憶體中解壓後才能給進行合併。這裡的合併只是為了減少最終合併的工作量,也就是在map輸出還在拷貝時,就開始進行一部分合並工作。合併的過程一樣會進行全域性排序。

最終磁碟中合併:

當所有map輸出都拷貝完畢之後,所有資料被最後合併成一個排序的檔案,作為reduce任務的輸入。這個合併過程是一輪一輪進行的,最後一輪的合併結果直接推送給reduce作為輸入,節省了磁碟操作的一個來回。最後(所以map輸出都拷貝到reduce之後)進行合併的map輸出可能來自合併後寫入磁碟的檔案,也可能來及記憶體緩衝,在最後寫入記憶體的map輸出可能沒有達到閾值觸發合併,所以還留在記憶體中。

每一輪合併並不一定合併平均數量的檔案數,指導原則是使用整個合併過程中寫入磁碟的資料量最小,為了達到這個目的,則需要最終的一輪合併中合併儘可能多的資料,因為最後一輪的資料直接作為reduce的輸入,無需寫入磁碟再讀出。因此我們讓最終的一輪合併的檔案數達到最大,即合併因子的值,通過mapreduce.task.io.sort.factor來配置。

假設現在有50個map輸出檔案,合併因子配置為10,則需要5輪的合併。最終的一輪確保合併10個檔案,其中包括4個來自前4輪的合併結果,因此原始的50箇中,再留出6個給最終一輪。所以最後的5輪合併可能情況如下:

這裡寫圖片描述

前4輪合併後的資料都是寫入到磁碟中的,注意到最後的2格顏色不一樣,是為了標明這些資料可能直接來自於記憶體

MemToMem合併:

除了記憶體中合併和磁碟中合併外,Hadoop還定義了一種MemToMem合併,這種合併將記憶體中的map輸出合併,然後再寫入記憶體。這種合併預設關閉,可以通過reduce.merge.memtomem.enabled開啟,當map輸出檔案達到reduce.merge.memtomem.threshold時,觸發這種合併。

最後一次合併後傳遞給reduce方法:

合併後的檔案作為輸入傳遞給Reducer,Reducer針對每個key及其排序的資料呼叫reduce函式。產生的reduce輸出一般寫入到HDFS,reduce輸出的檔案第一個副本寫入到當前執行reduce的機器,其他副本選址原則按照常規的HDFS資料寫入原則來進行。

效能調優:

如果能夠根據情況對shuffle過程進行調優,對於提供MapReduce效能很有幫助。相關的引數配置列在後面的表格中。

一個通用的原則是給shuffle過程分配儘可能大的記憶體,當然你需要確保map和reduce有足夠的記憶體來執行業務邏輯。因此在實現Mapper和Reducer時,應該儘量減少記憶體的使用,例如避免在Map中不斷地疊加。

執行map和reduce任務的JVM,記憶體通過mapred.child.java.opts屬性來設定,儘可能設大記憶體。容器的記憶體大小通過mapreduce.map.memory.mbmapreduce.reduce.memory.mb來設定,預設都是1024M。

map優化:

在map端,避免寫入多個spill檔案可能達到最好的效能,一個spill檔案是最好的。通過估計map的輸出大小,設定合理的mapreduce.task.io.sort.*屬性,使得spill檔案數量最小。例如儘可能調大mapreduce.task.io.sort.mb

map端相關的屬性如下表:

屬性名 值型別 預設值 說明
mapreduce.task.io.sort.mb int 100 用於map輸出排序的記憶體大小
mapreduce.map.sort.spill.percent float 0.80 開始spill的緩衝池閾值
mapreduce.task.io.sort.factor int 10 合併檔案數最大值,與reduce共用
mapreduce.map.combine.minspills int 3 執行combiner的最低spill檔案數
mapreduce.map.out.compress boolean false 輸出是否壓縮
mapreduce.map.out.compress 類名 DefaultCodec 壓縮演算法
mapreduce.shuffle.max.threads int 0 服務於reduce提取結果的執行緒數量

Reduce優化:

在reduce端,如果能夠讓所有資料都儲存在記憶體中,可以達到最佳的效能。通常情況下,記憶體都保留給reduce函式,但是如果reduce函式對記憶體需求不是很高,將mapreduce.reduce.merge.inmem.threshold(觸發合併的map輸出檔案數)設為0,mapreduce.reduce.input.buffer.percent(用於儲存map輸出檔案的堆記憶體比例)設為1.0,可以達到很好的效能提升。

屬性名 值型別 預設值 說明
mapreduce.reduce.shuffle.parallelcopies int 5 提取map輸出的copier執行緒數
mapreduce.reduce.shuffle.maxfetchfailures int 10 提取map輸出最大嘗試次數,超出後報錯
mapreduce.task.io.sort.factor int 10 合併檔案數最大值,與map共用
mapreduce.reduce.shuffle.input.buffer.percent float 0.70 copy階段用於儲存map輸出的堆記憶體比例
mapreduce.reduce.shuffle.merge.percent float 0.66 開始spill的緩衝池比例閾值
mapreduce.reduce.shuffle.inmem.threshold int 1000 開始spill的map輸出檔案數閾值,小於等於0表示沒有閾值,此時只由緩衝池比例來控制
mapreduce.reduce.input.buffer.percent float 0.0 reduce函式開始執行時,記憶體中的map輸出所佔的堆記憶體比例不得高於這個值,預設情況記憶體都用於reduce函式,也就是map輸出都寫入到磁碟


通用優化:

Hadoop預設使用4KB作為緩衝,這個算是很小的,可以通過io.file.buffer.size來調高緩衝池大小。