1. 程式人生 > >Hive自定義UDAF詳解

Hive自定義UDAF詳解

        遇到一個Hive需求:有A、B、C三列,按A列進行聚合,求出C列聚合後的最小值和最大值各自對應的B列值。這個需求用hql和內建函式也可完成,但是比較繁瑣,會解析成幾個MR進行執行,如果自定義UDAF便可只利用一個MR完成任務。 

        所用Hive為0.13.1版本。UDAF有兩種,第一種是比較簡單的形式,利用抽象類UDAF和UDAFEvaluator,暫不做討論。主要說一下第二種形式,利用介面GenericUDAFResolver2(或者抽象類AbstractGenericUDAFResolver)和抽象類GenericUDAFEvaluator。 

        這裡用AbstractGenericUDAFResolver做說明。 
Java程式碼  
  1. public abstract class AbstractGenericUDAFResolver implements GenericUDAFResolver2 {  
  2.   @SuppressWarnings("deprecation")  
  3.   @Override  
  4.   public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)  
  5.     throws SemanticException {  
  6.     if (info.isAllColumns()) {  
  7.       throw new SemanticException(  
  8.           "The specified syntax for UDAF invocation is invalid.");  
  9.     }  
  10.     return
     getEvaluator(info.getParameters());  
  11.   }  
  12.   @Override  
  13.   public GenericUDAFEvaluator getEvaluator(TypeInfo[] info)   
  14.     throws SemanticException {  
  15.     throw new SemanticException(  
  16.           "This UDAF does not support the deprecated getEvaluator() method.");  
  17.   }  
  18. }  

        可以看到,該抽象類有兩個方法,其中一個已經被棄用,所以只需要實現引數型別為TypeInfo的getEvaluator方法即可。 
        該方法其實相當於一個工廠,TypeInfo表示在使用時傳入該UDAF的引數的型別。該方法主要做的工作有: 
  • 檢查引數長度和型別
  • 根據引數返回對應的實際處理物件

        返回的物件型別為GenericUDAFEvaluator,這是一個抽象類: 
Java程式碼  
  1. public abstract class GenericUDAFEvaluator implements Closeable {  
  2.     ......  
  3.     public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {  
  4.         // This function should be overriden in every sub class  
  5.         // And the sub class should call super.init(m, parameters) to get mode set.  
  6.         mode = m;  
  7.         return null;  
  8.     }  
  9.     public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;  
  10.     public abstract void reset(AggregationBuffer agg) throws HiveException;  
  11.     public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;  
  12.     public abstract Object terminatePartial(AggregationBuffer agg) throws HiveException;  
  13.     public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException;  
  14.     public abstract Object terminate(AggregationBuffer agg) throws HiveException;  
  15.     ......  
  16. }  


        說明上述方法的之前,需要提一個GenericUDAFEvaluator的內部列舉類Mode 
Java程式碼  
  1. public static enum Mode {  
  2.     /** 
  3.      * 相當於map階段,呼叫iterate()和terminatePartial() 
  4.      */  
  5.     PARTIAL1,  
  6.     /** 
  7.      * 相當於combiner階段,呼叫merge()和terminatePartial() 
  8.      */  
  9.     PARTIAL2,  
  10.     /** 
  11.      * 相當於reduce階段呼叫merge()和terminate() 
  12.      */  
  13.     FINAL,  
  14.     /** 
  15.      * COMPLETE: 相當於沒有reduce階段map,呼叫iterate()和terminate() 
  16.      */  
  17.     COMPLETE  
  18.   };  

        可以看到,UDAF將任務分成了幾種型別,PARTIAL1相當於MR程式的map階段,負責迭代處理記錄並返回該階段的中間結果。PARTIAL2相當於Combiner,對map階段的結果進行一次聚合。FINAL是reduce階段,進行整體聚合以及返回最終結果。COMPLETE有點特殊,是一個沒有reduce階段的map過程,所以在進行記錄迭代之後,直接返回最終結果。 

        再來看GenericUDAFEvaluator中的各方法 
Java程式碼  
  1. public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {...}  

        初始化方法,在Mode的每一個階段啟動時會執行init方法。該方法有兩個引數,第一個引數是Mode,可以根據此引數判斷當前執行的是哪個階段,進行該階段相應的初始化工作。ObjectInspector是一個抽象的型別描述,例如:當引數型別是原生型別時,可以轉化為PrimitiveObjectInspector,除此之外還有StructObjectInspector等等。ObjectInspector只是描述型別,並不儲存實際資料。後面的具體例子中會有一些使用說明。 
        ObjectInspector[]的長度不是固定的,要看當前是處於哪個階段。如果是PARTIAL1,那麼與使用時傳入該UDAF的引數個數一致;如果是FINAL階段,長度就是1了,因為map階段返回的結果只有一個物件。 
Java程式碼  
  1. public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;  
  2. public abstract void reset(AggregationBuffer agg) throws HiveException;  

        AggregationBuffer是一個標識介面,沒有任何需要實現的方法。實現該介面的類被用於暫存中間結果。reset是為了重置AggregationBuffer,但是在實際應用場景中沒有發現單獨呼叫該方法進行重置,有可能是聚合key的資料量還不夠大,在後面會再說一下這個問題。 
        Java程式碼  
  1. public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;  
  2. public abstract Object terminatePartial(AggregationBuffer agg) throws HiveException;  
  3. public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException;  
  4. public abstract Object terminate(AggregationBuffer agg) throws HiveException;  
  5. ......  

        iterate方法存在於MR的M階段,用於處理每一條輸入記錄。Object[]作為輸入傳入UFAF,AggregationBuffer作為中間快取暫存結果。需要注意的是,每次呼叫iterate傳入的AggregationBuffer並不一定是同一個物件。Hive呼叫UDAF的時候會用一個Map來管理AggregationBuffer,Map的key即為需要聚合的key。就通過實際執行過程來看,在每一次iterate呼叫之前,會根據聚合key從Map中查詢對應的AggregationBuffer,若能找到則直接返回AggregationBuffer物件,找不到則呼叫getNewAggregationBuffer方法新建並插入Map中並返回結果。 
        terminatePartial方法在iterate處理完所有輸入後呼叫,用於返回初步的聚合結果。 
        merge方法存在於MR的R階段(也同樣存在於Combine階段),用於最後的聚合。Object型別的partial引數與terminatePartial返回值一致,AggregationBuffer引數與上述一致。 
        terminate方法在merge方法執行完畢之後呼叫,用於進行最後的處理,並返回最後結果。 
        像上面提到的Mode一樣,這些方法並不一定都會被呼叫,與Hive解析成的MR程式型別有關。例如解析後的MR程式只有M階段,則只會呼叫iterate和terminate。實際使用過程中,由於聚合key資料量有限,記憶體可以承載,所以沒有發現reset單獨呼叫的情況。每次遇到一個不同的key,則新建一個AggregationBuffer,沒有看原始碼,不知道當聚合key很大的時候,是否會呼叫reset進行物件重用。