1. 程式人生 > >08-Hive高階查詢join

08-Hive高階查詢join

宣告:我的朋友,這一篇不要轉載,因為你可以直接在這裡看。

大家好,我們今天來學習Hive高階查詢join語法。

你有沒有期待把Hive學完整?我打算寫完整,只要我知道。我寫的都是比較接地氣的,因為高大上的我知道的少。砸門一起加油進步吧!

1 我們先回顧一下上一節課說的。上一節我們知道了:order by是一個全域性的操作,groupby是一個聚合的操作。避免資料傾斜的方法之一是設定引數:hive.groupby.skewindata=true;
當有資料傾斜的時候進行負載均衡,當選項設定為 true,生成的查詢計劃會有兩個 MR Job。如果你想學多一點關於資料傾斜的知識,分享一個連結:

資料傾斜的原因

2 今天學習【join】
表連線
2.1兩個表m,n之間按照on條件連線,m中的一條記錄和n中的一條記錄組成一條新紀錄 。
2.2 join等值連線,只有某個值在m和n中同時存在時
2.3 left outer join左外連線,左邊表中的值無論是否是在b中存在時,都輸出,右邊表中的值只有在左邊表中存在時才輸出。
2.4 right outer join 和left outer join相反。
2.5left semi join 類似exits。
2.6 mapjoin 在map端完成join操作,不需要用ruduce,甚於記憶體做join,屬於優化操作。

3 分別建立表m和表n,具有的欄位如下:

clo col2    m
A   1
C   5
B   2
C   3

col col3    n
C   4
D   5
A   6

在hive控制檯上執行以下語句:

create table m(
col string,
col2 string
)
row format delimited fields terminated by '\t' 
lines terminated by '\n'
stored as textfile;

create table n(
col string,
col3 string
)
row format delimited fields terminated by
'\t' lines terminated by '\n' stored as textfile;
load data local inpath '/usr/host/m' into table m; load data local inpath '/usr/host/n' into table n;

以上的語句都看得懂吧,其實就是建立表和載入資料。

hive> select * from n;
OK
C   4
D   5
A   6
Time taken: 0.415 seconds
hive> select * from n;
OK
C   4
D   5
A   6
Time taken: 0.288 seconds
hive> select * from m;
OK
A   1
C   5
B   2
C   3
Time taken: 0.317 seconds
hive> 

接下來我們開始join操作,語句如下:

hive> set hive.auto.convert.join=true;
hive> select s.col,s.col2,t.col3
    > from
    > (select col,col2 from m)s
    > join
    > (select col,col3 from n)t
    > on s.col=t.col;
java.lang.InstantiationException: org.antlr.runtime.CommonToken
Continuing ...
java.lang.RuntimeException: failed to evaluate: <unbound>=Class.new();
Continuing ...
java.lang.InstantiationException: org.antlr.runtime.CommonToken
Continuing ...
java.lang.RuntimeException: failed to evaluate: <unbound>=Class.new();
Continuing ...
Total MapReduce jobs = 3
Ended Job = -1393545778, job is filtered out (removed at runtime).
Ended Job = -1334217954, job is filtered out (removed at runtime).
2016-06-06 05:25:03 Starting to launch local task to process map join;  maximum memory = 518979584
2016-06-06 05:25:05 Processing rows:    3   Hashtable size: 3   Memory usage:   5078208 rate:   0.01
2016-06-06 05:25:05 Dump the hashtable into file: file:/tmp/root/hive_2016-06-06_05-24-57_982_7288788097348023892/-local-10002/HashTable-Stage-3/MapJoin-mapfile01--.hashtable
2016-06-06 05:25:05 Upload 1 File to: file:/tmp/root/hive_2016-06-06_05-24-57_982_7288788097348023892/-local-10002/HashTable-Stage-3/MapJoin-mapfile01--.hashtable File size: 432
2016-06-06 05:25:05 End of local task; Time Taken: 1.587 sec.
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 2 out of 3
Number of reduce tasks is set to 0 since there's no reduce operator
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 1; number of reducers: 0
2016-06-06 05:25:19,055 null map = 0%,  reduce = 0%
2016-06-06 05:25:28,989 null map = 100%,  reduce = 0%, Cumulative CPU 0.97 sec
2016-06-06 05:25:30,087 null map = 100%,  reduce = 0%, Cumulative CPU 0.97 sec
2016-06-06 05:25:31,173 null map = 100%,  reduce = 0%, Cumulative CPU 0.97 sec
MapReduce Total cumulative CPU time: 970 msec
Ended Job = job_1465200327080_0019
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
A   1   6
C   5   4
C   3   4
Time taken: 35.081 seconds
hive> 

