Hive UDAF開發詳解
明
這篇文章是來自Hadoop Hive UDAF Tutorial - Extending Hive with Aggregation Functions:的不嚴格翻譯,因為翻譯的文章示例寫得比較通俗易懂,此外,我把自己對於Hive的UDAF理解穿插到文章裏面。udfa是hive中用戶自定義的聚集函數,hive內置UDAF函數包括有sum()與count(),UDAF實現有簡單與通用兩種方式,簡單UDAF因為使用Java反射導致性能損失,而且有些特性不能使用,已經被棄用了;在這篇博文中我們將關註Hive中自定義聚類函數-GenericUDAF,UDAF開發主要涉及到以下兩個抽象類:
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
源碼鏈接
博文中的所有的代碼和數據可以在以下鏈接找到:hive examples示例數據準備
首先先創建一張包含示例數據的表:people,該表只有name一列,該列中包含了一個或多個名字,該表數據保存在people.txt文件中。
~$ cat ./people.txt John Smith John and Ann White Ted Green Dorothy
把該文件上載到hdfs目錄/user/matthew/people中:
hadoop fs -mkdir people hadoop fs -put ./people.txt people
下面要創建hive外部表,在hive shell中執行
CREATE EXTERNAL TABLE people (name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t‘ ESCAPED BY ‘‘ LINES TERMINATED BY ‘\n‘ STORED AS TEXTFILE LOCATION ‘/user/matthew/people‘;
相關抽象類介紹
創建一個GenericUDAF必須先了解以下兩個抽象類:org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
為了更好理解上述抽象類的API,要記住hive只是mapreduce函數,只不過hive已經幫助我們寫好並隱藏mapreduce,向上提供簡潔的sql函數,所以我們要結合Mapper、Combiner與Reducer來幫助我們理解這個函數。要記住在hadoop集群中有若幹臺機器,在不同的機器上Mapper與Reducer任務獨立運行。
所以大體上來說,這個UDAF函數讀取數據(mapper),聚集一堆mapper輸出到部分聚集結果(combiner),並且最終創建一個最終的聚集結果(reducer)。因為我們跨域多個combiner進行聚集,所以我們需要保存部分聚集結果。
AbstractGenericUDAFResolver
Resolver很簡單,要覆蓋實現下面方法,該方法會根據sql傳人的參數數據格式指定調用哪個Evaluator進行處理。
<span style="background-color: rgb(255, 255, 255);"><span style="font-size:14px;">public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException;</span></span>
GenericUDAFEvaluator
UDAF邏輯處理主要發生在Evaluator中,要實現該抽象類的幾個方法。
在理解Evaluator之前,必須先理解objectInspector接口與GenericUDAFEvaluator中的內部類Model。
ObjectInspector
作用主要是解耦數據使用與數據格式,使得數據流在輸入輸出端切換不同的輸入輸出格式,不同的Operator上使用不同的格式。可以參考這兩篇文章:first post on Hive UDFs、Hive中ObjectInspector的作用,裏面有關於objectinspector的介紹。Model
Model代表了UDAF在mapreduce的各個階段。
public static enum Mode { /** * PARTIAL1: 這個是mapreduce的map階段:從原始數據到部分數據聚合 * 將會調用iterate()和terminatePartial() */ PARTIAL1, /** * PARTIAL2: 這個是mapreduce的map端的Combiner階段,負責在map端合並map的數據::從部分數據聚合到部分數據聚合: * 將會調用merge() 和 terminatePartial() */ PARTIAL2, /** * FINAL: mapreduce的reduce階段:從部分數據的聚合到完全聚合 * 將會調用merge()和terminate() */ FINAL, /** * COMPLETE: 如果出現了這個階段,表示mapreduce只有map,沒有reduce,所以map端就直接出結果了:從原始數據直接到完全聚合 * 將會調用 iterate()和terminate() */ COMPLETE };
一般情況下,完整的UDAF邏輯是一個mapreduce過程,如果有mapper和reducer,就會經歷PARTIAL1(mapper),FINAL(reducer),如果還有combiner,那就會經歷PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。
而有一些情況下的mapreduce,只有mapper,而沒有reducer,所以就會只有COMPLETE階段,這個階段直接輸入原始數據,出結果。
GenericUDAFEvaluator的方法
// 確定各個階段輸入輸出參數的數據格式ObjectInspectors public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException; // 保存數據聚集結果的類 abstract AggregationBuffer getNewAggregationBuffer() throws HiveException; // 重置聚集結果 public void reset(AggregationBuffer agg) throws HiveException; // map階段,叠代處理輸入sql傳過來的列數據 public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException; // map與combiner結束返回結果,得到部分數據聚集結果 public Object terminatePartial(AggregationBuffer agg) throws HiveException; // combiner合並map返回的結果,還有reducer合並mapper或combiner返回的結果。 public void merge(AggregationBuffer agg, Object partial) throws HiveException; // reducer階段,輸出最終結果 public Object terminate(AggregationBuffer agg) throws HiveException;
圖解Model與Evaluator關系
Model各階段對應Evaluator方法調用
Evaluator各個階段下處理mapreduce流程
實例
下面將講述一個聚集函數UDAF的實例,我們將計算people這張表中的name列字母的個數。
下面的函數代碼是計算指定列中字符的總數(包括空格)
代碼
@Description(name = "letters", value = "_FUNC_(expr) - 返回該列中所有字符串的字符總數") public class TotalNumOfLettersGenericUDAF extends AbstractGenericUDAFResolver { @Override public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException { if (parameters.length != 1) { throw new UDFArgumentTypeException(parameters.length - 1, "Exactly one argument is expected."); } ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]); if (oi.getCategory() != ObjectInspector.Category.PRIMITIVE){ throw new UDFArgumentTypeException(0, "Argument must be PRIMITIVE, but " + oi.getCategory().name() + " was passed."); } PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) oi; if (inputOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){ throw new UDFArgumentTypeException(0, "Argument must be String, but " + inputOI.getPrimitiveCategory().name() + " was passed."); } return new TotalNumOfLettersEvaluator(); } public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator { PrimitiveObjectInspector inputOI; ObjectInspector outputOI; PrimitiveObjectInspector integerOI; int total = 0; @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { assert (parameters.length == 1); super.init(m, parameters); //map階段讀取sql列,輸入為String基礎數據格式 if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) { inputOI = (PrimitiveObjectInspector) parameters[0]; } else { //其余階段,輸入為Integer基礎數據格式 integerOI = (PrimitiveObjectInspector) parameters[0]; } // 指定各個階段輸出數據格式都為Integer類型 outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class, ObjectInspectorOptions.JAVA); return outputOI; } /** * 存儲當前字符總數的類 */ static class LetterSumAgg implements AggregationBuffer { int sum = 0; void add(int num){ sum += num; } } @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { LetterSumAgg result = new LetterSumAgg(); return result; } @Override public void reset(AggregationBuffer agg) throws HiveException { LetterSumAgg myagg = new LetterSumAgg(); } private boolean warned = false; @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { assert (parameters.length == 1); if (parameters[0] != null) { LetterSumAgg myagg = (LetterSumAgg) agg; Object p1 = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]); myagg.add(String.valueOf(p1).length()); } } @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { LetterSumAgg myagg = (LetterSumAgg) agg; total += myagg.sum; return total; } @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException { if (partial != null) { LetterSumAgg myagg1 = (LetterSumAgg) agg; Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject(partial); LetterSumAgg myagg2 = new LetterSumAgg(); myagg2.add(partialSum); myagg1.add(myagg2.sum); } } @Override public Object terminate(AggregationBuffer agg) throws HiveException { LetterSumAgg myagg = (LetterSumAgg) agg; total = myagg.sum; return myagg.sum; } } }
代碼說明
AggregationBuffer
允許我們保存中間結果,通過定義我們的buffer,我們可以處理任何格式的數據,在代碼例子中字符總數保存在AggregationBuffer
。
/** * 保存當前字符總數的類 */ static class LetterSumAgg implements AggregationBuffer { int sum = 0; void add(int num){ sum += num; } }
這意味著UDAF在不同的mapreduce階段會接收到不同的輸入。Iterate讀取我們表中的一行(或者準確來說是表),然後輸出其他數據格式的聚集結果。
artialAggregation
合並這些聚集結果到另外相同格式的新的聚集結果,然後最終的reducer取得這些聚集結果然後輸出最終結果(該結果或許與接收數據的格式不一致)。
在init()方法中我們指定輸入為string,結果輸出格式為integer,還有,部分聚集結果輸出格式為integer(保存在aggregation buffer中);terminate()
與
terminatePartial()
兩者輸出一個
integer
。
// init方法中根據不同的mode指定輸出數據的格式objectinspector if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) { inputOI = (PrimitiveObjectInspector) parameters[0]; } else { integerOI = (PrimitiveObjectInspector) parameters[0]; } // 不同model階段的輸出數據格式 outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class, ObjectInspectorOptions.JAVA);
iterate()
函數讀取到每行中列的字符串,計算與保存該字符串的長度
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { ... Object p1 = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]); myagg.add(String.valueOf(p1).length()); } }
Merge函數增加部分聚集總數到AggregationBuffer
public void merge(AggregationBuffer agg, Object partial) throws HiveException { if (partial != null) { LetterSumAgg myagg1 = (LetterSumAgg) agg; Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject(partial); LetterSumAgg myagg2 = new LetterSumAgg(); myagg2.add(partialSum); myagg1.add(myagg2.sum); } }
Terminate()函數返回AggregationBuffer中的內容,這裏產生了最終結果。
public Object terminate(AggregationBuffer agg) throws HiveException { LetterSumAgg myagg = (LetterSumAgg) agg; total = myagg.sum; return myagg.sum; }
使用自定義函數
ADD JAR ./hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar; CREATE TEMPORARY FUNCTION letters as ‘com.matthewrathbone.example.TotalNumOfLettersGenericUDAF‘; SELECT letters(name) FROM people; OK 44 Time taken: 20.688 seconds
資料參考
http://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888051.htmlhttp://blog.csdn.net/duguduchong/article/details/8684963
來源: http://lib.csdn.net/article/hive/42862
Hive UDAF開發詳解