1. 程式人生 > >SparkSQL中的內建函式

SparkSQL中的內建函式

    使用Spark SQL中的內建函式對資料進行分析,Spark SQL API不同的是,DataFrame中的內建函式操作的結果是返回一個Column物件,而DataFrame天生就是"A distributed collection of data organized into named columns.",這就為資料的複雜分析建立了堅實的基礎並提供了極大的方便性,例如說,我們在操作DataFrame的方法中可以隨時呼叫內建函式進行業務需要的處理,這之於我們構建附件的業務邏輯而言是可以極大的減少不必須的時間消耗(基於上就是實際模型的對映),讓我們聚焦在資料分析上,這對於提高工程師的生產力而言是非常有價值的Spark 1.5.x開始提供了大量的內建函式,例如agg:

def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
groupBy().agg(aggExpr, aggExprs : _*)
}

還有max、mean、min、sum、avg、explode、size、sort_array、day、to_date、abs、acros、asin、atan
總體上而言內建函式包含了五大基本型別:
1、聚合函式,例如countDistinct、sumDistinct等;
2、集合函式,例如sort_array、explode等
3、日期、時間函式,例如hour、quarter、next_day
4、數學函式,例如asin、atan、sqrt、tan、round等;
5、開窗函式,例如rowNumber等
6、字串函式,concat、format_number、rexexp_extract
7、其它函式,isNaN、sha、randn、callUDF

第一步:建立Spark的配置物件SparkConf,設定Spark程式的執行時的配置資訊,例如說通過setMaster來設定程式要連結的Spark叢集的Master的URL,Spark程式在本地執行

val conf = new SparkConf() //建立SparkConf物件
conf.setAppName("SparkSQL") //設定應用程式的名稱,在程式執行的監控介面可以看到名稱
//conf.setMaster("spark://DaShuJu-040:7077") //此時,程式在Spark叢集
conf.setMaster("local")
第二步:建立SparkContext物件
      SparkContext是Spark程式所有功能的唯一入口,無論是採用Scala、Java、Python、R等都必須有一個SparkContext
      SparkContext核心作用:初始化Spark應用程式執行所需要的核心元件,包括DAGScheduler、TaskScheduler、SchedulerBackend
      同時還會負責Spark程式往Master註冊程式等
      SparkContext是整個Spark應用程式中最為至關重要的一個物件
val sc = new SparkContext(conf) //建立SparkContext物件,通過傳入SparkConf例項來定製Spark執行的具體引數和配置資訊
val sqlContext = new SQLContext(sc)   //構建SQL上下文</span>
//要使用Spark SQL的內建函式,就一定要匯入SQLContext下的隱式轉換 
import sqlContext.implicits._


第三步:模擬資料,最後生成RDD
val userData = Array(
      "2016-3-27,001,http://spark.apache.org/,1000",
      "2016-3-27,001,http://hadoop.apache.org/,1001",
      "2016-3-27,002,http://fink.apache.org/,1002",
      "2016-3-28,003,http://kafka.apache.org/,1020",
      "2016-3-28,004,http://spark.apache.org/,1010",
      "2016-3-28,002,http://hive.apache.org/,1200",
      "2016-3-28,001,http://parquet.apache.org/,1500",
      "2016-3-28,001,http://spark.apache.org/,1800"
    )</span>
val userDataRDD = sc.parallelize(userData)  //生成DD分散式集合物件
</span>

第四步:根據業務需要對資料進行預處理生成DataFrame,要想把RDD轉換成DataFrame,需要先把RDD中的元素型別變成Row型別
      於此同時要提供DataFrame中的Columns的元資料資訊描述

val userDataRDDRow = userDataRDD.map(row => {val splited = row.split(",") ;Row(splited(0),splited(1).toInt,splited(2),splited(3).toInt)})
val structTypes = StructType(Array(
      StructField("time", StringType, true),
      StructField("id", IntegerType, true),
      StructField("url", StringType, true),
      StructField("amount", IntegerType, true)
))
<span style="font-family: Arial, Helvetica, sans-serif;">val userDataDF = sqlContext.createDataFrame(userDataRDDRow,structTypes)</span>
第五步:使用Spark SQL提供的內建函式對DataFrame進行操作,特別注意:內建函式生成的Column物件且自定進行CG;
userDataDF.groupBy("time").agg('time, countDistinct('id)).map(row=>Row(row(1),row(2))).collect.foreach(println)
userDataDF.groupBy("time").agg('time, sum('amount)).show()