1. 程式人生 > >hive中使用自定義函式(UDF)實現分析函式row_number的功能

hive中使用自定義函式(UDF)實現分析函式row_number的功能

1. hive0.10及之前的版本沒有row_number這個函式,假設我們現在出現如下業務場景,現在我們在hdfs上有個log日誌檔案,為了方便敘述,該檔案只有2個欄位,第一個是使用者的id,第二個是當天登入的timestamp,現在我們需要求每個使用者最早登入的那條記錄(注意不是僅僅只要那個登入的timestamp),可以方便計算NewUser。

2. 我們的資料是這樣的:

1,32
2,46
3,312
4,4643
5,54
6,456
7,437
8,5347
9,47
1,466
2,546
3,4
4,886
5,546
6,57
7,235
8,765
9,634
這裡是假設的資料。

3.我們可以用hive建立一張表,其中第一個欄位是id string, 第二個是login_time bigint,假設我們的表名是log。

4. 這樣的場景可以用很多方法解決,但是我們可以用RowNumber函式,在0.11及以上的版本才整合到了hive中,但是我們公司用的是CDH4.5.0,hive才到0.10,所以只能自己寫個這樣的函式,具體的程式碼如下:

  1. import org.apache.hadoop.hive.ql.exec.UDF;  
  2. publicclass RowN extends UDF {  
  3.     privatestaticint MAX_VALUE = 50;  
  4.     privatestatic String comparedColumn[] = new String[MAX_VALUE];  
  5.     privatestatic
    int rowNum = 1;  
  6.     publicint evaluate(Object... args) {  
  7.         String columnValue[] = new String[args.length];  
  8.         for (int i = 0; i < args.length; i++) {  
  9.             columnValue[i] = args[i].toString();  
  10.         }  
  11.         if (rowNum == 1) {  
  12.             for (int i = 0; i < columnValue.length; i++)  
  13.                 comparedColumn[i] = columnValue[i];  
  14.         }  
  15.         for (int i = 0; i < columnValue.length; i++) {  
  16.             if (!comparedColumn[i].equals(columnValue[i])) {  
  17.                 for (int j = 0; j < columnValue.length; j++) {  
  18.                     comparedColumn[j] = columnValue[j];  
  19.                 }  
  20.                 rowNum = 1;  
  21.                 return rowNum++;  
  22.             }  
  23.         }  
  24.         return rowNum++;  
  25.     }  
  26. }  
  27. </span>  

5. 稍微解釋下這個UDF,首先我們的UDF函式輸入是多個列的值,傳入多個值表示用多個值是否相同來打序號,對於我們的場景只要1個(就是id),函式row_number(),必須帶一個或者多個列引數,如ROW_NUMBER(col1, ....),它的作用是按指定的列進行分組生成行序列。在ROW_NUMBER(a,b) 時,若兩條記錄的a,b列相同,則行序列+1,否則重新計數。

6. 接下去關鍵的就是怎麼取使用這個函數了,我們必須保證查出來的資料是有序的,這樣才好加序號,而且要根據某個欄位排序,但是如果資料量大或者我們自己設定了多個reducer咋辦,這樣的話我們就想到了使用distribute by和sort by的配合使用,可以使key相同的資料進入同一個reducer,這樣就好辦了,那麼我們的hql語句其實就是一句話:

  1. createtemporaryfunction RowNumber as'xxx.xxx.xxx.udf.RowNumber';  
  2. select id, login_time from (select * from log distribute by id sort by id, login_time asc) tmp where RowNumber(id)=1;  
結果為:

132
246
34
4886
554
625
775
8534
947

注:

1. 如果對distribute by不熟悉可以看另一個我的部落格,有具體的解釋:http://blog.csdn.net/jthink_/article/details/38903775

2. 這個函式最關鍵的部分就是得先有序,所以加序號前必須保證資料有序