1. 程式人生 > >第72課:Spark SQL UDF和UDAF解密與實戰

第72課:Spark SQL UDF和UDAF解密與實戰

內容:

    1.SparkSQL UDF     2.SparkSQL UDAF

一、SparkSQL UDF和SparkSQL UDAF

    1.解決SparkSQL內建函式不足問題,自定義內建函式,     2.UDF:User Define Function,使用者自定義的函式,函式的輸入是一個具體的資料記錄,實現上講就是簡單的scala程式碼     3.UDAF:User Define Aggregation Function,使用者自定義的聚合函式,函式本身作用於資料集合,能夠在聚合操作的基礎上自定義操作     4.實質上講,例如說UDF會被sparkSQL中的Catalyst分裝成為Expression,最終會通過eval方法來計算輸入的資料Row(此處的Row和dataframe中的Row沒有任何關係)     5.通過SQLContext註冊UDF,在scala2.10.x版本UDF函式最多可以接受22個輸入引數

二、SparkSQL UDF和SparkSQL UDAF實戰  

package SparkSQL

import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types._

/**
  * FileName: SparkSQLUDFAndUDAF
  * Author:   hadoop
  * Email:    
[email protected]
* Date: 18-11-10 上午10:46 * Description: *通過案例實戰SparkSQL下的UDF和UDAF的具體使用 * UDF:User Define Function,使用者自定義的函式,函式的輸入是一個具體的資料記錄,實現上講就是簡單的scala程式碼 * UDAF:User Define Aggregation Function,使用者自定義的聚合函式,函式本身作用於資料集合,能夠在聚合操作的基礎上自定義操作 * 實質上講,例如說UDF會被sparkSQL中的Catalyst分裝成為Expression,最終會通過eval方法來計算輸入的資料Row(此處的Row和dataframe中的Row沒有任何關係) */ object SparkSQLUDFAndUDAF { def main(args: Array[String]): Unit = { //建立SparkConf用於讀取系統資訊並設定運用程式的名稱 val conf = new SparkConf().setAppName("SparkSQLUDFAndUDAF").setMaster("local") //建立JavaSparkContext物件例項作為整個Driver的核心基石 val sc = new SparkContext(conf) //方便檢視輸出的日誌資訊,也可以設定為WARN、ERROR sc.setLogLevel("ERROR") //建立SQLContext上下文物件,用於SqL的分析 val sqlContext = new SQLContext(sc) //模擬資料 val bigData = Array( "Spark","Spark","Hadoop", "Spark","Spark","Hadoop" ) /** * 建立提供的資料常見DAtaframe */ val bigDataRDD = sc.parallelize(bigData) val bigDataRDDRow = bigDataRDD.map((item=>Row(item))) val structType = StructType(Array( StructField("word",StringType,true) )) val bigDataDS = sqlContext.createDataFrame(bigDataRDDRow,structType) bigDataDS.registerTempTable("bigDataTable") /** * 通過SQLContext註冊UDF,在scala2.10.x版本UDF函式最多可以接受22個輸入引數 */ sqlContext.udf.register("computeLength",(input:String)=>input.length) //直接在SQL語句中使用UDF,就像使用SQL自帶的內部函式一樣 sqlContext.sql("select word,computeLength(word) as length from bigDataTable").show() sqlContext.udf.register("wordcount",new MyUDAF) sqlContext.sql("select word,wordcount(word) as count,computeLength(word) as length from bigDataTable group by word").show() while (true){ } } } /** * 按照模板實現UDAF */ class MyUDAF extends UserDefinedAggregateFunction{ /** * 該方法指定具體輸入資料的型別,在這裡指定的資料列名和輸入的列名沒有關係 * @return */ override def inputSchema: StructType = StructType(Array(StructField("input",StringType,true))) /** * 在進行聚合操作的時候所要處理的資料的結果的型別 * @return */ override def bufferSchema: StructType = StructType(Array(StructField("count",IntegerType,true))) /** * 指定UDAF函式計算後返回的結果型別 * @return */ override def dataType: DataType = IntegerType /** *指定資料一致性 * @return */ override def deterministic: Boolean = true /** * 在aggregate之前每組資料的初始化結果 * @param buffer */ override def initialize (buffer: MutableAggregationBuffer): Unit = (buffer(0) = 0) /** * 在進行聚合的時候,每當性的值進來,對分組的聚合如何進行計算 * 本地的聚合操作,相當於Hadoop MapReduce模型這的Combiner * @param buffer * @param input */ override def update (buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0) = buffer.getAs[Int](0)+1 } /** * 在分散式節點進行local Reduce完成後需要進行全域性級別的Merge操作 * @param buffer1 * @param buffer2 */ override def merge (buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getAs[Int](0)+buffer2.getAs[Int](0) } /** * 返回UDAF最後的計算結果 * @param buffer * @return */ override def evaluate (buffer: Row): Any = buffer.getAs[Int](0) }

執行結果: