1. 程式人生 > >MapReduce內部shuffle過程詳解(Combiner的使用)

MapReduce內部shuffle過程詳解(Combiner的使用)

Maptask呼叫一個元件FileInputFormat
FileInputFormat有一個最高層的介面 --> InputFormat
我們不需要去寫自己的實現類,使用的就是內部預設的元件:TextInputFormat
在這裡插入圖片描述

maptask先呼叫TextInputFormat, 但是實質讀資料是TextInputFormat呼叫RecordReader。 RecordReader 是一個介面,這個介面的實現類呼叫read方法去讀取資料。

InputFormat和RecordReader:

org.apache.hadoop.mapreduce包裡的InputFormat抽象類提供瞭如下列程式碼所示的兩個方法:

public abstract class InputFormat<K, V> {
    public abstract List<InputSplit> getSplits(JobContext context);
    RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context);
}

這兩個方法展示了InputFormat類的兩個功能:

  • 將輸入檔案切分為map處理所需的split
  • 建立RecordReader類,它將從一個split生成鍵值對序列

RecordReader類同樣也是org.apache.hadoop.mapreduce包裡的抽象類

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {

  /*
  * 初始化RecordReader,只能被呼叫一次。
  */
  public abstract void initialize(InputSplit split,
                                  TaskAttemptContext context
                                  ) throws IOException, InterruptedException;

  /**
   * 獲取下一個資料的鍵值對
   */
  public abstract 
  boolean nextKeyValue() throws IOException, InterruptedException;

  /**
   * 獲取當前資料的 key
   */
  public abstract
  KEYIN getCurrentKey() throws IOException, InterruptedException;
  
  /**
   * 獲取當前資料的value
   */
  public abstract 
  VALUEIN getCurrentValue() throws IOException, InterruptedException;
  
  /**
   *  進度
   */
  public abstract float getProgress() throws IOException, InterruptedException;
  
  /**
   * 關閉RecordReader
   */
  public abstract void close() throws IOException;
}

組合使用InputFormat和RecordReader可以將任何型別的輸入資料轉換為MapReduce所需的鍵值對。

下面有一篇文章可以參考:
https://www.cnblogs.com/xuepei/p/3664698.html

InputFormat:
通過使用InputFormat,MapReduce框架可以做到:

  • 1、驗證作業的輸入的正確性

  • 2、將輸入檔案切分成邏輯的InputSplits,一個InputSplit將被分配給一個單獨的Mapper task

  • 3、提供RecordReader的實現,這個RecordReader會從InputSplit中正確讀出一條一條的K-V對供Mapper使用。

maptask會將返回的 key - value 交給自定義的Mapper。如果沒有自定義的Mapper就是使用預設的Mapper。這個預設的Mapper的map處理方法,就是進來什麼就輸出什麼。
在這裡插入圖片描述

map方法的引數有 key - value ,所以maptask會將InputFormat返回的key - value交給Mapper的map方法去使用。
在這裡插入圖片描述

內部還有一個元件 OutputCollector:
OutputCollector 由hadoop框架提供,複製收集Mapper和Reducer的輸出資料,實現map或者reduce函式時,只需要簡單地將其輸出的<key,value>對往OutputCollector中一丟即可,剩餘的框架會幫你做好。

這個收集器會將資料輸出到一個環形緩衝區,其實就是一個數組,(一邊寫資料,一邊會對資料進行回收,通過索引來控制,相當於一個環一樣)

環形緩衝區其實就是一個數組,後端不斷接收資料的同時,前端資料不斷溢位,長度用完之後讀取的新資料再從前端開始覆蓋。這個緩衝區的預設大小是100M – 這個環形緩衝區是記憶體裡面的,速度很快,但是容量有限。-- 這個100M是不會全部被使用的,因為還需要記憶體進行排序等操作。 這裡面有一個保留區
操作80%就會溢位

spiller元件會從環形緩衝區溢位檔案,這過程會按照定義的partitioner分割槽(預設是hashpartition),並且按照key.compareTo()進行排序(底層主要是快排和外部排序)
spiller會在環形緩衝區溢位的時候,對資料進行分割槽和排序, – (spiller會會環形緩衝區這段記憶體進行操作)
分割槽是根據key,這個分割槽需要呼叫一個元件 Partitioner - 預設的實現是 HashPartitioner.
排序的時候需要呼叫被排序物件的compareTo()方法。

這個過程就是將資料分好區,每個區都是排好序的。
分割槽排序之後,就可以將檔案往外寫。一個區一個區往外寫。 記憶體中的資料寫到檔案中。這個檔案就在maptask的工作目錄裡面。就在本地。
每一次溢位就會產生一個檔案。
在這裡插入圖片描述

如果記憶體裡面還剩下最後的一點點資料,那也需要溢位。

這些溢位的檔案不會一個一個地交給reduce去處理,還會先進行一次合併。合併完之後會形成一個大檔案。這個merge使用的是歸併排序。

在這裡插入圖片描述

merge之後就會將資料全部交給reduce來進行處理:

在這裡插入圖片描述

reduce task會去下載資料(通過網路從傳輸)

