1. 程式人生 > >Hive高階應用

Hive高階應用

目錄

1.Hive shell 操作

1.1.Hive 命令列

1.1.1.進入hive的客戶端之後

1.1.2.進入hive的客戶端之前

1.2.Hive 引數配置方式

2.資料傾斜

2.1.什麼是資料傾斜?

2.2.Hadoop框架的特定

2.3.主要表現

2.4.容易資料傾斜的情況

2.5.產生資料傾斜的原因

2.6.業務場景

3.Hive 執行過程例項分析

3.1.Hive 執行過程概述

3.2.Join

​3.3.Group By

​3.4.Distinct

4.Hive優化策略

4.1.Hadoop 框架的計算特性

4.2.優化常用手段

4.3.排序選擇

4.4.怎樣做笛卡爾積

4.5.怎樣寫in/exists語句

4.6.設定合理的mapTask數量

4.7.小檔案合併

4.8.設定合理的reduceTask數量

4.9.合併MapReduce操作

4.10.合理利用分桶:Bucketing和Sampling

4.11.合理利用分割槽:Partition

4.12.Join 優化

4.13.Group By 優化

4.14.合理利用檔案儲存格式

4.15.本地模式執行MapReduce

4.16.並行化處理

4.17.設定壓縮儲存


1.Hive shell 操作

1.1.Hive 命令列

1.1.1.進入hive的客戶端之後

quit 退出hive客戶端

set key=value 進入hvie客戶端之後進行引數設定

  • key:hive-default.xml hive-site.xml <name>key</name>
  • 這種引數設定只對當前客戶端生效
  • 當前客戶端退出,則引數失效

set key 檢視指定引數的值

set -v 列印所有的hive或hadoop引數配置(瞭解即可)

add file [file] 新增檔案到hive的classpath下

add jar jarname UDF中,新增jar檔案到hive的classpath下

list file/files

list jar/jars 檢視當前classpath下的檔案或jar包資源

!linux命令 在hive的客戶端執行linux命令,只能執行查詢相關的命令

dfs [dfs cmd] 在hive客戶端執行hdfs相關的命令

  • dfs -ls /;
  • hadoop fs -ls / 老的命令
  • hdfs dfs -ls / 二者執行效果一樣,新的命令
  • hadoop/hdfs 開啟hadoop/hdfs的客戶端,而hive就是這個客戶端,所以直接從dfs開始寫,不要前面的。

source file 在hive的客戶端執行sql指令碼的,指令碼是本地的(要掌握)

1.1.2.進入hive的客戶端之前

語法結構:hive [選項]

常用選項

  • -hiveconf key=value 在啟動hive之前設定hive的相關引數,每次只能初始化一個引數。
  • -i 即init 從檔案初始化hive的相關引數,在啟動hive的時候,可以一次性初始化多個引數。
  • -e 即execute,跟hql語句,在不進入hive客戶端的情況下,可以執行hql語句,比如 hive -e "show databases"。在執行查詢語句的時候,表名前是一定要加上庫名的。
  • -f 執行一個hql語句的腳步,類似於source的功能,比如hive -f hql指令碼路徑。

示例程式碼

1.執行一個查詢

2.執行一個檔案

3.執行引數檔案

從配置檔案啟動 hive,並載入配置檔案當中的配置引數

1.2.Hive 引數配置方式

Hive 引數大全:https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties

開發 Hive 應用時,不可避免地需要設定 Hive 的引數。設定 Hive 的引數可以調優 HQL 程式碼的執行效率,或幫助定位問題。然而實踐中經常遇到的一個問題是,為什麼設定的引數沒有起作用?這通常是錯誤的設定方式導致的。

對於一般引數,有以下三種設定方式:

  • 配置檔案 (全域性有效)
  • 命令列引數(對 hive 啟動例項有效)
  • 引數宣告 (對 hive 的連線 session 有效)

