1. 程式人生 > >MapReduce框架學習(2)——Map/Reduce及 Shuffle前後

MapReduce框架學習(2)——Map/Reduce及 Shuffle前後

參考: JeffreyZhou的部落格園
《Hadoop權威指南》第四版

0 Map/Reduce大致流程

  1. 輸入(input): 將輸入資料分成一個個split,並將spilt進一步拆成<key,value>形式;
  2. 對映(map):根據輸入的<key,value>進行處理,輸出list<key,value>;
  3. 合併(combiner):合併(單個節點上)中間相同的key值;
  4. 分割槽(partition):將<key,value>分成N分,分別送到下一環節;
  5. 化簡(reduce):將中間結果合併,得到最終結果;
  6. 輸出(output):指定輸出最終結果格式。

本博文重點在於將map/reduce中的各個小環節進行理解和應用,屬於細節方面,而不是講解MR的大致執行流程,這類知識應該作為前提了解,本文在此放置一個整體的執行例圖,方便對比本博文的章節內容。

MR

圖片來源:圖解mapreduce原理和執行過程

2.1 Java Map/Reduce

在明白MR程式的工作原理之後,要知道一個MR作業,包括三點:

  • 輸入資料
  • MR程式
  • Job配置資訊

上篇博文大概講解了輸入資料的分片、格式等知識,這篇來講講MR過程,後面再將配置資訊。

2.2 map處理

map函式由Mapper類來表示,如果不指定map函式,則系統自動指定一個Null,這意味著將輸入的<key,value>對,不做任何修改,直接送到下一個環節。要自定義map函式,就繼承Mapper類,複寫其map函式。如wordcount程式中的這句:

public static class TokenizerMapper 
extends Mapper<Object, Text, Text, IntWritable> 
throws IOException, InterruptedException
{
public void map
(Object key, Text value, Context context) throws IOException, InterruptedException { // your map code context.write(key, value); } }

Mapper是一個泛型型別,有四個形參,分別指定map函式的輸入鍵、輸入值、輸出鍵和輸出值的型別。可以看到,這幾個形參基本都是java中沒見過的型別,但是text 和 int都比較熟悉,沒錯,這是Hadoop自身提供的一套基本型別,它可以優化網路序列化傳輸,至於這個是什麼後面再將,總之就是進行優化了,更適合Hadoop的使用。常見對應關係如下:

Hadoop Java
Text String
IntWritable Integer
。。。 依次類似

在程式中,使用下列語句指定map處理類:

job.setMapperClass(TokenizerMapper.class))

2.3 reduce處理

這一塊如圖所示,接受上一環節的輸出,進行處理,並形成最終的輸出結果,上一環節可能是Partitioner的輸出,也可能就是map的輸出,這個要視情況來決定是使用低配方案還是頂配,待會兒分析中間這幾個環節,先來看看哼哈二將的哈大頭。同樣,要自定義reduce函式,就繼承Reducer類,複寫reduce函式:

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
  public void reduce(Text key, Iterable<IntWritable> values,Context context) 
  throws IOException, InterruptedException {
  // your reduce code
    context.write(key, result);
  }
}

reduce和map很像,同樣的四個形參,可以想到的是,這裡的輸入引數型別應該是和map(或者說上一個任意環節)的輸出型別一樣的,不然暗號都不一樣,怎麼可能成功接頭。所以,這是程式設計中要注意到的一點。
在程式中,使用下列語句指定map處理類:

job.setReducerClass(IntSumReducer.class))

