1. 程式人生 > >Hadoop(十四)MapReduce原理分析

Hadoop(十四)MapReduce原理分析

資源 並行處理 ons 描述 並發數 span col 數據分析 sub

前言

  上一篇我們分析了一個MapReduce在執行中的一些細節問題,這一篇分享的是MapReduce並行處理的基本過程和原理。

  Mapreduce是一個分布式運算程序的編程框架,是用戶開發“基於hadoop的數據分析應用”的核心框架。
  Mapreduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,並發運行在一個hadoop集群上

一、MapReduce並行處理的基本過程

  首先要說明的是Hadoop2.0之前和Hadoop2.0之後的區別: 

   2.0之前只有MapReduce的運行框架,那麽它裏面有只有兩種節點,一個是master,一個是worker。master既做資源調度又做程序調度,worker只是用來參與計算的


   但是在2.0之後加入了YARN集群,Yarn集群的主節點承擔了資源調度,Yarn集群的從節點中會選出一個節點(這個由redourcemanager決定)

   用作類似於2.0之前的master的工作,來進行應用程序的調度

   資源調度: 處理程序所需要的cpu、內存資源,以及存儲數據所需要的硬盤資源都是resourcemanager去分配的

  技術分享

  一切都是從最上方的user program開始的,user program鏈接了MapReduce庫,實現了最基本的Map函數和Reduce函數
  圖中執行的順序都用數字標記了。
  1)MapReduce庫先把user program的輸入文件劃分為M份(M為用戶定義),如圖左方所示分成了split0~4;然後使用fork將用戶進程拷貝到集群內其它機器上。

  2)user program的副本中有一個稱為master,其余稱為worker,master是負責調度的,為空閑worker分配作業(Map作業或者Reduce作業),worker的數量也是

    可以由用戶指定的。

  3)被分配了Map作業的worker,開始讀取對應分片的輸入數據,Map作業數量是由M決定的,和split一一對應;Map作業從輸入數據中抽取出鍵值對,每一個鍵值對

    都作為參數傳遞給map函數,map函數產生的中間鍵值對被緩存在內存中。

  4)緩存的中間鍵值對會被定期寫入本地磁盤,而且被分為R個區,R的大小是由用戶定義的,將來每個區會對應一個Reduce作業;這些中間鍵值對的位置會被通報

    給master,master負責將信息轉發給Reduce worker。

  5)master通知分配了Reduce作業的worker它負責的分區在什麽位置(肯定不止一個地方,每個Map作業產生的中間鍵值對都可能映射到所有R個不同分區),當

    Reduce worker把所有它負責的中間鍵值對都讀過來後,先對它們進行排序,使得相同鍵的鍵值對聚集在一起。因為不同的鍵可能會映射到同一個分區也就是

    同一個Reduce作業(誰讓分區少呢),所以排序是必須的。

  6)reduce worker遍歷排序後的中間鍵值對,對於每個唯一的鍵,都將鍵與關聯的值傳遞給reduce函數,reduce函數產生的輸出會添加到這個分區的輸出文件中。

  7)當所有的Map和Reduce作業都完成了,master喚醒正版的user program,MapReduce函數調用返回user program的代碼。

  8)所有執行完畢後,MapReduce輸出放在了R個分區的輸出文件中(分別對應一個Reduce作業)。用戶通常並不需要合並這R個文件,而是將其作為輸入交給另一

    個MapReduce程序處理。整個過程中,輸入數據是來自底層分布式文件系統(GFS)的,中間數據是放在本地文件系統的,最終輸出數據是寫入底層分布式文件

    系統(GFFS)的。而且我們要註意Map/Reduce作業和map/reduce函數的區別:Map作業處理一個輸入數據的分片,可能需要調用多次map函數來處理每個輸入

    鍵值對;Reduce作業處理一個分區的中間鍵值對,期間要對每個不同的鍵調用一次reduce函數,Reduce作業最終也對應一個輸出文件。

    技術分享

二、MapRrduce輸入與輸出問題

  Map/Reduce框架運轉在<key, value>鍵值對上,也就是說,框架把作業的輸入看為是一組<key, value>鍵值對,同樣也產出一組 <key, value>鍵值對做為作業的輸出,這兩組鍵

    值對的類型可能不同。
  框架需要對key和value的類(classes)進行序列化操作,因此,這些類需要實現Writable接口。另外,為了方便框架執行排序操作,key類必須實現 WritableComparable接口

  註意:不管是哪裏的序列化,最主要的作用就是持久化存儲或者是用於網絡傳輸

  一個Map/Reduce作業的輸入和輸出類型如下所示:
  (input) <k1, v1> -> map -> <k2, v2>-> combine -> <k2, v2> -> reduce -> <k3, v3> (output)。

  其實在前面講解Hadoop IO的時候已經知道了解了Writale接口:  

    Writable接口是一個實現了序列化協議的序列化對象。
    在Hadoop中定義一個結構化對象都要實現Writable接口,使得該結構化對象可以序列化為字節流,字節流也可以反序列化為結構化對象。

    技術分享

三、MapReduce實際處理流程

  mapreduce 其實是分治算法的一種現,所謂分治算法就是“就是分而治之 ,將大的問題分解為相同類型的子問題(最好具有相同的規模),對子問題進行求解,然後合並成大問題的解

  mapreduce就是分治法的一種,將輸入進行分片,然後交給不同的task進行處理,然後合並成最終的解
  mapreduce實際的處理過程可以理解為Input->Map->Sort->Combine->Partition->Reduce->Output

  1)Input階段

    數據以一定的格式傳遞給Mapper,有TextInputFormat,DBInputFormat,SequenceFileFormat等可以使用,在Job.setInputFormat可以設置,也可以自定義分片函數。

  2)map階段

    對輸入的(key,value)進行處理,即map(k1,v1)->list(k2,v2),使用Job.setMapperClass進行設置。

  3)Sort階段

    對於Mapper的輸出進行排序,使用Job.setOutputKeyComparatorClass進行設置,然後定義排序規則

  4)Combine階段

    這個階段對於Sort之後又相同key的結果進行合並,使用Job.setCombinerClass進行設置,也可以自定義Combine Class類。

  5)Partition階段

    將Mapper的中間結果按照key的範圍劃分為R份(Reduce作業的個數),默認使用HashPartioner(key.hashCode()&Integer.MAX_VALUE%numPartitions),也可以自定義劃分的函數。

    使用Job.setPartitionClass設置。

  6)Reduce階段

    對於Mapper階段的結果進行進一步處理,Job.setReducerClass進行設置自定義的Reduce類。

  7)Output階段

    Reducer輸出數據的格式。

四、一個job的運行流程

  一個mapreduce作業的執行流程是:作業提交->作業初始化->任務分配->任務執行->更新任務執行進度和狀態->作業完成

  技術分享

  一個完整的mapreduce作業流程,包括4個獨立的實體:
    客戶端:client,編寫mapreduce程序,配置作業,提交作業。
    JobTracker:協調這個作業的運行,分配作業,初始化作業,與TaskTracker進行通信
    TaskTracker:負責運行作業,保持與JobTracker進行通信
    HDFS:分布式文件系統,保持作業的數據和結果。

4.1、提交作業

  JobClient使用runjob方法創建一個JobClient實例,然後調用submitJob()方法進行作業的提交,提交作業的具體過程如下:

    1)通過調用JobTracker對象的getNewJobId()方法從JobTracker處獲得一個作業ID。
    2)檢查作業的相關路徑。如果輸出路徑存在,作業將不會被提交(保護上一個作業運行結果)。
    3)計算作業的輸入分片,如果無法計算,例如輸入路徑不存在,作業將不被提交,錯誤返回給mapreduce程序。
    4)將運行作業所需資源(作業jar文件,配置文件和計算得到的分片)復制到HDFS上。
    5)告知JobTracker作業準備執行(使用JobTracker對象的submitJob()方法來真正提交作業)。

4.2、作業初始化

  當JobTracker收到Job提交的請求後,將Job保存在一個內部隊列,並讓Job Scheduler(作業調度器)處理並初始化。初始化涉及到創建一個封裝了其tasks的job對象,

  並保持對task的狀態和進度的跟蹤(step 5)。當創建要運行的一系列task對象後,Job Scheduler首先開始從文件系統中獲取由JobClient計算的input splits(step 6),然後

  再為每個split創建map task。

4.3、任務的分配

  TaskTracker和JobTracker之間的通信和任務分配是通過心跳機制完成的。TaskTracker作為一個單獨的JVM,它執行一個簡單的循環,主要實現每隔一段時間向JobTracker

  發送心跳,告訴JobTracker此TaskTracker是否存活,是否準備執行新的任務。如果有待分配的任務,它就會為TaskTracker分配一個任務。