配置檔案:Hive 的配置檔案包括

  • 使用者自定義配置檔案:$HIVE_CONF_DIR/hive-site.xml
  • 預設配置檔案:$HIVE_CONF_DIR/hive-default.xml
  • 使用者自定義配置會覆蓋預設配置。
  • 另外,Hive 也會讀入 Hadoop 的配置,因為 Hive 是作為 Hadoop 的客戶端啟動的,Hive 的配置會覆蓋 Hadoop 的配置。
  • 配置檔案的設定對本機啟動的所有 Hive 程序都有效。

命令列引數:啟動 Hive(客戶端或 Server 方式)時,可以在命令列新增-hiveconf param=value
來設定引數,例如:
bin/hive -hiveconf hive.root.logger=INFO,console
這一設定對本次啟動的 session(對於 server 方式啟動,則是所有請求的 session)有效。

引數宣告:可以在 HQL 中使用 SET 關鍵字設定引數,例如:

  • set mapred.reduce.tasks = 10;
  • set mapreduce.job.reduces = 10;
  • 這一設定的作用域也是 session 級的。

set hive.exec.reducers.bytes.per.reducer=<number> 每個 reduce task 的平均負載資料量。Hive 會估算總資料量,然後用該值除以上述引數值,就能得出需要執行的 reduceTask 數。
set hive.exec.reducers.max=<number> 設定 reduce task 數量的上限
set mapreduce.job.reduces=<number> 指定固定的 reduce task 數量。但是,這個引數在必要時<業務邏輯決定只能用一個 reduce task> hive 會忽略,比如在設定了 set mapreduce.job.reduces = 3,但是 HQL 語句當中使用了 order by 的話,那麼就會忽略該引數的設定。

上述三種設定方式的優先順序依次遞增。即引數宣告覆蓋命令列引數,命令列引數覆蓋配置檔案設定。注意某些系統級的引數,例如 log4j 相關的設定,必須用前兩種方式設定,因為那些引數的讀取在 session 建立以前已經完成了。

2.資料傾斜

2.1.什麼是資料傾斜?

由於資料分佈不均勻,造成資料大量的集中到一點,造成資料熱點。

2.2.Hadoop框架的特定

1.不怕資料大,怕資料傾斜

2.Jobs 數比較多的作業執行效率相對比較低,如子查詢比較多

3.sum,count,max,min 等聚集函式,通常不會有資料傾斜問題

2.3.主要表現

任務進度長時間維持在 99%或者 100%的附近,檢視任務監控頁面,發現只有少量 reduce子任務未完成,因為其處理的資料量和其他的 reduce 差異過大。單一 reduce 處理的記錄數和平均記錄數相差太大,通常達到好幾倍之多,最長時間遠大於平均時長。

2.4.容易資料傾斜的情況

操作:

關鍵詞 情形 後果
Join 其中一個表較小,但是key集中 分發到某一個或幾個Reduce上的資料遠高於平均值
大表和大表,但是分桶的判斷欄位0值或空值過多 這些空值都由一個reduce處理非常慢
group by group by緯度過小,某值的數量過多 處理某值的reduce非常耗時
count(distinct) 某特殊值過多 處理此特殊值的reduce耗時

1.group by 不和聚集函式搭配使用的時候

2.count(distinct),在資料量大的情況下,容易資料傾斜,因為 count(distinct)是按 group by 欄位分組,按 distinct 欄位排序

3.小表關聯超大表 join

2.5.產生資料傾斜的原因

A:key 分佈不均勻

B:業務資料本身的特性

C:建表考慮不周全

D:某些 HQL 語句本身就存在資料傾斜

2.6.業務場景

1.空值產生的資料傾斜

場景說明:在日誌中,常會有資訊丟失的問題,比如日誌中的 user_id,如果取其中的user_id 和使用者表中的 user_id 相關聯,就會碰到資料傾斜的問題。

解決方案 1:user_id 為空的不參與關聯
select * from log a join user b on a.user_id is not null and a.user_id = b.user_id 
union all 
select * from log c where c.user_id is null;

解決方案 2:賦予空值新的 key 值
select * from log a left outer join user b on 
case when a.user_id is null then concat('hive',rand()) else a.user_id end = b.user_id