join等值連線,只有某個值在m和n中同時存在時才輸出。所以輸出的結果就是如上所示了。

2 左外連線

hive> set hive.optimize.skewjoin=true;
hive> set hive.auto.convert.join=true;
hive> select s.col,s.col2,t.col3
    > from
    > (select col,col2 from m)s
    > left outer join
    > (select col,col3 from n)t
    > on s.col=t.col;
Total MapReduce jobs = 2
Ended Job = 1311401655, job is filtered out (removed at runtime).
16/06/06 05:57:17 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
16/06/06 05:57:17 INFO Configuration.deprecation: mapred.system.dir is deprecated. Instead, use mapreduce.jobtracker.system.dir
2016-06-06 05:57:18 Starting to launch local task to process map join;  maximum memory = 518979584
2016-06-06 05:57:20 Processing rows:    3   Hashtable size: 3   Memory usage:   5081104 rate:   0.01
2016-06-06 05:57:20 Dump the hashtable into file: file:/tmp/root/hive_2016-06-06_05-57-12_989_8198446239599600254/-local-10002/HashTable-Stage-3/MapJoin-mapfile111--.hashtable
2016-06-06 05:57:20 Upload 1 File to: file:/tmp/root/hive_2016-06-06_05-57-12_989_8198446239599600254/-local-10002/HashTable-Stage-3/MapJoin-mapfile111--.hashtable File size: 435
2016-06-06 05:57:20 End of local task; Time Taken: 1.401 sec.
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 2 out of 2
Number of reduce tasks is set to 0 since there's no reduce operator
16/06/06 05:57:21 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
16/06/06 05:57:21 INFO Configuration.deprecation: mapred.system.dir is deprecated. Instead, use mapreduce.jobtracker.system.dir
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 1; number of reducers: 0
2016-06-06 05:57:32,269 null map = 0%,  reduce = 0%
2016-06-06 05:57:41,293 null map = 100%,  reduce = 0%, Cumulative CPU 1.04 sec
2016-06-06 05:57:42,388 null map = 100%,  reduce = 0%, Cumulative CPU 1.04 sec
MapReduce Total cumulative CPU time: 1 seconds 40 msec
Ended Job = job_1465200327080_0027
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
A   1   6
C   5   4
B   2   NULL
C   3   4
Time taken: 30.428 seconds
hive> 

3 右外連線

hive> set hive.auto.convert.join=true;
hive> select s.col,s.col2,t.col3
    > from
    > (select col,col2 from m)s
    > right outer join
    > (select col,col3 from n)t
    > on s.col=t.col;
java.lang.InstantiationException: org.antlr.runtime.CommonToken
Continuing ...
java.lang.RuntimeException: failed to evaluate: <unbound>=Class.new();
Continuing ...
java.lang.InstantiationException: org.antlr.runtime.CommonToken
Continuing ...
java.lang.RuntimeException: failed to evaluate: <unbound>=Class.new();
Continuing ...
Total MapReduce jobs = 2
Ended Job = 84151671, job is filtered out (removed at runtime).
16/06/06 06:01:40 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
16/06/06 06:01:40 INFO Configuration.deprecation: mapred.system.dir is deprecated. Instead, use mapreduce.jobtracker.system.dir
16/06/06 06:01:40 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
2016-06-06 06:01:41 Starting to launch local task to process map join;  maximum memory = 518979584
2016-06-06 06:01:43 Processing rows:    3   Hashtable size: 3   Memory usage:   5105544 rate:   0.01
2016-06-06 06:01:43 Dump the hashtable into file: file:/tmp/root/hive_2016-06-06_06-01-36_820_8095318663586481472/-local-10002/HashTable-Stage-3/MapJoin-mapfile00--.hashtable
2016-06-06 06:01:43 Upload 1 File to: file:/tmp/root/hive_2016-06-06_06-01-36_820_8095318663586481472/-local-10002/HashTable-Stage-3/MapJoin-mapfile00--.hashtable File size: 451
2016-06-06 06:01:43 End of local task; Time Taken: 1.559 sec.
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 2 out of 2
Number of reduce tasks is set to 0 since there's no reduce operator
16/06/06 06:01:44 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
16/06/06 06:01:44 INFO Configuration.deprecation: mapred.system.dir is deprecated. Instead, use mapreduce.jobtracker.system.dir
16/06/06 06:01:44 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 1; number of reducers: 0
2016-06-06 06:01:56,531 null map = 0%,  reduce = 0%
2016-06-06 06:02:05,414 null map = 100%,  reduce = 0%, Cumulative CPU 0.71 sec
2016-06-06 06:02:06,504 null map = 100%,  reduce = 0%, Cumulative CPU 0.71 sec
MapReduce Total cumulative CPU time: 710 msec
Ended Job = job_1465200327080_0029
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
C   5   4
C   3   4
NULL    NULL    5
A   1   6
Time taken: 31.606 seconds
hive> 

