1. 程式人生 > >hadoop之mapreduce詳解(優化篇)

hadoop之mapreduce詳解(優化篇)

一、概述

     優化前我們需要知道hadoop適合幹什麼活,適合什麼場景,在工作中,我們要知道業務是怎樣的,能才結合平臺資源達到最有優化。除了這些我們當然還要知道mapreduce的執行過程,比如從檔案的讀取,map處理,shuffle過程,reduce處理,檔案的輸出或者儲存。在工作中,往往平臺的引數都是固定的,不可能為了某一個作業去修改整個平臺的引數,所以在作業的執行過程中,需要對作業進行單獨的設定,這樣既不會對其他作業產生影響,也能很好的提高作業的效能,提高優化的靈活性。

現在回顧下hadoop的優勢(適用場景):
1、可構建在廉價機器上,裝置成本相對低

2、高容錯性,HDFS將資料自動儲存多個副本,副本丟失後,自動恢復,防止資料丟失或損壞
3、適合批處理,HDFS適合一次寫入、多次查詢(讀取)的情況,適合在已有的資料進行多次分析,穩定性好
4、適合儲存大檔案,其中的大表示可以儲存單個大檔案,因為是分塊儲存,以及表示儲存大量的資料

二、小檔案優化

從概述中我們知道,很明顯hadoop適合大檔案的處理和儲存,那為什麼不適合小檔案呢?

1、從儲存方面來說:hadoop的儲存每個檔案都會在NameNode上記錄元資料,如果同樣大小的檔案,檔案很小的話,就會產生很多檔案,造成NameNode的壓力。
2、從讀取方面來說:同樣大小的檔案分為很多小檔案的話,會增加磁碟定址次數,降低效能

3、從計算方面來說:我們知道一個map預設處理一個分片或者一個小檔案,如果map的啟動時間都比資料處理的時間還要長,那麼就會造成效能低,而且在map端溢寫磁碟的時候每一個map最終會產生reduce數量個數的中間結果,如果map數量特別多,就會造成臨時檔案很多,而且在reduce拉取資料的時候增加磁碟的IO。

好,我們明白小檔案造成的弊端之後,那我們應該怎麼處理這些小檔案呢?

1、從源頭幹掉,也就是在hdfs上我們不儲存小檔案,也就是資料上傳hdfs的時候我們就合併小檔案
2、在FileInputFormat讀取入資料的時候我們使用實現類CombineFileInputFormat讀取資料,在讀取資料的時候進行合併。

三、資料傾斜問題優化

我們都知道mapreduce是一個並行處理,那麼處理的時間肯定是作業中所有任務最慢的那個了,可謂木桶效應?為什麼會這樣呢?

1、資料傾斜,每個reduce處理的資料量不是同一個級別的,所有導致有些已經跑完了,而有些跑的很慢。
2、還有可能就是某些作業所在的NodeManager有問題或者container有問題,導致作業執行緩慢。

那麼為什麼會產生資料傾斜呢?

資料本身就不平衡,所以在預設的hashpartition時造成分割槽資料不一致問題,還有就是程式碼設計不合理等。

那如何解決資料傾斜的問題呢?

1、既然預設的是hash演算法進行分割槽,那我們自定義分割槽,修改分割槽實現邏輯,結合業務特點,使得每個分割槽資料基本平衡
2、既然有預設的分割槽演算法,那麼我們可以修改分割槽的鍵,讓其符合hash分割槽,並且使得最後的分割槽平衡,比如在key前加隨機數n-key。
3、既然reduce處理慢,我們可以增加reduce的記憶體和vcore呀,這樣挺高效能就快了,雖然沒從根本上解決問題,但是還有效果
4、既然一個reduce處理慢,那我們可以增加reduce的個數來分攤一些壓力呀,也不能根本解決問題,還是有一定的效果。

那麼如果不是資料傾斜帶來的問題,而是節點服務有問題造成某些map和reduce執行緩慢呢?

