1. 程式人生 > >Hadoop Mapreduce分割槽、分組、二次排序過程詳解

Hadoop Mapreduce分割槽、分組、二次排序過程詳解

這篇文章分析的特別好,耐心看下去。。

1、MapReduce中資料流動

   (1)最簡單的過程:  map - reduce
   (2)定製了partitioner以將map的結果送往指定reducer的過程: map - partition - reduce
   (3)增加了在本地先進性一次reduce(優化)過程: map - combin(本地reduce) - partition -reduce
2、Mapreduce中Partition的概念以及使用。
(1)Partition的原理和作用
        得到map給的記錄後,他們該分配給哪些reducer來處理呢?hadoop採用的預設的派發方式是根據雜湊值來派發的,但是實際中,這並不能很高效或者按照我們要求的去執行任務。例如,經過partition處理後,一個節點的reducer分配到了20條記錄,另一個卻分配道了10W萬條,試想,這種情況效率如何。又或者,我們想要處理後得到的檔案按照一定的規律進行輸出,假設有兩個reducer,我們想要最終結果中part-00000中儲存的是"h"開頭的記錄的結果,part-00001中儲存其他開頭的結果,這些預設的partitioner是做不到的。所以需要我們自己定製partition來根據自己的要求,選擇記錄的reducer。自定義partitioner很簡單,只要自定義一個類,並且繼承Partitioner類,重寫其getPartition方法就好了,在使用的時候通過呼叫Job的setPartitionerClass指定一下即可


        Map的結果,會通過partition分發到Reducer上。Mapper的結果,可能送到Combiner做合併,Combiner在系統中並沒有自己的基類,而是用Reducer作為Combiner的基類,他們對外的功能是一樣的,只是使用的位置和使用時的上下文不太一樣而已。Mapper最終處理的鍵值對<key, value>,是需要送到Reducer去合併的,合併的時候,有相同key的鍵/值對會送到同一個Reducer那。哪個key到哪個Reducer的分配過程,是由Partitioner規定的。它只有一個方法,


        getPartition(Text key, Text value, int numPartitions)


輸入是Map的結果對<key, value>和Reducer的數目,輸出則是分配的Reducer(整數編號)。就是指定Mappr輸出的鍵值對到哪一個reducer上去。系統預設的Partitioner是HashPartitioner,它以key的Hash值對Reducer的數目取模,得到對應的Reducer。這樣保證如果有相同的key值,肯定被分配到同一個reducre上。如果有N個reducer,編號就為0,1,2,3……(N-1)。


(2)Partition的使用
        分割槽出現的必要性,如何使用Hadoop產生一個全域性排序的檔案?最簡單的方法就是使用一個分割槽,但是該方法在處理大型檔案時效率極低,因為一臺機器必須處理所有輸出檔案,從而完全喪失了MapReduce所提供的並行架構的優勢。事實上我們可以這樣做,首先建立一系列排好序的檔案;其次,串聯這些檔案(類似於歸併排序);最後得到一個全域性有序的檔案。主要的思路是使用一個partitioner來描述全域性排序的輸出。比方說我們有1000個1-10000的資料,跑10個ruduce任務, 如果我們執行進行partition的時候,能夠將在1-1000中資料的分配到第一個reduce中,1001-2000的資料分配到第二個reduce中,以此類推。即第n個reduce所分配到的資料全部大於第n-1個reduce中的資料。這樣,每個reduce出來之後都是有序的了,我們只要cat所有的輸出檔案,變成一個大的檔案,就都是有序的了


基本思路就是這樣,但是現在有一個問題,就是資料的區間如何劃分,在資料量大,還有我們並不清楚資料分佈的情況下。一個比較簡單的方法就是取樣,假如有一億的資料,我們可以對資料進行取樣,如取10000個數據取樣,然後對取樣資料分割槽間。在Hadoop中,patition我們可以用TotalOrderPartitioner替換預設的分割槽。然後將取樣的結果傳給他,就可以實現我們想要的分割槽。在取樣時,我們可以使用hadoop的幾種取樣工具,RandomSampler,InputSampler,IntervalSampler。


       這樣,我們就可以對利用分散式檔案系統進行大資料量的排序了,我們也可以重寫Partitioner類中的compare函式,來定義比較的規則,從而可以實現字串或其他非數字型別的排序,也可以實現二次排序乃至多次排序。


