1. 程式人生 > >谷歌技術"三寶"之MapReduce

谷歌技術"三寶"之MapReduce

江湖傳說永流傳:谷歌技術有"三寶",GFS、MapReduce和大表(BigTable)!

谷歌在03到06年間連續發表了三篇很有影響力的文章,分別是03年SOSP的GFS,04年OSDI的MapReduce,和06年OSDI的BigTable。SOSP和OSDI都是作業系統領域的頂級會議,在計算機學會推薦會議裡屬於A類。SOSP在單數年舉辦,而OSDI在雙數年舉辦。

那麼這篇部落格就來介紹一下MapReduce。

1. MapReduce是幹啥的

因為沒找到谷歌的示意圖,所以我想借用一張Hadoop專案的結構圖來說明下MapReduce所處的位置,如下圖。
Hadoop實際上就是谷歌三寶的開源實現,Hadoop MapReduce對應Google MapReduce,HBase對應BigTable,HDFS對應GFS。HDFS(或GFS)為上層提供高效的非結構化儲存服務,HBase(或BigTable)是提供結構化資料服務的分散式資料庫,Hadoop MapReduce(或Google MapReduce)是一種平行計算的程式設計模型,用於作業排程。

GFS和BigTable已經為我們提供了高效能、高併發的服務,但是並行程式設計可不是所有程式設計師都玩得轉的活兒,如果我們的應用本身不能併發,那GFS、BigTable也都是沒有意義的。MapReduce的偉大之處就在於讓不熟悉並行程式設計的程式設計師也能充分發揮分散式系統的威力。

簡單概括的說,MapReduce是將一個大作業拆分為多個小作業的框架(大作業和小作業應該本質是一樣的,只是規模不同),使用者需要做的就是決定拆成多少份,以及定義作業本身。

下面用一個貫穿全文的例子來解釋MapReduce是如何工作的。

2. 例子:統計詞頻

如果我想統計下過去10年計算機論文出現最多的幾個單詞,看看大家都在研究些什麼,那我收集好論文後,該怎麼辦呢?

方法一:我可以寫一個小程式,把所有論文按順序遍歷一遍,統計每一個遇到的單詞的出現次數,最後就可以知道哪幾個單詞最熱門了。

這種方法在資料集比較小時,是非常有效的,而且實現最簡單,用來解決這個問題很合適。

方法二:寫一個多執行緒程式,併發遍歷論文。

這個問題理論上是可以高度併發的,因為統計一個檔案時不會影響統計另一個檔案。當我們的機器是多核或者多處理器,方法二肯定比方法一高效。但是寫一個多執行緒程式要比方法一困難多了,我們必須自己同步共享資料,比如要防止兩個執行緒重複統計檔案。

方法三:把作業交給多個計算機去完成。

我們可以使用方法一的程式,部署到N臺機器上去,然後把論文集分成N份,一臺機器跑一個作業。這個方法跑得足夠快,但是部署起來很麻煩,我們要人工把程式copy到別的機器,要人工把論文集分開,最痛苦的是還要把N個執行結果進行整合(當然我們也可以再寫一個程式)。

方法四:讓MapReduce來幫幫我們吧!

MapReduce本質上就是方法三,但是如何拆分檔案集,如何copy程式,如何整合結果這些都是框架定義好的。我們只要定義好這個任務(使用者程式),其它都交給MapReduce。

在介紹MapReduce如何工作之前,先講講兩個核心函式map和reduce以及MapReduce的虛擬碼。

3. map函式和reduce函式

map函式和reduce函式是交給使用者實現的,這兩個函式定義了任務本身。

  • map函式:接受一個鍵值對(key-value pair),產生一組中間鍵值對。MapReduce框架會將map函式產生的中間鍵值對裡鍵相同的值傳遞給一個reduce函式。
  • reduce函式:接受一個鍵,以及相關的一組值,將這組值進行合併產生一組規模更小的值(通常只有一個或零個值)。

統計詞頻的MapReduce函式的核心程式碼非常簡短,主要就是實現這兩個函式。

[plain] view plaincopyprint?
  1. map(String key, String value):  
  2.     // key: document name  
  3.     // value: document contents  
  4.     for each word w in value:  
  5.         EmitIntermediate(w, "1");  
  6. reduce(String key, Iterator values):  
  7.     // key: a word  
  8.     // values: a list of counts  
  9.     int result = 0;  
  10.     for each v in values:  
  11.         result += ParseInt(v);  
  12.         Emit(AsString(result));  
在統計詞頻的例子裡,map函式接受的鍵是檔名,值是檔案的內容,map逐個遍歷單詞,每遇到一個單詞w,就產生一箇中間鍵值對<w, "1">,這表示單詞w咱又找到了一個;MapReduce將鍵相同(都是單詞w)的鍵值對傳給reduce函式,這樣reduce函式接受的鍵就是單詞w,值是一串"1"(最基本的實現是這樣,但可以優化),個數等於鍵為w的鍵值對的個數,然後將這些“1”累加就得到單詞w的出現次數。最後這些單詞的出現次數會被寫到使用者定義的位置,儲存在底層的分散式儲存系統(GFS或HDFS)。

4. MapReduce是如何工作的