總結:方法 2 比方法 1 效率更好,不但 IO 少了,而且作業數也少了,方案 1 中,log 表讀了兩次,jobs 肯定是 2,而方案 2 是 1。這個優化適合無效 id(比如-99,’’,null)產生的資料傾斜,把空值的 key 變成一個字串加上一個隨機數,就能把造成資料傾斜的資料分到不同的 reduce 上解決資料傾斜的問題。

改變之處:使本身為 null 的所有記錄不會擁擠在同一個 reduceTask 了,會由於有替代的隨機字串值,而分散到了多個 reduceTask 中了,由於 null 值關聯不上,處理後並不影響最終結果。

2.不同資料型別關聯產生資料傾斜

場景說明:使用者表中 user_id 欄位為 int,log 表中 user_id 為既有 string 也有 int 的型別,當按照兩個表的 user_id 進行 join 操作的時候,預設的 hash 操作會按照 int 型別的 id 進行分配,這樣就會導致所有的 string 型別的 id 就被分到同一個 reducer 當中。

解決方案:把數字型別 id 轉換成 string 型別的 id
select * from user a left outer join log b on b.user_id = cast(a.user_id as string)

3.大小表關聯查詢產生資料傾斜

注意:使用map join解決小表關聯大表造成的資料傾斜問題。這個方法使用的頻率很高。
map join 概念:將其中做連線的小表(全量資料)分發到所有 MapTask 端進行 Join,從
而避免了 reduceTask,前提要求是記憶體足以裝下該全量資料

以大表 a 和小表 b 為例,所有的 maptask 節點都裝載小表 b 的所有資料,然後大表 a 的一個數據塊資料比如說是 a1 去跟 b 全量資料做連結,就省去了 reduce 做彙總的過程。所以相對來說,在記憶體允許的條件下使用 map join 比直接使用 MapReduce 效率還高些,當然這隻限於做 join 查詢的時候。

在 hive 中,直接提供了能夠在 HQL 語句指定該次查詢使用 map join,map join 的用法是在查詢/子查詢的SELECT關鍵字後面新增/*+ MAPJOIN(tablelist) */提示優化器轉化為map join(早期的 Hive 版本的優化器是不能自動優化 map join 的)。其中 tablelist 可以是一個表,或以逗號連線的表的列表。tablelist 中的表將會讀入記憶體,通常應該是將小表寫在這裡。

MapJoin 具體用法:
select /* +mapjoin(a) */ a.id aid, name, age from a join b on a.id = b.id;
select /* +mapjoin(movies) */ a.title, b.rating from movies a join ratings b on a.movieid = b.movieid;

在 hive0.11 版本以後會自動開啟 map join 優化,由兩個引數控制:
set hive.auto.convert.join=true;//設定 MapJoin 優化自動開啟
set hive.mapjoin.smalltable.filesize=25000000//設定小表不超過多大時開啟 mapjoin 優化

如果是大大表關聯呢?那就大事化小,小事化了。把大表切分成小表,然後分別 map join

那麼如果小表不大不小,那該如何處理呢???
使用 map join 解決小表(記錄數少)關聯大表的資料傾斜問題,這個方法使用的頻率非常高,但如果小表很大,大到 map join 會出現 bug 或異常,這時就需要特別的處理

舉一例:日誌表和使用者表做連結
select * from log a left outer join users b on a.user_id = b.user_id;
users 表有 600w+的記錄,把 users 分發到所有的 map 上也是個不小的開銷,而且 map join 不支援這麼大的小表。如果用普通的 join,又會碰到資料傾斜的問題。

改進方案:
select /*+mapjoin(x)*/* from log a
          left outer join (
                    select /*+mapjoin(c)*/ d.*
                              from ( select distinct user_id from log ) c join users d on c.user_id = d.user_id
          ) x
on a.user_id = x.user_id;

假如,log 裡 user_id 有上百萬個,這就又回到原來 map join 問題。所幸,每日的會員 uv不會太多,有交易的會員不會太多,有點選的會員不會太多,有佣金的會員不會太多等等。所以這個方法能解決很多場景下的資料傾斜問題

3.Hive 執行過程例項分析

3.1.Hive 執行過程概述

Hive 將 HQL 轉換成一組操作符(Operator),比如 GroupByOperator, JoinOperator 等

操作符 Operator 是 Hive 的最小處理單元

