Hive中的UDAF是多進一出,類似聚合函數Count(),Sum()
以前的寫法是繼承UDAF類,然後實現UDAFEvaluator接口
實現裏面的方法,具體如下,是一個sum的demo
public class Test_Sum extends UDAF { public static class Evaluator implements UDAFEvaluator { private int mSum; public Evaluator() { super(); init(); } //初始化,初始值為0 public void init() { mSum = 0; } //map階段,返回值為boolean public boolean iterate(String o) { if (o != null) { mSum += 1; } return true; } // 類似於Combiner,不過只傳給merge結果 public int terminatePartial() { // This is SQL standard - sum of zero items should be null. return mSum; } // reduce 階段,返回值為boolean public boolean merge(int o) { mSum +=o; return true; } //返回聚合結果 public int terminate() { // This is SQL standard - sum of zero items should be null. return mSum; } } }具體的使用同UDF
加載Jar包,創建臨時函數,使用。
也可將JAR包放置HDFS上,可變為永久函數
------------------------------------------------------------------------------------------------------
UDAF類已經過時棄用了,現在是實現GenericUDAFResolver2接口
下面為Count的Demo
public class New_Count implements GenericUDAFResolver2 { private static final Log LOG = LogFactory.getLog(GenericUDAFCount.class.getName()); //檢查參數類型,多用 public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException { // This method implementation is preserved for backward compatibility. return new GenericUDAFCountEvaluator(); } //檢查參數類型 public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo paramInfo) throws SemanticException { TypeInfo[] parameters = paramInfo.getParameters(); if (parameters.length == 0) { if (!paramInfo.isAllColumns()) { throw new UDFArgumentException("Argument expected"); } assert !paramInfo.isDistinct() : "DISTINCT not supported with *"; } else { if (parameters.length > 1 && !paramInfo.isDistinct()) { throw new UDFArgumentException("DISTINCT keyword must be specified"); } assert !paramInfo.isAllColumns() : "* not supported in expression list"; } return new GenericUDAFCountEvaluator().setCountAllColumns( paramInfo.isAllColumns()); } public static class GenericUDAFCountEvaluator extends GenericUDAFEvaluator { private boolean countAllColumns = false; //判斷是否為所有的列,即* private LongObjectInspector partialCountAggOI; private LongWritable result; @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); partialCountAggOI = PrimitiveObjectInspectorFactory.writableLongObjectInspector; result = new LongWritable(0); //初始為0 return PrimitiveObjectInspectorFactory.writableLongObjectInspector; } private GenericUDAFCountEvaluator setCountAllColumns(boolean countAllCols) { countAllColumns = countAllCols; return this; } /** class for storing count value. */ @AggregationType(estimable = true) static class CountAgg extends AbstractAggregationBuffer { long value; @Override public int estimate() { return JavaDataModel.PRIMITIVES2; } } @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { CountAgg buffer = new CountAgg(); reset(buffer); return buffer; } @Override public void reset(AggregationBuffer agg) throws HiveException { ((CountAgg) agg).value = http://www.ithao123.cn/0; } @Override //map階段 public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { // parameters == null means the input table/split is empty if (parameters == null) { return; } if (countAllColumns) { //判斷是否為全部的列 assert parameters.length == 0; ((CountAgg) agg).value++; } else { boolean countThisRow = true; //若非全部的列,則需要判斷相應列是否為null for (Object nextParam : parameters) { if (nextParam == null) { countThisRow = false; break; } } if (countThisRow) { ((CountAgg) agg).value++; } } } @Override //reducer階段 public void merge(AggregationBuffer agg, Object partial) throws HiveException { if (partial != null) { long p = partialCountAggOI.get(partial); ((CountAgg) agg).value += p; } } @Override public Object terminate(AggregationBuffer agg) throws HiveException { result.set(((CountAgg) agg).value); return result; } @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { return terminate(agg); } } }
參考:
http://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888051.html
http://paddy-w.iteye.com/blog/2081409
Tags: standard private public return reduce
文章來源: