1. 程式人生 > >09-Hive查詢操作Distributed by 和sort by

09-Hive查詢操作Distributed by 和sort by

宣告:
哈嘍,大家好,我是謝老師。
今天來學習的是Distributed by 和sort by語法。

首先還是要來回顧一下上一講所學的join和mapjoin操作。mapjoin會比join快很多,資料量很小的時候優勢不明顯,資料量很大的時候就快很多了。mapjoin其實就是join的優化。很多人都說Hive語法很簡單,跟寫sql語句差不多,但是hive優化就比較難了,這一點我也感覺到了。
那設定mapjoin的方式有哪些呢?有兩種。我們重新來做一下實驗,當做複習。
1 mapjoin的第一種方式:

set hive.auto.convert.join=true;

這裡有兩個表:

Time taken: 0.311 seconds
hive> select * from city;                                      
OK
20130829234535  china   henan   nanyang
20130829234536  china   henan   xinyang
20130829234537  china   beijing beijing
20130829234538  china   jiang   susuzhou
20130829234539  china   hubei   wuhan
20130829234540
china sandong weizhi 20130829234541 china hebei shijiazhuang 20130829234542 china neimeng eeduosi 20130829234543 china beijing beijing 20130829234544 china jilin jilin Time taken: 0.169 seconds hive> select * from province; OK 20130829234535 china henan nanyang 20130829234536 china henan xinyang 20130829234537
china beijing beijing 20130829234538 china jiang susuzhou 20130829234539 china hubei wuhan 20130829234540 china sandong weizhi 20130829234541 china hebei shijiazhuang 20130829234542 china neimeng eeduosi 20130829234543 china beijing beijing 20130829234544 china jilin jilin Time taken: 0.131 seconds hive>
set hive.auto.convert.join=true;
hive> select m.city,n.province
    > from
    > (select province,city from city)m
    > join
    > (select province from province)n
    > on m.province=n.province;
Total MapReduce jobs = 3
Ended Job = 28094960, job is filtered out (removed at runtime).
Ended Job = -243803491, job is filtered out (removed at runtime).
2016-06-06 06:30:11 Starting to launch local task to process map join;  maximum memory = 518979584
2016-06-06 06:30:12 Processing rows:    8   Hashtable size: 8   Memory usage:   5107040 rate:   0.01
2016-06-06 06:30:12 Dump the hashtable into file: file:/tmp/root/hive_2016-06-06_06-30-06_575_9058687622373010883/-local-10002/HashTable-Stage-3/MapJoin-mapfile21--.hashtable
2016-06-06 06:30:12 Upload 1 File to: file:/tmp/root/hive_2016-06-06_06-30-06_575_9058687622373010883/-local-10002/HashTable-Stage-3/MapJoin-mapfile21--.hashtable File size: 752
2016-06-06 06:30:12 End of local task; Time Taken: 1.476 sec.
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 2 out of 3
Number of reduce tasks is set to 0 since there's no reduce operator
16/06/06 06:30:14 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
16/06/06 06:30:14 INFO Configuration.deprecation: mapred.system.dir is deprecated. Instead, use mapreduce.jobtracker.system.dir
16/06/06 06:30:14 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 1; number of reducers: 0
2016-06-06 06:30:26,141 null map = 0%,  reduce = 0%
2016-06-06 06:30:40,209 null map = 100%,  reduce = 0%, Cumulative CPU 0.92 sec
2016-06-06 06:30:41,299 null map = 100%,  reduce = 0%, Cumulative CPU 0.92 sec
2016-06-06 06:30:42,398 null map = 100%,  reduce = 0%, Cumulative CPU 0.92 sec
MapReduce Total cumulative CPU time: 920 msec
Ended Job = job_1465200327080_0033
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
nanyang henan
nanyang henan
xinyang henan
xinyang henan
beijing beijing
beijing beijing
susuzhou    jiang
wuhan   hubei
weizhi  sandong
shijiazhuang    hebei
eeduosi neimeng
beijing beijing
beijing beijing
jilin   jilin
Time taken: 36.849 seconds
hive> 

第二種方式:手動設定

hive> select /*+mapjoin(n)*/ m.city,n.province
    > from
    > (select province,city from city)m
    > join
    > (select province from province)n
    > on m.province=n.province;
Total MapReduce jobs = 1
2016-06-06 06:32:45 End of local task; Time Taken: 1.648 sec.
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
Mapred Local Task Succeeded . Convert the Join into MapJoin
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
16/06/06 06:32:47 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
16/06/06 06:32:47 INFO Configuration.deprecation: mapred.system.dir is deprecated. Instead, use mapreduce.jobtracker.system.dir
16/06/06 06:32:47 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 1; number of reducers: 0
2016-06-06 06:33:04,292 null map = 0%,  reduce = 0%
2016-06-06 06:33:16,471 null map = 100%,  reduce = 0%, Cumulative CPU 0.94 sec
2016-06-06 06:33:17,592 null map = 100%,  reduce = 0%, Cumulative CPU 0.94 sec
2016-06-06 06:33:18,711 null map = 100%,  reduce = 0%, Cumulative CPU 0.94 sec
MapReduce Total cumulative CPU time: 940 msec
Ended Job = job_1465200327080_0034
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
nanyang henan
nanyang henan
xinyang henan
xinyang henan
beijing beijing
beijing beijing
susuzhou    jiang
wuhan   hubei
weizhi  sandong
shijiazhuang    hebei
eeduosi neimeng
beijing beijing
beijing beijing
jilin   jilin
Time taken: 47.668 seconds
hive> 

有木有發現mapjoin的reducers都是0?

2 **接下來學習的是:
**Dirstribute分散資料
distribute by col按照col列把資料分散到不同的reduce
sorted排序
sort by col2 按照col列把資料排序

hive> select col,col2 from M
    > distribute by col
    > sort by col asc,col2 desc;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapred.reduce.tasks=<number>
16/06/06 06:38:36 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
16/06/06 06:38:36 INFO Configuration.deprecation: mapred.system.dir is deprecated. Instead, use mapreduce.jobtracker.system.dir
16/06/06 06:38:36 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 1; number of reducers: 1
2016-06-06 06:38:49,294 null map = 0%,  reduce = 0%
2016-06-06 06:39:04,441 null map = 100%,  reduce = 0%, Cumulative CPU 1.36 sec
2016-06-06 06:39:05,535 null map = 100%,  reduce = 0%, Cumulative CPU 1.36 sec
2016-06-06 06:39:06,628 null map = 100%,  reduce = 0%, Cumulative CPU 1.36 sec
2016-06-06 06:39:07,744 null map = 100%,  reduce = 0%, Cumulative CPU 1.36 sec
2016-06-06 06:39:08,865 null map = 100%,  reduce = 0%, Cumulative CPU 1.36 sec
2016-06-06 06:39:09,929 null map = 100%,  reduce = 0%, Cumulative CPU 1.36 sec
2016-06-06 06:39:10,998 null map = 100%,  reduce = 0%, Cumulative CPU 1.36 sec
2016-06-06 06:39:12,073 null map = 100%,  reduce = 0%, Cumulative CPU 1.36 sec
2016-06-06 06:39:13,136 null map = 100%,  reduce = 0%, Cumulative CPU 1.36 sec
2016-06-06 06:39:14,226 null map = 100%,  reduce = 0%, Cumulative CPU 1.36 sec
2016-06-06 06:39:15,291 null map = 100%,  reduce = 0%, Cumulative CPU 1.36 sec
2016-06-06 06:39:16,376 null map = 100%,  reduce = 0%, Cumulative CPU 1.36 sec
2016-06-06 06:39:17,509 null map = 100%,  reduce = 100%, Cumulative CPU 2.13 sec
2016-06-06 06:39:18,605 null map = 100%,  reduce = 100%, Cumulative CPU 2.13 sec
2016-06-06 06:39:19,687 null map = 100%,  reduce = 100%, Cumulative CPU 2.13 sec
MapReduce Total cumulative CPU time: 2 seconds 130 msec
Ended Job = job_1465200327080_0035
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
A   1
B   2
C   5
C   3
Time taken: 52.716 seconds
hive> 