每個操作符代表一個 HDFS 操作或者 MapReduce 作業

Hive 通過 ExecMapper 和 ExecReducer 執行 MapReduce 程式,執行模式有本地模式和分散式兩種模式

Hive 操作符列表:

操作符 描述
TableScanOperator 掃描Hive表資料
ReduceSinkOperator 建立將傳送到Reducer端的<Key,Value>對
JoinOperator

Join兩份資料

SelectOperator 選擇輸出列
FileSinkOperator 建立結果資料,輸出至檔案
FilterOperator 過濾輸入資料
GroupByOperator Group By語句
MapJoinOperator /*+ mapjoin(t) */
LimitOperator Limit語句
UnionOperator Union語句

Hive 編譯器的工作職責:

  • Parser:將 HQL 語句轉換成抽象語法樹(AST:Abstract Syntax Tree)
  • Semantic Analyzer:將抽象語法樹轉換成查詢塊
  • Logic Plan Generator:將查詢塊轉換成邏輯查詢計劃
  • Logic Optimizer:重寫邏輯查詢計劃,優化邏輯執行計劃
  • Physical Plan Gernerator:將邏輯計劃轉化成物理計劃(MapReduce Jobs)
  • Physical Optimizer:選擇最佳的 Join 策略,優化物理執行計劃

優化器型別:

上表中帶①符號的,優化目的都是儘量將任務合併到一個 Job 中,以減少 Job 數量,帶②的優化目的是儘量減少 shuffle 資料量

3.2.Join

對於 join 操作:SELECT pv.pageid, u.age FROM page_view pv JOIN user u ON pv.userid = u.userid;

實現過程:

Map:

  1. 以 JOIN ON 條件中的列作為 Key,如果有多個列,則 Key 是這些列的組合
  2. 以 JOIN 之後所關心的列作為 Value,當有多個列時,Value 是這些列的組合。在Value 中還會包含表的 Tag 資訊,用於標明此 Value 對應於哪個表
  3. 按照 Key 進行排序

Shuffle:根據 Key 的值進行 Hash,並將 Key/Value 對按照 Hash 值推至不同對 Reduce 中

Reduce:Reducer 根據 Key 值進行 Join 操作,並且通過 Tag 來識別不同的表中的資料

具體實現過程:

3.3.Group By

對於 group by:SELECT pageid, age, count(1) FROM pv_users GROUP BY pageid, age; 

3.4.Distinct

對於 distinct:SELECT age, count(distinct pageid) FROM pv_users GROUP BY age; 按照 age 分組,然後統計每個分組裡面的不重複的 pageid 有多少個。

詳細過程解釋:該 SQL 語句會按照 age 和 pageid 預先分組,進行 distinct 操作。然後會再按
照 age 進行分組,再進行一次 distinct 操作。

4.Hive優化策略

4.1.Hadoop 框架的計算特性

1.資料量大不是問題,資料傾斜是個問題。

2.jobs 數比較多的作業執行效率相對比較低,比如即使有幾百行的表,如果多次關聯多次彙總,產生十幾個 jobs,耗時很長。原因是 map reduce 作業初始化的時間是比較長的。

3.sum,count,max,min 等 UDAF,不怕資料傾斜問題,hadoop 在 map 端的彙總合併優化,使資料傾斜不成問題。

4.count(distinct userid),在資料量大的情況下,效率較低,如果是多 count(distinct userid,month)效率更低,因為 count(distinct)是按 group by 欄位分組,按 distinct 欄位排序,一般這種分佈方式是很傾斜的,比如 PV 資料,淘寶一天 30 億的 pv,如果按性別分組,分配 2 個 reduce,每個 reduce 期望處理 15 億資料,但現實必定是男少女多。

4.2.優化常用手段

1.好的模型設計事半功倍。

2.解決資料傾斜問題。

3.減少 job 數。

4.設定合理的 MapReduce 的 task 數,能有效提升效能。(比如,10w+級別的計算,用 160個 reduce,那是相當的浪費,1 個足夠)。