4.4、任務的執行

  TaskTracker申請到新的任務之後,就要在本地運行了。首先,是將任務本地化(包括運行任務所需的數據、配置信息、代碼等),即從HDFS復制到本地。調用localizeJob()完成的。

  對於使用Streaming和Pipes創建Map或者Reduce程序的任務,Java會把key/value傳遞給外部進程,然後通過用戶自定義的Map或者Reduce進行處理,然後把key/value傳回到java中。

  其中就好像是TaskTracker的子進程在處理Map和Reduce代碼一樣。

4.5、更新任務的執行進度和狀態

  進度和狀態是通過heartbeat(心跳機制)來更新和維護的。對於Map Task,進度就是已處理數據和所有輸入數據的比例。對於Reduce Task,情況就郵電復雜,包括3部分,

  拷貝中間結果文件、排序、reduce調用,每部分占1/3。

4.6、任務完成

  當Job完成後,JobTracker會收一個Job Complete的通知,並將當前的Job狀態更新為successful,同時JobClient也會輪循獲知提交的Job已經完成,將信息顯示給用戶。

  最後,JobTracker會清理和回收該Job的相關資源,並通知TaskTracker進行相同的操作(比如刪除中間結果文件)

五、MapReduce框架結構及核心運行機制

5.1、結構

  一個完整的mapreduce程序在分布式運行時有三類實例進程

    MRAppMaster:負責整個程序的過程調度及狀態協調(Hadoop2.0之後就不一樣了)
    mapTask:負責map階段的整個數據處理流程
    ReduceTask:負責reduce階段的整個數據處理流程

5.2、MapReduce運行流程解析

  技術分享

  流程分析:

  1) 一個mr程序啟動的時候,最先啟動的是MRAppMaster,MRAppMaster啟動後根據本次job的描述信息,計算出需要的maptask實例數量,然後向集群申請機器啟動

    相應數量的maptask進程。

  2)maptask進程啟動之後,根據給定的數據切片範圍進行數據處理,主體流程為:
    利用客戶指定的inputformat來獲取RecordReader讀取數據,形成輸入KV對
    將輸入KV對傳遞給客戶定義的map()方法,做邏輯運算,並將map()方法輸出的KV對收集到緩存
    將緩存中的KV對按照K分區排序後不斷溢寫到磁盤文件

  3) MRAppMaster監控到所有maptask進程任務完成之後,會根據客戶指定的參數啟動相應數量的reducetask進程,並告知reducetask進程要處理的數據範圍(數據分區)

  4)Reducetask進程啟動之後,根據MRAppMaster告知的待處理數據所在位置,從若幹臺maptask運行所在機器上獲取到若幹個maptask輸出結果文件,並在本地進行重新歸並排序,

    然後按照相同key的KV為一個組,調用客戶定義的reduce()方法進行邏輯運算,並收集運算輸出的結果KV,然後調用客戶指定的outputformat將結果數據輸出到外部存儲。

5.3、MapTask並行度決定機制

  maptask的並行度決定map階段的任務處理並發度,進而影響到整個job的處理速度
  那麽,mapTask並行實例是否越多越好呢?其並行度又是如何決定呢?

5.3.1、mapTask並行度的決定機制

  一個job的map階段並行度由客戶端在提交job時決定而客戶端對map階段並行度的規劃的基本邏輯為:
    將待處理數據執行邏輯切片(即按照一個特定切片大小,將待處理數據劃分成邏輯上的多個split),然後每一個split分配一個mapTask並行實例處理
    這段邏輯及形成的切片規劃描述文件,由FileInputFormat實現類的getSplits()方法完成,其過程如下圖:

  技術分享

