1. 程式人生 > >7個例項全面掌握Hadoop MapReduce

7個例項全面掌握Hadoop MapReduce

作者介紹

杜亦舒創業中,技術合夥人,喜歡研究分享技術。個人訂閱號:效能與架構。

本文旨在幫您快速瞭解 MapReduce 的工作機制和開發方法,解決以下幾個問題:

  • MapReduce 基本原理是什麼?

  • MapReduce 的執行過程是怎麼樣的?

  • MapReduce 的核心流程細節

  • 如何進行 MapReduce 程式開發?(通過7個例項逐漸掌握)

文章中提供了程式例項中涉及到的測試資料檔案,可以直接下載使用。

關於實踐環境,如果您不喜歡自己搭建Hadoop環境,可以下載使用本教程提供的環境,實踐部分內容中會介紹具體使用方法。

通過學習並實踐完成後,可以對 MapReduce 工作原理有比較清晰的認識,並掌握 MapReduce 的程式設計思路。

大綱:

一、MapReduce 基本原理

二、MapReduce 入門示例 - WordCount 單詞統計

三、MapReduce 執行過程分析

  • 例項1 - 自定義物件序列化

  • 例項2 - 自定義分割槽

  • 例項3 - 計算出每組訂單中金額最大的記錄

  • 例項4 - 合併多個小檔案

  • 例項5 - 分組輸出到多個檔案

四、MapReduce 核心流程梳理

  • 例項6 - join 操作

  • 例項7 - 計算出使用者間的共同好友

五、下載方式

一、MapReduce基本原理

MapReduce是一種程式設計模型,用於大規模資料集的分散式運算。

1、MapReduce通俗解釋

圖書館要清點圖書數量,有10個書架,管理員為了加快統計速度,找來了10個同學,每個同學負責統計一個書架的圖書數量。

張同學統計 書架1

王同學統計 書架2

劉同學統計 書架3

……

過了一會兒,10個同學陸續到管理員這彙報自己的統計數字,管理員把各個數字加起來,就得到了圖書總數。

這個過程就可以理解為MapReduce的工作過程。

2

、MapReduce中有兩個核心操作

(1)map

管理員分配哪個同學統計哪個書架,每個同學都進行相同的“統計”操作,這個過程就是map。

(2)reduce

每個同學的結果進行彙總,這個過程是reduce。

3、MapReduce工作過程拆解

下面通過一個景點案例(單詞統計)看MapReduce是如何工作的。

有一個文字檔案,被分成了4份,分別放到了4臺伺服器中儲存

Text1:the weather is good

Text2:today is good

Text3:good weather is good

Text4:today has good weather

現在要統計出每個單詞的出現次數。

處理過程

(1)拆分單詞

  • map節點1

輸入:“the weather is good”

輸出:(the,1),(weather,1),(is,1),(good,1)

  • map節點2

輸入:“today is good”

輸出:(today,1),(is,1),(good,1)

  • map節點3

輸入:“good weather is good”

輸出:(good,1),(weather,1),(is,1),(good,1)

  • map節點4

輸入:“today has good weather”

輸出:(today,1),(has,1),(good,1),(weather,1)

(2)排序

  • map節點1

  • map節點2

  • map節點3

  • map節點4

(3)合併

  • map節點1

  • map節點2

  • map節點3

  • map節點4

(4)彙總統計

每個map節點都完成以後,就要進入reduce階段了。

例如使用了3個reduce節點,需要對上面4個map節點的結果進行重新組合,比如按照26個字母分成3段,分配給3個reduce節點。

Reduce節點進行統計,計算出最終結果。

這就是最基本的MapReduce處理流程。

4、MapReduce程式設計思路

瞭解了MapReduce的工作過程,我們思考一下用程式碼實現時需要做哪些工作?

  1. 在4個伺服器中啟動4個map任務

  2. 每個map任務讀取目標檔案,每讀一行就拆分一下單詞,並記下來次單詞出現了一次

  3. 目標檔案的每一行都處理完成後,需要把單詞進行排序

  4. 在3個伺服器上啟動reduce任務

  5. 每個reduce獲取一部分map的處理結果

  6. reduce任務進行彙總統計,輸出最終的結果資料

但不用擔心,MapReduce是一個非常優秀的程式設計模型,已經把絕大多數的工作做完了,我們只需要關心2個部分:

  1. map處理邏輯——對傳進來的一行資料如何處理?輸出什麼資訊?

  2. reduce處理邏輯——對傳進來的map處理結果如何處理?輸出什麼資訊?

編寫好這兩個核心業務邏輯之後,只需要幾行簡單的程式碼把map和reduce裝配成一個job,然後提交給Hadoop叢集就可以了。

至於其它的複雜細節,例如如何啟動map任務和reduce任務、如何讀取檔案、如對map結果排序、如何把map結果資料分配給reduce、reduce如何把最終結果儲存到檔案等等,MapReduce框架都幫我們做好了,而且還支援很多自定義擴充套件配置,例如如何讀檔案、如何組織map或者reduce的輸出結果等等,後面的示例中會有介紹。

二、MapReduce入門示例:WordCount單詞統計

WordCount是非常好的入門示例,相當於helloword,下面就開發一個WordCount的MapReduce程式,體驗實際開發方式。

1、安裝Hadoop實踐環境

您可以選擇自己搭建環境,也可以使用打包好的Hadoop環境(版本2.7.3)。

