1. 程式人生 > >hive的查詢注意事項以及優化總結

hive的查詢注意事項以及優化總結

Hive是將符合SQL語法的字串解析生成可以在Hadoop上執行的MapReduce的工具。使用Hive儘量按照分散式計算的一些特點來設計sql,和傳統關係型資料庫有區別,

所以需要去掉原有關係型資料庫下開發的一些固有思維。

基本原則:

1:儘量儘早地過濾資料,減少每個階段的資料量,對於分割槽表要加分割槽,同時只選擇需要使用到的欄位

select ... from A

join B

on A.key = B.key

where A.userid>10

     and B.userid<10

        and A.dt='20120417'

        and B.dt='20120417';

應該改寫為:

select .... from (select .... from A

                  where dt='201200417'

                                    and userid>10

                              ) a

join ( select .... from B

       where dt='201200417'

                     and userid < 10   

     ) b

on a.key = b.key;

2、對歷史庫的計算經驗  (這項是說根據不同的使用目的優化使用方法)

   歷史庫計算和使用,分割槽

3:儘量原子化操作,儘量避免一個SQL包含複雜邏輯

可以使用中間表來完成複雜的邏輯   

4 jion操作   小表要注意放在join的左邊(目前TCL裡面很多都小表放在join的右邊)。

否則會引起磁碟和記憶體的大量消耗

5:如果union all的部分個數大於2,或者每個union部分資料量大,應該拆成多個insert into 語句,實際測試過程中,執行時間能提升50%

insert overwite table tablename partition (dt= ....)

select ..... from (

                   select ... from A

                   union all

                   select ... from B

                   union all

                   select ... from C

                               ) R

where ...;

可以改寫為:

insert into table tablename partition (dt= ....)

select .... from A

WHERE ...;

insert into table tablename partition (dt= ....)

select .... from B

WHERE ...;

insert into table tablename partition (dt= ....)

select .... from C

WHERE ...; 

5:寫SQL要先了解資料本身的特點,如果有join ,group操作的話,要注意是否會有資料傾斜

如果出現數據傾斜,應當做如下處理:

set hive.exec.reducers.max=200;

set mapred.reduce.tasks= 200;---增大Reduce個數

set hive.groupby.mapaggr.checkinterval=100000 ;--這個是group的鍵對應的記錄條數超過這個值則會進行分拆,值根據具體資料量設定

set hive.groupby.skewindata=true; --如果是group by過程出現傾斜 應該設定為true

set hive.skewjoin.key=100000; --這個是join的鍵對應的記錄條數超過這個值則會進行分拆,值根據具體資料量設定

set hive.optimize.skewjoin=true;--如果是join 過程出現傾斜 應該設定為true

(1)  啟動一次job儘可能的多做事情,一個job能完成的事情,不要兩個job來做

 通常來說前面的任務啟動可以稍帶一起做的事情就一起做了,以便後續的多個任務重用,與此緊密相連的是模型設計,好的模型特別重要.

(2) 合理設定reduce個數

reduce個數過少沒有真正發揮hadoop平行計算的威力,但reduce個數過多,會造成大量小檔案問題,資料量、資源情況只有自己最清楚,找到個折衷點,

(3) 使用hive.exec.parallel引數控制在同一個sql中的不同的job是否可以同時執行,提高作業的併發

2、讓伺服器儘量少做事情,走最優的路徑,以資源消耗最少為目標

 比如:

(1) 注意join的使用

若其中有一個表很小使用map join,否則使用普通的reduce join,注意hive會將join前面的表資料裝載記憶體,所以較小的一個表在較大的表之前,減少記憶體資源的消耗

(2)注意小檔案的問題

在hive裡有兩種比較常見的處理辦法

第一是使用Combinefileinputformat,將多個小檔案打包作為一個整體的inputsplit,減少map任務數

set mapred.max.split.size=256000000;

set mapred.min.split.size.per.node=256000000

set  Mapred.min.split.size.per.rack=256000000

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

第二是設定hive引數,將額外啟動一個MR Job打包小檔案

hive.merge.mapredfiles = false 是否合併 Reduce 輸出檔案,預設為 False 

 hive.merge.size.per.task = 256*1000*1000 合併檔案的大小 

(3)注意資料傾斜

在hive裡比較常用的處理辦法

第一通過hive.groupby.skewindata=true控制生成兩個MR Job,第一個MR Job Map的輸出結果隨機分配到reduce做次預彙總,減少某些key值條數過多某些key條數過小造成的資料傾斜問題

第二通過hive.map.aggr = true(預設為true)在Map端做combiner,假如map各條資料基本上不一樣, 聚合沒什麼意義,做combiner反而畫蛇添足,hive裡也考慮的比較周到通過引數hive.groupby.mapaggr.checkinterval = 100000 (預設)hive.map.aggr.hash.min.reduction=0.5(預設),預先取100000條資料聚合,如果聚合後的條數/100000>0.5,則不再聚合

(4)善用multi insert,union all

multi insert適合基於同一個源表按照不同邏輯不同粒度處理插入不同表的場景,做到只需要掃描源表一次,job個數不變,減少源表掃描次數

union all用好,可減少表的掃描次數,減少job的個數,通常預先按不同邏輯不同條件生成的查詢union all後,再統一group by計算,不同表的union all相當於multiple inputs,同一個表的union all,相當map一次輸出多條

(5) 引數設定的調優

叢集引數種類繁多,舉個例子比如

可針對特定job設定特定引數,比如jvm重用,reduce copy執行緒數量設定(適合map較快,輸出量較大)

如果任務數多且小,比如在一分鐘之內完成,減少task數量以減少任務初始化的消耗。可以通過配置JVM重用選項減少task的消耗

-----------------------------------------------------------

一、控制Hive中Map和reduce的數量

Hive中的sql查詢會生成執行計劃,執行計劃以MapReduce的方式執行,那麼結合資料和叢集的大小,map和reduce的數量就會影響到sql執行的效率。

除了要控制Hive生成的Job的數量,也要控制map和reduce的數量。

1、 map的數量,通常情況下和split的大小有關係,之前寫的一篇blog“map和reduce的數量是如何定義的”有描述。

 hive中預設的hive.input.format是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,對於combineHiveInputFormat,它的輸入的map數量

由三個配置決定,

mapred.min.split.size.per.node, 一個節點上split的至少的大小

mapred.min.split.size.per.rack 一個交換機下split至少的大小

mapred.max.split.size 一個split最大的大小

它的主要思路是把輸入目錄下的大檔案分成多個map的輸入, 併合並小檔案, 做為一個map的輸入. 具體的原理是下述三步:

a、根據輸入目錄下的每個檔案,如果其長度超過mapred.max.split.size,以block為單位分成多個split(一個split是一個map的輸入),每個split的長度都大於mapred.max.split.size, 因為以block為單位, 因此也會大於blockSize, 此檔案剩下的長度如果大於mapred.min.split.size.per.node, 則生成一個split, 否則先暫時保留.

b、現在剩下的都是一些長度效短的碎片,把每個rack下碎片合併, 只要長度超過mapred.max.split.size就合併成一個split, 最後如果剩下的碎片比mapred.min.split.size.per.rack大, 就合併成一個split, 否則暫時保留.

c、把不同rack下的碎片合併, 只要長度超過mapred.max.split.size就合併成一個split, 剩下的碎片無論長度, 合併成一個split.

舉例: mapred.max.split.size=1000

mapred.min.split.size.per.node=300

mapred.min.split.size.per.rack=100

輸入目錄下五個檔案,rack1下三個檔案,長度為2050,1499,10, rack2下兩個檔案,長度為1010,80. 另外blockSize為500.

經過第一步, 生成五個split: 1000,1000,1000,499,1000. 剩下的碎片為rack1下:50,10; rack2下10:80

由於兩個rack下的碎片和都不超過100, 所以經過第二步, split和碎片都沒有變化.

第三步,合併四個碎片成一個split, 長度為150.

如果要減少map數量, 可以調大mapred.max.split.size, 否則調小即可.

其特點是: 一個塊至多作為一個map的輸入,一個檔案可能有多個塊,一個檔案可能因為塊多分給做為不同map的輸入, 一個map可能處理多個塊,可能處理多個檔案。

2、 reduce數量

可以在hive執行sql的時,打印出來,如下:

Number of reduce tasks not specified. Estimated from input data size: 1

In order to change the average load for a reducer (in bytes):

  set hive.exec.reducers.bytes.per.reducer=<number>

In order to limit the maximum number of reducers:

  set hive.exec.reducers.max=<number>

In order to set a constant number of reducers:

  set mapred.reduce.tasks=<number>

reduce數量由以下三個引數決定,

mapred.reduce.tasks(強制指定reduce的任務數量)

hive.exec.reducers.bytes.per.reducer(每個reduce任務處理的資料量,預設為1000^3=1G)

hive.exec.reducers.max(每個任務最大的reduce數,預設為999)

計算reducer數的公式很簡單N=min( hive.exec.reducers.max ,總輸入資料量/ hive.exec.reducers.bytes.per.reducer )

  只有一個reduce的場景:

  a、沒有group by 的彙總

  b、order by

  c、笛卡爾積

二、join和Group的優化

        對於普通的join操作,會在map端根據key的hash值,shuffle到某一個reduce上去,在reduce端做join連線操作,記憶體中快取join左邊的表,遍歷右邊的表,一次做join操作。所以在做join操作時候,將資料量多的表放在join的右邊。

       當資料量比較大,並且key分佈不均勻,大量的key都shuffle到一個reduce上了,就出現了資料的傾斜。

              對於Group操作,首先在map端聚合,最後在reduce端坐聚合,hive預設是這樣的,以下是相關的引數          · hive.map.aggr = true是否在 Map 端進行聚合,預設為 True         · hive.groupby.mapaggr.checkinterval = 100000在 Map 端進行聚合操作的條目數目

       對於join和Group操作都可能會出現資料傾斜。

        以下有幾種解決這個問題的常見思路

      1、引數hive.groupby.skewindata = true,解決資料傾斜的萬能鑰匙,查詢計劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分佈到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的資料結果按照 Group By Key 分佈到 Reduce 中(這個過程可以保證相同的 Group By Key 被分佈到同一個 Reduce 中),最後完成最終的聚合操作。

      2、where的條件寫在join裡面,使得減少join的數量(經過map端過濾,只輸出複合條件的)

      3、mapjoin方式,無reduce操作,在map端做join操作(map端cache小表的全部資料),這種方式下無法執行Full/RIGHT OUTER join操作

      4、對於count(distinct)操作,在map端以group by的欄位和count的欄位聯合作為key,如果有大量相同的key,那麼會存在資料傾斜的問題       5、資料的傾斜還包括,大量的join連線key為空的情況,空的key都hash到一個reduce上去了,解決這個問題,最好把空的key和非空的key做區分          空的key不做join操作。

   當然有的hive操作,不存在資料傾斜的問題,比如資料聚合類的操作,像sum、count,因為已經在map端做了聚合操作了,到reduce端的資料相對少一些,所以不存在這個問題。

四、小檔案的合併

       大量的小檔案導致檔案數目過多,給HDFS帶來壓力,對hive處理的效率影響比較大,可以合併map和reduce產生的檔案

          · hive.merge.mapfiles = true是否和並 Map 輸出檔案,預設為 True           · hive.merge.mapredfiles = false是否合併 Reduce 輸出檔案,預設為 False          · hive.merge.size.per.task = 256*1000*1000合併檔案的大小

五、in/exists(not)

         通過left semi join 實現 in操作,一個限制就是join右邊的表只能出現在join條件中

六、分割槽裁剪

         通過在條件中指定分割槽,來限制資料掃描的範圍,可以極大提高查詢的效率

七、排序

        order by 排序,只存在一個reduce,這樣效率比較低。

        可以用sort by操作,通常結合distribute by使用做reduce分割槽鍵