那麼我們可以使用推測執行呀,你跑的慢,我們可以找個其他的節點重啟一樣的任務競爭,誰快誰為準。推測執行時以空間換時間的優化。會帶來叢集資源的浪費,會給叢集增加壓力,所以我司叢集的推測執行都是關閉的。其實在作業執行的時候可以偷偷開啟的呀

推測執行引數控制:

mapreduce.map.speculative
mapreduce.reduce.speculative

四、mapreduce過程優化

4.1、map端

上面我們從hadoop的特性場景等聊了下mapreduce的優化,接下來我們從mapreduce的執行過程進行優化。

好吧,我們就從源頭開始說,從資料的讀取以及map數的確定:

    在前面我們聊過小檔案的問題,所以在資料的讀取這裡也可以做優化,所以選擇一個合適資料的檔案的讀取類(FIleInputFormat的實現類)也很重要我們在作業提交的過程中,會把jar,分片資訊,資源資訊提交到hdfs的臨時目錄,預設會有10個複本,通過引數mapreduce.client.submit.file.replication控制後期作業執行都會去下載這些東西到本地,中間會產生磁碟IO,所以如果叢集很大的時候,可以增加該值,提高下載的效率。

分片的計算公式:

計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize))
minSize的預設值是1,而maxSize的預設值是long型別的最大值,即可得切片的預設大小是blockSize(128M)
maxSize引數如果調得比blocksize小,則會讓切片變小,而且就等於配置的這個引數的值
minSize引數調的比blockSize大,則可以讓切片變得比blocksize還大

     因為map數沒有具體的引數指定,所以我們可以通過如上的公式調整切片的大小,這樣我們就可以設定map數了,那麼問題來了,map數該如何設定呢?

這些東西一定要結合業務,map數太多,會產生很多中間結果,導致reduce拉取資料變慢,太少,每個map處理的時間又很長,結合資料的需求,可以把map的執行時間調至到一分鐘左右比較合適,那如果資料量就是很大呢,我們有時候還是需要控制map的數量,這個時候每個map的執行時間就比較長了,那麼我們可以調整每個map的資源來提升map的處理能力呀,我司就調整了mapreduce.map.memory.mb=3G(預設1G)mapreduce.map.cpu.vcores=1(預設也是1)

從源頭上我們確定好map之後。那麼接下來看map的具體執行過程咯。

首先寫環形換衝區,那為啥要寫環形換衝區呢,而不是直接寫磁碟呢?這樣的目的主要是為了減少磁碟i/o。

每個Map任務不斷地將鍵值對輸出到在記憶體中構造的一個環形資料結構中。使用環形資料結構是為了更有效地使用記憶體空間,在記憶體中放置儘可能多的資料。執行流程是,該緩衝預設100M(mapreduce.task.io.sort.mb引數控制),當到達80%(mapreduce.map.sort.spill.percent引數控制)時就會溢寫磁碟。每達到80%都會重寫溢寫到一個新的檔案。那麼,我們完全可以根據機器的配置和資料來兩種這兩個引數,當記憶體足夠,我們增大mapreduce.task.io.sort.mb完全會提高溢寫的過程,而且會減少中間結果的檔案數量。我司調整mapreduce.task.io.sort.mb=512。當檔案溢寫完後,會對這些檔案進行合併,預設每次合併10(mapreduce.task.io.sort.factor引數控制)個溢寫的檔案,我司調整mapreduce.task.io.sort.factor=64。這樣可以提高合併的並行度,減少合併的次數,降低對磁碟操作的次數。

mapreduce.shuffle.max.threads(預設為0,表示可用處理器的兩倍),該引數表示每個節點管理器的工作執行緒,用於map輸出到reduce。

那麼map算是完整了,在reduce拉取資料之前,我們完全還可以combiner呀(不影響最終結果的情況下),此時會根據Combiner定義的函式對map的結果進行合併這樣就可以減少資料的傳輸,降低磁碟io,提高效能了。

