1. 程式人生 > >大資料-Hadoop生態(17)-MapReduce框架原理-MapReduce流程,Shuffle機制,Partition分割槽

大資料-Hadoop生態(17)-MapReduce框架原理-MapReduce流程,Shuffle機制,Partition分割槽

 MapReduce工作流程

1.準備待處理檔案

2.job提交前生成一個處理規劃

3.將切片資訊job.split,配置資訊job.xml和我們自己寫的jar包交給yarn

4.yarn根據切片規劃計算出MapTask的數量

(以一個MapTask為例)

5.Maptask呼叫inputFormat生成RecordReader,將自己處理的切片檔案內容打散成K,V值

6.MapTask將打散好的K,V值交給Mapper,Mapper經過一系列的處理將KV值寫出

7.寫出的KV值被outputCollector收集起來,寫入一個在記憶體的環形緩衝區

8,9.當環形緩衝區被寫入的空間等於80%時,會觸發溢寫.此時資料是在記憶體中,所以在溢寫之前,會對資料進行排序,是一個二次排序的快排(先根據分割槽排序再根據key排序).然後將資料有序的寫入到磁碟上.

緩衝區為什麼是環形的?這樣做是為了可以在緩衝區的任何地方進行資料的寫入.

當第一次溢寫時,資料會從餘下的20%空間中的中間位置,再分左右繼續寫入,也就是從第一次是從上往下寫資料變成了從下往上寫資料

 

10,11.當多次溢寫產生多個有序的檔案後,會觸發歸併排序,將多個有序的檔案合併成一個有序的大檔案.當檔案數>=10個時,會觸發歸併排序,取檔案的一小部分放入記憶體的緩衝區,再生成一個小檔案部分大小x檔案數的緩衝區,逐個比較放入大檔案緩衝區,依次比較下去,再將大檔案緩衝寫入到磁碟,歸併結束後將大檔案放在檔案列表的末尾,繼續重複此動作,直到合併成一個大檔案.此次歸併排序的時間複雜度要求較低.

12.當所有的MapTask執行完任務後,啟動相應數量的ReduceTask,並告知每一個ReduceTask應該處理的資料分割槽

13.ReduceTask將指定分割槽的檔案下載到本地,如有多個分割槽檔案的話,ReduceTask上將會有多個大檔案,再一次歸併排序,形成一個大檔案.

14.15,如果有分組要求的話,ReduceTask會將資料一組一組的交給Reduce,處理完後準備將資料寫出

16.Reduce呼叫output生成RecordWrite將資料寫入到指定路徑中

 

 shuffle機制

上圖中,資料從Mapper寫出來之後到資料進入到Reduce之前,這一階段就叫做Shuffle

 

Shuffle時,會有三次排序,第一次是資料從環形緩衝區寫入到磁碟時,會有一次快排,第二次是在MapTask中,將多個分割槽且內部有序的小檔案歸併成一個分割槽且內部有序的大檔案,第三次是在ReduceTask中,從多個MapTask中獲取指定分割槽的大檔案,再進行一個歸併排序,合併成一個大檔案.

以WordCount為例,試想一下,在第一次從環形緩衝區寫入到磁碟時,排好序的資料為(w1,1),(w1,1),(w1,1),(w2,1),(w2,1),(w3,1),這樣的資料會增加網路傳輸量,所以在這裡可以使用Combiner進行資料合併.最後形成的資料是(w1,3),(w2,2),(w3,1),後續會詳細講解~

partition分割槽

將Mapper想象成一個水池,資料是池裡的水.預設分一個去,只有一根水管.如果只有一個ReduceTask,則水會全部順著唯一的水管流入到ReduceTask中.如果此時有3根水管,則水會被分成三股水流流入到3個ReduceTask中,而且哪些水進哪個水管,並不受我們主管控制,也就是資料處理速度加快了~~Partition分割槽就決定了分幾根水管.試想一下,如果有4根水管,末端只有3個ReduceTask,那麼有一股水流會丟失.也就是造成資料丟失,程式會報錯.如果只有2根水管,那麼則有一個ReduceTask無事可做,最後生成的是一個空檔案,浪費資源

所以,一般來說,有幾個ReduceTask就要分幾個區,至於partition和ReduceTask設定為幾,要看叢集效能,資料集,業務,經驗等等~

對應流程圖上,也就是從環形緩衝區寫入到磁碟時,會分割槽

 

 

collector出現了,除了將key,value收集到緩衝區中之外,還收集了partition分割槽

 

 

key.hashCode() & Integer.MAX_VALUE,保證取餘前的數為正數

比如,numReduceTasks = 3, 一個數n對3取餘,結果會有0,1,2三種可能,也就是分三個區,再一次印證了要 reduceTask number = partition number 

預設分割槽是根據key的hashcode和reduceTasks的個數取模得到的,使用者無法控制哪個key儲存到哪個分割槽上

案例演練

以12小章的統計流量案例為例,大資料-Hadoop生態(12)-Hadoop序列化和原始碼追蹤

將手機號136、137、138、139開頭都分別放到一個獨立的4個檔案中,其他開頭的放到一個檔案中

 自定義Partition類

package com.atguigu.partitioner;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartitioner extends Partitioner<Text, FlowBean> {
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
        //1. 擷取手機前三位
        String start = text.toString().substring(0, 3);

        //2. 按照手機號前三位返回分割槽號
        switch (start) {
            case "136":
                return 0;
            case "137":
                return 1;
            case "138":
                return 2;
            case "139":
                return 3;
            default:
                return 4;
        }


    }
}

Driver類的main()中增加以下程式碼

job.setPartitionerClass(MyPartitioner.class);

job.setNumReduceTasks(5);

輸出結果,5個檔案 

如果job.setNumReduceTasks(10),會生成10個檔案,其中5個是空檔案

 如果job.setNumReduceTasks(2),程式會直接執行失敗報異常

如果job.setNumReduceTasks(1),程式會執行成功,因為如果numReduceTasks=1時,根本就不會執行分割槽的過程

 

 如果是以下情況,也會執行失敗.MapReduce會認為你分了41個區,所以分割槽號必須從0開始,逐一累加.

job.setNumReduceTasks(5)

switch (start) { case "136": return 0; case "137": return 1; case "138": return 2; case "139": return 3; default: return 40; }