為什麼輸出的結果會是這樣,下面這幅圖好好琢磨一下吧。
4 資料輸出對比
這裡寫圖片描述

5 優化引數設定
如果你發現有問題,執行不了,可以在執行語句前設定以下引數嘗試一下:

set hive.optimize.skewjoin=true;

這一個引數設定的意思是:類似於之前的groupby操作的時候設定優化引數避免資料傾斜問題,這一個也是具有類似意義。

6 mapjoin
mapjoin其實就是join的優化操作。
mapjoin(map side join)
在map端把小表載入到記憶體中,然後讀取大表,和記憶體中的小表完成連線操作
其中使用了分散式快取技術
mapjoin的原理:
這裡寫圖片描述
優點:
不消耗叢集的reduce資源(reduce相對緊缺)
減少了reduce操作,加快程式執行
降低網路
缺點:
佔用部分記憶體,所以載入到記憶體中的表不能過大,因為每個計算節點都會載入一次
生成較多的小檔案

設定成mapjoin有兩種方式:
配置以下引數,是hive自動根據sql,選擇使用common join或者map join
第一種方式:
set hive.auto.convert.join=true;
set hive.mapjoin.smalltable.filesize,預設值是25mb
第二種方式,手動指定,句式如下:

select /*+mapjoin(n)*/ m.col,m.col2,n.col3 from m
join n
on m.col=n.col
hive> select /*+mapjoin(n)*/ m.col,m.col2,n.col3 from m
    > join n
    > on m.col=n.col;
Total MapReduce jobs = 1
16/06/06 06:04:14 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
16/06/06 06:04:14 INFO Configuration.deprecation: mapred.system.dir is deprecated. Instead, use mapreduce.jobtracker.system.dir
16/06/06 06:04:14 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
2016-06-06 06:04:16 Starting to launch local task to process map join;  maximum memory = 518979584
2016-06-06 06:04:17 Processing rows:    3   Hashtable size: 3   Memory usage:   5062584 rate:   0.01
2016-06-06 06:04:17 Dump the hashtable into file: file:/tmp/root/hive_2016-06-06_06-04-09_627_2210755106302628931/-local-10002/HashTable-Stage-1/MapJoin-n-11--.hashtable
2016-06-06 06:04:17 Upload 1 File to: file:/tmp/root/hive_2016-06-06_06-04-09_627_2210755106302628931/-local-10002/HashTable-Stage-1/MapJoin-n-11--.hashtable File size: 432
2016-06-06 06:04:17 End of local task; Time Taken: 1.451 sec.
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
16/06/06 06:04:18 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
16/06/06 06:04:18 INFO Configuration.deprecation: mapred.system.dir is deprecated. Instead, use mapreduce.jobtracker.system.dir
16/06/06 06:04:18 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 1; number of reducers: 0
2016-06-06 06:04:29,775 null map = 0%,  reduce = 0%
2016-06-06 06:04:38,832 null map = 100%,  reduce = 0%, Cumulative CPU 1.01 sec
2016-06-06 06:04:39,896 null map = 100%,  reduce = 0%, Cumulative CPU 1.01 sec
2016-06-06 06:04:40,995 null map = 100%,  reduce = 0%, Cumulative CPU 1.01 sec
MapReduce Total cumulative CPU time: 1 seconds 10 msec
Ended Job = job_1465200327080_0030
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
A   1   6
C   5   4
C   3   4
Time taken: 32.387 seconds
hive> 

簡單總結以下,mapjoin的使用場景:
1 關聯操作中有一張表非常小
2 不等值的連結操作

好了,有點累了,今天就先玩到這裡吧。如果你看到此文,想進一步學習或者和我溝通,加我微信公眾號:名字:五十年後
see you again!
這裡寫圖片描述