1. 程式人生 > >MapReduce:大型叢集上的簡單資料處理

MapReduce:大型叢集上的簡單資料處理

MapReduce:大型叢集上的簡單資料處理

摘要

MapReduce是一個程式設計模型和一個處理和生成大資料集的相關實現。使用者指定一個map函式處理一個key-value對來生成一組中間key-value對;指定一個reduce函式合併所有和同一中間key值相聯絡的中間value值。許多現實世界中的任務以這個模型展現,就像文中展示的那樣。

以這種函式型別編寫的程式在一群日常機器上自動並行化並執行。執行時系統關心劃分輸入資料的細節,在一組機器間排程程式的執行,處理機器失效,管理內部機器需要的通訊。這使得那些沒有任何並行和分散式系統經驗的程式設計師可以容易地使用大型分散式系統的資源。

我們MapReduce

的實現執行在一大群日常機器上並且高度可擴張:一個典型的MapReduce計算在成千上萬臺機器上處理數TB的資料。程式設計師發現系統很易用:成百上千的MapReduce程式已經被實現,每天都有多餘1000MapReduce作業在Google叢集上執行。

1.介紹

在過去的5年裡,作者以及Google裡的其他程式設計師已經實現了數以百計的,特殊目的的計算。這些計算處理海量原始資料,比如,文件抓取(shijin:類似網路爬蟲的程式)、web請求日誌等;或者計算各種各樣的派生資料,比如倒排索引、web文件的圖結構的各種表示形勢、每臺主機上網路爬蟲抓取的頁面數量的彙總、列舉一天中一組最頻繁的查詢等。大多數這種計算概念上直截了當。然而輸入的資料量巨大,並且為了在合理的時間內完成,計算不得不分佈到數百或數千臺機器上。如何平行計算、分發資料、處理失效的問題湊在一起使原本簡單的計算晦澀難懂,需要大量複雜的程式碼來處理這些問題。

作為對上述複雜性的應對,我們設計一個新的抽象模型,其使我們可以表達我們試圖執行的簡單運算,但是將並行、容錯、資料分佈和負載均衡等散亂的細節隱藏在了一個庫裡面。我們抽象模型的靈感來自Lisp和許多其他函式式語言的mapreduce原語。我們意識到大多數我們的計算都涉及這樣的操作:在我們輸入中的每個邏輯“記錄”上應用map操作,以便計算出一組中間key/value對,然後在所有享有相同key值的value值應用reduce操作,以便合適地合併派生的資料。我們使用帶有使用者指定mapreduce操作的函式模型,就可以輕易地並行化大規模計算;並且可以使用“再次執行”(re-execution)作為基礎的容錯機制。

這項工作的主要貢獻是一個簡單強大的介面,該介面使自動地並行化和分佈大規模計算成為了可能,介面連同該介面的實現,實現了在大群日常PC機上的高效能。

第二節描述基本的程式設計模型給出一些例子。第三節描述了為我們基於叢集的計算環境定做的MapReduce介面的實現。第四節描述一些我們發現有用的程式設計模型優化。第五節對我們各種不同任務實現的效能進行了測量。第六節探索了MapReduceGoogle內部的使用,包含我們在使用其作為我們生產索引系統的重寫操作的基礎時的經驗。第七節討論相關的和未來的工作。

2.程式設計模型

計算取出一組輸入key/value對,產生一組輸出key/value對。使用MapReduce庫的使用者用兩個函式表達這個計算:MapReduce

使用者自己編寫的Map接受一個輸入對,然後產生一組中間key/value對。MapReduce庫把所有和相同中間keyI關聯的中間value值聚集在一起後傳遞給Reduce函式。

也是由使用者編寫的Reduce函式接受一箇中間keyI和一組那個key值的value值。Reduce函式將這些value值合併在一起,形成一個可能更小的value值的集合。通常每次Reduce呼叫僅產生01個輸出value值。中間value值通過一個迭代器提供給使用者的Reduce函式,這樣我們就可以處理因太大而無法適應記憶體的value值列表。

2.1 例子