2.4 map和reduce之間的糾纏

  1. map任務將其輸出寫入本地磁碟,而非HDFS
      map的輸出是中間結果,該中間結果由reduce任務處理後才產生最終輸出結果,而且一旦作業(job)完成,map的輸出結果就可以刪除。map的輸出是中間結果,該中間結果由reduce任務處理後才產生最終輸出結果,而且一旦作業(job)完成,map的輸出結果就可以刪除。

  2. 預設只有一個reduce任務
      預設情況下,所有Map任務的輸出,排序後需通過網路傳輸傳送到執行reduce任務的節點,資料在reduce端合併,然後才能由我們自定義的reduce函式處理,此時的輸出儲存到HDFS中。

  3. 太多map和多個reduce任務
      當然,一般都是設定多個reduce任務,不然人家map群挑一個reduce,那對reduce多不公平,但reduce,你看她名字就知道了,化簡,肯定是比map少,這才能顯示出其地位。
      那麼,在多對多的情況下,怎麼分配呢,這就是partiton的作用了,分好區,劃地封侯,保證領土完整,互不侵犯。

  4. map的輸出傳送到reduce的輸入
      前面說了,map具有資料本地優勢化,但reduce沒有啊,在叢集中,頻寬應該算是最重要的資源了,沒辦法,要致富,先修路,沒出路,再大的能耐也沒轍。那麼為了減少map輸出產生的資料傳送,我們可以現在map本地進行一下“reduce”,沒錯,就是本地化的reduce。這裡似乎有點亂,只要記住一點,combiner屬於優化方案,每個combiner只作用於一個map上面,而reduce的作用在所有map上面。combiner函式是為了減少mapper和reducer之間的資料傳輸量,是否使用還需要斟酌一下。

  5. 上面的邏輯看起來比較簡單,但真實執行情況要比這個複雜的多,簡單的做一下對比就知道了:

WordCount舉例 真實情況
一個任務 多個任務併發
現有的小規模輸入資料,txt格式 海量,不同資料來源,多種格式,實時
幾個mapper,一個reducer 好多reducer,超多mapper
無軟、硬體故障 硬體故障是一種常態

2.5 Combiner函式和Partitioner函式

  1. 前面講了,combiner就是本地化的reducer,自定義:
public static class MyCombiner 
extends Reducer<Text,IntWritable,Text,IntWritable> {
   public void reduce(Text key, Iterable<IntWritable> values,Context context) 
   throws IOException, InterruptedException {
     context.write(key, new IntWritable(1));
   }
 }

在程式中,使用下列語句指定map處理類:

job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class)

值得注意的一點是:combiner可以不止一次的呼叫。。。如果至少存在3個溢位檔案,則combiner就會在輸出檔案寫到磁碟之再次執行。

  1. Partitioner決定著Map節點的輸出將被分割槽到哪個Reduce節點。而預設的Partitioner是HashPartitioner,它根據每條資料記錄的主健值進行Hash操作,獲得一個非負整數的Hash碼,然後用當前作業的Reduce節點數取模運算,這個取模運算只是為了平均reduce的處理能力,咱可以自定義。有N個結點的話,就會平均分配置到N個節點上,一個隔一個依次。大多情況下這個平均分配是夠用了,但也會有一些特殊情況,以後補充。
    自定義:
public static class MyPartitioner extends HashPartitioner<K,V> {
   public void getPartition(K key, V value,int numReduceTasks) {
     super.getPartition(key,value,numReduceTasks);
   }
 }

2.6 到底什麼是shuffle

簡單說,就是將map輸出作為輸入,傳給reducer的過程,成為shuffle。它屬於不斷被優化和改進的程式碼庫的一部分,

此處劃重點,優化!!!