5.瞭解資料分佈,自己動手解決資料傾斜問題是個不錯的選擇。這是通用的演算法優化,但演算法優化有時不能適應特定業務背景,開發人員瞭解業務,瞭解資料,可以通過業務邏輯精確有效的解決資料傾斜問題。

6.資料量較大的情況下,慎用 count(distinct),group by 容易產生傾斜問題。

7.對小檔案進行合併,是行之有效的提高排程效率的方法,假如所有的作業設定合理的檔案數,對雲梯的整體排程效率也會產生積極的正向影響。

8.優化時把握整體,單個作業最優不如整體最優。

4.3.排序選擇

cluster by:對同一欄位分桶並排序,不能和 sort by 連用

distribute by + sort by:分桶,保證同一欄位值只存在一個結果檔案當中,結合 sort by 保證每個 reduceTask 結果有序

sort by:單機排序,單個 reduce 結果有序

order by:全域性排序,缺陷是隻能使用一個 reduce

一定要區分這四種排序的使用方式和適用場景。

4.4.怎樣做笛卡爾積

當 Hive 設定為嚴格模式(hive.mapred.mode=strict)時,不允許在 HQL 語句中出現笛卡爾積,這實際說明了 Hive 對笛卡爾積支援較弱。因為找不到 Join key,Hive 只能使用 1 個 reducer來完成笛卡爾積。

當然也可以使用 limit 的辦法來減少某個表參與 join 的資料量,但對於需要笛卡爾積語義的需求來說,經常是一個大表和一個小表的 Join 操作,結果仍然很大(以至於無法用單機處理),這時 MapJoin 才是最好的解決辦法。MapJoin,顧名思義,會在 Map 端完成 Join 操作。這需要將 Join 操作的一個或多個表完全讀入記憶體。

PS:MapJoin 在子查詢中可能出現未知 BUG。在大表和小表做笛卡爾積時,規避笛卡爾積的方法是,給 Join 新增一個 Join key,原理很簡單:將小表擴充一列 join key,並將小表的條目複製數倍,join key 各不相同;將大表擴充一列 join key 為隨機數

精髓就在於複製幾倍,最後就有幾個 reduce 來做,而且大表的資料是前面小表擴張 key 值範圍裡面隨機出來的,所以複製了幾倍 n,就相當於這個隨機範圍就有多大 n,那麼相應的,大表的資料就被隨機的分為了 n 份。並且最後處理所用的 reduce 數量也是 n,而且也不會出現資料傾斜

4.5.怎樣寫in/exists語句

雖然經過測驗,hive1.2.1 也支援 in/exists 操作,但還是推薦使用 hive 的一個高效替代方案:left semi join
比如說:
select a.id, a.name from a where a.id in (select b.id from b);
select a.id, a.name from a where exists (select id from b where a.id = b.id);
應該轉換成:
select a.id, a.name from a left semi join b on a.id = b.id;

4.6.設定合理的mapTask數量

Map 數過大:

  • Map 階段輸出檔案太小,產生大量小檔案
  • 初始化和建立 Map 的開銷很大

Map 數太小:

  • 檔案處理或查詢併發度小,Job 執行時間過長
  • 大量作業時,容易堵塞叢集

在 MapReduce 的程式設計案例中,我們得知,一個MR Job的 MapTask 數量是由輸入分片 InputSplit決定的。而輸入分片是由 FileInputFormat.getSplit()決定的。一個輸入分片對應一個 MapTask,而輸入分片是由三個引數決定的:

dfs.blocksize 128M  HDFS 預設資料塊大小
mapreduce.input.fileinputformat.split.minsize 1 最小分片大小
mapreduce.input.fileinputformat.split.maxsize Long.MAX_VALUE 最大分片大小(MR)
mapreduce.input.fileinputformat.split.maxsize 256M 最大處理資料量 Hive

輸入分片大小的計算是這麼計算出來的:
long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
預設情況下,輸入分片大小和 HDFS 叢集預設資料塊大小一致,也就是預設一個數據塊,啟用一個 MapTask 進行處理,這樣做的好處是避免了伺服器節點之間的資料傳輸,提高 job 處理效率。

