Hadoop之 MapReduce 的核心知識點
mapreduce
什麼是mapreduce ?
MapReduce是一種程式設計模型,用於大規模資料集的並行運算。概念”Map(對映)”和”Reduce(歸約)”,是它們的主要思想,。它極大地方便了程式設計人員在不會分散式並行程式設計的情況下,將自己的程式執行在分散式系統上。 當前的軟體實現是指定一個Map(對映)函式,用來把一組鍵值對(Mapper的輸入鍵值)對映成一組新的鍵值對(Mapper的輸出鍵值),指定併發的Reduce(歸約)函式,用來保證所有對映的鍵值對中的每一個共享相同的鍵組(即相同的鍵的資料傳送到同一個reduce上,並進行合併處理)。
對映和化簡
簡單說來,一個對映函式就是對一些獨立元素組成的概念上的列表。事實上,每個元素都是被獨立操作的。這就是說,Map操作是可以高度並行的,這對高效能要求的應用以及平行計算領域的需求非常有用。 而化簡操作指的是對一個列表的元素進行適當的合併(key 相同的合併).
分佈可靠
MapReduce通過把對資料集的大規模操作分發給網路上的每個節點實現可靠性;每個節點會週期性的返回它所完成的工作和最新的狀態。如果一個節點保持沉默超過一個預設的時間間隔,主節點(類同Google File System中的主伺服器)記錄下這個節點狀態為死亡,並把分配給這個節點的資料發到別的節點。每個操作使用命名檔案的原子操作以確保不會發生並行執行緒間的衝突;當檔案被改名的時候,系統可能會把他們複製到任務名以外的另一個名字上去。(避免副作用)。
化簡操作工作方式與之類似,但是由於化簡操作的可並行性相對較差,主節點會盡量把化簡操作只分配在一個節點上,或者離需要操作的資料儘可能近的節點上。
Mapreduce的工作原理是什麼?
一切都是從最上方的user program開始的,user program連結了MapReduce庫,實現了最基本的Map函式和Reduce函式。圖中執行的順序都用數字標記了。
1.MapReduce庫先把user program的輸入檔案劃分為M份(M為使用者定義),每一份通常有16MB到64MB,如圖左方所示分成了split0~4;然後使用fork將使用者程序拷貝到叢集內其它機器上。
2.user program的副本中有一個稱為master,其餘稱為worker,master是負責排程的,為空閒worker分配作業(Map作業或者Reduce作業),worker的數量也是可以由使用者指定的。
3.被分配了Map作業的worker,開始讀取對應分片的輸入資料,Map作業數量是由M決定的,和split一一對應;Map作業從輸入資料中抽取出鍵值對,每一個鍵值對都作為引數傳遞給map函式,map函式產生的中間鍵值對被快取在記憶體中。
4.快取的中間鍵值對會被定期寫入本地磁碟,而且被分為R個區,R的大小是由使用者定義的,將來每個區會對應一個Reduce作業;這些中間鍵值對的位置會被通報給master,master負責將資訊轉發給Reduce worker。
5.master通知分配了Reduce作業的worker它負責的分割槽在什麼位置(肯定不止一個地方,每個Map作業產生的中間鍵值對都可能對映到所有R個不同分割槽),當Reduce worker把所有它負責的中間鍵值對都讀過來後,先對它們進行排序,使得相同鍵的鍵值對聚集在一起。因為不同的鍵可能會對映到同一個分割槽也就是同一個Reduce作業(誰讓分割槽少呢),所以排序是必須的。
6.reduce worker遍歷排序後的中間鍵值對,對於每個唯一的鍵,都將鍵與關聯的值傳遞給reduce函式,reduce函式產生的輸出會新增到這個分割槽的輸出檔案中。
7.當所有的Map和Reduce作業都完成了,master喚醒正版的user program,MapReduce函式呼叫返回user program的程式碼。
所有執行完畢後,MapReduce輸出放在了R個分割槽的輸出檔案中(分別對應一個Reduce作業)。使用者通常並不需要合併這R個檔案,而是將其作為輸入交給另一個MapReduce程式處理。整個過程中,輸入資料是來自底層分散式檔案系統(GFS)的,中間資料是放在本地檔案系統的,最終輸出資料是寫入底層分散式檔案系統(GFS)的。而且我們要注意Map/Reduce作業和map/reduce函式的區別:Map作業處理一個輸入資料的分片,可能需要呼叫多次map函式來處理每個輸入鍵值對;Reduce作業處理一個分割槽的中間鍵值對,期間要對每個不同的鍵呼叫一次reduce函式,Reduce作業最終也對應一個輸出檔案。
Mapreduce的工作流程是什麼?
Map -> Reduce
MapReduce其實是分治演算法的一種實現,所謂分治演算法就是“就是分而治之”,將大的問題分解為相同型別的子問題(最好具有相同的規模),對子問題進行求解,然後合併成大問題的解。MapReduce就是分治法的一種,將輸入進行分片,然後交給不同的task進行處理,然後合併成最終的解。具體流程圖如下:
MapReduce實際的處理過程可以理解為Input->Map->Sort->Combine->Partition->Reduce->Output。
(1)Input階段 資料以一定的格式傳遞給Mapper,有TextInputFormat,DBInputFormat,SequenceFileFormat等可以使用,在Job.setInputFormat可以設定,也可以自定義分分片函式。
(2)Map階段 對輸入的key,value進行處理,即map(k1,v1) -> list(k2,v2),使用Job.setMapperClass進行設定。
(3) Sort階段 對於Mapper的輸出進行排序,使用Job.setOutputKeyComparatorClass進行設定,然後定義排序規則
(4) Combine階段 這個階段對於Sort之後有相同key的結果進行合併,使用Job.setCombinerClass進行設定,也可以自定義Combine Class類。
(5) Partition階段 將Mapper的中間結果按照Key的範圍劃分為R份(Reduce作業的個數),預設使用HashPatitioner(key.hashCode() & Integer.MAX_VALUE) % numPartitions),也可以自定義劃分的函式。使用Job.setPartitionClass進行設定。
(6) Reduce階段 對於Mapper的結果進一步進行處理,Job.setReducerClass進行設定自定義的Reduce類。
(7) Output階段 Reducer輸出資料的格式。
Mapreduce的程式設計模型是什麼?
MapReduce將作業的整個執行過程分為兩個階段:Map階段和Reduce階段
Map階段由一定數量的Map Task組成
輸入資料格式解析:InputFormat
輸入資料處理:Mapper
資料分組:Partitioner
Reduce階段由一定數量的Reduce Task組成
資料遠端拷貝
資料按照key排序
資料處理:Reducer
資料輸出格式:OutputFormat
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
InputFormat
InputFormat 負責處理MR的輸入部分.
有三個作用:
1驗證作業的輸入是否規範.
2把輸入檔案切分成InputSplit. (處理跨行問題)
3提供RecordReader 的實現類,把InputSplit讀到Mapper中進行處理.
- 1
- 2
- 3
- 4
- 5
- 6
InputFormat類的層次結構:
Split與Block
Split與Block簡介
Block: HDFS中最小的資料儲存單位,預設是128MB
Split: MapReduce中最小的計算單元,預設與Block一一對應
Block與Split: Split與Block是對應關係是任意的,可由使用者控制.
- 1
- 2
- 3
- 4
- 5
InputSplit 在執行mapreduce之前,原始資料被分割成若干split,每個split作為一個map任務的輸入,在map執行過程中split會被分解成一個個記錄(key-value對),map會依次處理每一個記錄。
FileInputFormat只劃分比HDFS block大的檔案,所以FileInputFormat劃分的結果是這個檔案或者是這個檔案中的一部分.
如果一個檔案的大小比block小,將不會被劃分,這也是Hadoop處理大檔案的效率要比處理很多小檔案的效率高的原因。
當Hadoop處理很多小檔案(檔案大小小於hdfs block大小)的時候,由於FileInputFormat不會對小檔案進行劃分,所以每一個小檔案都會被當做一個split並分配一個map任務,導致效率底下。例如:一個1G的檔案,會被劃分成16個64MB的split,並分配16個map任務處理,而10000個100kb的檔案會被10000個map任務處理。
TextInputFormat
1.TextInputformat是預設的處理類,處理普通文字檔案。
2.檔案中每一行作為一個記錄,他將每一行在檔案中的起始偏移量作為key,每一行的內容作為value。
3.預設以\n或回車鍵作為一行記錄。
4.TextInputFormat繼承了FileInputFormat。
- 1
- 2
- 3
- 4
- 5
其他輸入類
- CombineFileInputFormat 相對於大量的小檔案來說,hadoop更合適處理少量的大檔案。CombineFileInputFormat可以緩解這個問題,它是針對小檔案而設計的。
- KeyValueTextInputFormat 當輸入資料的每一行是兩列,並用tab分離的形式的時候, KeyValueTextInputformat處理這種格式的檔案非常適合。
- NLineInputformat NLineInputformat可以控制在每個split中資料的行數。
- SequenceFileInputformat 當輸入檔案格式是sequencefile的時候,要使用SequenceFileInputformat作為輸入。
- 等等。。。具體可檢視InputFormat 介面的實現類。
Combiner
每一個map可能會產生大量的輸出,combiner的作用就是在map端對輸出先做一次合併,以減少傳輸到reducer的資料量,因為從map端到reduce端會涉及到網路IO(檔案傳輸)。
combiner最基本是實現本地key的歸併,combiner具有類似本地的reduce功能,合併相同的key對應的value(wordcount例子),通常與Reducer邏輯一樣。
如果不用combiner,那麼,所有的結果都是reduce完成,效率會相對低下。使用combiner,先完成的map會在本地聚合,提升速度。
好處:①減少Map Task輸出資料量(磁碟IO)②減少Reduce-Map網路傳輸資料量(網路IO) 【注意:Combiner的輸出是Reducer的輸入,如果Combiner是可插拔(可有可無)的,新增Combiner絕不能改變最終的計算結果。所以Combiner只應該用於那種Reduce的輸入key/value與輸出key/value型別完全一致,且不影響最終結果的場景。比如累加,最大值等。】
Partitioner
Partitioner決定了Map Task輸出的每條資料交給哪個Reduce Task處理
預設實現:HashPartitioner是mapreduce的預設partitioner。計算方法是 reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到當前的目的reducer。(hash(key) mod R 其中R是Reduce Task數目)
允許使用者自定義 很多情況需自定義Partitioner比如“hash(hostname(URL)) mod R”確保相同域名的網頁交給同一個Reduce Task處理
Reduce的輸出
TextOutputformat 預設的輸出格式,key和value中間值用tab隔開的。
SequenceFileOutputformat 將key和value以sequencefile格式輸出。
SequenceFileAsOutputFormat 將key和value以原始二進位制的格式輸出。
MapFileOutputFormat 將key和value寫入MapFile中。由於MapFile中的key是有序的,所以寫入的時候必須保證記錄是按key值順序寫入的。
MultipleOutputFormat 預設情況下一個reducer會產生一個輸出,但是有些時候我們想一個reducer產生多個輸出,MultipleOutputFormat和MultipleOutputs可以實現這個功能。(還可以自定義輸出格式,序列化會說到)
MapReduce程式設計模型總結
1.Map階段
InputFormat(預設TextInputFormat)
Mapper
Combiner(local reducer)
Partitioner
2.Reduce階段
Reducer
OutputFormat(預設TextOutputFormat)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
shuffle是什麼?
系統執行排序的過程(即將map輸出作為輸入傳給reduce),稱為shuffle.即這張圖是官方對Shuffle過程的描述,hadoop的核心思想是MapReduce,但shuffle又是MapReduce的核心(心臟)。shuffle的主要工作是從Map結束到Reduce開始之間的過程。首先看下這張圖,就能瞭解shuffle所處的位置。圖中的partitions、copy phase、sort phase所代表的就是shuffle的不同階段(大致範圍)。也可以這樣理解, Shuffle描述著資料從map task輸出到reduce task輸入的這段過程。
map 端的Shuffle細節: 1. 在map task執行時,它的輸入資料來源於HDFS的block。(map函式產生輸出時,利用緩衝的方式寫入記憶體,並出於效率考慮的方式就行預排序。如上圖。此處預設的記憶體大小為100M,可通過io.sort.mr 來設定,當此緩衝區程式設計 %80的時候 ,一個後臺執行緒就會將內容寫入磁碟。在寫入磁碟之前,執行緒首先根據最終要傳的reduce把這些資料劃分成相應的分割槽(partition),在每個分割槽中,後臺執行緒進行內排序,如果有combine,就會在排序後的分割槽內執行。)
在經過mapper的執行後,我們得知mapper的輸出是這樣一個key/value對:相同的key 到底應該交由哪個reduce去做,是現在決定的,也就是partition 作用。MapReduce提供Partitioner介面,它的作用就是根據key或value及reduce的數量來決定當前的這對輸出資料最終應該交由哪個reduce task處理。預設對key hash後再以reduce task數量取模。預設的取模方式只是為了平均reduce的處理能力,如果使用者自己對Partitioner有需求,可以訂製並設定到job上。
接下來,需要將資料寫入記憶體緩衝區中,緩衝區的作用是批量收集map結果,減少磁碟IO的影響。key/value對以及Partition的結果都會被寫入緩衝區,當然寫入之前,key與value值都會被序列化成位元組陣列。
reduce task在執行之前的工作就是不斷地拉取當前job裡每個map task的最終結果,然後對從不同地方拉取過來的資料不斷地做merge(合併),也最終形成一個檔案作為reduce task的輸入檔案。
在將壓縮的map輸出進行壓縮是個好主意,預設情況下,是不壓縮的,若啟用壓縮,需要手動配置將mapred.compress.map.output 設定為true,並配置mapred.compress.map.codec 。
reduce 端的Shuffle細節:
- Copy過程,簡單地拉取資料。reduce 通過http的方式得到輸出檔案的分割槽。
- Merge階段
- reduce task在執行之前的工作就是不斷地拉取當前job裡每個map task的最終結果,然後對從不同地方拉取過來的資料不斷地做merge(合併),也最終形成一個檔案作為reduce task的輸入檔案。
partition是什麼?
Map的結果,會通過partition分發到Reducer上,Reducer做完Reduce操作後,通過OutputFormat,進行輸出.partition是分割map每個節點的結果,按照key分別對映給不同的reduce,也是可以自定義的。這裡其實可以理解歸類。
Mapper的結果,可能送到Combiner做合併,Combiner在系統中並沒有自己的基類,而是用Reducer作為Combiner的基類,他們對外的功能是一樣的,只是使用的位置和使用時的上下文不太一樣而已。Mapper最終處理的鍵值對
int getPartition(Text key, Text value, int numPartitions) ;
- 1
就是指定Mappr輸出的鍵值對到哪一個reducer上去。系統預設的Partitioner是HashPartitioner,它以key的Hash值對Reducer的數目取模,得到對應的Reducer。這樣保證如果有相同的key值,肯定被分配到同一個reducre上。如果有N個reducer,編號就為0,1,2,3……(N-1)。
combiner是什麼?
combine分為map端和reduce端,作用是把同一個key的鍵值對合並在一起,可以自定義的。combine函式把一個map函式產生的<\key,value>對(多個key,value)合併成一個新的<\key2,value2>.將新的<\key2,value2>作為輸入到reduce函式中這個value2亦可稱之為values,因為有多個。這個合併的目的是為了減少網路傳輸。具體實現是由Combine類。
他們三者之間的關係是什麼?
map的個數由誰來決定,如何計算?
在map階段讀取資料前,FileInputFormat會將輸入檔案分割成split。split的個數決定了map的個數。塊大小、檔案大小、檔案數、以及splitsize大小有關。
影響map個數,即split個數的因素主要有:
1)HDFS塊的大小,即HDFS中dfs.block.size的值。如果有一個輸入檔案為1024m,當塊為256m時,會被劃分為4個split;當塊為128m時,會被劃分為8個split。
2)檔案的大小。當塊為128m時,如果輸入檔案為128m,會被劃分為1個split;當塊為256m,會被劃分為2個split。
3)檔案的個數。FileInputFormat按照檔案分割split,並且只會分割大檔案,即那些大小超過HDFS塊的大小的檔案。如果HDFS中dfs.block.size設定為128m,而輸入的目錄中檔案有100個,則劃分後的split個數至少為100個。
4)splitsize的大小。分片是按照splitszie的大小進行分割的,一個split的大小在沒有設定的情況下,預設等於hdfs block的大小。但應用程式可以通過兩個引數來對splitsize進行調節。
map個數的計算公式如下:
splitsize=max(minimumsize,min(maximumsize,blocksize))。如果沒有設定minimumsize和maximumsize,splitsize的大小預設等於blocksize
reduce個數由誰來決定,如何計算?
當setNumReduceTasks( int a) a=1(即預設值),不管Partitioner返回不同值的個數b為多少,只啟動1個reduce,這種情況下自定義的Partitioner類沒有起到任何作用。
若a!=1: 當setNumReduceTasks( int a)裡 a設定小於Partitioner返回不同值的個數b的話:比如說自定義的partitioner 返回10,但setNumReduceTasks(2) 這樣就會丟擲異常
java.io.IOException: Illegal partition
- 1
某些key沒有找到所對應的reduce去處。原因是隻啟動了a個reduce。 3. 同樣,若partitioner 返回1(比如key設定為 NullWritable的時候),而設定了10個reduce,setNumReduceTasks(10),那麼此時就會出現 結果檔案為10個,只有一個有資料,其餘九個為空檔案。
所以,reduce的個數可以自定義,但是計算的時候最好是和partitioner時一樣,最好相差不要太多。