1. 程式人生 > >MapReduce分散式計算和程式設計原理總結

MapReduce分散式計算和程式設計原理總結

在MapReduce程式的開發過程中,往往需要用到FileInputFormat與TextInputFormat,TextInputFormat這個類繼FileInputFormat,FileInputFormat這個類繼承自InputFormat,InputFormat這個類會將檔案file按照邏輯進行劃分,劃分成的每一個split切片將會被分配給一個Mapper任務,檔案先被切分成split塊,而後每一個split切片對應一個Mapper任務。

FileInputFormat的劃分機制:

 A. 簡單地按照檔案的內容長度進行切片。

B. 切片大小,預設等於 block 大小。

C. 切片時不考慮資料集整體,而是逐個針對每一個檔案單獨切片。預設情況下, split size =block size,在 hadoop 2.x 中為 128M。

注意:bytesRemaining/splitSize > 1.1 不滿足的話,那麼最後所有剩餘的會作為一個切片。從而不會形成例如 129M 檔案規劃成兩個切片的局面。

input File 通過 split 被邏輯切分為多個 split 檔案,通過 Record 按行讀取內容給 map(使用者自己實現的)進行處理,資料被 map 處理結束之後交給 OutputCollector 收集器,對其結果 key 進行分割槽(預設使用 hash 分割槽),然後寫入 buffer,每個 map task 都有一個記憶體緩衝區,儲存著 map 的輸出結果,當緩衝區快滿的時候需要將緩衝區的資料以一個臨時檔案的方式存放到磁碟,當整個 map task 結束後再對磁碟中這個 map task 產生的所有臨時檔案做合併,生成最終的正式輸出檔案,然後等待reduce task 來拉資料。 Map端的輸入的(k,v)分別是該行的起始偏移量,以及每一行的資料內容,map端的輸出(k,v)可以根據需求進行自定義,但是如果輸出的是javabean物件,需要對javabean繼承writable

shuffle的過程是:Map產生輸出開始到Reduc取得資料作為輸入之前的過程稱作shuffle.

1).Collect 階段:將 MapTask 的結果輸出到預設大小為 100M 的環形緩衝區,儲存的是 key/value,Partition 分割槽資訊等。

2).Spill 階段:當記憶體中的資料量達到一定的閥值的時候,就會將資料寫入本地磁碟,在將資料寫入磁碟之前需要對資料進行一次排序的操作,如果配置了 combiner,還會將有相同分割槽號和 key 的資料進行排序。

3).Merge 階段:把所有溢位的臨時檔案進行一次合併操作,以確保一個MapTask 最終只產生一箇中間資料檔案。

4).Copy 階段: ReduceTask 啟動 Fetcher 執行緒到已經完成 MapTask 的節點上覆制一份屬於自己的資料,這些資料預設會儲存在記憶體的緩衝區中,當記憶體的緩衝區達到一定的閥值的時候,就會將資料寫到磁碟之上。

5).Merge 階段:在 ReduceTask 遠端複製資料的同時,會在後臺開啟兩個執行緒對記憶體到本地的資料檔案進行合併操作。

6).Sort 階段:在對資料進行合併的同時,會進行排序操作,由於 MapTask階段已經對資料進行了區域性的排序,ReduceTask 只需保證 Copy 的資料的最終整體有效性即可。Shuffle 中的緩衝區大小會影響到 mapreduce 程式的執行效率,原則上說,緩衝區越大,磁碟 io 的次數越少,執行速度就越快緩衝區的大小可以通過引數調整, 引數:io.sort.mb 預設 100M

 

        reducer將已經分好組的資料作為輸入,並依次為每個鍵對應分組執行reduce函式。reduce函式的輸入是鍵以及包含與該鍵對應的所有值的迭代器。reduce端的輸入是map端的輸出,它的輸出的(k,v)根據需求進行自定義reducetask 並行度同樣影響整個 job 的執行併發度和執行效率,與 maptask的併發數由切片數決定不同,Reducetask 數量的決定是可以直接手動設定:

job.setNumReduceTasks(4);如果資料分佈不均勻,就有可能在 reduce 階段產生資料傾斜。預設的reduceTask的是1。

*Task平行計算總結:

最好每個 task 的執行時間至少一分鐘。如果 job 的每個 map 或者 reduce task 的執行時間都只有 30-40 秒鐘,那麼就減少該 job 的 map 或者 reduce 數,每一個 task(map|reduce)的 setup 和加入到排程器中進行排程,這個中間的過程可能都要花費幾秒鐘,所以如果每個task 都非常快就跑完了,就會在 task 的開始和結束的時候浪費太多的時間。 預設情況下,每一個 task 都是一個新的 JVM 例項,都需要開啟和銷燬的開銷。在一些情況下,JVM 開啟和銷燬的時間可能會比實際處理資料的時間要消耗的長,配置 task 的 M JVM  重用可以改善該問題:(mapred.job.reuse.jvm.num.tasks,預設是 1,表示一個 JVM 上最多可以順序執行的 task 數目(屬於同一個 Job)是 1。也就是說一個 task 啟一個 JVM)如果 input 的檔案非常的大,比如 1TB,可以考慮將 hdfs 上的每個 blocksize 設大,比如設成 256MB 或者 512MB。

OutputFormat主要用於描述輸出資料的格式,它能夠將使用者提供的key/value對寫入特定格式的檔案中。Hadoop 自帶了很多 OutputFormat 的實現,它們與InputFormat實現相對應,足夠滿足我們業務的需要。 OutputFormat 類的層次結構如下圖所示

OutputFormat是MapReduce輸出的基類,所有MapReduce輸出都實現了 OutputFormat 介面,主要有:

TextInputFormat 、SequenceFileOutputFormat、MultipleOutputs、DBOutputFormat等。