兩種經典的控制 MapTask 的個數方案:減少 MapTask 數或者增加 MapTask 數
1.減少 MapTask 數是通過合併小檔案來實現,這一點主要是針對資料來源
2.增加 MapTask 數可以通過控制上一個 job 的 reduceTask 個數

因為 Hive 語句最終要轉換為一系列的 MapReduce Job 的,而每一個 MapReduce Job 是由一系列的 MapTask 和 ReduceTask 組成的,預設情況下, MapReduce 中一個 MapTask 或者一個ReduceTask 就會啟動一個 JVM 程序,一個 Task 執行完畢後, JVM 程序就退出。這樣如果任務花費時間很短,又要多次啟動 JVM 的情況下,JVM 的啟動時間會變成一個比較大的消耗,
這個時候,就可以通過重用 JVM 來解決:set mapred.job.reuse.jvm.num.tasks=5

4.7.小檔案合併

檔案數目過多,會給 HDFS 帶來壓力,並且會影響處理效率,可以通過合併 Map 和 Reduce 的結果檔案來消除這樣的影響:
set hive.merge.mapfiles = true ##在 map only 的任務結束時合併小檔案
set hive.merge.mapredfiles = false ## true 時在 MapReduce 的任務結束時合併小檔案
set hive.merge.size.per.task = 256*1000*1000 ##合併檔案的大小
set mapred.max.split.size=256000000; ##每個 Map 最大分割大小
set mapred.min.split.size.per.node=1; ##一個節點上 split 的最少值
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; ##執行 Map 前進行小檔案合併

4.8.設定合理的reduceTask數量

Hadoop MapReduce 程式中,reducer 個數的設定極大影響執行效率,這使得 Hive 怎樣決定reducer 個數成為一個關鍵問題。遺憾的是 Hive 的估計機制很弱,不指定 reducer 個數的情況下,Hive 會猜測確定一個 reducer 個數,基於以下兩個設定:

1.hive.exec.reducers.bytes.per.reducer(預設為 256000000)
2.hive.exec.reducers.max(預設為 1009)
3.mapreduce.job.reduces=-1(設定一個常量 reducetask 數量)

計算 reducer 數的公式很簡單:
N=min(引數 2,總輸入資料量/引數 1)
通常情況下,有必要手動指定 reducer 個數。考慮到 map 階段的輸出資料量通常會比輸入有大幅減少,因此即使不設定 reducer 個數,重設引數 2 還是必要的。

依據 Hadoop 的經驗,可以將引數 2 設定為 0.95*(叢集中 datanode 個數)。

4.9.合併MapReduce操作

Multi-group by 是 Hive 的一個非常好的特性,它使得 Hive 中利用中間結果變得非常方便。

例如:
FROM (SELECT a.status, b.school, b.gender FROM status_updates a JOIN profiles b ON (a.userid = 
b.userid and a.ds='2009-03-20' ) ) subq1
INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2009-03-20')
SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
INSERT OVERWRITE TABLE school_summary PARTITION(ds='2009-03-20')
SELECT subq1.school, COUNT(1) GROUP BY subq1.school

上述查詢語句使用了 multi-group by 特性連續 group by 了 2 次資料,使用不同的 group by key。這一特性可以減少一次 MapReduce 操作。

4.10.合理利用分桶:Bucketing和Sampling

Bucket 是指將資料以指定列的值為 key 進行 hash,hash 到指定數目的桶中。這樣就可以支
持高效取樣了。如下例就是以 userid 這一列為 bucket 的依據,共設定 32 個 buckets
CREATE TABLE page_view(viewTime INT, userid BIGINT, page_url STRING, referrer_url STRING, ip STRING COMMENT 'IP Address of the User') COMMENT 'This is the page view table'
 PARTITIONED BY(dt STRING, country STRING)
 CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY '1'
 COLLECTION ITEMS TERMINATED BY '2'
 MAP KEYS TERMINATED BY '3'
 STORED AS SEQUENCEFILE;
通常情況下,Sampling 在全體資料上進行取樣,這樣效率自然就低,它要去訪問所有資料。而如果一個表已經對某一列製作了 bucket,就可以取樣所有桶中指定序號的某個桶,這就減少了訪問量。

