1. 程式人生 > >hive處理小檔案(進行map、reduce、壓縮、歸檔優化解決)

hive處理小檔案(進行map、reduce、壓縮、歸檔優化解決)

背景

Hive query將運算好的資料寫回hdfs(比如insert into語句),有時候會產生大量的小檔案,如果不採用CombineHiveInputFormat就對這些小檔案進行操作的話會產生大量的map task,耗費大量叢集資源,而且小檔案過多會對namenode造成很大壓力。所以Hive在正常job執行完之後,會起一個conditional task,來判斷是否需要合併小檔案,如果滿足要求就會另外啟動一個map-only job 或者mapred job來完成合並

 

一、    控制hive任務中的map數: 

1.    通常情況下,作業會通過input的目錄產生一個或者多個map任務。 

主要的決定因素有: input的檔案總個數,input的檔案大小,叢集設定的檔案塊大小(目前為128M, 可在hive中通過set dfs.block.size;命令檢視到,該引數不能自定義修改);

2.    舉例:假設input目錄下有1個檔案a,大小為780M,那麼hadoop會將該檔案a分隔成7個塊(6個128m的塊和1個12m的塊),從而產生7個map數。假設input目錄下有3個檔案a,b,c,大小分別為10m,20m,130m,那麼hadoop會分隔成4個塊10m,20m,128m,2m),從而產生4個map數。即,如果檔案大於塊大小(128m),那麼會拆分,如果小於塊大小,則把該檔案當成一個塊。

3.    map數越多越好?否定的。如果一個任務有很多小檔案(遠遠小於塊大小128m),則每個小檔案也會被當做一個塊,用一個map任務來完成,而一個map任務啟動和初始化的時間遠遠大於邏輯處理的時間,就會造成很大的資源浪費。而且,同時可執行的map數是受限的。

4.    是不是保證每個map處理接近128m的檔案塊,就可以了?也不一定。比如有一個127m的檔案,正常會用一個map去完成,但這個檔案只有一個或者兩個小欄位,卻有幾千萬的記錄,如果map處理的邏輯比較複雜,用一個map任務去做,肯定也比較耗時。

解決:減少map數

    假設一個SQL任務:select brand,("hour") time_type,("2018-12-04") times,("05") as hours,("201812") as months,("20181204") as days,count(distinct(openId)) brandCount from log_db.commodlog where create_time like "2018-12-04 05%" and openId !='' group by brand SORT BY brand ASC;

    該任務的inputdir  /user/hive/warehouse/log_db/commodlog/month=201812/day=20181204

    共有194個檔案,其中很多是遠遠小於128m的小檔案,總大小9G,正常執行會用194個map任務。

   Map總共消耗的計算資源: SLOTS_MILLIS_MAPS= 623,020

 

通過以下方法來在map執行前合併小檔案,減少map數:

啟動壓縮

set hive.exec.compress.output=true;

set mapreduce.output.fileoutputformat.compress=true; 
set mapreduce.input.fileinputformat.split.maxsize=1073741824;

set mapreduce.input.fileinputformat.split.minsize=1;

set mapreduce.input.fileinputformat.split.minsize.per.node=536870912;

set mapreduce.input.fileinputformat.split.minsize.per.rack=536870912;

經過測試,這種設定可以在map階段和並小檔案,減少map的數量。

注意:在測試的時候,如果檔案格式為Textfile,並且啟用lzo壓縮,不能生效。 rcfile 以及orc可以生效,Textfile不啟用lzo壓縮也可以生效。如果是新叢集的話,沒有歷史遺留的問題的話,建議hive都使用orc檔案格式,以及啟用lzo壓縮。     

       set mapred.max.split.size=100000000;

       set mapred.min.split.size.per.node=100000000;

       set mapred.min.split.size.per.rack=100000000;

       set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

        再執行上面的語句,用了74個map任務,map消耗的計算資源:SLOTS_MILLIS_MAPS= 333,500. 對於這個簡單SQL任務,執行時間上可能差不多,但節省了一半的計算資源。

   大概解釋一下,100000000表示100M

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;這個引數表示執行前進行小檔案合併,

         前面三個引數確定合併檔案塊的大小,大於檔案塊大小128m的,按照128m來分隔,小於128m,大於100m的,按照100m來分隔,把那些小於100m的(包括小檔案和分隔大檔案剩下的),

         進行合併,最終生成了74個塊。

         

