一文讀懂MapReduce
Hadoop解決大規模資料分散式計算的方案是MapReduce。MapReduce既是一個程式設計模型,又是一個計算框架。也就是說,開發人員必須基於MapReduce程式設計模型進行程式設計開發,然後將程式通過MapReduce計算框架分發到Hadoop叢集中執行。我們先看一下作為程式設計模型的MapReduce。
MapReduce程式設計模型
MapReduce是一種非常簡單又非常強大的程式設計模型。
簡單在於其程式設計模型只包含map和reduce兩個過程,map的主要輸入是一對<key , value>值,經過map計算後輸出一對<key , value>值;然後將相同key合併,形成<key , value集合>;再將這個<key , value集合>輸入reduce,經過計算輸出零個或多個<key , value>對。
但是MapReduce同時又是非常強大的,不管是關係代數運算(SQL計算),還是矩陣運算(圖計算),大資料領域幾乎所有的計算需求都可以通過MapReduce程式設計來實現。
我們以WordCount程式為例。WordCount主要解決文字處理中的詞頻統計問題,就是統計文字中每一個單詞出現的次數。如果只是統計一篇文章的詞頻,幾十K到幾M的資料,那麼寫一個程式,將資料讀入記憶體,建一個Hash表記錄每個詞出現的次數就可以了,如下圖。

小資料量的詞頻統計
但是如果想統計全世界網際網路所有網頁(數萬億計)的詞頻數(這正是google這樣的搜尋引擎典型需求),你不可能寫一個程式把全世界的網頁都讀入記憶體,這時候就需要用MapReduce程式設計來解決。
WordCount的MapReduce程式如下。
public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } }複製程式碼
其核心是一個map函式,一個reduce函式。
map函式的輸入主要是一個<key , value>對,在這個例子裡,value是要統計的所有文字中的一行資料,key在這裡不重要,我們忽略。
public void map(Object key, Text value, Context context)複製程式碼
map函式的計算過程就是,將這行文字中的單詞提取出來,針對每個單詞輸出一個<word , 1>這樣的<key , value>對。
MapReduce計算框架會將這些<word , 1>收集起來,將相同的word放在一起,形成<word , <1,1,1,1,1,1,1.....>>這樣的<key , value集合>資料,然後將其輸入給reduce函式。
public void reduce(Text key, Iterable<IntWritable> values,Context context)複製程式碼
這裡的reduce的輸入引數values就是由很多個1組成的集合,而key就是具體的單詞word。
reduce函式的計算過程就是,將這個集合裡的1求和,再將單詞(word)和這個和(sum)組成一個<key , value>(<word , sum>)輸出。每一個輸出就是一個單詞和它的詞頻統計總和。
假設有兩個block的文字資料需要進行詞頻統計,MapReduce計算過程如下圖。