在這裡插入圖片描述

reduce task 從 map task 機器上下載下來的資料是沒有排序的,還需要進行再一次地合併。

在這裡插入圖片描述

問題: reduce task 拿到這個檔案是怎麼處理的?
reducer 的 reduce方法是處理業務邏輯的,會根據key來處理,相同的key就呼叫一次reduce() 方法。呼叫 GroupingComparator()方法

在這裡插入圖片描述


在這裡插入圖片描述


完整的圖:

在這裡插入圖片描述


Combainer

Hadoop礦建使用Mapper將資料處理成一個個<key,value>鍵值對,在網路節點間對其進行整理(shuffle),然後使用Reduce處理資料並進行最後的輸出。

這時會出現效能上的瓶頸:
(1)如果我們有10億個資料,Mapper會產生10億個鍵值對在網路間進行傳輸,但是如果我們對資料求最大值,那麼很明顯的Mapper值需要輸出它所知道的最大值即可。這樣不僅可以減少輕網路壓力,同樣也可以大幅度提高程式效率。
總結:網路頻寬嚴重被佔降低效率。
(2)有時候資料遠遠不是一致性的或者說是平衡分佈的,這樣不僅Mapper中的鍵值對,中間階段(shuffle)的鍵值對等,大多數的鍵值對最終會聚集於單一的Reducer上,壓倒這個Reducer,從而大大降低程式的效能。
總結:單一節點承載過重降低程式效能。


在MapperReducer程式設計模型中,在Mapper和Reducer之間有一個非常重要的元件,它解決了上述的效能瓶頸問題,它就是Combiner.

注意點:

	(1)與mapper和reducer不同的是,combiner沒有預設的實現,需要顯式設定早conf中才有作用,
	(2)並不是所有的job都適用combiner,只有操作滿足結合律的才可設定combiner。combine操作類似於:
	opt(opt(1, 2, 3), opt(4, 5, 6))。如果opt為求和、求最大值的話,可以使用,但是如果是求中值的話,不適用。

每一個mapper都可能產生大量的本地輸出,Combiner額作用就是對map端的輸出先做一次合併,減少在map和reduce節點之間的資料傳輸,以提高網路IO效能,是MapReduce的一種優化手段之一,其具體作用如下所述:

自定義的Combiner其實就是跟Reducer一樣的程式碼:

package com.thp.bigdata.wcdemo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 自定義的Combiner
 * @author tommy
 *
 */
public class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
		//
		System.out.println("Combiner輸入分組<" + key.toString() + ",N(N>=1)>");
		
		int count = 0;
		for(IntWritable value : values) {
			count += value.get();  // 這個count 最後就是某一個單詞的彙總的值
			// 顯示次數表示輸入的k2,v2的鍵值對數量
			System.out.println("Combiner輸入鍵值對<" + key.toString() + "," + value.get() + ">");
		}
		context.write(key, new IntWritable(count));
		System.out.println("Combiner輸出鍵值對<" + key.toString() + "," + count+ ">");
	}
	
}

還需要將這個自定義的Combiner新增進去:

// 設定Map規約Combiner
		job.setCombinerClass(MyCombiner.class);

我每一個檔案中就有8個hello,這個8個hello如果使用了combiner會先進行一次合併:

沒使用之前:

Reducer輸入分組<hello,N(N>=1)>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>

reduce會有56次輸入,但是如果使用了combiner的話,每一次map之後,就會進行換一次合併:
在這裡插入圖片描述

接下來會有一個combiner進行一次合併:
在這裡插入圖片描述

第一個檔案中的8個hello就進行了一次合併:
每一個檔案都會有這個combiner的過程,最後會有reduce的過程進行彙總:
在這裡插入圖片描述
這個reduce過程對hello進行統計只有7次輸入,每一次輸入的資料都是8次,是combiner先進行了一次合併。

總結:
從控制檯的資訊我們可以發現,其實combiner只是把相同的hello進行規約,由此輸入給reduce的就變成<hello,8>。在實際的Hadoop叢集操作中,我們是由多臺主機一起進行MapReduce的,如果加入規約操作,每一臺主機在reduce之前先進行一次對本機資料的規約,然後再通過叢集進行reduce操作,這樣就大大節省reduce時間,從而加快了MapReduce的處理速度。

但是:特別值得注意的一點是,一個combiner只是處理一個節點中的輸出,而不能享受向reduce一樣的輸入(經過了shuffle階段的資料),這點非常關鍵

combiner:

前面展示的流水線忽略了一個可以優化MapReduce作業所使用的頻寬的步驟,這個過程叫Combiner,它在Mapper之後,Reducer之前執行。combienr是可選的,如果這個過程適合於你的作業,Combiner例項會在每一個執行map任務的節點上執行。Combiner會接收特定節點上的Mapper例項的輸出作為輸入,接著Combiner的輸出會被送到Reducer那裡,而不是傳送Mapper的輸出。Combiner是一個“迷你reduce過程”,他只處理單臺機器生成的資料。

參考別的大神的部落格:
https://blog.csdn.net/guoery/article/details/8529004