1. 程式人生 > >MapReduce原始碼解析之Mapper

MapReduce原始碼解析之Mapper

MapReduce原始碼解析之Mapper

北京易觀智庫網路科技有限公司
作者:賀斌
在這裡插入圖片描述

摘要:詳解MapReduce中Map(對映)的實現者Mapper。

導語:

說起MapReduce,只要是大資料領域的小夥伴,相信都不陌生。它作為Hadoop生態系統中的一部分,最早是由Google公司研究提出的一種面向大規模資料處理的平行計算模型。MapReduce主要由"Map(對映)"和"Reduce(歸約)"組成,主要思想是用Map函式用來把一組鍵值對對映成一組新的鍵值對,然後指定併發的Reduce函式進行合併輸出。然而就是這個簡單的分散式思想,卻敲開了大資料時代的大門,很多大資料工作者都把學習MapReduce作為基礎的一課。 不過對於大多數小夥伴來說,雖然學習或寫了很久的MapReduce,卻不一定真正研究過它的原始碼。這一期,我就先給大家介紹一下MapReduce中Map(對映)的實現者Mapper。

類圖

如圖為Mapper以及它的一些子類的類圖(Mapper一共有九個子類。我們挑了其中的4個子類來做分析)
在這裡插入圖片描述
Mapper原始碼
在這裡插入圖片描述
從原始碼中我們可以看出,Mapper類裡總共包含四個方法,一個抽象類。
1.setup()方法—一般作為map()方法的準備工作,如進行相關配置檔案的讀取、引數的傳遞;
2.cleanup()方法—是用來做一些收尾工作,如關閉檔案、Key-Value的分發;
3.map()方法—是真正的程式邏輯部分,如對一行文字的split、filter處理之後,將資料以key-Value的形式寫入context;
4.run()方法—是驅動整個Mapper執行的一個方法,按照run()>>setup()>>map()>>cleanup()順序執行;
5.Context抽象類

—是Mapper裡的一個內部抽象類,主要是為了在Map任務或者Reduce任務中跟蹤task的相關狀態和資料的存放。如Context可以儲存一些jobConf有關的資訊,在setup()方法中,就可以用context讀取相關的配置資訊,以及作為key-Value資料的載體。(Context比較複雜,以後可以單獨介紹)

Mapper子類

InverseMapper
在這裡插入圖片描述
這個類很簡單,只是將Key-Value進行反轉,然後直接分發,如麵包-生產商,我們既可以統計某一種麵包來自多少個生產商,也可以統計一個生產商生產多少種麵包。不同的需求,利用InverseMapper可以達到不同的效果
TokenCounterMapper


在這裡插入圖片描述
這個類使用StringTokenizer來獲取value中的tokens(在StringTokenizer的建構函式中將value按照“\t\n\t\f”進行分割),然後對每一個token,分發出一個<token,one>對,這將在Reduce端被收集,同一個token對應的Key-Value對都會被收集到同一個Reducer上,計算出所有Mapper分發出來的以某個token為key的<token,one>的數量,然後加起來,就得到了token的計數。在我們學習的wordcount程式中,其實只需要在main方法中將job.setMapperClass(TokenCounterMapper.class)進行設定,便可以統計單詞的個數。
RegexMapper
在這裡插入圖片描述
這個類其實就是將wordcount進行了正則化,匹配自己需要格式的word進行統計。
MultithreadedMapper
在這裡插入圖片描述
在這裡插入圖片描述
在這裡插入圖片描述
這個類使用多執行緒來執行Mapper(它由 mapreduce.mapper.multithreadedmapper.mapclass設定)。該類重寫了run()方法,首先設定執行上下文context和workMapper,然後啟動多個MapRunner(內部類)子執行緒(由mapred.map.multithreadedrunner.threads 設定 ),最後使用join()等待子執行緒執行完畢。MapRunner(內部類)繼承了Thread,擁有獨立的Context去執行Mapper,並進行異常處理。從MapRunner的Constructor中我們看見,它使用了獨立的SubMapRecordReader、SubMapRecordWriter和SubMapStatusReporter。SubMapRecordReader在讀Key-Value和SubMapRecordWriter在寫Key-Value的時候都要同步。這是通過互斥訪問MultithreadedMapper的上下文outer來實現的。MultithreadedMapper可以充分利用CPU,採用多個執行緒處理後,一個執行緒可以同時在另一個執行緒執行的時候讀取資料並執行,這樣就使用了更多的CPU週期來執行任務,從而提高吞吐率(注意讀寫操作都是執行緒安全的)。但對於IO密集型的作業,採用MultithreadedMapper會適得其反,因為會有多個執行緒等待IO,IO便成為限制吞吐率的關鍵,這時候我們可以通過增多task數量的方法來解決,因為這樣在IO上就是並行的。
以上即是對MapReduce中Mapper以及部分子類原始碼的解析,後續將繼續討論MapReduce中的其他關鍵類原始碼。

北京易觀智庫網路科技有限公司
在這裡插入圖片描述