1. 程式人生 > >詳解spark sql使用者自定義函式:UDF與UDAF

詳解spark sql使用者自定義函式:UDF與UDAF

場景

UDAF = USER DEFINED AGGREGATION FUNCTION
  • 1
  • 1

上一篇文章已經介紹了spark sql的視窗函式,並知道Spark sql提供了豐富的內建函式供猿友們使用,辣為何還要使用者自定義函式呢?實際的業務場景可能很複雜,內建函式hold不住,所以spark sql提供了可擴充套件的內建函式介面:哥們,你的業務太變態了,我滿足不了你,自己按照我的規範去定義一個sql函式,該怎麼折騰就怎麼折騰! 
例如,MySQL資料庫中有一張task表,共兩個欄位taskid (任務ID)與taskParam(JSON格式的任務請求引數)。簡單起見,這裡只列出一條記錄:

taskid 
  1
taskParam
 {"endAge":["50"],"endDate":["2016-06-21"],"startAge":["10"],"startDate":["2016-06-21"]}
  • 1
  • 2
  • 3
  • 4
  • 1
  • 2
  • 3
  • 4

假設應用程式已經讀取了mysql中這張表的記錄,並通過 DateFrame註冊成了一張臨時表 task。問題來了:怎麼獲取taskParam中startAge的第一個值呢?

sqlContext.sql("select taskid,getJsonFieldUDF(taskParm,'startAge')")
  • 1
  • 1

這個時候,我們就需要自定義一個UDF函數了,取名getJsonFieldUDF。

Java版本的程式碼大致如下:

package cool.pengych.sparker.product;
import org.apache.spark.sql.api.java.UDF2;
import com.alibaba.fastjson.JSONObject;
/**
 * 使用者自定義函式
 * @author pengyucheng
 */
public class GetJsonObjectUDF implements UDF2<String,String,String>
{
    /**
     * 獲取陣列型別json字串中某一欄位的值
     */
    @Override
public String call(String json, String field) throws Exception { try { JSONObject jsonObject = JSONObject.parseObject(json); return jsonObject.getJSONArray(field).getString(0); } catch(Exception e) { e.printStackTrace(); } return null; } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

這樣的需求在實際專案中是很普遍的:請求引數經常以json格式儲存在資料庫中,,,完了,越寫越多 。這裡還是先以Scala實現一個簡單的hello world級別的小樣為例,來體驗udf與udaf的使用好了。

問題

將如下陣列:

val bigData = Array("Spark","Hadoop","Flink","Spark","Hadoop","Flink",
"Spark","Hadoop","Flink","Spark","Hadoop","Flink")
  • 1
  • 2
  • 1
  • 2

中的字元分組聚合並計算出每個字元的長度及字元出現的個數。正常結果 
如下:

+------+-----+------+
|  name|count|length|
+------+-----+------+
| Spark|    4|     5|
| Flink|    4|     5|
|Hadoop|    4|     6|
+------+-----+------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

注:‘spark’ 這個字元的長度為5 ,共出現了4次。

分析

  • 自定義個一個求字串長度的函式 
    自定義的sql函式,與scala中的普通函式一樣,只不過在使用上前者需要先在sqlContext中進行註冊。
  • 自定義一個聚合函式 
    按照字串名稱分組後,呼叫自定義的聚合函式實現累加。 
    啊,好抽象,直接看程式碼吧!

程式碼

package main.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.expressions.MutableAggregationBuffer

/**
 * Spark SQL UDAS:user defined aggregation function 
 * UDF: 函式的輸入是一條具體的資料記錄,實現上講就是普通的scala函式-只不過需要註冊
 * UDAF:使用者自定義的聚合函式,函式本身作用於資料集合,能夠在具體操作的基礎上進行自定義操作
 */
object SparkSQLUDF {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLWindowFunctionOps")
    val sc = new SparkContext(conf)

    val hiveContext = new SQLContext(sc)

    val bigData = Array("Spark","Hadoop","Flink","Spark","Hadoop","Flink","Spark","Hadoop","Flink","Spark","Hadoop","Flink")
    val bigDataRDD = sc.parallelize(bigData)

     val bigDataRowRDD = bigDataRDD.map(line => Row(line))
     val structType = StructType(Array(StructField("name",StringType,true)))
     val bigDataDF = hiveContext.createDataFrame(bigDataRowRDD, structType)

     bigDataDF.registerTempTable("bigDataTable")

    /*
     * 通過HiveContext註冊UDF,在scala2.10.x版本UDF函式最多可以接受22個輸入引數
     */
     hiveContext.udf.register("computeLength",(input:String) => input.length)
     hiveContext.sql("select name,computeLength(name)  as length from bigDataTable").show

     //while(true){}

     hiveContext.udf.register("wordCount",new MyUDAF)
     hiveContext.sql("select name,wordCount(name) as count,computeLength(name) as length from bigDataTable group by name ").show
  }
}

/**
 * 使用者自定義函式
 */
 class MyUDAF extends UserDefinedAggregateFunction
 {
  /**
   * 指定具體的輸入資料的型別
   * 自段名稱隨意:Users can choose names to identify the input arguments - 這裡可以是“name”,或者其他任意串
   */
  override def inputSchema:StructType = StructType(Array(StructField("name",StringType,true)))

  /**
   * 在進行聚合操作的時候所要處理的資料的中間結果型別
   */
  override def bufferSchema:StructType = StructType(Array(StructField("count",IntegerType,true)))

  /**
   * 返回型別
   */
  override def dataType:DataType = IntegerType

  /**
   * whether given the same input,
   * always return the same output
   * true: yes 
   */
  override def deterministic:Boolean = true

  /**
   * Initializes the given aggregation buffer
   */
  override def initialize(buffer:MutableAggregationBuffer):Unit = {buffer(0)=0}

  /**
   * 在進行聚合的時候,每當有新的值進來,對分組後的聚合如何進行計算
   * 本地的聚合操作,相當於Hadoop MapReduce模型中的Combiner
   */
  override def update(buffer:MutableAggregationBuffer,input:Row):Unit={
    buffer(0) = buffer.getInt(0)+1
  }

  /**
   * 最後在分散式節點進行local reduce完成後需要進行全域性級別的merge操作
   */
  override def merge(buffer1:MutableAggregationBuffer,buffer2:Row):Unit={
    buffer1(0) = buffer1.getInt(0)+buffer2.getInt(0)
  }

  /**
   * 返回UDAF最後的計算結果
   */
  override def evaluate(buffer:Row):Any = buffer.getInt(0)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104

執行結果

16/06/29 19:30:24 INFO DAGScheduler: ResultStage 5 (show at SparkSQLUDF.scala:48) finished in 1.625 s
+------+-----+------+
|  name|count|length|
+------+-----+------+
| Spark|    4|     5|
| Flink|    4|     5|
|Hadoop|    4|     6|
+------+-----+------+

16/06/29 19:30:24 INFO DAGScheduler: Job 3 finished: show at SparkSQLUDF.scala:48, took 1.717878 s
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

總結

  • 呼叫spark大神升級udaf實現 
    為了自己實現一個sql聚合函式,我需要繼承UserDefinedAggregateFunction並實現8個抽象方法!8個方法啊!what’s a disaster ! 然而,要想在sql中完成符合特定業務場景的聚合類(a = aggregation)功能,就得udaf。 
    怎麼理解MutableAggregationBuffer呢?就是儲存中間結果的,聚合就意味著多條記錄的累加等操作。

  • udf與udaf註冊語法

 hiveContext.udf.register("computeLength",(input:String) => input.length)
  • 1
  • 1
 hiveContext.udf.register("wordCount",new MyUDAF)

相關推薦

spark sql使用者定義函式:UDFUDAF

場景 UDAF = USER DEFINED AGGREGATION FUNCTION11 上一篇文章已經介紹了spark sql的視窗函式,並知道Spark sql提供了豐富的內建函式供猿友們使用,辣為何還要使用者自定義函式呢?實際的業務場景可能很複雜,內建函式hold

SparkSQL之定義函式UDFUDAF

SparkSQL中有兩種自定函式,在我們使用自帶的函式時無法滿足自己的需求時,可以使用自定義函式,SparkSQL中有兩種自定義函式,一種是UDF,另一種是UDAF,和Hive 很類似,但是hive中還有UDTF,一進多出,但是sparkSQL中沒有,這是因為spark中用 flatMap這

Hive定義函式(UDFUDAF)

當Hive提供的內建函式無法滿足你的業務處理需要時,此時就可以考慮使用使用者自定義函式。 UDF 使用者自定義函式(user defined function)–針對單條記錄。 建立函式流程 1、自定義一個Java類 2、繼承UDF類 3、重寫e

Hive 使用者定義函式UDF

本例自定義一個Hive UDF函式,功能是將從Hive資料倉庫查詢出來的字串進行大小寫轉換。 第一步,建立java工程,新增jar包。 Ø匯入Hive的lib目錄下的jar包以及hadoop安裝目錄下的hadoop-core.jar 第二步,新建package包,包中新

今晚8點直播 | 基於百度定義模板的OCR結果結構化處理技術

隨著行業的發展和技術的成熟,文字識別(OCR)目前已經應用到了多個行業中,比如物流行業快遞包裹的分揀,金融行業的支票單據識別輸入,交通領域中的車牌識別,以及日常生活中的卡證、票據識別等等。OCR(文字識別)技術是目前常用的一種AI能力。但一般OCR的識別結果是一種按行輸出的半結構化輸出。

Juniper SSG5(bgroupx介面及刪除bgroupx定義

從console口登陸檢視介面,標紅部分預設從eth0/2-6都屬於Trust介面 ssg5-serial-> get interface A - Active, I - Inactive, U -Up, D - Down, R - Ready Interfa

T-SQL 語句——定義函式

CREATE FUNCTION fn_chinese_week_day(@week_day INT) -- RETURNS NVARCHAR(3) BEGIN DECLARE @w INT SET @w = @week_day % 7 RETURN CASE @w

把IP字串轉化為數值格式的SQL Server定義函式

create function ip2number (@ip varchar(16))returns bigintasbeginset @[email protected]+'.'declare @pos tinyintdeclare @num bigintdeclare @bin int,@off

spring:通過FactoryBean定義工廠初始化Bean

FactoryBean的作用: FactoryBean是一個介面,擴充套件功能提供給使用者自定義工廠方法和工廠物件用於例項化物件。 FactoryBean定義了3個方法介面: T getObject():自定義的工廠方法; booleanisSingleton():定義B

RecyclerView+BGARefreshLayout實現定義下拉重新整理、上拉載入和側滑刪除效果

前言 還有2個月就過年了,對於我們這樣在外漂泊的異鄉人來說,一家人團聚在一起,吃一頓團圓飯,那是再幸福不過的事了。我們之所以遠離家鄉來到異鄉就像異鄉人這首歌寫的一樣,只為一扇窗! 正文 上篇文章給大家講解了一下關於RecyclerView的使用,今天給

SQL Server 定義函式(Function)

sql server 自定義函式分為三種類型:標量函式(Scalar Function)、內嵌表值函式(Inline Function)、多宣告表值函式(Multi-Statement Function) 標量函式:標量函式是對單一值操作,返回單一值。 內嵌表值函式:

Android事件傳遞機制(巢狀定義View示例)

一、概述   自定義View如果嵌套了自定義View,可能簡單寫一個onTouchEvent處理事件已經不能解決你的需要。簡單舉個例子: 你自定義了一個容器View,簡稱為父View,在這裡監聽點選事件,做事情A,監聽滑動做事情B 然後你又自定了一個View,放入該容器

Spark SQL定義刪除外部表

前言 Spark SQL 在刪除外部表時,本不能刪除外部表的資料的。本篇文章主要介紹如何修改Spark SQL 原始碼實現在刪除外部表的時候,可以帶額外選項來刪除外部表的資料。 本文的環境是我一直使用的 spark 2.4.3 版本。 1. 修改ANTLR4 語法檔案 修改 SqlBase.g4檔案中drop

hive 定義函式UDF

1. 在Hive中給我們內建了很多函式 進入hive客戶端,檢視hive內建函式: hive > show functions; OK ! != % & * + - / < <= <=> <> = == > &g

hive中使用定義函式(UDF)實現分析函式row_number的功能

1. hive0.10及之前的版本沒有row_number這個函式,假設我們現在出現如下業務場景,現在我們在hdfs上有個log日誌檔案,為了方便敘述,該檔案只有2個欄位,第一個是使用者的id,第二個是當天登入的timestamp,現在我們需要求每個使用者最早登入的那條記錄(

spark三種清理資料的方式:UDF定義函式spark.sql;Python中的zip()*zip()函式//及python中的*args和**kwargs

(1)UDF的方式清理資料 import sys reload(sys) sys.setdefaultencoding('utf8') import re import json from pyspark.sql import SparkSession

MySQL定義函式用法-複合結構定義變數/流程控制

自定義函式 (user-defined function UDF)就是用一個象ABS() 或 CONCAT()這樣的固有(內建)函式一樣作用的新函式去擴充套件MySQL。 所以UDF是對MySQL功能的一個擴充套件 建立和刪除自定義函式語法: 建立UDF:   CREATE 

Spark機器學習 定義sql函式處理Vector型別

對於sparksql處理不了的型別,可以使用spark.udf.register自定義函式方法處理。 spark.udf.register("getPCA0", (s: Vector) => s(0)) spark.udf.register("getPCA1", (s

mysql定義函式

mysql有自己的編寫函式的語法,轉載別人的分享作為自己的參考: 自定義函式 (user-defined function UDF)就是用一個象ABS() 或 CONCAT()這樣的固有(內建)函式一樣作用的新函式去擴充套件MySQL。 所以UDF是對MySQL功能的一

MySQL定義函式用法

自定義函式 (user-defined function UDF)就是用一個象ABS() 或 CONCAT()這樣的固有(內建)函式一樣作用的新函式去擴充套件MySQL。 所以UDF是對MySQL功能的一個擴充套件 建立和刪除自定義函式語法: 建立UDF:   CREATE [AGGREGATE] FUN