Hive的UDAF

分類:存儲 時間:2017-02-13

Hive中的UDAF是多進一出,類似聚合函數Count(),Sum()

以前的寫法是繼承UDAF類,然後實現UDAFEvaluator接口

實現裏面的方法,具體如下,是一個sum的demo

public class Test_Sum extends UDAF {
    public static class Evaluator implements UDAFEvaluator {

        private int mSum;
        public Evaluator() {
            super();
            init();
        }
        //初始化,初始值為0
        public void init() {
            mSum = 0;

        }
        //map階段,返回值為boolean
        public boolean iterate(String o) {
            if (o != null) {
                mSum += 1;
            }
            return true;
        }
        // 類似於Combiner,不過只傳給merge結果
        public int terminatePartial() {
            // This is SQL standard - sum of zero items should be null.
            return mSum;
        }
        // reduce 階段,返回值為boolean
        public boolean merge(int o) {
            mSum +=o;

            return true;
        }
        //返回聚合結果
        public int terminate() {
            // This is SQL standard - sum of zero items should be null.
            return mSum;
        }
    }
}
具體的使用同UDF

加載Jar包,創建臨時函數,使用。

也可將JAR包放置HDFS上,可變為永久函數


------------------------------------------------------------------------------------------------------

UDAF類已經過時棄用了,現在是實現GenericUDAFResolver2接口

下面為Count的Demo

public class New_Count implements GenericUDAFResolver2 {
    private static final Log LOG = LogFactory.getLog(GenericUDAFCount.class.getName());
    //檢查參數類型,多用
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
            throws SemanticException {
        // This method implementation is preserved for backward compatibility.
        return new GenericUDAFCountEvaluator();
    }

     //檢查參數類型
    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo paramInfo)
            throws SemanticException {

        TypeInfo[] parameters = paramInfo.getParameters();

        if (parameters.length == 0) {
            if (!paramInfo.isAllColumns()) {
                throw new UDFArgumentException("Argument expected");
            }
            assert !paramInfo.isDistinct() : "DISTINCT not supported with *";
        } else {
            if (parameters.length > 1 && !paramInfo.isDistinct()) {
                throw new UDFArgumentException("DISTINCT keyword must be specified");
            }
            assert !paramInfo.isAllColumns() : "* not supported in expression list";
        }

        return new GenericUDAFCountEvaluator().setCountAllColumns(
                paramInfo.isAllColumns());
    }
    public static class GenericUDAFCountEvaluator extends GenericUDAFEvaluator {
        private boolean countAllColumns = false; //判斷是否為所有的列,即*
        private LongObjectInspector partialCountAggOI;
        private LongWritable result;

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters)
                throws HiveException {
            super.init(m, parameters);
            partialCountAggOI =
                    PrimitiveObjectInspectorFactory.writableLongObjectInspector;
            result = new LongWritable(0);  //初始為0
            return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
        }

        private GenericUDAFCountEvaluator setCountAllColumns(boolean countAllCols) {
            countAllColumns = countAllCols;
            return this;
        }

        /** class for storing count value. */
        @AggregationType(estimable = true)
        static class CountAgg extends AbstractAggregationBuffer {
            long value;
            @Override
            public int estimate() { return JavaDataModel.PRIMITIVES2; }
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            CountAgg buffer = new CountAgg();
            reset(buffer);
            return buffer;
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            ((CountAgg) agg).value = http://www.ithao123.cn/0;
        }

        @Override  //map階段
        public void iterate(AggregationBuffer agg, Object[] parameters)
                throws HiveException {
            // parameters == null means the input table/split is empty
            if (parameters == null) {
                return;
            }
            if (countAllColumns) {  //判斷是否為全部的列
                assert parameters.length == 0;
                ((CountAgg) agg).value++;
            } else {
                boolean countThisRow = true; //若非全部的列,則需要判斷相應列是否為null
                for (Object nextParam : parameters) {
                    if (nextParam == null) {
                        countThisRow = false;
                        break;
                    }
                }
                if (countThisRow) {
                    ((CountAgg) agg).value++;
                }
            }
        }

        @Override //reducer階段
        public void merge(AggregationBuffer agg, Object partial)
                throws HiveException {
            if (partial != null) {
                long p = partialCountAggOI.get(partial);
                ((CountAgg) agg).value += p;
            }
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            result.set(((CountAgg) agg).value);
            return result;
        }

        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            return terminate(agg);
        }
    }
}

 

參考:

http://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888051.html

http://paddy-w.iteye.com/blog/2081409




Tags: standard private public return reduce

文章來源:


ads
ads

相關文章
ads

相關文章

ad