7個例項全面掌握Hadoop MapReduce
作者介紹
杜亦舒,創業中,技術合夥人,喜歡研究分享技術。個人訂閱號:效能與架構。
本文旨在幫您快速瞭解 MapReduce 的工作機制和開發方法,解決以下幾個問題:
-
MapReduce 基本原理是什麼?
-
MapReduce 的執行過程是怎麼樣的?
-
MapReduce 的核心流程細節
-
如何進行 MapReduce 程式開發?(通過7個例項逐漸掌握)
文章中提供了程式例項中涉及到的測試資料檔案,可以直接下載使用。
關於實踐環境,如果您不喜歡自己搭建Hadoop環境,可以下載使用本教程提供的環境,實踐部分內容中會介紹具體使用方法。
通過學習並實踐完成後,可以對 MapReduce 工作原理有比較清晰的認識,並掌握 MapReduce 的程式設計思路。
大綱:
一、MapReduce 基本原理
二、MapReduce 入門示例 - WordCount 單詞統計
三、MapReduce 執行過程分析
-
例項1 - 自定義物件序列化
-
例項2 - 自定義分割槽
-
例項3 - 計算出每組訂單中金額最大的記錄
-
例項4 - 合併多個小檔案
-
例項5 - 分組輸出到多個檔案
四、MapReduce 核心流程梳理
-
例項6 - join 操作
-
例項7 - 計算出使用者間的共同好友
五、下載方式
一、MapReduce基本原理
MapReduce是一種程式設計模型,用於大規模資料集的分散式運算。
1、MapReduce通俗解釋
圖書館要清點圖書數量,有10個書架,管理員為了加快統計速度,找來了10個同學,每個同學負責統計一個書架的圖書數量。
張同學統計 書架1
王同學統計 書架2
劉同學統計 書架3
……
過了一會兒,10個同學陸續到管理員這彙報自己的統計數字,管理員把各個數字加起來,就得到了圖書總數。
這個過程就可以理解為MapReduce的工作過程。
2
(1)map
管理員分配哪個同學統計哪個書架,每個同學都進行相同的“統計”操作,這個過程就是map。
(2)reduce
每個同學的結果進行彙總,這個過程是reduce。
3、MapReduce工作過程拆解
下面通過一個景點案例(單詞統計)看MapReduce是如何工作的。
有一個文字檔案,被分成了4份,分別放到了4臺伺服器中儲存
Text1:the weather is good
Text2:today is good
Text3:good weather is good
Text4:today has good weather
現在要統計出每個單詞的出現次數。
處理過程
(1)拆分單詞
-
map節點1
輸入:“the weather is good”
輸出:(the,1),(weather,1),(is,1),(good,1)
-
map節點2
輸入:“today is good”
輸出:(today,1),(is,1),(good,1)
-
map節點3
輸入:“good weather is good”
輸出:(good,1),(weather,1),(is,1),(good,1)
-
map節點4
輸入:“today has good weather”
輸出:(today,1),(has,1),(good,1),(weather,1)
(2)排序
-
map節點1
-
map節點2
-
map節點3
-
map節點4
(3)合併
-
map節點1
-
map節點2
-
map節點3
-
map節點4
(4)彙總統計
每個map節點都完成以後,就要進入reduce階段了。
例如使用了3個reduce節點,需要對上面4個map節點的結果進行重新組合,比如按照26個字母分成3段,分配給3個reduce節點。
Reduce節點進行統計,計算出最終結果。
這就是最基本的MapReduce處理流程。
4、MapReduce程式設計思路
瞭解了MapReduce的工作過程,我們思考一下用程式碼實現時需要做哪些工作?
-
在4個伺服器中啟動4個map任務
-
每個map任務讀取目標檔案,每讀一行就拆分一下單詞,並記下來次單詞出現了一次
-
目標檔案的每一行都處理完成後,需要把單詞進行排序
-
在3個伺服器上啟動reduce任務
-
每個reduce獲取一部分map的處理結果
-
reduce任務進行彙總統計,輸出最終的結果資料
但不用擔心,MapReduce是一個非常優秀的程式設計模型,已經把絕大多數的工作做完了,我們只需要關心2個部分:
-
map處理邏輯——對傳進來的一行資料如何處理?輸出什麼資訊?
-
reduce處理邏輯——對傳進來的map處理結果如何處理?輸出什麼資訊?
編寫好這兩個核心業務邏輯之後,只需要幾行簡單的程式碼把map和reduce裝配成一個job,然後提交給Hadoop叢集就可以了。
至於其它的複雜細節,例如如何啟動map任務和reduce任務、如何讀取檔案、如對map結果排序、如何把map結果資料分配給reduce、reduce如何把最終結果儲存到檔案等等,MapReduce框架都幫我們做好了,而且還支援很多自定義擴充套件配置,例如如何讀檔案、如何組織map或者reduce的輸出結果等等,後面的示例中會有介紹。
二、MapReduce入門示例:WordCount單詞統計
WordCount是非常好的入門示例,相當於helloword,下面就開發一個WordCount的MapReduce程式,體驗實際開發方式。
1、安裝Hadoop實踐環境
您可以選擇自己搭建環境,也可以使用打包好的Hadoop環境(版本2.7.3)。
這個Hadoop環境實際上是一個虛機映象,所以需要安裝virtualbox虛擬機器、vagrant映象管理工具,和我的Hadoop映象,然後用這個映象啟動虛機就可以了,下面是具體操作步驟:
(1)安裝virtualbox
下載地址:https://www.virtualbox.org/wiki/Downloads
(2)安裝vagrant
因為官網下載較慢,我上傳到了雲盤
Windows版
連結: https://pan.baidu.com/s/1pKKQGHl
密碼: eykr
Mac版
連結: https://pan.baidu.com/s/1slts9yt
密碼: aig4
安裝完成後,在命令列終端下就可以使用vagrant命令。
(3)下載Hadoop映象
連結: https://pan.baidu.com/s/1bpaisnd
密碼: pn6c
(4)啟動
載入Hadoop映象
vagrant box add{自定義映象名稱} {映象所在路徑}
例如您想命名為Hadoop,映象下載後的路徑為d:hadoop.box,載入命令就是這樣:
vagrant box addhadoop d:hadoop.box
建立工作目錄,例如d:hdfstest。
進入此目錄,初始化
cd d:hdfstest
vagrant init hadoop
啟動虛機
vagrant up
啟動完成後,就可以使用SSH客戶端登入虛機了
IP 127.0.0.1
埠 2222
使用者名稱 root
密碼 vagrant
在Hadoop伺服器中啟動HDFS和Yarn,之後就可以執行MapReduce程式了
start-dfs.sh
start-yarn.sh
2、建立專案
注:流程是在本機開發,然後打包,上傳到Hadoop伺服器上執行。
新建專案目錄wordcount,其中新建檔案pom.xml,內容:
然後建立原始碼目錄src/main/java
現在的目錄結構
3、程式碼
mapper程式:src/main/java/WordcountMapper.java
內容:
這裡定義了一個mapper類,其中有一個map方法。MapReduce框架每讀到一行資料,就會呼叫一次這個map方法。
map的處理流程就是接收一個key value對兒,然後進行業務邏輯處理,最後輸出一個key value對兒。
Mapper<LongWritable, Text, Text, IntWritable>
其中的4個型別分別是:輸入key型別、輸入value型別、輸出key型別、輸出value型別。
MapReduce框架讀到一行資料侯以key value形式傳進來,key預設情況下是mr礦機所讀到一行文字的起始偏移量(Long型別),value預設情況下是mr框架所讀到的一行的資料內容(String型別)。
輸出也是key value形式的,是使用者自定義邏輯處理完成後定義的key,使用者自己決定用什麼作為key,value是使用者自定義邏輯處理完成後的value,內容和型別也是使用者自己決定。
此例中,輸出key就是word(字串型別),輸出value就是單詞數量(整型)。
這裡的資料型別和我們常用的不一樣,因為MapReduce程式的輸出資料需要在不同機器間傳輸,所以必須是可序列化的,例如Long型別,Hadoop中定義了自己的可序列化型別LongWritable,String對應的是Text,int對應的是IntWritable。
reduce程式:src/main/java/WordCountReducer.java
這裡定義了一個Reducer類和一個reduce方法。
當傳給reduce方法時,就變為:
Reducer<Text, IntWritable, Text, IntWritable>
4個型別分別指:輸入key的型別、輸入value的型別、輸出key的型別、輸出value的型別。
需要注意,reduce方法接收的是:一個字串型別的key、一個可迭代的資料集。因為reduce任務讀取到map任務處理結果是這樣的:
(good,1)(good,1)(good,1)(good,1)
當傳給reduce方法時,就變為:
key:good
value:(1,1,1,1)
所以,reduce方法接收到的是同一個key的一組value。
主程式:src/main/java/WordCountMapReduce.java
這個main方法就是用來組裝一個job並提交執行
4、編譯打包
在pom.xml所在目錄下執行打包命令:
mvn package
執行完成後,會自動生成target目錄,其中有打包好的jar檔案。
現在專案檔案結構:
5、執行
先把target中的jar上傳到Hadoop伺服器,然後在Hadoop伺服器的HDFS中準備測試檔案(把Hadoop所在目錄下的txt檔案都上傳到HDFS)
cd $HADOOP_HOME
hdfs dfs -mkdir -p /wordcount/input
hdfs dfs -put *.txt /wordcount/input
執行wordcount jar
hadoop jar mapreduce-wordcount-0.0.1-SNAPSHOT.jar WordCountMapR
educe /wordcount/input /wordcount/output
執行完成後驗證
hdfs dfs -cat /wordcount/output/*
可以看到單詞數量統計結果。
三、MapReduce執行過程分析
下面看一下從job提交到執行完成這個過程是怎樣。
(1)客戶端提交任務
Client提交任務時會先到HDFS中檢視目標檔案的大小,瞭解要獲取的資料的規模,然後形成任務分配的規劃,例如:
a.txt 0-128M交給一個task,128-256M 交給一個task,b.txt 0-128M交給一個task,128-256M交給一個task ...,形成規劃檔案job.split。
然後把規劃檔案job.split、jar、配置檔案xml提交給yarn(Hadoop叢集資源管理器,負責為任務分配合適的伺服器資源)
(2)啟動appmaster
注:appmaster是本次job的主管,負責maptask和reducetask的啟動、監控、協調管理工作。
yarn找一個合適的伺服器來啟動appmaster,並把job.split、jar、xml交給它。
(3)啟動maptask
Appmaster啟動後,根據固化檔案job.split中的分片資訊啟動maptask,一個分片對應一個maptask。
分配maptask時,會盡量讓maptask在目標資料所在的datanode上執行。