MapReduce計算過程
一個map函式可以針對一部分資料進行運算,這樣就可以將一個大資料切分成很多塊(這也正是HDFS所做的),MapReduce計算框架為每個塊分配一個map函式去計算,從而實現大資料的分散式計算。
上面提到MapReduce程式設計模型將大資料計算過程切分為map和reduce兩個階段,在map階段為每個資料塊分配一個map計算任務,然後將所有map輸出的key進行合併,相同的key及其對應的value傳送給同一個reduce任務去處理。
這個過程有兩個關鍵問題需要處理
-
如何為每個資料塊分配一個map計算任務,程式碼是如何傳送資料塊所在伺服器的,傳送過去是如何啟動的,啟動以後又如何知道自己需要計算的資料在檔案什麼位置(資料塊id是什麼)
-
處於不同伺服器的map輸出的<key , value> ,如何把相同的key聚合在一起傳送給reduce任務
這兩個關鍵問題正好對應文章中“MapReduce計算過程”一圖中兩處“MapReduce框架處理”。
MapReduce計算過程中兩處MapReduce框架處理
我們先看下MapReduce是如何啟動處理一個大資料計算應用作業的。
MapReduce作業啟動和執行機制
我們以Hadoop1為例,MapReduce執行過程涉及以下幾類關鍵程序:
-
大資料應用程序:啟動使用者MapReduce程式的主入口,主要指定Map和Reduce類、輸入輸出檔案路徑等,並提交作業給Hadoop叢集。
-
JobTracker程序:根據要處理的輸入資料量啟動相應數量的map和reduce程序任務,並管理整個作業生命週期的任務排程和監控。JobTracker程序在整個Hadoop叢集全域性唯一。
-
TaskTracker程序:負責啟動和管理map程序以及reduce程序。因為需要每個資料塊都有對應的map函式,TaskTracker程序通常和HDFS的DataNode程序啟動在同一個伺服器,也就是說,Hadoop叢集中絕大多數伺服器同時執行DataNode程序和TaskTacker程序。
如下圖所示。
MapReduce作業啟動和執行機制
具體作業啟動和計算過程如下:
-
應用程序將使用者作業jar包儲存在HDFS中,將來這些jar包會分發給Hadoop叢集中的伺服器執行MapReduce計算。
-
應用程式提交job作業給JobTracker。
-
JobTacker根據作業排程策略建立JobInProcess樹,每個作業都會有一個自己的JobInProcess樹。
-
JobInProcess根據輸入資料分片數目(通常情況就是資料塊的數目)和設定的reduce數目建立相應數量的TaskInProcess。
-
TaskTracker程序和JobTracker程序進行定時通訊。
-
如果TaskTracker有空閒的計算資源(空閒CPU核),JobTracker就會給他分配任務。分配任務的時候會根據TaskTracker的伺服器名字匹配在同一臺機器上的資料塊計算任務給它, 使啟動的計算任務正好處理本機上的資料。
-
TaskRunner收到任務後根據任務型別(map還是reduce),任務引數(作業jar包路徑,輸入資料檔案路徑,要處理的資料在檔案中的起始位置和偏移量,資料塊多個備份的DataNode主機名等)啟動相應的map或者reduce程序。
-
map或者reduce程式啟動後,檢查本地是否有要執行任務的jar包檔案,如果沒有,就去HDFS上下載,然後載入map或者reduce程式碼開始執行。
-
如果是map程序,從HDFS讀取資料(通常要讀取的資料塊正好儲存在本機)。如果是reduce程序,將結果資料寫出到HDFS。
通過以上過程,MapReduce可以將大資料作業計算任務分佈在整個Hadoop叢集中執行,每個map計算任務要處理的資料通常都能從本地磁碟上讀取到。而使用者要做的僅僅是編寫一個map函式和一個reduce函式就可以了,根本不用關心這兩個函式是如何被分佈啟動到叢集上的,資料塊又是如何分配給計算任務的。這一切都由MapReduce計算框架完成。
MapReduce資料合併與連線機制
在WordCount例子中,要統計相同單詞在所有輸入資料中出現的次數,而一個map只能處理一部分資料,一個熱門單詞幾乎會出現在所有的map中,這些單詞必須要合併到一起進行統計才能得到正確的結果。
事實上,幾乎所有的大資料計算場景都需要處理資料關聯的問題,簡單如WordCount只要對key進行合併就可以了,複雜如資料庫的join操作,需要對兩種型別(或者更多型別)的資料根據key進行連線。
MapReduce計算框架處理資料合併與連線的操作就在map輸出與reduce輸入之間,這個過程有個專門的詞彙來描述,叫做shuffle。
MapReduce shuffle過程
每個map任務的計算結果都會寫入到本地檔案系統,等map任務快要計算完成的時候,MapReduce計算框架會啟動shuffle過程,在map端呼叫一個Partitioner介面,對map產生的每個<key , value>進行reduce分割槽選擇,然後通過http通訊傳送給對應的reduce程序。這樣不管map位於哪個伺服器節點,相同的key一定會被髮送給相同的reduce程序。reduce端對收到的<key , value>進行排序和合並,相同的key放在一起,組成一個<key , value集合>傳遞給reduce執行。
MapReduce框架預設的Partitioner用key的雜湊值對reduce任務數量取模,相同的key一定會落在相同的reduce任務id上,實現上,這樣的Partitioner程式碼只需要一行,如下所示。
/** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }複製程式碼
shuffle是大資料計算過程中發生奇蹟的地方,不管是MapReduce還是Spark,只要是大資料批處理計算,一定會有shuffle過程,讓資料關聯起來,資料的內在關係和價值才會呈現出來。不理解shuffle,就會在map和reduce程式設計中產生困惑,不知道該如何正確設計map的輸出和reduce的輸入。shuffle也是整個MapReduce過程中最難最消耗效能的地方,在MapReduce早期程式碼中,一半程式碼都是關於shuffle處理的。