兩者結合出現,確保每個reduce的輸出都是有序的

3 【對比】
3.1distribute by 和group by
都是按照key值劃分資料
都是使用reduce操作
唯一不同,distribute by 只是單純的分散資料,而group by 把相同key的資料聚集到一起,後續必須是聚合操作
3.2order by 與sort by
order by 是全域性排序
sort by 只是確保每個reduce上面輸出的資料有序,如果只有一個reduce時,和order by作用一樣

4 應用場景
map輸出的檔案大小不均
reduce輸出檔案大小不均
小檔案過多
檔案超大

5 ****cluseter by**把有相同值得資料聚集到一起,並排序**
效果:
cluster by col
distribute by col order by col

hive> desc city;
OK
time    string  
country string  
province    string  
city    string  
Time taken: 2.9 seconds
hive> 

建立表city3:

hive> create table city3(
    > time string,
    > country string,
    > province string,
    > city string
    > )
    > row format delimited fields terminated by '\t' 
    > lines terminated by '\n'
    > stored as textfile;
OK
Time taken: 0.731 seconds
hive> 

配置引數:
hive> set mapred.reduce.tasks=5; 結果會出現五個檔案

hive>insert overwrite table city3
select time,
country,
province,
city
from city 
distribute by province;

以province為元素將表格打散,最後會輸出五個檔案。有圖作證。
這裡寫圖片描述

6 把小檔案合成一個大檔案

set mapred.reduce.tasks=1; 這樣子就把檔案合成一個了
insert overwrite table city1 partition(dt=’20160519’)
select time,
country,
province,
city
from province
distribute by country;

這裡寫圖片描述
這裡寫圖片描述
這裡寫圖片描述

7 union all 操作
多個表的資料合併成一個表,hive不支援union,但是支援union all
樣例:

select col
from(
select a as col from t1)
union all
select b as col from t2
)tmp

**注意一下:要確保欄位型別和別名都要一樣。
union執行是比較快的,因為沒有reduce操作,只有map操作。**
要求:
欄位名字一樣
欄位型別一樣
欄位個數一樣
欄位不能有別名
如果需要從合併之後的表中查詢

union是不需要別名的,和join不一樣
實驗一下吧:

hive> select * from(
    > select col,col2  from m
    > union all
    > select col,col3 as col2 from n
    > )tmp
    > ;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
16/06/06 06:54:23 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
16/06/06 06:54:23 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
16/06/06 06:54:23 INFO Configuration.deprecation: mapred.system.dir is deprecated. Instead, use mapreduce.jobtracker.system.dir
16/06/06 06:54:23 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 1; number of reducers: 0
2016-06-06 06:54:35,705 null map = 0%,  reduce = 0%
2016-06-06 06:54:47,507 null map = 100%,  reduce = 0%, Cumulative CPU 0.98 sec
2016-06-06 06:54:48,615 null map = 100%,  reduce = 0%, Cumulative CPU 0.98 sec
2016-06-06 06:54:49,703 null map = 100%,  reduce = 0%, Cumulative CPU 0.98 sec
2016-06-06 06:54:50,774 null map = 100%,  reduce = 0%, Cumulative CPU 0.98 sec
MapReduce Total cumulative CPU time: 980 msec
Ended Job = job_1465200327080_0038
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin
OK
C   4
D   5
A   6
A   1
C   5
B   2
C   3
Time taken: 31.35 seconds
hive> 

好了,有點累了,今天就先玩到這裡吧。如果你看到此文,想進一步學習或者和我溝通,加我微信公眾號:名字:五十年後
see you again! !
一萬小時的訓練