08-Hive高階查詢join
宣告:我的朋友,這一篇不要轉載,因為你可以直接在這裡看。
大家好,我們今天來學習Hive高階查詢join語法。
你有沒有期待把Hive學完整?我打算寫完整,只要我知道。我寫的都是比較接地氣的,因為高大上的我知道的少。砸門一起加油進步吧!
1 我們先回顧一下上一節課說的。上一節我們知道了:order by是一個全域性的操作,groupby是一個聚合的操作。避免資料傾斜的方法之一是設定引數:hive.groupby.skewindata=true;
當有資料傾斜的時候進行負載均衡,當選項設定為 true,生成的查詢計劃會有兩個 MR Job。如果你想學多一點關於資料傾斜的知識,分享一個連結:
2 今天學習【join】
表連線
2.1兩個表m,n之間按照on條件連線,m中的一條記錄和n中的一條記錄組成一條新紀錄 。
2.2 join等值連線,只有某個值在m和n中同時存在時
2.3 left outer join左外連線,左邊表中的值無論是否是在b中存在時,都輸出,右邊表中的值只有在左邊表中存在時才輸出。
2.4 right outer join 和left outer join相反。
2.5left semi join 類似exits。
2.6 mapjoin 在map端完成join操作,不需要用ruduce,甚於記憶體做join,屬於優化操作。
3 分別建立表m和表n,具有的欄位如下:
clo col2 m
A 1
C 5
B 2
C 3
col col3 n
C 4
D 5
A 6
在hive控制檯上執行以下語句:
create table m(
col string,
col2 string
)
row format delimited fields terminated by '\t'
lines terminated by '\n'
stored as textfile;
create table n(
col string,
col3 string
)
row format delimited fields terminated by '\t'
lines terminated by '\n'
stored as textfile;
load data local inpath '/usr/host/m' into table m;
load data local inpath '/usr/host/n' into table n;
以上的語句都看得懂吧,其實就是建立表和載入資料。
hive> select * from n;
OK
C 4
D 5
A 6
Time taken: 0.415 seconds
hive> select * from n;
OK
C 4
D 5
A 6
Time taken: 0.288 seconds
hive> select * from m;
OK
A 1
C 5
B 2
C 3
Time taken: 0.317 seconds
hive>
接下來我們開始join操作,語句如下:
hive> set hive.auto.convert.join=true;
hive> select s.col,s.col2,t.col3
> from
> (select col,col2 from m)s
> join
> (select col,col3 from n)t
> on s.col=t.col;
java.lang.InstantiationException: org.antlr.runtime.CommonToken
Continuing ...
java.lang.RuntimeException: failed to evaluate: <unbound>=Class.new();
Continuing ...
java.lang.InstantiationException: org.antlr.runtime.CommonToken
Continuing ...
java.lang.RuntimeException: failed to evaluate: <unbound>=Class.new();
Continuing ...
Total MapReduce jobs = 3
Ended Job = -1393545778, job is filtered out (removed at runtime).
Ended Job = -1334217954, job is filtered out (removed at runtime).
2016-06-06 05:25:03 Starting to launch local task to process map join; maximum memory = 518979584
2016-06-06 05:25:05 Processing rows: 3 Hashtable size: 3 Memory usage: 5078208 rate: 0.01
2016-06-06 05:25:05 Dump the hashtable into file: file:/tmp/root/hive_2016-06-06_05-24-57_982_7288788097348023892/-local-10002/HashTable-Stage-3/MapJoin-mapfile01--.hashtable
2016-06-06 05:25:05 Upload 1 File to: file:/tmp/root/hive_2016-06-06_05-24-57_982_7288788097348023892/-local-10002/HashTable-Stage-3/MapJoin-mapfile01--.hashtable File size: 432
2016-06-06 05:25:05 End of local task; Time Taken: 1.587 sec.
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 2 out of 3
Number of reduce tasks is set to 0 since there's no reduce operator
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 1; number of reducers: 0
2016-06-06 05:25:19,055 null map = 0%, reduce = 0%
2016-06-06 05:25:28,989 null map = 100%, reduce = 0%, Cumulative CPU 0.97 sec
2016-06-06 05:25:30,087 null map = 100%, reduce = 0%, Cumulative CPU 0.97 sec
2016-06-06 05:25:31,173 null map = 100%, reduce = 0%, Cumulative CPU 0.97 sec
MapReduce Total cumulative CPU time: 970 msec
Ended Job = job_1465200327080_0019
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
A 1 6
C 5 4
C 3 4
Time taken: 35.081 seconds
hive>
join等值連線,只有某個值在m和n中同時存在時才輸出。所以輸出的結果就是如上所示了。
2 左外連線
hive> set hive.optimize.skewjoin=true;
hive> set hive.auto.convert.join=true;
hive> select s.col,s.col2,t.col3
> from
> (select col,col2 from m)s
> left outer join
> (select col,col3 from n)t
> on s.col=t.col;
Total MapReduce jobs = 2
Ended Job = 1311401655, job is filtered out (removed at runtime).
16/06/06 05:57:17 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
16/06/06 05:57:17 INFO Configuration.deprecation: mapred.system.dir is deprecated. Instead, use mapreduce.jobtracker.system.dir
2016-06-06 05:57:18 Starting to launch local task to process map join; maximum memory = 518979584
2016-06-06 05:57:20 Processing rows: 3 Hashtable size: 3 Memory usage: 5081104 rate: 0.01
2016-06-06 05:57:20 Dump the hashtable into file: file:/tmp/root/hive_2016-06-06_05-57-12_989_8198446239599600254/-local-10002/HashTable-Stage-3/MapJoin-mapfile111--.hashtable
2016-06-06 05:57:20 Upload 1 File to: file:/tmp/root/hive_2016-06-06_05-57-12_989_8198446239599600254/-local-10002/HashTable-Stage-3/MapJoin-mapfile111--.hashtable File size: 435
2016-06-06 05:57:20 End of local task; Time Taken: 1.401 sec.
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 2 out of 2
Number of reduce tasks is set to 0 since there's no reduce operator
16/06/06 05:57:21 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
16/06/06 05:57:21 INFO Configuration.deprecation: mapred.system.dir is deprecated. Instead, use mapreduce.jobtracker.system.dir
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 1; number of reducers: 0
2016-06-06 05:57:32,269 null map = 0%, reduce = 0%
2016-06-06 05:57:41,293 null map = 100%, reduce = 0%, Cumulative CPU 1.04 sec
2016-06-06 05:57:42,388 null map = 100%, reduce = 0%, Cumulative CPU 1.04 sec
MapReduce Total cumulative CPU time: 1 seconds 40 msec
Ended Job = job_1465200327080_0027
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
A 1 6
C 5 4
B 2 NULL
C 3 4
Time taken: 30.428 seconds
hive>
3 右外連線
hive> set hive.auto.convert.join=true;
hive> select s.col,s.col2,t.col3
> from
> (select col,col2 from m)s
> right outer join
> (select col,col3 from n)t
> on s.col=t.col;
java.lang.InstantiationException: org.antlr.runtime.CommonToken
Continuing ...
java.lang.RuntimeException: failed to evaluate: <unbound>=Class.new();
Continuing ...
java.lang.InstantiationException: org.antlr.runtime.CommonToken
Continuing ...
java.lang.RuntimeException: failed to evaluate: <unbound>=Class.new();
Continuing ...
Total MapReduce jobs = 2
Ended Job = 84151671, job is filtered out (removed at runtime).
16/06/06 06:01:40 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
16/06/06 06:01:40 INFO Configuration.deprecation: mapred.system.dir is deprecated. Instead, use mapreduce.jobtracker.system.dir
16/06/06 06:01:40 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
2016-06-06 06:01:41 Starting to launch local task to process map join; maximum memory = 518979584
2016-06-06 06:01:43 Processing rows: 3 Hashtable size: 3 Memory usage: 5105544 rate: 0.01
2016-06-06 06:01:43 Dump the hashtable into file: file:/tmp/root/hive_2016-06-06_06-01-36_820_8095318663586481472/-local-10002/HashTable-Stage-3/MapJoin-mapfile00--.hashtable
2016-06-06 06:01:43 Upload 1 File to: file:/tmp/root/hive_2016-06-06_06-01-36_820_8095318663586481472/-local-10002/HashTable-Stage-3/MapJoin-mapfile00--.hashtable File size: 451
2016-06-06 06:01:43 End of local task; Time Taken: 1.559 sec.
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 2 out of 2
Number of reduce tasks is set to 0 since there's no reduce operator
16/06/06 06:01:44 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
16/06/06 06:01:44 INFO Configuration.deprecation: mapred.system.dir is deprecated. Instead, use mapreduce.jobtracker.system.dir
16/06/06 06:01:44 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 1; number of reducers: 0
2016-06-06 06:01:56,531 null map = 0%, reduce = 0%
2016-06-06 06:02:05,414 null map = 100%, reduce = 0%, Cumulative CPU 0.71 sec
2016-06-06 06:02:06,504 null map = 100%, reduce = 0%, Cumulative CPU 0.71 sec
MapReduce Total cumulative CPU time: 710 msec
Ended Job = job_1465200327080_0029
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
C 5 4
C 3 4
NULL NULL 5
A 1 6
Time taken: 31.606 seconds
hive>
為什麼輸出的結果會是這樣,下面這幅圖好好琢磨一下吧。
4 資料輸出對比
5 優化引數設定
如果你發現有問題,執行不了,可以在執行語句前設定以下引數嘗試一下:
set hive.optimize.skewjoin=true;
這一個引數設定的意思是:類似於之前的groupby操作的時候設定優化引數避免資料傾斜問題,這一個也是具有類似意義。
6 mapjoin
mapjoin其實就是join的優化操作。
mapjoin(map side join)
在map端把小表載入到記憶體中,然後讀取大表,和記憶體中的小表完成連線操作
其中使用了分散式快取技術
mapjoin的原理:
優點:
不消耗叢集的reduce資源(reduce相對緊缺)
減少了reduce操作,加快程式執行
降低網路
缺點:
佔用部分記憶體,所以載入到記憶體中的表不能過大,因為每個計算節點都會載入一次
生成較多的小檔案
設定成mapjoin有兩種方式:
配置以下引數,是hive自動根據sql,選擇使用common join或者map join
第一種方式:
set hive.auto.convert.join=true;
set hive.mapjoin.smalltable.filesize,預設值是25mb
第二種方式,手動指定,句式如下:
select /*+mapjoin(n)*/ m.col,m.col2,n.col3 from m
join n
on m.col=n.col;
hive> select /*+mapjoin(n)*/ m.col,m.col2,n.col3 from m
> join n
> on m.col=n.col;
Total MapReduce jobs = 1
16/06/06 06:04:14 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
16/06/06 06:04:14 INFO Configuration.deprecation: mapred.system.dir is deprecated. Instead, use mapreduce.jobtracker.system.dir
16/06/06 06:04:14 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
2016-06-06 06:04:16 Starting to launch local task to process map join; maximum memory = 518979584
2016-06-06 06:04:17 Processing rows: 3 Hashtable size: 3 Memory usage: 5062584 rate: 0.01
2016-06-06 06:04:17 Dump the hashtable into file: file:/tmp/root/hive_2016-06-06_06-04-09_627_2210755106302628931/-local-10002/HashTable-Stage-1/MapJoin-n-11--.hashtable
2016-06-06 06:04:17 Upload 1 File to: file:/tmp/root/hive_2016-06-06_06-04-09_627_2210755106302628931/-local-10002/HashTable-Stage-1/MapJoin-n-11--.hashtable File size: 432
2016-06-06 06:04:17 End of local task; Time Taken: 1.451 sec.
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
16/06/06 06:04:18 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
16/06/06 06:04:18 INFO Configuration.deprecation: mapred.system.dir is deprecated. Instead, use mapreduce.jobtracker.system.dir
16/06/06 06:04:18 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 1; number of reducers: 0
2016-06-06 06:04:29,775 null map = 0%, reduce = 0%
2016-06-06 06:04:38,832 null map = 100%, reduce = 0%, Cumulative CPU 1.01 sec
2016-06-06 06:04:39,896 null map = 100%, reduce = 0%, Cumulative CPU 1.01 sec
2016-06-06 06:04:40,995 null map = 100%, reduce = 0%, Cumulative CPU 1.01 sec
MapReduce Total cumulative CPU time: 1 seconds 10 msec
Ended Job = job_1465200327080_0030
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
A 1 6
C 5 4
C 3 4
Time taken: 32.387 seconds
hive>
簡單總結以下,mapjoin的使用場景:
1 關聯操作中有一張表非常小
2 不等值的連結操作
好了,有點累了,今天就先玩到這裡吧。如果你看到此文,想進一步學習或者和我溝通,加我微信公眾號:名字:五十年後
see you again!