考慮這麼一個問題,在一個大的文件集合中對每個單詞出現的次數進行計數,使用者可能要類似下面虛擬碼的程式碼:

map(String key, String value):

    // key: document name

    // value: documen t contents

    for each word w in value:

        EmitIntermediate(w, “1”);

reduce(String key, Iterator values):

    // key: a word

    // values: a list of counts

    int result = 0;

    for each v in value s:

        result += Parse Int(v);

    Emit(AsString(result));

Map函式emit每個詞加上一個相關的出現計數(在這個簡單的例子裡就是1)Reduce函式把為一個特定的詞emit的所有計數加起來。

另外,使用者編寫程式碼用輸入和輸出檔案的名字以及可選的調節引數填充一個mapreduce說明物件,使用者之後呼叫MapReduce函式,並把這個說明物件傳遞給它。使用者的程式碼和MapReduce庫連結在一起(C++實現)。附錄A包含了這個例子的全部程式文字。

2.2 型別

儘管在前面的虛擬碼按照字串輸入輸出書寫,但是在概念上,使用者提供的mapreduce函式有相關的型別:

map       (k1,v1)          ->list(k2,v2)

reduce   (k2,list(v2))   ->list(v2)

比如,輸入的key值和value值與輸出的key值和value值從不同的型別域得到。並且,中間key值和value值與輸出key值和value值來自同一個型別域。(alex注:原文中這個domain的含義不是很清楚,我參考HadoopKFS等實現,mapreduce都使用了泛型,因此,我把domain翻譯成型別域)。

我們的C++實現使用字串型別作為使用者自定義函式的輸入輸出,並將字串與適當型別的轉換工作交給了客戶程式碼。

2.3 更多的例子

這裡有一些有趣程式的簡單例子,可以很容易地作為MapReduce計算來表示:

分散式的Grep:如果與提供的模式串匹配,Map函式emit一行,Reduce函式是一個恆等函式,即僅僅把提供的中間資料複製到輸出。

URL訪問頻率計數:Map函式處理web頁面請求的日誌,然後輸出(URL,1)Reduce函式把相同URLvalue值加在一起,emit一個(URL, 總數)對。

倒轉網路連結圖:Map函式為每個連結輸出(target,source)對,每個連結連線到在名字叫源的頁面中發現的目標URLReduce函式把與給定目標URL相關的所有源URL連線成列表,emit(target,list(source))對。

每個主機的檢索詞向量:檢索詞向量將一個文件或者一組文件中出現的嘴重要的詞彙總為一個(詞,頻)對列表。Map函式為每個輸入文件emit(主機名, 檢索詞向量),其中主機名來自文件的URLReduce函式為一個給定主機接收所有每文件檢索詞向量,其將這些檢索詞向量加在一起,丟棄低頻的檢索詞,然後emit一個最終的(主機名, 檢索詞向量)對。

倒排索引:Map函式解析每個文件,emit一系列(, 文件號)列表,Reduce函式接受一個給定詞的所有(, 文件號)對,排序相關的文件號,emit(,list(文件號))。所有的輸出對集合形成一個簡單的倒排索引,增加計算來跟蹤詞在文件中的位置很簡單。

分散式排序:Map函式從每個記錄提取key值,emit(key,record)對。Reduce 函式原封不動地emit所有對。這個運算依賴4.1描述的分割槽裝置和4.2節描述的排序屬性。

3.實現

MapReduce有多種不同的可能實現。正確的選擇取決於環境。例如,一種實現可能適用於小型共享記憶體的機器,另外一種則適用於大型NUMA多處理器,然而還有的適合大型的網路叢集。

本章節描述一個針對Google內部廣泛使用的運算環境的實現:大群日常機器用交換乙太網連線在一起。在我們環境中:

1. 機器通常是x86雙核處理器、執行Linux系統、每臺機器2-4GB記憶體。

2. 使用日常網路硬體,通常在機器級別頻寬為百兆每分或者千兆每分,但是平均遠小於網路整體頻寬的一半。(averaging considerably less in overall bisection bandwidth

3. 叢集包含數百或數千臺機器,因此機器失效是常態。

4. 儲存由直接附屬在個體機器上的廉價IDE硬碟提供。一個內部開發的分散式檔案系統用來管理儲存在這些磁碟上的資料。檔案系統使用副本來提供不可靠的硬體上的可用性和可靠性。

5. 使用者向一個排程系統提交作業。每個作業包含一組任務,排程系統將這些任務對映到一組叢集內部可用的機器上。

3.1 執行概覽

Map呼叫通過將輸入資料自動分為M個片段(a set of M splits)的方式被分佈到多臺機器上。輸入片段能夠被不同的機器並行處理。Reduce呼叫使用分割槽函式將中間key值空間分成R份(例如,hash(key) mod R),繼而被分佈到多臺機器上執行。分割槽數量(R)和分割槽函式由使用者指定。

1展示了我們的實現中MapReduce操作的整體流程。當用戶呼叫MapReduce函式時,下列動作發生(圖一中的數字標籤對應下面列表中的序號):

1. 使用者程式中的MapReduce庫首先將輸入檔案分成M份,每份通常在16MB64MB之間(可以通過可選引數由使用者控制)。然後在機器叢集中啟動許多程式副本。

2. 程式副本中有一個是特殊-master。其它的是worker,由master分配工作。有Mmap任務和Rreduce任務將被分配,master選擇空閒的worker然後為每一個worker分配map任務或reduce任務。

3. 被分配了map任務的worker讀取相關輸入片段的內容,它從輸入的資料片段中解析出key/value對,然後把key/value對傳遞給使用者自定義的Map函式,由Map函式生成的中間key/value對快取在記憶體中。

4. 快取對被定期地寫入本地磁碟,被分割槽函式分成R個域。快取對在本地磁碟上的位置被傳回mastermaster負責將這些位置轉寄給reduce worker

5. 當一個reduce workermaster告知位置資訊後,它使用遠端過程呼叫從map worker的本地磁碟讀取快取資料。當一個reduce worker讀取了所有的中間資料後,它通過中間key值對緩衝資料排序,以便相同key值的出現組織在一起。由於通常許多不同的key值對映到同一reduce任務上,因此排序是需要的。如果中間資料量太大而無法適應記憶體,那麼就使用外部排序。

6.Reduce worker迭代排序後的中間資料,對於每一個遇到的唯一的中間key 值,Reduce worker將這個key值和與它相關的中間value值的集合傳遞給使用者的Reduce函式。Reduce函式的輸出被追加到這個reduce分割槽的一個最終輸出檔案。

7. 當所有的mapreduce任務完成之後,master喚醒使用者程式。此時此刻,使用者程式裡的對MapReduce呼叫返回使用者程式碼。

成功完成之後,mapreduce執行的輸出可以在R個輸出檔案中得到(每個檔案對應一個reduce任務,檔名由使用者指定)。通常,使用者不需要將這R個輸出檔案合併成一個檔案-他們經常把這些檔案作為輸入傳遞給另外一個MapReduce呼叫,或者在另外一個分散式應用中使用它們,這種分散式應用能夠處理分成多個檔案的輸入。

3.2 Master資料結構

Master保持一些資料結構,對每一個mapreduce任務,它儲存其狀態(空閒、進行中或已完成),以及Worker機器(對於非空閒任務)的身份(identity)。

Master是一個管道,通過它中間檔案域的位置資訊從map任務傳播到reduce任務。因此,對於每個已經完成的map任務,master儲存了map任務產生的R箇中間檔案域的位置和大小。當map任務完成時,接收到了位置和大小資訊的更新,這些資訊被遞進地推送給那些正在執行的reduce任務。

3.3 容錯

因為MapReduce庫是設計用來協助使用數百數千的機器處理超大規模資料的,這個庫必須優雅地處理機器故障。

worker故障

master週期性地ping每個worker。如果在一個確定的時間段內沒有收到worke的迴應,master將這個worker標記為失效。任何由這個worker完成的map任務被重置回它們初始的空閒狀態,因此變得可以被排程到其它worker。同樣,在一個失效的worker上正在執行的mapreduce任務被重置為空閒狀態,變得可被重新排程。

故障時已完成的map任務必須重新執行是因為它們的輸出被儲存在失效機器的本地磁碟上,因此不可訪問了。已經完成的reduce任務不需要再次執行,因為它們的輸出儲存在全域性檔案系統。

當一個map任務首先被worker A執行,之後被worker B執行(因為A失效),所有執行reduce任務的worker會接到重新執行的通知。還沒有從worker A讀取資料的任何reduce任務將從worker B讀取資料。

MapReduce對規模worker失效很有彈性。例如,在一次MapReduce操作執行期間,在正在執行的叢集上進行的網路維護一次造成一組80臺機器在幾分鐘內無法訪問,MapReduce master只需簡單德再次執行那些不可訪問的worker完成的工作,然後繼續執行,直終完成這個MapReduce操作。

master失敗

master定期對上面描述的master資料結構作檢查點很簡單。如果這個master任務失效了,一個新的備份可以從上一個檢查點狀態啟動。然而,考慮到只有一個單獨mastermaster失效是不太可能的,因此我們現在的實現是如果master失效,就中止MapReduce運算。客戶可以檢查這個情況,並且如果需要可以重試MapReduce操作。

在失效面前的語義

semantics in the presence of failures

當用戶提供的mapreduce操作是輸入值的確定性函式,我們的分散式實現產生相同的輸出,就像沒有錯誤、順序執行整個程式產生的一樣。

我們依賴對mapreduce任務的輸出是原子提交來完成這個特性。每個工作中的任務把它的輸出寫到私有的臨時檔案中。一個reduce任務生成一個這樣的檔案,並且一個map任務生成R個這樣的檔案(一個reduce任務對應一個)。當一個map任務完成時,workermaster傳送一個訊息,訊息中包含R個臨時檔名。如果master收到一個已經完成的map任務的完成訊息,它將忽略這個訊息;否則,master將這R個檔案的名字記錄在一個master資料結構裡。

當一個reduce任務完成時,reduce worker原子性地將臨時檔案重新命名為最終的輸出檔案。如果同一個reduce任務在多臺機器上執行,針對同一個最終輸出檔案將有多個重新命名呼叫執行。我們依賴底層檔案系統提供的原子重新命名操作來保證最終的檔案系統狀態僅僅包含reduce任務一次執行產生的資料。

絕大多數map和人reduce操作是確定的,而且存在這樣的一個事實:我們的語義等同於順序的執行,在這種情況下,程式設計師可以很容易推斷他們程式的行為。當map/reduce操作是不確定性的時候,我們提供較弱但是依然合理的語義。在非確定性操作面前,一個特定reduce任務R1的輸出等價於一個非確定性程式順序執行產生的R1的輸出。然而,一個不同reduce任務R2的輸出可能相當於一個不同的非確定行程式順序執行產生的R2的輸出。

考慮map任務Mreduce任務R1R2。設e(Ri)Ri提交的執行過程(只有一個這樣的執行過程)。由於e(R1)可能讀取了由M一次執行產生的輸出,而e(R2)可能讀取了由M的不同執行產生的輸出,較弱的語義隨之而來。

3.4 儲存位置

在我們的計算環境中,網路頻寬是一個相當稀缺的資源。我們通過充分利用輸入資料(GFS管理)儲存組成叢集的機器的本地磁碟上這樣一個事實來節省網路頻寬。GFS 把每個檔案分成64MB的塊,並且在不同機器上儲存每個塊的一些拷貝(通常是3個拷貝)。考慮到輸入檔案的位置資訊,MapReducemaster試圖將一個map任務排程到包含相關輸入資料拷貝的機器上;嘗試失敗的話,它將嘗試排程map任務到靠近任務輸入資料副本的機器上(例如,一個包含資料並在同一閘道器上的worker機器)。當在一個叢集的大部分worker上執行大型MapReduce操作的時候,大部分輸入資料從本地機器讀取,並且不消耗頻寬。

3.5 任務粒度

如上所述,我們把map階段細分成M個片段、把reduce 階段細分成R個片段。理想情況下,MR應當比worker機器的數量要多得多。在每臺worker上執行許多不同的任務提高動態負載平衡,並且在worker故障時加速恢復:失效機器上完成的很多map任務可以分佈到所有其他的worker機器。

但是在我們的實現中對MR的取值有實際的限制,因為master必須制定O(M+R)排程策略,並且如上所述在記憶體中儲存O(M*R)個狀態(然而記憶體使用的常數因子還是比較小的:O(M*R)片狀態大約包含每對map任務/reduce任務對1個位元組)。

此外,R值通常是被使用者制約的,因為每個reduce任務的輸出最終在一個獨立的輸出檔案中。實際上,我們傾向於選擇M值以使每個獨立任務有大約16M64M的輸入資料(這樣如上所述的本地最優化最有效),我們把R值設定為我們想使用的worker機器數量的一個小的倍數。我們通常用,使用2000worker機器,以M=200000R=5000來執行MapReduce計算。

3.6 備用任務

延長一個MapReduce操作花費的總時間的常見因素之一是“落伍者”:一臺機器花費了不同尋常的長時間才完成計算中最後幾個mapreduce任務之一,出現“落伍者”的原因非常多。比如:一個有壞磁碟的機器可能經歷頻繁的糾錯以致將其讀效能從30M/s降低到1M/s。叢集排程系統可能已經降其他任務排程到這臺機器上,由於CPU、記憶體、本地磁碟和網路頻寬的競爭導致執行MapReduce程式碼更慢。我們最近遇到的一個問題是機器初始化程式碼中的bug,導致處理器快取失效:受影響機器上的計算減慢了超過百倍。

我們有一個通用的機制來減輕“落伍者”的問題。當一個MapReduce操作接近完成的時候,master排程剩餘正在執行任務的備份執行、無論主執行還是備份執行完成,任務被標記為已完成。我們調優了這個機制,以便其通常增加操作不多於幾個百分點的計算資源。作為示例,5.3節描述的排序程式在關掉備用任務機制時要多花44%的時間完成。

4.精細化

儘管簡單地書寫MapReduce函式提供的基本功能能夠滿足大多數需求,我們還是發現了一些有用的擴充套件。在本節做了描述。

4.1 分割槽函式

MapReduce的使用者指定他們需要的reduce任務/輸出檔案的數量(R)。我們在中間key值上使用分割槽函式將資料在這些任務間分開。一個預設的分割槽函式是使用雜湊(比如,hash(key) mod R)提供的。它傾向於導致相當均衡的分割槽。然而,在某些情況下,通過key值的其它函式分割資料是有用的。比如,輸出的key值是URLs,我們希望單個主機的所有條目以結束在同一個輸出檔案中。為了支援類似的情況,MapReduce庫的使用者可以提供一個專門的分割槽函式。例如,使用“hash(Hostname(urlkey)) mod R”作為分割槽函式就可以把所有來自同一個主機的URL結束在同一個輸出檔案中。

4.2 順序保證

我們保證在給定的分割槽中,中間key/value對是以key值遞增的順序處理的。這個順序保證每個分成生成一個有序的輸出檔案很容易,這在下列情況時很有用:輸出檔案的格式需要支援按key值高效地隨機訪問查詢,或者輸出的使用者有序的資料很方便。

4.3 合併函式

某些情況下,每個map任務產生的中間key值有顯著的重複,並且使用者指定的Reduce函式滿足結合律和交換律。這個情況很好的例子是2.1節中的詞數統計示例。由於詞頻傾向於滿足zipf分佈,每個map任務將產生數百數千形如<the,1>的記錄。所有這些記錄將通過網路被髮送到一個單獨的reduce任務,然後被Reduce函式累加起來產生一個數。我們允許使用者指定一個可選的合併函式,其在通過網路傳送資料之前對資料進行部分合並。

合併函式在每臺執行map任務的機器上執行。通常使用相同的程式碼實現合併函式和reduce函式。合併函式和reduce函式唯一的區別就是MapReduce庫如何處理函式的輸出。Reduce函式的輸出被寫入在最終的輸出檔案,合併函式的輸出被寫到中間檔案裡,該檔案被髮送給reduce任務。

部分合並顯著加速了MapReduce操作中的某些類。附錄A包含一個使用合併函式的例子。

4.4 輸入輸出型別

MapReduce庫支援讀取一些不同的格式的輸入資料。比如,文字模式輸入將每一行視為一個key/value對。key是檔案中的偏移,value是那一行的內容。另外一種通常支援的格式以key值排序儲存了一系列key/value對。每種輸入型別的實現都懂得如何把自己分割成有意義的範圍,以便作為獨立的map處理(例如,文字模式的範圍分割確保分割只在行邊界發生)。雖然大多數使用者僅僅使用少量預定義輸入型別之一,但是使用者可以通過提供一個簡單的Reader介面的實現支援一個新的輸入型別。

需要提供資料的reader不必從檔案中讀取,比如,我們可以容易地定義一個從資料庫裡讀記錄的reader,或者從對映在記憶體中的資料結構讀。

類似的,我們為生產不同格式的資料提供一組輸出型別,使用者程式碼可以容易地為新的輸出型別新增支援。

4.5 副作用

某些情況下,MapReduce的使用者發現從map/reduce操作中產生作為附加輸出的輔助檔案比較方便。我們依賴程式writer把這種“副作用”變成原子的和冪等的(alex注:冪等的指一個總是產生相同結果的數學運算)。通常應用程式首先寫到一個臨時檔案,在全部生成之後,原子地將這個檔案重新命名。

我們不為單個任務產生的多個輸出檔案的原子兩步提交提供支援。因此,產生多個輸出檔案、並且具有跨檔案一致性要求的任務,必須是確定性的。實際這個限制還沒有成為問題。

4.6 跳過受損記錄

有時候,使用者程式碼中的bug導致map或者reduce函式在某些記錄上確定性地崩潰。這樣的bug阻止MapReduce操作完成。應對的通常過程是修復bug,但是有時不可行;可能這個bug在第三方庫裡,其原始碼是得不到的。而且有時忽略一些記錄是可以接受的,比如在一個大資料集上進行統計分析。我們提供了一種可選的執行模式,這種模式下,為了保證繼續進行,MapReduce庫探測哪些記錄導致確定性的崩潰,並且跳過這些記錄。

每個worker程序都安裝了一個捕獲段例外和匯流排錯誤的訊號控制代碼。在呼叫使用者的MapReduce操作之前,MapReduce庫在一個全域性變數中儲存了引數的序號。如果使用者程式碼產生了一個訊號,訊號控制代碼向MapReducemaster傳送包含序列號的“奄奄一息”的UDP包。當master在特定記錄上看到多餘一次失敗時,當master釋出相關Map或者Reduce任務的下次重新執行時,它就指出這條記錄應該跳過。

4.7 本地執行

除錯MapReduce函式的是非常棘手的,因為實際的執行發生在分散式系統中,通常是在數千臺機器上,由master動態地制定工作分配策略。為了促進除錯、效能剖析和小規模測試,我們開發了一套可選的MapReduce庫的實現, MapReduce操作在本地計算機上順序地執行所有工作。為使用者提供了控制以便把計算限制到特定的map任務上。使用者通過特殊的標誌來呼叫他們的程式,然後可以容易地使用他們覺得有用的除錯和測試工具(比如gdb)。

4.8 狀態資訊

Master執行著內部的HTTP伺服器並且輸出一組供人消費的狀態頁面。狀態頁面顯示包括計算的進展,比如已經完成了多少任務、有多少任務正在進行、輸入的位元組數、中間資料的位元組數、輸出的位元組數、進行的百分比等等。頁面還包含指向每個任務產生的stderrstdout檔案的連結。使用者可以使用這些資料預測計算需要執行多長時間、是否需要增加向計算增加更多的資源。這些頁面也可以用來搞清楚什麼時候計算比預期的要慢。

另外,最頂層的狀態頁面顯示了哪些worker失效了,以及他們失效的時候正在執行的mapreduce任務是哪些。這些資訊在嘗試診斷使用者程式碼中bug的時候很有用。

4.9 計數器

MapReduce庫提供了一個計數器促進統計各種事件發生次數。比如,使用者程式碼可能想統計處理的總詞數、或者已經索引的德文文件數等等。

為了使用這個附加功能,使用者程式碼建立一個名叫計數器的物件,然後在在MapReduce函式中適當地增加計數。例如:

Counter* uppercase;

uppercase = GetCounter(“uppercase”);

map(String name, String contents):

       for each word w in c ontents:

              if (IsCapitalized(w)):

                     uppercase->Increment();

       EmitIntermediate(w , 1);

這些來自單個worker機器的計數值週期性傳播到master(附加在ping的應答包中)。master把來自成功執行的mapreduce任務的計數器值進行累加,當MapReduce操作完成後,返回給使用者程式碼。當前的計數器值也會顯示在master的狀態頁面上,這樣人們可以觀看當前計算的進度。當累加計數器值的時候,master排除同一map或者reduce任務重複執行的影響,避免重複計數(重複執行額可以起因於我們使用備用任務以及失效後任務的重新執行)。

有些計數器的值由MapReduce庫自動維護,比如已經處理的輸入key/value對的數量、已經產生的輸出key/value對的數量。

使用者發現計數器附加功能對MapReduce操作行為的完整性檢查有用。比如,在一些MapReduce操作中,使用者程式碼可能需要確保產生的輸出對的數量精確的等於處理的輸入對的數量,或者處理的German文件的比例在處理文件的整體數量中在可以容忍的比例。

5.效能

本節我們用在一大群機器上執行的兩個計算來測量MapReduce的效能。一個計算搜尋大約1TB的資料查詢特定的模式匹配,另一個計算排序大約1TB的資料。

這兩個程式是由MapReduce使用者書寫的實際程式子集的代表-一類程式是將從一種表現形式打亂為另外一種表現形式;另一類是從大資料集中抽取少量使用者感興趣的資料。

5.1 叢集配置

所有程式都執行在一個大約由1800臺機器組成的叢集上。每臺機器有兩個2G主頻、支援超執行緒的Intel Xeon處理器,4GB的記憶體,兩個160GBIDE硬碟和一個千兆乙太網。這些機器被安排在一個兩層樹形交換網路中,在root節點大約支援100-200GBPS的合計頻寬。所有機器使用相同主機裝置,因此任意對之間的往返時間小於1毫秒。

4GB的記憶體中,大約1-1.5G被執行在叢集上的其他任務預訂。程式在週末下午執行,這時CPU、磁碟和網路大多數空閒。

5.2 查詢

這個grep程式從頭到尾掃描1010100位元組的記錄,查詢相當稀少的3個字元的模式(這個模式出現在92337個記錄中)。輸入資料被分割成大約64M的塊(M=15000shijin1010*100/(64*1024*1024)),整個輸出被放在一個檔案中(R=1)。

2 顯示了運算隨時間的進展。Y軸表示輸入資料的掃描速度。這個速度隨著更多的機器被分配到MapReduce計算中而增加,當1764worker被分配,速度達到超過30GB/s的峰值。當Map任務結束時,速度開始降低並在計算到80秒時達到0。整個計算從開始到結束大約花了150秒。這包括大約一分鐘的啟動開銷。開銷起因於程式傳播到所有worker機器、與GFS互動開啟1000個輸入檔案集合的延遲、獲取檔案優化所需資訊的時間。

5.3 排序

排序程式對1010100位元組的記錄(大約1TB的資料)排序。這個程式模仿TeraSort基準測試程式[10]

排序程式由不到50行程式碼組成。一個三行Map函式從一個文字行中提取出10位元組的排序key值,並且將key值和原始文字行作為中間key/valueemit。我們使用了一個內建的恆等函式作為Reduce操作。這個函式將中間key/value對作為輸出輸出key/value對不加修改地傳送。最終已排序的輸出寫入到一組雙備份(2-way replicated)GFS檔案(也就是說,作為程式輸出,2TB 的資料被寫入)。

像以前一樣,輸入資料被分割成64MB