Hive自定義UDAF詳解
阿新 • • 發佈:2019-01-22
遇到一個Hive需求:有A、B、C三列,按A列進行聚合,求出C列聚合後的最小值和最大值各自對應的B列值。這個需求用hql和內建函式也可完成,但是比較繁瑣,會解析成幾個MR進行執行,如果自定義UDAF便可只利用一個MR完成任務。
所用Hive為0.13.1版本。UDAF有兩種,第一種是比較簡單的形式,利用抽象類UDAF和UDAFEvaluator,暫不做討論。主要說一下第二種形式,利用介面GenericUDAFResolver2(或者抽象類AbstractGenericUDAFResolver)和抽象類GenericUDAFEvaluator。
這裡用AbstractGenericUDAFResolver做說明。
Java程式碼
可以看到,該抽象類有兩個方法,其中一個已經被棄用,所以只需要實現引數型別為TypeInfo的getEvaluator方法即可。
該方法其實相當於一個工廠,TypeInfo表示在使用時傳入該UDAF的引數的型別。該方法主要做的工作有:
返回的物件型別為GenericUDAFEvaluator,這是一個抽象類:
Java程式碼
說明上述方法的之前,需要提一個GenericUDAFEvaluator的內部列舉類Mode
Java程式碼
可以看到,UDAF將任務分成了幾種型別,PARTIAL1相當於MR程式的map階段,負責迭代處理記錄並返回該階段的中間結果。PARTIAL2相當於Combiner,對map階段的結果進行一次聚合。FINAL是reduce階段,進行整體聚合以及返回最終結果。COMPLETE有點特殊,是一個沒有reduce階段的map過程,所以在進行記錄迭代之後,直接返回最終結果。
再來看GenericUDAFEvaluator中的各方法
Java程式碼
初始化方法,在Mode的每一個階段啟動時會執行init方法。該方法有兩個引數,第一個引數是Mode,可以根據此引數判斷當前執行的是哪個階段,進行該階段相應的初始化工作。ObjectInspector是一個抽象的型別描述,例如:當引數型別是原生型別時,可以轉化為PrimitiveObjectInspector,除此之外還有StructObjectInspector等等。ObjectInspector只是描述型別,並不儲存實際資料。後面的具體例子中會有一些使用說明。
ObjectInspector[]的長度不是固定的,要看當前是處於哪個階段。如果是PARTIAL1,那麼與使用時傳入該UDAF的引數個數一致;如果是FINAL階段,長度就是1了,因為map階段返回的結果只有一個物件。
Java程式碼
AggregationBuffer是一個標識介面,沒有任何需要實現的方法。實現該介面的類被用於暫存中間結果。reset是為了重置AggregationBuffer,但是在實際應用場景中沒有發現單獨呼叫該方法進行重置,有可能是聚合key的資料量還不夠大,在後面會再說一下這個問題。
Java程式碼
iterate方法存在於MR的M階段,用於處理每一條輸入記錄。Object[]作為輸入傳入UFAF,AggregationBuffer作為中間快取暫存結果。需要注意的是,每次呼叫iterate傳入的AggregationBuffer並不一定是同一個物件。Hive呼叫UDAF的時候會用一個Map來管理AggregationBuffer,Map的key即為需要聚合的key。就通過實際執行過程來看,在每一次iterate呼叫之前,會根據聚合key從Map中查詢對應的AggregationBuffer,若能找到則直接返回AggregationBuffer物件,找不到則呼叫getNewAggregationBuffer方法新建並插入Map中並返回結果。
terminatePartial方法在iterate處理完所有輸入後呼叫,用於返回初步的聚合結果。
merge方法存在於MR的R階段(也同樣存在於Combine階段),用於最後的聚合。Object型別的partial引數與terminatePartial返回值一致,AggregationBuffer引數與上述一致。
terminate方法在merge方法執行完畢之後呼叫,用於進行最後的處理,並返回最後結果。
像上面提到的Mode一樣,這些方法並不一定都會被呼叫,與Hive解析成的MR程式型別有關。例如解析後的MR程式只有M階段,則只會呼叫iterate和terminate。實際使用過程中,由於聚合key資料量有限,記憶體可以承載,所以沒有發現reset單獨呼叫的情況。每次遇到一個不同的key,則新建一個AggregationBuffer,沒有看原始碼,不知道當聚合key很大的時候,是否會呼叫reset進行物件重用。
所用Hive為0.13.1版本。UDAF有兩種,第一種是比較簡單的形式,利用抽象類UDAF和UDAFEvaluator,暫不做討論。主要說一下第二種形式,利用介面GenericUDAFResolver2(或者抽象類AbstractGenericUDAFResolver)和抽象類GenericUDAFEvaluator。
這裡用AbstractGenericUDAFResolver做說明。
Java程式碼
- public abstract class AbstractGenericUDAFResolver implements GenericUDAFResolver2 {
- @SuppressWarnings("deprecation")
- @Override
- public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
- throws SemanticException {
- if (info.isAllColumns()) {
- throw new SemanticException(
- "The specified syntax for UDAF invocation is invalid.");
- }
- return
- }
- @Override
- public GenericUDAFEvaluator getEvaluator(TypeInfo[] info)
- throws SemanticException {
- throw new SemanticException(
- "This UDAF does not support the deprecated getEvaluator() method.");
- }
- }
可以看到,該抽象類有兩個方法,其中一個已經被棄用,所以只需要實現引數型別為TypeInfo的getEvaluator方法即可。
該方法其實相當於一個工廠,TypeInfo表示在使用時傳入該UDAF的引數的型別。該方法主要做的工作有:
- 檢查引數長度和型別
- 根據引數返回對應的實際處理物件
返回的物件型別為GenericUDAFEvaluator,這是一個抽象類:
Java程式碼
- public abstract class GenericUDAFEvaluator implements Closeable {
- ......
- public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
- // This function should be overriden in every sub class
- // And the sub class should call super.init(m, parameters) to get mode set.
- mode = m;
- return null;
- }
- public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;
- public abstract void reset(AggregationBuffer agg) throws HiveException;
- public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
- public abstract Object terminatePartial(AggregationBuffer agg) throws HiveException;
- public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException;
- public abstract Object terminate(AggregationBuffer agg) throws HiveException;
- ......
- }
說明上述方法的之前,需要提一個GenericUDAFEvaluator的內部列舉類Mode
Java程式碼
- public static enum Mode {
- /**
- * 相當於map階段,呼叫iterate()和terminatePartial()
- */
- PARTIAL1,
- /**
- * 相當於combiner階段,呼叫merge()和terminatePartial()
- */
- PARTIAL2,
- /**
- * 相當於reduce階段呼叫merge()和terminate()
- */
- FINAL,
- /**
- * COMPLETE: 相當於沒有reduce階段map,呼叫iterate()和terminate()
- */
- COMPLETE
- };
可以看到,UDAF將任務分成了幾種型別,PARTIAL1相當於MR程式的map階段,負責迭代處理記錄並返回該階段的中間結果。PARTIAL2相當於Combiner,對map階段的結果進行一次聚合。FINAL是reduce階段,進行整體聚合以及返回最終結果。COMPLETE有點特殊,是一個沒有reduce階段的map過程,所以在進行記錄迭代之後,直接返回最終結果。
再來看GenericUDAFEvaluator中的各方法
Java程式碼
- public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {...}
初始化方法,在Mode的每一個階段啟動時會執行init方法。該方法有兩個引數,第一個引數是Mode,可以根據此引數判斷當前執行的是哪個階段,進行該階段相應的初始化工作。ObjectInspector是一個抽象的型別描述,例如:當引數型別是原生型別時,可以轉化為PrimitiveObjectInspector,除此之外還有StructObjectInspector等等。ObjectInspector只是描述型別,並不儲存實際資料。後面的具體例子中會有一些使用說明。
ObjectInspector[]的長度不是固定的,要看當前是處於哪個階段。如果是PARTIAL1,那麼與使用時傳入該UDAF的引數個數一致;如果是FINAL階段,長度就是1了,因為map階段返回的結果只有一個物件。
Java程式碼
- public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;
- public abstract void reset(AggregationBuffer agg) throws HiveException;
AggregationBuffer是一個標識介面,沒有任何需要實現的方法。實現該介面的類被用於暫存中間結果。reset是為了重置AggregationBuffer,但是在實際應用場景中沒有發現單獨呼叫該方法進行重置,有可能是聚合key的資料量還不夠大,在後面會再說一下這個問題。
Java程式碼
- public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
- public abstract Object terminatePartial(AggregationBuffer agg) throws HiveException;
- public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException;
- public abstract Object terminate(AggregationBuffer agg) throws HiveException;
- ......
iterate方法存在於MR的M階段,用於處理每一條輸入記錄。Object[]作為輸入傳入UFAF,AggregationBuffer作為中間快取暫存結果。需要注意的是,每次呼叫iterate傳入的AggregationBuffer並不一定是同一個物件。Hive呼叫UDAF的時候會用一個Map來管理AggregationBuffer,Map的key即為需要聚合的key。就通過實際執行過程來看,在每一次iterate呼叫之前,會根據聚合key從Map中查詢對應的AggregationBuffer,若能找到則直接返回AggregationBuffer物件,找不到則呼叫getNewAggregationBuffer方法新建並插入Map中並返回結果。
terminatePartial方法在iterate處理完所有輸入後呼叫,用於返回初步的聚合結果。
merge方法存在於MR的R階段(也同樣存在於Combine階段),用於最後的聚合。Object型別的partial引數與terminatePartial返回值一致,AggregationBuffer引數與上述一致。
terminate方法在merge方法執行完畢之後呼叫,用於進行最後的處理,並返回最後結果。
像上面提到的Mode一樣,這些方法並不一定都會被呼叫,與Hive解析成的MR程式型別有關。例如解析後的MR程式只有M階段,則只會呼叫iterate和terminate。實際使用過程中,由於聚合key資料量有限,記憶體可以承載,所以沒有發現reset單獨呼叫的情況。每次遇到一個不同的key,則新建一個AggregationBuffer,沒有看原始碼,不知道當聚合key很大的時候,是否會呼叫reset進行物件重用。