2、MapReduce中分組的概念和使用
    分割槽的目的是根據Key值決定Mapper的輸出記錄被送到哪一個Reducer上去處理。而分組的就比較好理解了。筆者認為,分組就是與記錄的Key相關。在同一個分割槽裡面,具有相同Key值的記錄是屬於同一個分組的。


3、MapReduce中Combiner的使用
        很多MapReduce程式受限於叢集上可用的頻寬,所以它會盡力最小化需要在map和reduce任務之間傳輸的中間資料。Hadoop允許使用者宣告一個combiner function來處理map的輸出,同時把自己對map的處理結果作為reduce的輸入。因為combiner function本身只是一種優化,hadoop並不保證對於某個map輸出,這個方法會被呼叫多少次。換句話說,不管combiner function被呼叫多少次,對應的reduce輸出結果都應該是一樣的。


  下面我們以《權威指南》的例子來加以說明,假設1950年的天氣資料讀取是由兩個map完成的,其中第一個map的輸出如下:
  (1950, 0)
  (1950, 20)
  (1950, 10)


第二個map的輸出為:
       (1950, 25)
       (1950, 15)


而reduce得到的輸入為:(1950, [0, 20, 10, 25, 15]), 輸出為:(1950, 25)


  由於25是集合中的最大值,我們可以使用一個類似於reduce function的combiner function來找出每個map輸出中的最大值,這樣的話,reduce的輸入就變成了:
  (1950, [20, 25])


  各個funciton 對溫度值的處理過程可以表示如下:max(0, 20, 10, 25, 15) =max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25


  注意:並不是所有的函式都擁有這個屬性的(有這個屬性的函式我們稱之為commutative和associative),例如,如果我們要計算平均溫度,就不能這樣使用combiner function,因為mean(0, 20, 10, 25, 15) =14,而mean(mean(0, 20, 10),mean(25, 15)) = mean(10, 20) = 15


  combiner function並不能取代reduce function(因為仍然需要reduce function處理來自不同map的帶有相同key的記錄)。但是他可以幫助減少需要在map和reduce之間傳輸的資料,就為這一點combiner function就值得考慮使用。


4、Shuffle階段排序流程詳解
        我們首先看一下MapReduce中的排序的總體流程。


        MapReduce框架會確保每一個Reducer的輸入都是按Key進行排序的。一般,將排序以及Map的輸出傳輸到Reduce的過程稱為混洗(shuffle)。每一個Map都包含一個環形的快取,預設100M,Map首先將輸出寫到快取當中。當快取的內容達到“閾值”時(閾值預設的大小是快取的80%),一個後臺執行緒負責將結果寫到硬碟,這個過程稱為“spill”。Spill過程中,Map仍可以向快取寫入結果,如果快取已經寫滿,那麼Map進行等待。