如下例所示就是取樣了 page_view 中 32 個桶中的第三個桶的全部資料:
SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 32);
如下例所示就是取樣了 page_view 中 32 個桶中的第三個桶的一半資料:
SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 64);

詳細說明:http://blog.csdn.net/zhongqi2513/article/details/74612701

4.11.合理利用分割槽:Partition

Partition 就是分割槽。分割槽通過在建立表時啟用 partitioned by 實現,用來 partition 的維度並不是實際資料的某一列,具體分割槽的標誌是由插入內容時給定的。當要查詢某一分割槽的內容時可以採用 where 語句,形似 where tablename.partition_column = a 來實現。

建立含分割槽的表
CREATE TABLE page_view(viewTime INT, userid BIGINT,
 page_url STRING, referrer_url STRING,
 ip STRING COMMENT 'IP Address of the User')
PARTITIONED BY(date STRING, country STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '1'
STORED AS TEXTFILE;

載入內容,並指定分割槽標誌
load data local inpath '/home/hadoop/pv_2008-06-08_us.txt' into table page_view 
partition(date='2008-06-08', country='US');

查詢指定標誌的分割槽內容
SELECT page_views.* FROM page_views
 WHERE page_views.date >= '2008-03-01' AND page_views.date <= '2008-03-31' AND 
page_views.referrer_url like '%xyz.com';

4.12.Join 優化

總體原則:
1.優先過濾後再進行 Join 操作,最大限度的減少參與 join 的資料量
2.小表 join 大表,最好啟動 mapjoin
3.Join on 的條件相同的話,最好放入同一個 job,並且 join 表的排列順序從小到大

在使用寫有 Join 操作的查詢語句時有一條原則:應該將條目少的表/子查詢放在 Join 操作符的左邊。原因是在 Join 操作的 Reduce 階段,位於 Join 操作符左邊的表的內容會被載入進記憶體,將條目少的表放在左邊,可以有效減少發生 OOM 錯誤的機率。對於一條語句中有多個 Join 的情況,如果 Join 的條件相同,比如查詢
INSERT OVERWRITE TABLE pv_users 
SELECT pv.pageid, u.age FROM page_view p 
JOIN user u ON (pv.userid = u.userid) 
JOIN newuser x ON (u.userid = x.userid);

如果 Join 的 key 相同,不管有多少個表,都會則會合併為一個 Map-Reduce 任務,而不是”n”個,在做 OUTER JOIN 的時候也是一樣。

如果 join 的條件不相同,比如:
INSERT OVERWRITE TABLE pv_users 
 SELECT pv.pageid, u.age FROM page_view p 
 JOIN user u ON (pv.userid = u.userid) 
 JOIN newuser x on (u.age = x.age);
Map-Reduce 的任務數目和 Join 操作的數目是對應的,上述查詢和以下查詢是等價的:
// 先 page_view 表和 user 表做連結
INSERT OVERWRITE TABLE tmptable
 SELECT * FROM page_view p JOIN user u ON (pv.userid = u.userid);
// 然後結果表 temptable 和 newuser 表做連結
INSERT OVERWRITE TABLE pv_users 
 SELECT x.pageid, x.age FROM tmptable x JOIN newuser y ON (x.age = y.age); 

在編寫 Join 查詢語句時,如果確定是由於 join 出現的資料傾斜,那麼請做如下設定:
set hive.skewjoin.key=100000; // 這個是 join 的鍵對應的記錄條數超過這個值則會進行分拆,值根據具體資料量設定
set hive.optimize.skewjoin=true; // 如果是 join 過程出現傾斜應該設定為 true

4.13.Group By 優化

Map 端部分聚合:
並不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端進行部分聚合,最後在 Reduce 端得出最終結果。
MapReduce 的 combiner 元件,引數包括:
set hive.map.aggr = true 是否在 Map 端進行聚合,預設為 True 
set hive.groupby.mapaggr.checkinterval = 100000 在 Map 端進行聚合操作的條目數目

當使用 Group By 有資料傾斜的時候進行負載均衡:
set hive.groupby.skewindata = true
當 sql 語句使用 groupby 時資料出現傾斜時,如果該變數設定為 true,那麼 Hive 會自動進行負載均衡。策略就是把 MR 任務拆分成兩個:第一個先做預彙總,第二個再做最終彙總。

在 MR 的第一個階段中,Map 的輸出結果集合會快取到 maptaks 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個階段 再根據預處理的資料結果按照 Group By Key 分佈到Reduce 中(這個過程可以保證相同的 Group By Key 被分佈到同一個 Reduce 中),最後完成最終的聚合操作。

4.14.合理利用檔案儲存格式

建立表時,儘量使用 orc、parquet 這些列式儲存格式,因為列式儲存的表,每一列的資料在物理上是儲存在一起的,Hive 查詢時會只遍歷需要列資料,大大減少處理的資料量。

4.15.本地模式執行MapReduce

Hive 在叢集上查詢時,預設是在叢集上 N 臺機器上執行, 需要多個機器進行協調執行,這個方式很好地解決了大資料量的查詢問題。但是當 Hive 查詢處理的資料量比較小時,其實沒有必要啟動分散式模式去執行,因為以分散式方式執行就涉及到跨網路傳輸、多節點協調等,並且消耗資源。這個時間可以只使用本地模式來執行 mapreduce job,只在一臺機器上執行,速度會很快。啟動本地模式涉及到三個引數:

引數名 預設值 備註
hive.exec.mode.local.auto  false  讓 hive 決定是否在本地模式自動執行
hive.exec.mode.local.auto.input.files.max 4 不啟用本地模式的 task 最大個數
hive.exec.mode.local.auto.inputbytes.max 128M  不啟動本地模式的最大輸入檔案大小

set hive.exec.mode.local.auto=true 是開啟 hive 自動判斷是否啟動本地模式的開關,但是隻是開啟這個引數並不能保證啟動本地模式,要當 map 任務數不超過hive.exec.mode.local.auto.input.files.max 的個數並且 map 輸入檔案大小不超過
hive.exec.mode.local.auto.inputbytes.max 所指定的大小時,才能啟動本地模式。

4.16.並行化處理

一個 hive sql 語句可能會轉為多個 mapreduce Job,每一個 job 就是一個 stage,這些 job 順序執行,這個在 cli 的執行日誌中也可以看到。但是有時候這些任務之間並不是是相互依賴的,如果叢集資源允許的話,可以讓多個並不相互依賴 stage 併發執行,這樣就節約了時間,提高了執行速度,但是如果叢集資源匱乏時,啟用並行化反倒是會導致各個 job 相互搶佔資源而導致整體執行效能的下降。啟用並行化:
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8; //同一個 sql 允許並行任務的最大執行緒數

4.17.設定壓縮儲存

1.壓縮的原因

Hive 最終是轉為 MapReduce 程式來執行的,而 MapReduce 的效能瓶頸在於網路 IO 和磁碟 IO,要解決效能瓶頸,最主要的是減少資料量,對資料進行壓縮是個好的方式。壓縮雖然是減少了資料量,但是壓縮過程要消耗 CPU 的,但是在 Hadoop 中, 往往效能瓶頸不在於 CPU,CPU 壓力並不大,所以壓縮充分利用了比較空閒的 CPU。

2.常用壓縮方法對比

各個壓縮方式所對應的 Class 類:

3.壓縮方式的選擇

壓縮比率
壓縮解壓縮速度
是否支援 Split

4.壓縮使用

Job 輸出檔案按照 block 以 GZip 的方式進行壓縮:

set mapreduce.output.fileoutputformat.compress=true // 預設值是 false
set mapreduce.output.fileoutputformat.compress.type=BLOCK // 預設值是 Recordset 
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec 
 // 預設值是 org.apache.hadoop.io.compress.DefaultCodec

Map 輸出結果也以 Gzip 進行壓縮:
set mapred.map.output.compress=true
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec
 // 預設值是 org.apache.hadoop.io.compress.DefaultCodec

對 Hive 輸出結果和中間都進行壓縮:
set hive.exec.compress.output=true // 預設值是 false,不壓縮
set hive.exec.compress.intermediate=true // 預設值是 false,為 true 時 MR 設定的壓縮才啟用