5.3.2、FileInputFormat切片機制

  1)FileInputFormat切片機制切片定義在InputFormat類中的getSplit()方法
  2)FileInputFormat中默認的切片機制:
    簡單地按照文件的內容長度進行切片
    切片大小,默認等於block大小
    切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片
  比如待處理數據有兩個文件:

    file1.txt 320M
    file2.txt 10M

  經過FileInputFormat的切片機制運算後,形成的切片信息如下:

    file1.txt.split1-- 0~128
    file1.txt.split2-- 128~256
    file1.txt.split3-- 256~320
    file2.txt.split1-- 0~10M
  3)FileInputFormat中切片的大小的參數配置
    通過分析源碼,在FileInputFormat中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由這幾個值來運算決定

    minsize:默認值:1
    配置參數: mapreduce.input.fileinputformat.split.minsize
    maxsize:默認值:Long.MAXValue
    配置參數:mapreduce.input.fileinputformat.split.maxsize
    blocksize

  因此,默認情況下,切片大小=blocksize
  maxsize(切片最大值):
  參數如果調得比blocksize小,則會讓切片變小,而且就等於配置的這個參數的值
  minsize (切片最小值):
  參數調的比blockSize大,則可以讓切片變得比blocksize還大

 

  選擇並發數的影響因素:

    運算節點的硬件配置
    運算任務的類型:CPU密集型還是IO密集型
    運算任務的數據量

5.3.3、ReduceTask並行度的決定

  reducetask的並行度同樣影響整個job的執行並發度和執行效率,但與maptask的並發數由切片數決定不同,Reducetask數量的決定是可以直接手動設置:

  //默認值是1,手動設置為4
  job.setNumReduceTasks(4);

  如果數據分布不均勻,就有可能在reduce階段產生數據傾斜
  註意: reducetask數量並不是任意設置,還要考慮業務邏輯需求,有些情況下,需要計算全局匯總結果,就只能有1個reducetask
  盡量不要運行太多的reduce task。對大多數job來說,最好rduce的個數最多和集群中的reduce持平,或者比集群的 reduce slots小。這個對於小集群而言,尤其重要。

5.4、mapreduce的shuffle機制

  1)概述

    mapreduce中,map階段處理的數據如何傳遞給reduce階段,是mapreduce框架中最關鍵的一個流程,這個流程就叫shuffle
    shuffle: 洗牌、發牌——(核心機制:數據分區,排序,緩存)。
    具體來說:就是將maptask輸出的處理結果數據,分發給reducetask,並在分發的過程中,對數據按key進行了分區和排序。

技術分享

    分區partition(確定哪個數據進入哪個reduce)
    Sort根據key排序
    Combiner進行局部value的合並

  2)詳細流程  

    1、 maptask收集我們的map()方法輸出的kv對,放到內存緩沖區中
    2、 從內存緩沖區不斷溢出本地磁盤文件,可能會溢出多個文件
    3、 多個溢出文件會被合並成大的溢出文件
    4、 在溢出過程中,及合並的過程中,都要調用partitoner進行分組和針對key進行排序
    5、 reducetask根據自己的分區號,去各個maptask機器上取相應的結果分區數據
    6、 reducetask會取到同一個分區的來自不同maptask的結果文件,reducetask會將這些文件再進行合並(歸並排序)
    7、 合並成大文件後,shuffle的過程也就結束了,後面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)

  Shuffle中的緩沖區大小會影響到mapreduce程序的執行效率,原則上說,緩沖區越大,磁盤io的次數越少,執行速度就越快
  緩沖區的大小可以通過參數調整, 參數:io.sort.mb 默認100M

六、MapReduce與YARN

6.1、YARN概述

  Yarn是一個資源調度平臺負責為運算程序提供服務器運算資源,相當於一個分布式的操作系統平臺,而mapreduce等運算程序則相當於運行於操作系統之上的應用程序

6.2、YARN中的重要概念

  1) yarn並不清楚用戶提交的程序的運行機制
  2) yarn只提供運算資源的調度(用戶程序向yarn申請資源,yarn就負責分配資源)
  3) yarn中的主管角色叫ResourceManager
  4) yarn中具體提供運算資源的角色叫NodeManager
  5) 這樣一來,yarn其實就與運行的用戶程序完全解耦,就意味著yarn上可以運行各種類型的分布式運算程序(mapreduce只是其中的一種),比如mapreduce、storm程序,spark程序,tez ……
  6) 所以,spark、storm等運算框架都可以整合在yarn上運行,只要他們各自的框架中有符合yarn規範的資源請求機制即可
  7) Yarn就成為一個通用的資源調度平臺,從此,企業中以前存在的各種運算集群都可以整合在一個物理集群上,提高資源利用率,方便數據共享

6.3、YARN中運行運算程序實例

  mapreduce程序的調度過程,如下圖:

技術分享

喜歡就點個“推薦”!

Hadoop(十四)MapReduce原理分析