1. 程式人生 > >Hive UDAF開發詳解

Hive UDAF開發詳解

-s 聚合 而且 pri ros cal 關系 方法調用 evaluator

這篇文章是來自Hadoop Hive UDAF Tutorial - Extending Hive with Aggregation Functions:的不嚴格翻譯,因為翻譯的文章示例寫得比較通俗易懂,此外,我把自己對於Hive的UDAF理解穿插到文章裏面。

udfa是hive中用戶自定義的聚集函數,hive內置UDAF函數包括有sum()與count(),UDAF實現有簡單與通用兩種方式,簡單UDAF因為使用Java反射導致性能損失,而且有些特性不能使用,已經被棄用了;在這篇博文中我們將關註Hive中自定義聚類函數-GenericUDAF,UDAF開發主要涉及到以下兩個抽象類:

org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator

源碼鏈接

博文中的所有的代碼和數據可以在以下鏈接找到:hive examples

示例數據準備

首先先創建一張包含示例數據的表:people,該表只有name一列,該列中包含了一個或多個名字,該表數據保存在people.txt文件中。

~$ cat ./people.txt

John Smith
John and Ann White
Ted Green
Dorothy

把該文件上載到hdfs目錄/user/matthew/people中:

hadoop fs -mkdir people
hadoop fs -put ./people.txt people

下面要創建hive外部表,在hive shell中執行

CREATE EXTERNAL TABLE people (name string)
ROW FORMAT DELIMITED FIELDS 
	TERMINATED BY ‘\t‘ 
	ESCAPED BY ‘‘ 
	LINES TERMINATED BY ‘\n‘
STORED AS TEXTFILE 
LOCATION ‘/user/matthew/people‘;

相關抽象類介紹

創建一個GenericUDAF必須先了解以下兩個抽象類:
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator

為了更好理解上述抽象類的API,要記住hive只是mapreduce函數,只不過hive已經幫助我們寫好並隱藏mapreduce,向上提供簡潔的sql函數,所以我們要結合Mapper、Combiner與Reducer來幫助我們理解這個函數。要記住在hadoop集群中有若幹臺機器,在不同的機器上Mapper與Reducer任務獨立運行。

所以大體上來說,這個UDAF函數讀取數據(mapper),聚集一堆mapper輸出到部分聚集結果(combiner),並且最終創建一個最終的聚集結果(reducer)。因為我們跨域多個combiner進行聚集,所以我們需要保存部分聚集結果。

AbstractGenericUDAFResolver

Resolver很簡單,要覆蓋實現下面方法,該方法會根據sql傳人的參數數據格式指定調用哪個Evaluator進行處理。

<span style="background-color: rgb(255, 255, 255);"><span style="font-size:14px;">public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException;</span></span>

GenericUDAFEvaluator

UDAF邏輯處理主要發生在Evaluator中,要實現該抽象類的幾個方法。

在理解Evaluator之前,必須先理解objectInspector接口與GenericUDAFEvaluator中的內部類Model。

ObjectInspector

作用主要是解耦數據使用與數據格式,使得數據流在輸入輸出端切換不同的輸入輸出格式,不同的Operator上使用不同的格式。可以參考這兩篇文章:first post on Hive UDFs、Hive中ObjectInspector的作用,裏面有關於objectinspector的介紹。

Model

Model代表了UDAF在mapreduce的各個階段。

public static enum Mode {
    /**
     * PARTIAL1: 這個是mapreduce的map階段:從原始數據到部分數據聚合
     * 將會調用iterate()和terminatePartial()
     */
    PARTIAL1,
        /**
     * PARTIAL2: 這個是mapreduce的map端的Combiner階段,負責在map端合並map的數據::從部分數據聚合到部分數據聚合:
     * 將會調用merge() 和 terminatePartial() 
     */
    PARTIAL2,
        /**
     * FINAL: mapreduce的reduce階段:從部分數據的聚合到完全聚合 
     * 將會調用merge()和terminate()
     */
    FINAL,
        /**
     * COMPLETE: 如果出現了這個階段,表示mapreduce只有map,沒有reduce,所以map端就直接出結果了:從原始數據直接到完全聚合
      * 將會調用 iterate()和terminate()
     */
    COMPLETE
  };

一般情況下,完整的UDAF邏輯是一個mapreduce過程,如果有mapper和reducer,就會經歷PARTIAL1(mapper),FINAL(reducer),如果還有combiner,那就會經歷PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。

而有一些情況下的mapreduce,只有mapper,而沒有reducer,所以就會只有COMPLETE階段,這個階段直接輸入原始數據,出結果。

GenericUDAFEvaluator的方法

// 確定各個階段輸入輸出參數的數據格式ObjectInspectors
public  ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;

// 保存數據聚集結果的類
abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;

// 重置聚集結果
public void reset(AggregationBuffer agg) throws HiveException;

// map階段,叠代處理輸入sql傳過來的列數據
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;

// map與combiner結束返回結果,得到部分數據聚集結果
public Object terminatePartial(AggregationBuffer agg) throws HiveException;

// combiner合並map返回的結果,還有reducer合並mapper或combiner返回的結果。
public void merge(AggregationBuffer agg, Object partial) throws HiveException;

// reducer階段,輸出最終結果
public Object terminate(AggregationBuffer agg) throws HiveException;

圖解Model與Evaluator關系

技術分享

Model各階段對應Evaluator方法調用


技術分享

Evaluator各個階段下處理mapreduce流程

實例

下面將講述一個聚集函數UDAF的實例,我們將計算people這張表中的name列字母的個數。

下面的函數代碼是計算指定列中字符的總數(包括空格)