Spill的具體過程如下:首先,後臺執行緒根據Reducer的個數將輸出結果進行分組,每一個分組對應一個Reducer。其次,對於每一個分組後臺執行緒對輸出結果的Key進行排序。在排序過程中,如果有Combiner函式,則對排序結果進行Combiner函式進行呼叫。每一次spill都會在硬碟產生一個spill檔案。因此,一個Map task有可能會產生多個spill檔案,當Map寫出最後一個輸出時,會將所有的spill檔案進行合併與排序,輸出最終的結果檔案。在這個過程中Combiner函式仍然會被呼叫。從整個過程來看,Combiner函式的呼叫次數是不確定的。下面我們重點分析下Shuffle階段的排序過程:


        Shuffle階段的排序可以理解成兩部分,一個是對spill進行分割槽時,由於一個分割槽包含多個key值,所以要對分割槽內的<key,value>按照key進行排序,即key值相同的一串<key,value>存放在一起,這樣一個partition內按照key值整體有序了。


        第二部分並不是排序,而是進行merge,merge有兩次,一次是map端將多個spill 按照分割槽和分割槽內的key進行merge,形成一個大的檔案。第二次merge是在reduce端,進入同一個reduce的多個map的輸出 merge在一起,該merge理解起來有點複雜,最終不是形成一個大檔案,而且期間資料在記憶體和磁碟上都有。所以shuffle階段的merge並不是嚴格的排序意義,只是將多個整體有序的檔案merge成一個大的檔案,由於不同的task執行map的輸出會有所不同,所以merge後的結果不是每次都相同,不過還是嚴格要求按照分區劃分,同時每個分割槽內的具有相同key的<key,value>對挨在一起。


        Shuffle排序綜述:如果只定義了map函式,沒有定義reduce函式,那麼輸入資料經過shuffle的排序後,結果為key值相同的輸出挨在一起,且key值小的一定在前面,這樣整體來看key值有序(巨集觀意義的,不一定是按從大到小,因為如果採用預設的HashPartitioner,則key 的hash值相等的在一個分割槽,如果key為IntWritable的話,每個分割槽內的key會排序好的),而每個key對應的value不是有序的。


5、MapReduce中輔助排序的原理與實現
(1)任務
我們需要把內容如下的sample.txt檔案處理為下面檔案:


原始檔:Sample.txt


bbb 654


ccc 534


ddd 423


aaa 754


bbb 842


ccc 120


ddd 219


aaa 344


bbb 214


ccc 547


ddd 654


aaa 122


bbb 102


ccc 479


ddd 742


aaa 146


目標:part-r-00000


aaa 122


bbb 102


ccc 120


ddd 219


(2)工作原理
   過程導引:
   1、定義包含記錄值和自然值的組合鍵,本例中為MyPariWritable.


   2、自定義鍵的比較器(comparator)來根據組合鍵對記錄進行排序,即同時利用自然鍵和自然值進行排序。(aaa 122組合為一個鍵)。


   3、針對組合鍵的Partitioner(本示例使用預設的hashPartitioner)和分組comparator在進行分割槽和分組時均只考慮自然鍵。


   詳細過程:
首先在map階段,使用job.setInputFormatClass定義的InputFormat將輸入的資料集分割成小資料塊splites,同時InputFormat提供一個RecordReder的實現。本例子中使用的是TextInputFormat,他提供的RecordReder會將文字的一行的行號作為key,這一行的文字作為value。這就是自定義Map的輸入是<LongWritable, Text>的原因。然後呼叫自定義Map的map方法,將一個個<LongWritable, Text>對輸入給Map的map方法。注意輸出應該符合自定義Map中定義的輸出< MyPariWritable, NullWritable>。最終是生成一個List< MyPariWritable, NullWritable>。在map階段的最後,會先呼叫job.setPartitionerClass對這個List進行分割槽,每個分割槽對映到一個reducer。每個分割槽內又呼叫job.setSortComparatorClass設定的key比較函式類排序。可以看到,這本身就是一個二次排序。在reduce階段,reducer接收到所有對映到這個reducer的map輸出後,也是會呼叫job.setSortComparatorClass設定的key比較函式類對所有資料對排序。然後開始構造一個key對應的value迭代器。這時就要用到分組,使用jobjob.setGroupingComparatorClass設定的分組函式類。只要這個比較器比較的兩個key相同,他們就屬於同一個組(本例中由於要求得每一個分割槽內的最小值,因此比較MyPariWritable型別的Key時,只需要比較自然鍵,這樣就能保證只要兩個MyPariWritable的自然鍵相同,則它們被送到Reduce端時候的Key就認為在相同的分組,由於該分組的Key只取分組中的第一個,而這些資料已經按照自定義MyPariWritable比較器排好序,則第一個Key正好包含了每一個自然鍵對應的最小值),它們的value放在一個value迭代器,而這個迭代器的key使用屬於同一個組的所有key的第一個key。最後就是進入Reducer的reduce方法,reduce方法的輸入是所有的key和它的value迭代器。同樣注意輸入與輸出的型別必須與自定義的Reducer中宣告的一致。