也就是說,按照上面分析的理想狀態,不就是map後輸出,然後經過combiner,再將所有的輸出集合到partitioner進行劃分,到不同的reducer進行處理,輸出到HDFS不就行啦。
  但是,再強大的理論框架,也擋不住現實的殘酷。(以下3點沒有查到具體的依據,都是自己的想法):
  (1)首先,直接map輸出如果直接寫入磁碟,一般機械硬碟讀寫很慢,這嚴重影響效率,理所當然想到利用記憶體(緩衝區),但問題又來了,內容的容量遠遠小於硬碟容量,經過記憶體就可能產生很多小檔案(spill to disk溢位檔案),但最終一個map的輸出檔案就只有一個,這時,就需要將多個溢位檔案合併(merge on disk)。;
  (2)既然在這過程中利用了記憶體,記憶體速度快,那麼在這轉手的過程中,還可以順便做一些其它的優化措施,比如,sortcombiner。排序是MR的預設行為,再進行一下combine,就可以減少spill檔案。
  (3)對照下圖,似乎還有一個partitions沒有解釋,因為上述兩點,有一個前提條件:只有一個map和一個reducer。前面講了,對於多個reducer,要將map分割槽送給不同的reducer,那麼在哪決定分給哪個reducer呢?其實在寫磁碟之前,執行緒首先根據資料最終要傳的reducer把資料劃分成相應的分割槽(partition)。在每個分割槽中,後臺執行緒再按鍵進行記憶體中排序,再執行combiner(如果有的話)。

shuffle過程圖解

上面是按照優化的目的,來解釋shuffle中各個環節的意義,已經將整個過程講了一遍,接下來就將這個過程分為map端和reduce端來說一下。

下面這段的參考來源: MapReduce原理分析記錄

2.6.1 map端

shuffle_map端
  map函式開始輸出時,並不是簡單地將它寫到磁碟,這個過程比想象中更復雜,它利用緩衝的方式寫到記憶體,並出於效率的考慮進行預排序。
  記憶體緩衝區預設大小是100MB,一旦緩衝內容達到閾值(預設為0.8,即80%),一個後臺執行緒便開始把內容溢位(spill)到磁碟,在這過程中,map輸出繼續寫到緩衝區,互不影響。
  在任務完成之前,溢位檔案被合併成一個已分割槽且已排序的輸出檔案。最終生成的檔案存放在Task Tracker(即DataNode)夠得著的某個本地目錄內。

關於Namenode、Datanode、Jobtracker、Tasktracker區別見本系列另外一篇文章:Hadoop學習中的一些概念區分

2.6.2 reduce端

shuffle_reduce端

每個reduce task不斷地通過RPC從Job Tracker(即namenode)那裡獲取map task是否完成的資訊,如果reduce得到通知,獲知某臺Task Tracker上的map task執行完成,就可以開始啟動shuffle的後半段過程了。
  reduce task在執行前,不斷拉取當前job裡,每個map的最終結果,然後對從不同地方拉取過來的資料不斷的做merge,最終也形成一個檔案,最為reduce task的輸入。
  - copy。每個map任務的完成時間可能不同,因此在每個map任務完成時,reduce任務就開始複製其輸出,從而能夠並行取得map輸出,預設是5個copy執行緒。
  - Merge。這和map階段的merge動作一樣,至少合併的是不同map端的數值。
  - “最終檔案”,merge後生成的最終檔案,可能存在於磁碟,也可能存在於記憶體中,預設是在磁碟中。

關於MR流程的一些補充

1. map任務個數

  • 等於輸入檔案被劃分成的分塊數(每個split對應一個map過程);
  • 如果檔案在HDFS中,這又取決於輸入檔案的大小以及檔案塊的大小(即Block size)等於輸入檔案被劃分成的分塊數(每個split對應一個map過程),如果檔案在HDFS中,這又取決於輸入檔案的大小以及檔案塊的大小(即Block size)。
  • 對於大多數作業來說,一個合理的split大小趨向於HDFS的Block size,預設是128MB。

map任務個數

  • 原因:
    這就是map的資料本地化的優勢,即著名的“計算向資料靠近”的思想。因為叢集的頻寬資源很寶貴,Hadoop在儲存有輸入資料的節點上執行map任務,可以獲得最佳效能。
    (1)如果分片過大,跨越了兩個資料塊,那麼對於任意一個HDFS節點,基本都不可能同時儲存這兩個資料塊(根據HDFS的儲存特點),就需要通過網路傳輸資料;
    (2)如果分片太小,那麼關鍵時間反而會卡在管理分片和構建map任務的總時間,而不是計算本身,有點“得不償失”。

2. reduce任務個數