這個Hadoop環境實際上是一個虛機映象,所以需要安裝virtualbox虛擬機器、vagrant映象管理工具,和我的Hadoop映象,然後用這個映象啟動虛機就可以了,下面是具體操作步驟:

(1)安裝virtualbox

下載地址:https://www.virtualbox.org/wiki/Downloads

(2)安裝vagrant

因為官網下載較慢,我上傳到了雲盤

Windows版

連結: https://pan.baidu.com/s/1pKKQGHl

密碼: eykr

Mac版

連結: https://pan.baidu.com/s/1slts9yt

密碼: aig4

安裝完成後,在命令列終端下就可以使用vagrant命令。

(3)下載Hadoop映象

連結: https://pan.baidu.com/s/1bpaisnd

密碼: pn6c

(4)啟動

載入Hadoop映象

vagrant box add{自定義映象名稱} {映象所在路徑}

例如您想命名為Hadoop,映象下載後的路徑為d:hadoop.box,載入命令就是這樣:

vagrant box addhadoop d:hadoop.box

建立工作目錄,例如d:hdfstest。

進入此目錄,初始化

cd d:hdfstest

vagrant init hadoop

啟動虛機

vagrant up

啟動完成後,就可以使用SSH客戶端登入虛機了

IP 127.0.0.1

埠 2222

使用者名稱 root

密碼 vagrant

在Hadoop伺服器中啟動HDFS和Yarn,之後就可以執行MapReduce程式了

start-dfs.sh

start-yarn.sh

2、建立專案

注:流程是在本機開發,然後打包,上傳到Hadoop伺服器上執行。

新建專案目錄wordcount,其中新建檔案pom.xml,內容:

然後建立原始碼目錄src/main/java

現在的目錄結構

3、程式碼

mapper程式:src/main/java/WordcountMapper.java

內容:

這裡定義了一個mapper類,其中有一個map方法。MapReduce框架每讀到一行資料,就會呼叫一次這個map方法。

map的處理流程就是接收一個key value對兒,然後進行業務邏輯處理,最後輸出一個key value對兒。

Mapper<LongWritable, Text, Text, IntWritable>

其中的4個型別分別是:輸入key型別、輸入value型別、輸出key型別、輸出value型別。

MapReduce框架讀到一行資料侯以key value形式傳進來,key預設情況下是mr礦機所讀到一行文字的起始偏移量(Long型別),value預設情況下是mr框架所讀到的一行的資料內容(String型別)。

輸出也是key value形式的,是使用者自定義邏輯處理完成後定義的key,使用者自己決定用什麼作為key,value是使用者自定義邏輯處理完成後的value,內容和型別也是使用者自己決定。

此例中,輸出key就是word(字串型別),輸出value就是單詞數量(整型)。

這裡的資料型別和我們常用的不一樣,因為MapReduce程式的輸出資料需要在不同機器間傳輸,所以必須是可序列化的,例如Long型別,Hadoop中定義了自己的可序列化型別LongWritable,String對應的是Text,int對應的是IntWritable。

reduce程式:src/main/java/WordCountReducer.java

這裡定義了一個Reducer類和一個reduce方法。

當傳給reduce方法時,就變為:

Reducer<Text, IntWritable, Text, IntWritable>

4個型別分別指:輸入key的型別、輸入value的型別、輸出key的型別、輸出value的型別。

需要注意,reduce方法接收的是:一個字串型別的key、一個可迭代的資料集。因為reduce任務讀取到map任務處理結果是這樣的:

(good,1)(good,1)(good,1)(good,1)

當傳給reduce方法時,就變為:

key:good

value:(1,1,1,1)

所以,reduce方法接收到的是同一個key的一組value。

主程式:src/main/java/WordCountMapReduce.java

這個main方法就是用來組裝一個job並提交執行

4、編譯打包

在pom.xml所在目錄下執行打包命令:

mvn package

執行完成後,會自動生成target目錄,其中有打包好的jar檔案。

現在專案檔案結構:

5、執行

先把target中的jar上傳到Hadoop伺服器,然後在Hadoop伺服器的HDFS中準備測試檔案(把Hadoop所在目錄下的txt檔案都上傳到HDFS)

cd $HADOOP_HOME

hdfs dfs -mkdir -p /wordcount/input

hdfs dfs -put *.txt /wordcount/input

執行wordcount jar

hadoop jar mapreduce-wordcount-0.0.1-SNAPSHOT.jar WordCountMapR

educe /wordcount/input /wordcount/output

執行完成後驗證

hdfs dfs -cat /wordcount/output/*

可以看到單詞數量統計結果。

三、MapReduce執行過程分析

下面看一下從job提交到執行完成這個過程是怎樣。

(1)客戶端提交任務

Client提交任務時會先到HDFS中檢視目標檔案的大小,瞭解要獲取的資料的規模,然後形成任務分配的規劃,例如:

a.txt 0-128M交給一個task,128-256M 交給一個task,b.txt 0-128M交給一個task,128-256M交給一個task ...,形成規劃檔案job.split。

然後把規劃檔案job.split、jar、配置檔案xml提交給yarn(Hadoop叢集資源管理器,負責為任務分配合適的伺服器資源)

(2)啟動appmaster

注:appmaster是本次job的主管,負責maptask和reducetask的啟動、監控、協調管理工作。

yarn找一個合適的伺服器來啟動appmaster,並把job.split、jar、xml交給它。

(3)啟動maptask

Appmaster啟動後,根據固化檔案job.split中的分片資訊啟動maptask,一個分片對應一個maptask。

分配maptask時,會盡量讓maptask在目標資料所在的datanode上執行。