終於走到了map到reduce的資料傳輸過程了:
這中間主要的影響無非就是磁碟IO,網路IO,資料量的大小了(是否壓縮),其實減少資料量的大小,就可以做到優化了,所以我們可以選擇性壓縮資料,這樣在傳輸的過程中
就可以降低磁碟IO,網路IO等。可以通過mapreduce.map.output.compress(default:false)設定為true進行壓縮,資料會被壓縮寫入磁碟,讀資料讀的是壓縮資料需要解壓,在實際經驗中Hive在Hadoop的執行的瓶頸一般都是IO而不是CPU,壓縮一般可以10倍的減少IO操作,壓縮的方式Gzip,Lzo,BZip2,Lzma等,其中Lzo是一種比較平衡選擇,mapreduce.map.output.compress.codec(default:org.apache.hadoop.io.compress.DefaultCodec)引數設定。我司使用org.apache.hadoop.io.compress.SnappyCodec演算法,但這個過程會消耗CPU,適合IO瓶頸比較大。

mapreduce.task.io.sort.mb        #排序map輸出所需要使用記憶體緩衝的大小,以兆為單位, 預設為100
mapreduce.map.sort.spill.percent #map輸出緩衝和用來磁碟溢寫過程的記錄邊界索引,這兩者使用的閾值,預設0.8
mapreduce.task.io.sort.factor    #排序檔案時,一次最多合併的檔案數,預設10
mapreduce.map.output.compress    #在map溢寫磁碟的過程是否使用壓縮,預設false
org.apache.hadoop.io.compress.SnappyCodec  #map溢寫磁碟的壓縮演算法,預設org.apache.hadoop.io.compress.DefaultCodec
mapreduce.shuffle.max.threads    #該引數表示每個節點管理器的工作執行緒,用於map輸出到reduce,預設為0,表示可用處理器的兩倍

4.1、reduce端

接下來就是reduce了,首先我們可以通過引數設定合理的reduce個數(mapreduce.job.reduces引數控制),以及通過引數設定每個reduce的資源,mapreduce.reduce.memory.mb=5G(預設1G)
mapreduce.reduce.cpu.vcores=1(預設為1)。

reduce在copy的過程中預設使用5(mapreduce.reduce.shuffle.parallelcopies引數控制)個並行度進行復制資料,我司調了mapreduce.reduce.shuffle.parallelcopies=100.reduce的每一個下載執行緒在下載某個map資料的時候,有可能因為那個map中間結果所在機器發生錯誤,或者中間結果的檔案丟失,或者網路瞬斷等等情況,這樣reduce的下載就有可能失敗,所以reduce的下載執行緒並不會無休止的等待下去,當一定時間後下載仍然失敗,那麼下載執行緒就會放棄這次下載,並在隨後嘗試從另外的地方下載(因為這段時間map可能重跑)。reduce下載執行緒的這個最大的下載時間段是可以通過mapreduce.reduce.shuffle.read.timeout(default180000秒)調整的。

Copy過來的資料會先放入記憶體緩衝區中,然後當使用記憶體達到一定量的時候才spill磁碟。這裡的緩衝區大小要比map端的更為靈活,它基於JVM的heap size設定。這個記憶體大小的控制就不像map一樣可以通過io.sort.mb來設定了,而是通過另外一個引數 mapreduce.reduce.shuffle.input.buffer.percent(default 0.7)控制的。意思是說,shuffile在reduce記憶體中的資料最多使用記憶體量為:0.7 × maxHeap of reduce task,記憶體到磁碟merge的啟動門限可以通過mapreduce.reduce.shuffle.merge.percent(default0.66)配置。

copy完成後,reduce進入歸併排序階段,合併因子預設為10(mapreduce.task.io.sort.factor引數控制),如果map輸出很多,則需要合併很多趟,所以可以提高此引數來減少合併次數。

mapreduce.reduce.shuffle.parallelcopies #把map輸出複製到reduce的執行緒數,預設5
mapreduce.task.io.sort.factor  #排序檔案時一次最多合併檔案的個數
mapreduce.reduce.shuffle.input.buffer.percent #在shuffle的複製階段,分配給map輸出緩衝區佔堆記憶體的百分比,預設0.7
mapreduce.reduce.shuffle.merge.percent #map輸出緩衝區的閾值,用於啟動合併輸出和磁碟溢寫的過程

 

更多hadoop生態文章見: hadoop生態