上圖是論文裡給出的流程圖。一切都是從最上方的user program開始的,user program連結了MapReduce庫,實現了最基本的Map函式和Reduce函式。圖中執行的順序都用數字標記了。

  1. MapReduce庫先把user program的輸入檔案劃分為M份(M為使用者定義),每一份通常有16MB到64MB,如圖左方所示分成了split0~4;然後使用fork將使用者程序拷貝到叢集內其它機器上。
  2. user program的副本中有一個稱為master,其餘稱為worker,master是負責排程的,為空閒worker分配作業(Map作業或者Reduce作業),worker的數量也是可以由使用者指定的。
  3. 被分配了Map作業的worker,開始讀取對應分片的輸入資料,Map作業數量是由M決定的,和split一一對應;Map作業從輸入資料中抽取出鍵值對,每一個鍵值對都作為引數傳遞給map函式,map函式產生的中間鍵值對被快取在記憶體中。
  4. 快取的中間鍵值對會被定期寫入本地磁碟,而且被分為R個區,R的大小是由使用者定義的,將來每個區會對應一個Reduce作業;這些中間鍵值對的位置會被通報給master,master負責將資訊轉發給Reduce worker。
  5. master通知分配了Reduce作業的worker它負責的分割槽在什麼位置(肯定不止一個地方,每個Map作業產生的中間鍵值對都可能對映到所有R個不同分割槽),當Reduce worker把所有它負責的中間鍵值對都讀過來後,先對它們進行排序,使得相同鍵的鍵值對聚集在一起。因為不同的鍵可能會對映到同一個分割槽也就是同一個Reduce作業(誰讓分割槽少呢),所以排序是必須的。
  6. reduce worker遍歷排序後的中間鍵值對,對於每個唯一的鍵,都將鍵與關聯的值傳遞給reduce函式,reduce函式產生的輸出會新增到這個分割槽的輸出檔案中。
  7. 當所有的Map和Reduce作業都完成了,master喚醒正版的user program,MapReduce函式呼叫返回user program的程式碼。

所有執行完畢後,MapReduce輸出放在了R個分割槽的輸出檔案中(分別對應一個Reduce作業)。使用者通常並不需要合併這R個檔案,而是將其作為輸入交給另一個MapReduce程式處理。整個過程中,輸入資料是來自底層分散式檔案系統(GFS)的,中間資料是放在本地檔案系統的,最終輸出資料是寫入底層分散式檔案系統(GFS)的。而且我們要注意Map/Reduce作業和map/reduce函式的區別:Map作業處理一個輸入資料的分片,可能需要呼叫多次map函式來處理每個輸入鍵值對;Reduce作業處理一個分割槽的中間鍵值對,期間要對每個不同的鍵呼叫一次reduce函式,Reduce作業最終也對應一個輸出檔案。

我更喜歡把流程分為三個階段。第一階段是準備階段,包括1、2,主角是MapReduce庫,完成拆分作業和拷貝使用者程式等任務;第二階段是執行階段,包括3、4、5、6,主角是使用者定義的map和reduce函式,每個小作業都獨立執行著;第三階段是掃尾階段,這時作業已經完成,作業結果被放在輸出檔案裡,就看使用者想怎麼處理這些輸出了。

5. 詞頻是怎麼統計出來的

結合第四節,我們就可以知道第三節的程式碼是如何工作的了。假設咱們定義M=5,R=3,並且有6臺機器,一臺master。


這幅圖描述了MapReduce如何處理詞頻統計。由於map worker數量不夠,首先處理了分片1、3、4,併產生中間鍵值對;當所有中間值都準備好了,Reduce作業就開始讀取對應分割槽,並輸出統計結果。

6. 使用者的權利

使用者最主要的任務是實現map和reduce介面,但還有一些有用的介面是向用戶開放的。
  • an input reader。這個函式會將輸入分為M個部分,並且定義瞭如何從資料中抽取最初的鍵值對,比如詞頻的例子中定義檔名和檔案內容是鍵值對。
  • a partition function。這個函式用於將map函式產生的中間鍵值對對映到一個分割槽裡去,最簡單的實現就是將鍵求雜湊再對R取模。
  • a compare function。這個函式用於Reduce作業排序,這個函式定義了鍵的大小關係。
  • an output writer。負責將結果寫入底層分散式檔案系統。
  • a combiner function。實際就是reduce函式,這是用於前面提到的優化的,比如統計詞頻時,如果每個<w, "1">要讀一次,因為reduce和map通常不在一臺機器,非常浪費時間,所以可以在map執行的地方先執行一次combiner,這樣reduce只需要讀一次<w, "n">了。
  • map和reduce函式就不多說了。

7. MapReduce的實現

目前MapReduce已經有多種實現,除了谷歌自己的實現外,還有著名的hadoop,區別是谷歌是c++,而hadoop是用java。另外斯坦福大學實現了一個在多核/多處理器、共享記憶體環境內執行的MapReduce,稱為Phoenix(介紹),相關的論文發表在07年的HPCA,是當年的最佳論文哦!

參考文獻

[1] MapReduce : Simplified Data Processing on Large Clusters. In proceedings of OSDI'04. [4] Evaluating MapReduce for Multi-core and Multiprocessor Systems. In proceedings of HPCA'07.