從上面可以看到,我們並沒有設定map任務的數量,因為這是根據Block size來自動劃分的。
但reduce任務的數量是單獨指定的,預設情況下,只有一個reducer,這當然不行,選擇reducer的數量是個技術活,由於並行化程度提高,增加reducer的數量能縮短reduce過程,然而,太多的話,小檔案將會更多,這又不夠優化。

按照《Hadoop權威指南》中一條經驗法則:
目標reducer保持在每個執行5分鐘左右,且產生至少一個HDFS塊(大小)的輸出比較合適。

3. MR過程中的資料格式問題

在上一篇博文中,我們先從外層瞭解了整個MR作業的輸入和輸出,把MR作業當作一個黑盒子,先知道進去啥東西,出來啥結果。然後這一篇博文打開了這個黑盒子,瞭解了中間過程的大體框架,那麼,在輸入資料進入這個黑盒子以後,到輸出之前,不管資料內容,先想想這中間的資料格式及形成又是如何變化的呢?
其實在前面提到過,不管是啥格式,至少這個環節的輸出格式,得和下一環節的輸入格式一致吧?不然口徑都不一樣,怎麼對接?
上面只提到了setMapperClass()setReducerClass()來指定要用的map型別和reduce型別,這兩個類中定義了接收的輸入資料型別和最終的輸出資料型別。但其實可以用下面兩個函式控制reduce函式的輸出型別,並且必須和reduce類產生的相匹配:

setOutputKeyClass()
setOutputValueClass()

map函式的輸出型別預設情況下和reduce函式是相同的,但是,如果不同,則通過以下方法來設定map函式的輸出型別:

setMapOutputKeyClass()
setMapOutputValueClass()

4. WordCount示例程式

這一大套講下來,自己都有點迷糊了,放一個程式程式碼,來捋一捋思路,清醒一下:

// Map過程
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
	private final static IntWritable one = new IntWritable(1);
	private Text word = new Text();
	public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
	// key 是這一行資料的起始偏移量,value是這一行的文字內容
	System.out.println("key=" +key.toString());
	System.out.println("Value=" + value.toString());
// 切分單詞,可以使用split()方法
	StringTokenizer itr = new StringTokenizer(value.toString());
//遍歷單詞陣列,輸出key-value的格式
	while (itr.hasMoreTokens()) {
		word.set(itr.nextToken());
		context.write(word, one);
	}
	}
}
// Reduce過程
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
	private IntWritable result = new IntWritable();
	public void reduce(Text key, Iterable<IntWritable> values,Context context) 
	throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable val : values) {
			sum += val.get();
		}
		result.set(sum);
		context.write(key, result);
	}
}

// 主程式
public static void main(String[] args) throws Exception {
	Configuration conf = new Configuration();   // 配置檔案
	// System.out.println("url:" + conf.get("fs.default.name"));  // deprecated
	System.out.println("url:" + conf.get("fs.defaultFS"));
	String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	if (otherArgs.length != 2) {
		System.err.println("Usage: wordcount <in> <out>");
		System.exit(2);
	}
	// 獲取一個作業
	// Job job = new Job(conf, "word count");  // deprecated
	Job job = Job.getInstance(conf,"wordcount");  // 用job的靜態方法
	// 設定job所用的那些類(class)檔案在哪個jar包
	job.setJarByClass(WordCount.class);
	// 設定所用的map reduce類
	job.setMapperClass(TokenizerMapper.class);
	job.setCombinerClass(IntSumReducer.class);  // 對每個節點的map輸出進行combine
	job.setReducerClass(IntSumReducer.class);  // 對所有節點的輸出進行reduce

// 設定資料輸出型別
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(IntWritable.class);
// 指定要處理的輸入、輸出路徑,
	//此處為輸入引數
	FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
	//此處為固定檔案目錄
	// FileInputFormat.addInputPath(job, "input");
	// FileOutputFormat.setOutputPath(job, "output");
// 將job提交給叢集執行
	System.exit(job.waitForCompletion(true) ? 0 : 1);
}