1. 程式人生 > >hive------ Group by、join、distinct等實現原理

hive------ Group by、join、distinct等實現原理

map etc 條件 val log in use ins none 操作

1. Hive 的 distribute by

Order by 能夠預期產生完全排序的結果,但是它是通過只用一個reduce來做到這點的。所以對於大規模的數據集它的效率非常低。在很多情況下,並不需要全局排序,此時可以換成Hive的非標準擴展sort by。Sort by為每個reducer產生一個排序文件。在有些情況下,你需要控制某個特定行應該到哪個reducer,通常是為了進行後續的聚集操作。Hive的distribute by 子句可以做這件事。

// 根據年份和氣溫對氣象數據進行排序,以確保所有具有相同年份的行最終都在一個reducer分區中

from record2

select year, temperature

distribute by year

sort by year asc, temperature desc;

2. Distinct 的實現

準備數據

語句

SELECT COUNT, COUNT(DISTINCT uid) FROM logs GROUP BY COUNT;
hive> SELECT * FROM logs;
OK
a	蘋果	3
a	橙子	3
a	燒雞	1
b	燒雞	3
 
hive> SELECT COUNT, COUNT(DISTINCT uid) FROM logs GROUP BY COUNT;

根據count分組,計算獨立用戶數。

計算過程

技術分享

1. 第一步先在mapper計算部分值,會以count和uid作為key,如果是distinct並且之前已經出現過,則忽略這條計算。第一步是以組合為key,第二步是以count為key.
2. ReduceSink是在mapper.close()時才執行的,在GroupByOperator.close()時,把結果輸出。註意這裏雖然key是count和uid,但是在reduce時分區是按count來的!
3. 第一步的distinct計算的值沒用,要留到reduce計算的才準確。這裏只是減少了key組合相同的行。不過如果是普通的count,後面是會合並起來的。
4. distinct通過比較lastInvoke判斷要不要+1(因為在reduce是排序過了的,所以判斷distict的字段變了沒有,如果沒變,則不+1)

Operator

技術分享

Explain

hive> explain select count, count(distinct uid) from logs group by count;
OK
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME logs))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL count)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL uid)))) (TOK_GROUPBY (TOK_TABLE_OR_COL count))))
 
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 is a root stage
 
STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        logs 
          TableScan //表掃描
            alias: logs
            Select Operator//列裁剪,取出uid,count字段就夠了
              expressions:
                    expr: count
                    type: int
                    expr: uid
                    type: string
              outputColumnNames: count, uid
              Group By Operator //先來map聚集
                aggregations:
                      expr: count(DISTINCT uid) //聚集表達式
                bucketGroup: false
                keys:
                      expr: count
                      type: int
                      expr: uid
                      type: string
                mode: hash //hash方式
                outputColumnNames: _col0, _col1, _col2
                Reduce Output Operator
                  key expressions: //輸出的鍵
                        expr: _col0 //count
                        type: int
                        expr: _col1 //uid
                        type: string
                  sort order: ++
                  Map-reduce partition columns: //這裏是按group by的字段分區的
                        expr: _col0 //這裏表示count
                        type: int
                  tag: -1
                  value expressions:
                        expr: _col2
                        type: bigint
      Reduce Operator Tree:
        Group By Operator //第二次聚集
          aggregations:
                expr: count(DISTINCT KEY._col1:0._col0) //uid:count
          bucketGroup: false
          keys:
                expr: KEY._col0 //count
                type: int
          mode: mergepartial //合並
          outputColumnNames: _col0, _col1
          Select Operator //列裁剪
            expressions:
                  expr: _col0
                  type: int
                  expr: _col1
                  type: bigint
            outputColumnNames: _col0, _col1
            File Output Operator //輸出結果到文件
              compressed: false
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
 
  Stage: Stage-0
    Fetch Operator
      limit: -1

3.
Group By 的實現
數據準備
SELECT uid, SUM(COUNT) FROM logs GROUP BY uid;
hive> SELECT * FROM logs;
a	蘋果	5
a	橙子	3
a      蘋果   2
b	燒雞	1
 
hive> SELECT uid, SUM(COUNT) FROM logs GROUP BY uid;
a	10
b	1

計算過程

技術分享
默認設置了hive.map.aggr=true,所以會在mapper端先group by一次,最後再把結果merge起來,為了減少reducer處理的數據量。註意看explain的mode是不一樣的。mapper是hash,reducer是mergepartial。如果把hive.map.aggr=false,那將groupby放到reducer才做,他的mode是complete.

Operator

技術分享

Explain

hive> explain SELECT uid, sum(count) FROM logs group by uid;
OK
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME logs))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL uid)) (TOK_SELEXPR (TOK_FUNCTION sum (TOK_TABLE_OR_COL count)))) (TOK_GROUPBY (TOK_TABLE_OR_COL uid))))
 
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 is a root stage
 
STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        logs 
          TableScan // 掃描表
            alias: logs
            Select Operator //選擇字段
              expressions:
                    expr: uid
                    type: string
                    expr: count
                    type: int
              outputColumnNames: uid, count
              Group By Operator //這裏是因為默認設置了hive.map.aggr=true,會在mapper先做一次聚合,減少reduce需要處理的數據
                aggregations:
                      expr: sum(count) //聚集函數
                bucketGroup: false
                keys: //鍵
                      expr: uid
                      type: string
                mode: hash //hash方式,processHashAggr()
                outputColumnNames: _col0, _col1
                Reduce Output Operator //輸出key,value給reducer
                  key expressions:
                        expr: _col0
                        type: string
                  sort order: +
                  Map-reduce partition columns:
                        expr: _col0
                        type: string
                  tag: -1
                  value expressions:
                        expr: _col1
                        type: bigint
      Reduce Operator Tree:
        Group By Operator
 
          aggregations:
                expr: sum(VALUE._col0)
//聚合
          bucketGroup: false
          keys:
                expr: KEY._col0
                type: string
          mode: mergepartial //合並值
          outputColumnNames: _col0, _col1
          Select Operator //選擇字段
            expressions:
                  expr: _col0
                  type: string
                  expr: _col1
                  type: bigint
            outputColumnNames: _col0, _col1
            File Output Operator //輸出到文件
              compressed: false
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
 
  Stage: Stage-0
    Fetch Operator
      limit: -1
4. join原理 

準備數據

語句
SELECT a.uid,a.name,b.age FROM logs a JOIN users b ON (a.uid=b.uid);
我們希望的結果是把users表join進來獲取age字段。

hive> SELECT * FROM logs;
OK
a	蘋果	5
a	橙子	3
b	燒雞	1
 
hive> SELECT * FROM users;
OK
a	23
b	21
 
hive> SELECT a.uid,a.name,b.age FROM logs a JOIN users b ON (a.uid=b.uid);
a	蘋果	23
a	橙子	23
b	燒雞	21

計算過程

技術分享

  1. key這裏後面的數字是tag,後面在reduce階段用來區分來自於那個表的數據。tag是附屬在key後面的。那為什麽會把a(0)和a(1)匯集在一起了呢,是因為對先對a求了hashcode,設在了HiveKey上,所以同一個key還是在一起的。
  2. Map階段只是拆分key和value。
  3. reduce階段主要看它是如何把它合並起來了,從圖上可以直觀的看到,其實就是把tag=1的內容,都加到tag=0的後面,就是這麽簡單。
  4. 代碼實現上,就是先臨時用個變量把值存儲起來在storage裏面, storage(0) = [{a, 蘋果}, {a, 橙子}] storage(1) = [{23}],當key變化(如a變為b)或全部結束時,會調用endGroup()方法,把內容合並起來。變成[{a,蘋果,23}, {a, 橙子,23}]

Operator

技術分享

Explain

hive> explain SELECT a.uid,a.name,b.age FROM logs a JOIN users b ON (a.uid=b.uid);
OK
 
//語法樹
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME logs) a) (TOK_TABREF (TOK_TABNAME users) b) (= (. (TOK_TABLE_OR_COL a) uid) (. (TOK_TABLE_OR_COL b) uid)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) uid)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) name)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL b) age)))))
 
//階段
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 is a root stage
 
STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree: //mapper階段
        a 
          TableScan //掃描表, 就只是一行一行的傳遞下去而已
            alias: a
            Reduce Output Operator //輸出給reduce的內容
              key expressions: // key啦,這裏的key是uid,就是我們寫在ON子句那個,你可以試試加多幾個條件
                    expr: uid
                    type: string
              sort order: + //排序
              Map-reduce partition columns://分區字段,貌似是和key一樣的
                    expr: uid
                    type: string
              tag: 0 //用來區分這個key是來自哪個表的
              value expressions: //reduce用到的value字段
                    expr: uid
                    type: string
                    expr: name
                    type: string
        b 
          TableScan //掃描表, 就只是一行一行的傳遞下去而已
            alias: b
            Reduce Output Operator //輸出給reduce的內容
              key expressions: //key
                    expr: uid
                    type: string
              sort order: +
              Map-reduce partition columns: //分區字段
                    expr: uid
                    type: string
              tag: 1 //用來區分這個key是來自哪個表的
              value expressions: //值
                    expr: age
                    type: int
      Reduce Operator Tree: // reduce階段
        Join Operator // JOIN的Operator
          condition map:
               Inner Join 0 to 1 // 內連接0和1表
          condition expressions: // 第0個表有兩個字段,分別是uid和name, 第1個表有一個字段age
 {VALUE._col0} {VALUE._col1}
 {VALUE._col1}
          handleSkewJoin: false //是否處理傾斜join,如果是,會分為兩個MR任務
          outputColumnNames: _col0, _col1, _col6 //輸出字段
          Select Operator //列裁剪(我們sql寫的select字段)
            expressions:
                  expr: _col0
                  type: string
                  expr: _col1
                  type: string
                  expr: _col6
                  type: int
            outputColumnNames: _col0, _col1, _col2
            File Output Operator //把結果輸出到文件
              compressed: false
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
 
  Stage: Stage-0
    Fetch Operator
      limit: -1

可以看到裏面都是一個個Operator順序的執行下來

 

hive------ Group by、join、distinct等實現原理