代碼

@Description(name = "letters", value = "_FUNC_(expr) - 返回該列中所有字符串的字符總數")
public class TotalNumOfLettersGenericUDAF extends AbstractGenericUDAFResolver {

    @Override
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
            throws SemanticException {
        if (parameters.length != 1) {
            throw new UDFArgumentTypeException(parameters.length - 1,
                    "Exactly one argument is expected.");
        }
        
        ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);
        
        if (oi.getCategory() != ObjectInspector.Category.PRIMITIVE){
            throw new UDFArgumentTypeException(0,
                            "Argument must be PRIMITIVE, but "
                            + oi.getCategory().name()
                            + " was passed.");
        }
        
        PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) oi;
        
        if (inputOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){
            throw new UDFArgumentTypeException(0,
                            "Argument must be String, but "
                            + inputOI.getPrimitiveCategory().name()
                            + " was passed.");
        }
        
        return new TotalNumOfLettersEvaluator();
    }

    public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {

        PrimitiveObjectInspector inputOI;
        ObjectInspector outputOI;
        PrimitiveObjectInspector integerOI;
        
        int total = 0;

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters)
                throws HiveException {
        	
            assert (parameters.length == 1);
            super.init(m, parameters);
           
             //map階段讀取sql列,輸入為String基礎數據格式
            if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
                inputOI = (PrimitiveObjectInspector) parameters[0];
            } else {
			//其余階段,輸入為Integer基礎數據格式
            	integerOI = (PrimitiveObjectInspector) parameters[0];
            }

             // 指定各個階段輸出數據格式都為Integer類型
            outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,
                    ObjectInspectorOptions.JAVA);
            return outputOI;

        }

        /**
         * 存儲當前字符總數的類
         */
        static class LetterSumAgg implements AggregationBuffer {
            int sum = 0;
            void add(int num){
            	sum += num;
            }
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            LetterSumAgg result = new LetterSumAgg();
            return result;
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
        	LetterSumAgg myagg = new LetterSumAgg();
        }
        
        private boolean warned = false;

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters)
                throws HiveException {
            assert (parameters.length == 1);
            if (parameters[0] != null) {
                LetterSumAgg myagg = (LetterSumAgg) agg;
                Object p1 = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]);
                myagg.add(String.valueOf(p1).length());
            }
        }

        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            LetterSumAgg myagg = (LetterSumAgg) agg;
            total += myagg.sum;
            return total;
        }

        @Override
        public void merge(AggregationBuffer agg, Object partial)
                throws HiveException {
            if (partial != null) {
                
                LetterSumAgg myagg1 = (LetterSumAgg) agg;
                
                Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject(partial);
                
                LetterSumAgg myagg2 = new LetterSumAgg();
                
                myagg2.add(partialSum);
                myagg1.add(myagg2.sum);
            }
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            LetterSumAgg myagg = (LetterSumAgg) agg;
            total = myagg.sum;
            return myagg.sum;
        }

    }
}

代碼說明

AggregationBuffer 允許我們保存中間結果,通過定義我們的buffer,我們可以處理任何格式的數據,在代碼例子中字符總數保存在AggregationBuffer

/**
* 保存當前字符總數的類
*/
static class LetterSumAgg implements AggregationBuffer {
	int sum = 0;
	void add(int num){
		sum += num;
	}
}

這意味著UDAF在不同的mapreduce階段會接收到不同的輸入。Iterate讀取我們表中的一行(或者準確來說是表),然後輸出其他數據格式的聚集結果。

artialAggregation合並這些聚集結果到另外相同格式的新的聚集結果,然後最終的reducer取得這些聚集結果然後輸出最終結果(該結果或許與接收數據的格式不一致)。

在init()方法中我們指定輸入為string,結果輸出格式為integer,還有,部分聚集結果輸出格式為integer(保存在aggregation buffer中);terminate()terminatePartial()兩者輸出一個integer

// init方法中根據不同的mode指定輸出數據的格式objectinspector
if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
	inputOI = (PrimitiveObjectInspector) parameters[0];
} else {
	integerOI = (PrimitiveObjectInspector) parameters[0];
}

// 不同model階段的輸出數據格式
outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,
                    ObjectInspectorOptions.JAVA);

iterate()函數讀取到每行中列的字符串,計算與保存該字符串的長度

public void iterate(AggregationBuffer agg, Object[] parameters)
	throws HiveException {
	...
	Object p1 = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]);
	myagg.add(String.valueOf(p1).length());
	}
}

Merge函數增加部分聚集總數到AggregationBuffer

public void merge(AggregationBuffer agg, Object partial)
      	throws HiveException {
	if (partial != null) {
                
		LetterSumAgg myagg1 = (LetterSumAgg) agg;
                
		Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject(partial);
                
		LetterSumAgg myagg2 = new LetterSumAgg();
                
		myagg2.add(partialSum);
		myagg1.add(myagg2.sum);
	}
}

Terminate()函數返回AggregationBuffer中的內容,這裏產生了最終結果。

public Object terminate(AggregationBuffer agg) throws HiveException {
	LetterSumAgg myagg = (LetterSumAgg) agg;
	total = myagg.sum;
	return myagg.sum;
}

使用自定義函數

ADD JAR ./hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar;
CREATE TEMPORARY FUNCTION letters as ‘com.matthewrathbone.example.TotalNumOfLettersGenericUDAF‘;

SELECT letters(name) FROM people;
OK
44
Time taken: 20.688 seconds

資料參考

http://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888051.html

http://blog.csdn.net/duguduchong/article/details/8684963
來源: http://lib.csdn.net/article/hive/42862

Hive UDAF開發詳解