解決:增加map數

         當input的檔案都很大,任務邏輯複雜,map執行非常慢的時候,可以考慮增加Map數,來使得每個map處理的資料量減少,從而提高任務的執行效率。

         假設有這樣一個任務:

         Select data_desc,

                count(1),

                count(distinct id),

                sum(case when …),

                sum(case when ...),

                sum(…)

        from a group by data_desc

                   如果表a只有一個檔案,大小為120M,但包含幾千萬的記錄,如果用1個map去完成這個任務,肯定是比較耗時的,這種情況下,我們要考慮將這一個檔案合理的拆分成多個,

                   這樣就可以用多個map任務去完成。

                   set mapred.reduce.tasks=10;

                   create table a_1 as 

                   select * from a 

                   distribute by rand(123); 

                   

                   這樣會將a表的記錄,隨機的分散到包含10個檔案的a_1表中,再用a_1代替上面sql中的a表,則會用10個map任務去完成。

                   每個map任務處理大於12M(幾百萬記錄)的資料,效率肯定會好很多。

    

   看上去,貌似這兩種有些矛盾,一個是要合併小檔案,一個是要把大檔案拆成小檔案,這點正是重點需要關注的地方,

   根據實際情況,控制map數量需要遵循兩個原則:使大資料量利用合適的map數;使單個map任務處理合適的資料量;

二、    控制hive任務的reduce數: 

1. 如何確定reduce數: 

reduce個數的設定極大影響任務執行效率,不指定reduce個數的情況下,Hive會猜測確定一個reduce個數,基於以下兩個設定:

hive.exec.reducers.bytes.per.reducer(每個reduce任務處理的資料量,預設為1000^3=1G) 

hive.exec.reducers.max(每個任務最大的reduce數,預設為1009)

計算reducer數的公式很簡單N=min(引數2,總輸入資料量/引數1)

即,如果reduce的輸入(map的輸出)總大小不超過1G,那麼只會有一個reduce任務;

如:select brand,("hour") time_type,("2018-12-04") times,("05") as hours,("201812") as months,("20181204") as days,count(distinct(openId)) brandCount from log_db.commodlog where create_time like "2018-12-04 05%" and openId !='' group by brand SORT BY brand ASC; 總大小為9G多,因此這句有10個reduce

2.    調整reduce個數方法一: 

調整hive.exec.reducers.bytes.per.reducer引數的值;

set hive.exec.reducers.bytes.per.reducer=500000000; (500M)

select brand,("hour") time_type,("2018-12-04") times,("05") as hours,("201812") as months,("20181204") as days,count(distinct(openId)) brandCount from log_db.commodlog where create_time like "2018-12-04 05%" and openId !='' group by brand SORT BY brand ASC;這次有20個reduce

3.    調整reduce個數方法二; 

set mapred.reduce.tasks = 15;

select brand,("hour") time_type,("2018-12-04") times,("05") as hours,("201812") as months,("20181204") as days,count(distinct(openId)) brandCount from log_db.commodlog where create_time like "2018-12-04 05%" and openId !='' group by brand SORT BY brand ASC;

這次有15個reduce

4.    reduce個數並不是越多越好; 

同map一樣,啟動和初始化reduce也會消耗時間和資源;

另外,有多少個reduce,就會有多少個輸出檔案,如果生成了很多個小檔案,那麼如果這些小檔案作為下一個任務的輸入,則也會出現小檔案過多的問題;

5.    什麼情況下只有一個reduce; 

很多時候你會發現任務中不管資料量多大,不管你有沒有設定調整reduce個數的引數,任務中一直都只有一個reduce任務;

其實只有一個reduce任務的情況,除了資料量小於hive.exec.reducers.bytes.per.reducer引數值的情況外,還有以下原因:

a)    沒有group by的彙總.這點非常常見,希望大家儘量改寫。

b)    用了Order by

c)    有笛卡爾積

通常這些情況下,除了找辦法來變通和避免,我暫時沒有什麼好的辦法,因為這些操作都是全域性的,所以hadoop不得不用一個reduce去完成;

同樣的,在設定reduce個數的時候也需要考慮這兩個原則:使大資料量利用合適的reduce數;使單個reduce任務處理合適的資料量;

 

MR作業結束後,判斷生成檔案的平均大小,如果小於閥值,就再啟動一個job來合併檔案

set hive.merge.mapredfiles=true;

set hive.merge.mapfiles=true;

set hive.merge.smallfiles.avgsize=268435456;

 

引數解釋

set hive.mergejob.maponly (預設為true)

如果hadoop版本支援CombineFileInputFormat,則啟動Map-only job for merge,否則啟動MapReduce merge job,map端combine file是比較高效的做法

 

hive.merge.mapfiles(預設為true)

正常的map-only job後,是否啟動merge job來合併map端輸出的結果

 

hive.merge.mapredfiles(預設為false)

正常的map-reduce job後,是否啟動merge job來合併reduce端輸出的結果,建議開啟

 

hive.merge.smallfiles.avgsize(預設為16MB)

如果不是partitioned table的話,輸出table檔案的平均大小小於這個值,啟動merge job,如果是partitioned table,則分別計算每個partition下檔案平均大小,只merge平均大小小於這個值的partition。這個值只有當hive.merge.mapfiles或hive.merge.mapredfiles設定為true時,才有效

hive.exec.reducers.bytes.per.reducer(預設為1G)

如果使用者不主動設定mapred.reduce.tasks數,則會根據input directory計算出所有讀入檔案的input summary size,然後除以這個值算出reduce number

reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);

reducers = Math.max(1, reducers);

reducers = Math.min(maxReducers, reducers);

 

hive.merge.size.per.task(預設是256MB)

merge job後每個檔案的目標大小(targetSize),用之前job輸出檔案的total size除以這個值,就可以決定merge job的reduce數目。merge job的map端相當於identity map,然後shuffle到reduce,每個reduce dump一個檔案,通過這種方式控制檔案的數量和大小

MapredWork work = (MapredWork) mrTask.getWork();

if (work.getNumReduceTasks() > 0) {

int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);

int reducers = (int) ((totalSize +targetSize - 1) / targetSize);

reducers = Math.max(1, reducers);

reducers = Math.min(maxReducers, reducers);

work.setNumReduceTasks(reducers);

}

 

mapred.max.split.size(預設256MB)

mapred.min.split.size.per.node(預設1 byte)

mapred.min.split.size.per.rack(預設1 byte)

這三個引數CombineFileInputFormat中會使用,Hive預設的InputFormat是CombineHiveInputFormat,裡面所有的呼叫(包括最重要的getSplits和getRecordReader)都會轉換成CombineFileInputFormat的呼叫,所以可以看成是它的一個包裝。CombineFileInputFormat 可以將許多小檔案合併成一個map的輸入,如果檔案很大,也可以對大檔案進行切分,分成多個map的輸入。一個CombineFileSplit對應一個map的輸入,包含一組path(hdfs路徑list),startoffset, lengths, locations(檔案所在hostname list)mapred.max.split.size是一個split 最大的大小,mapred.min.split.size.per.node是一個節點上(datanode)split至少的大小,mapred.min.split.size.per.rack是同一個交換機(rack locality)下split至少的大小通過這三個數的調節,組成了一串CombineFileSplit使用者可以通過增大mapred.max.split.size的值來減少Map Task數量

 

壓縮檔案的處理

對於輸出結果為壓縮檔案形式儲存的情況,要解決小檔案問題,如果在Map輸入前合併,對輸出的檔案儲存格式並沒有限制。但是如果使用輸出合併,則必須配合SequenceFile來儲存,否則無法進行合併,以下是示例:

set mapred.output.compression. type=BLOCK;

set hive.exec.compress.output= true;

set mapred.output.compression.codec=org.apache.hadoop.io.compress.LzoCodec;

set hive.merge.smallfiles.avgsize=100000000;

drop table if exists dw_stage.zj_small;

create table dw_stage.zj_small

STORED AS SEQUENCEFILE

as select *

from dw_db.dw_soj_imp_dtl

where log_dt = '2014-04-14'

and paid like '%baidu%' ;

使用HAR歸檔檔案

 

Hadoop的歸檔檔案格式也是解決小檔案問題的方式之一。而且Hive提供了原生支援:

set hive.archive.enabled= true;

set hive.archive.har.parentdir.settable= true;

set har.partfile.size=1099511627776;

ALTER TABLE srcpart ARCHIVE PARTITION(ds= '2008-04-08', hr= '12' );

ALTER TABLE srcpart UNARCHIVE PARTITION(ds= '2008-04-08', hr= '12' );

 

如果使用的不是分割槽表,則可建立成外部表,並使用har://協議來指定路徑。

 

結論

hive 通過上述幾個值來控制是否啟動merge file job,通常是建議大家都開啟,如果是一堆順序執行的作業鏈,只有最後一張表需要固化落地,中間表用好就刪除的話,可以在最後一個insert into table之前再開啟,防止之前的作業也會launch merge job使得作業變慢。