1. 程式人生 > >Spark2.x學習筆記:14、Spark SQL程式設計

Spark2.x學習筆記:14、Spark SQL程式設計

Spark2.x學習筆記:14、 Spark SQL程式設計

14.1 RDD的侷限性

  • RDD僅表示資料集,RDD沒有元資料,也就是說沒有欄位語義定義。
  • RDD需要使用者自己優化程式,對程式設計師要求較高。
  • 從不同資料來源讀取資料相對困難。
  • 合併多個數據源中的資料也較困難。

14.2 DataFrame和Dataset

(1)DataFrame
由於RDD的侷限性,Spark產生了DataFrame。
DataFrame=RDD+Schema
其中Schema是就是元資料,是語義描述資訊。
在Spark1.3之前,DataFrame被稱為SchemaRDD。以行為單位構成的分散式資料集合,按照列賦予不同的名稱。對select、fileter、aggregation和sort等操作符的抽象。

  • 內部資料無型別,統一為Row
  • DataFrame是一種特殊型別的Dataset
  • DataFrame自帶優化器Catalyst,可以自動優化程式。
  • DataFrame提供了一整套的Data Source API。

(2)Dataset
由於DataFrame的資料型別統一是Row,所以DataFrame也是有缺點的。

  • Row執行時型別檢查
    比如salary是字串型別,下面語句也只有執行時才進行型別檢查。
dataframe.filter("salary>1000").show()
  • Row不能直接操作domain物件
  • 函式風格程式設計,沒有面向物件風格的API

所以,Spark SQL引入了Dataset,擴充套件了DataFrame API,提供了編譯時型別檢查,面向物件風格的API。
Dataset可以和DataFrame、RDD相互轉換。
DataFrame[Row]=Dataset


可見DataFrame是一種特殊的Dataset。

14.3 為什麼需要DataFrame和Dataset?

我們知道Spark SQL提供了兩種方式操作資料:

  • SQL查詢
  • DataFrame和Dataset API

既然Spark SQL提供了SQL訪問方式,那為什麼還需要DataFrame和Dataset的API呢?
這是因為SQL語句雖然簡單,但是SQL的表達能力卻是有限的(所以Oracle資料庫提供了PL/SQL)。DataFrame和Dataset可以採用更加通用的語言(Scala或Python)來表達使用者的查詢請求。此外,Dataset可以更快撲捉錯誤,因為SQL是執行時捕獲異常,而Dataset是編譯時檢查錯誤。

14.4 基本步驟

  • 建立SparkSession物件
    SparkSession封裝了Spark SQL執行環境資訊,是所有Spark SQL程式唯一的入口。
  • 建立DataFrame或Dataset
    Spark SQL支援多種資料來源
  • 在DataFrame或Dataset之上進行轉換和Action
    Spark SQL提供了多鍾轉換和Action函式
  • 返回結果
    儲存結果到HDFS中,或直接打印出來 。

步驟1:建立SparkSession物件

val spark=SparkSessin.builder
        .master("local")
        .appName("spark session example")
        .getOrCreate()

注意:SparkSession中封裝了spark.sparkContext和spark.sqlContext
後面所有程式或程式片段中出現的spark變數均是SparkSession物件

將RDD隱式轉換為DataFrame

import spark.implicits._

步驟2:建立DataFrame或Dataset
提供了讀寫各種格式資料的API,包括常見的JSON,JDBC,Parquet,HDFS

步驟3:在DataFrame或Dataset之上進行各種操作
這裡寫圖片描述

14.5 例項演示

(1)進入spark-shell

[[email protected] ~]# spark-shell
17/10/13 10:05:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.80.131:4040
Spark context available as 'sc' (master = local[*], app id = local-1507903559300).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

這裡的Spark session物件是對Spark context物件的進一步封裝。也就是說Spark session物件(spark)中的SparkContext就是Spark context物件(sc),從下面輸出資訊可以驗證。

scala> spark.sparkContext
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@7bd7c4cf

scala> println(sc)
org.apache.spark.SparkContext@7bd7c4cf

scala>

(2)匯入org.apache.spark.sql.Row

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

(3)定義case class

scala> case class User(userID:Long,gender:String,age:Int,occupation:String,zipcode:String)
defined class User

scala> val usersRDD=sc.textFile("file:///root/data/ml-1m/users.dat")
usersRDD: org.apache.spark.rdd.RDD[String] = file:///root/data/ml-1m/users.dat MapPartitionsRDD[3] at textFile at <console>:25

scala> usersRDD.count
res3: Long = 6040

(4)case class作為RDD的schema

scala> val userRDD =usersRDD.map(_.split("::")).map(p=>User(p(0).toLong,p(1).trim,p(2).toInt,p(3),p(4)))
userRDD: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[5] at map at <console>:29

(5)通過RDD.toDF將RDD轉換為DataFrame

scala> val userDF=userRDD.toDF
userDF: org.apache.spark.sql.DataFrame = [userID: bigint, gender: string ... 3 more fields]

(6)檢視DataFrame所以方法
輸入userDF.,然後tab鍵,可以看到DataFrame所以方法

scala> userDF.
agg                             cube               hint             randomSplitAsList      take                
alias                           describe           inputFiles       rdd                    takeAsList          
apply                           distinct           intersect        reduce                 toDF                
as                              drop               isLocal          registerTempTable      toJSON              
cache                           dropDuplicates     isStreaming      repartition            toJavaRDD           
checkpoint                      dtypes             javaRDD          rollup                 toLocalIterator     
coalesce                        except             join             sample                 toString            
col                             explain            joinWith         schema                 transform           
collect                         explode            limit            select                 union               
collectAsList                   filter             map              selectExpr             unionAll            
columns                         first              mapPartitions    show                   unpersist           
count                           flatMap            na               sort                   where               
createGlobalTempView            foreach            orderBy          sortWithinPartitions   withColumn          
createOrReplaceGlobalTempView   foreachPartition   persist          sparkSession           withColumnRenamed   
createOrReplaceTempView         groupBy            printSchema      sqlContext             withWatermark       
createTempView                  groupByKey         queryExecution   stat                   write               
crossJoin                       head               randomSplit      storageLevel           writeStream         

scala>

(7)輸出DataFrame的Schema

scala> userDF.printSchema
root
 |-- userID: long (nullable = false)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = false)
 |-- occupation: string (nullable = true)
 |-- zipcode: string (nullable = true)

(8)DataFrame的其他方法

scala> userDF.first
res5: org.apache.spark.sql.Row = [1,F,1,10,48067]

scala> userDF.take(10)
res6: Array[org.apache.spark.sql.Row] = Array([1,F,1,10,48067], [2,M,56,16,70072], [3,M,25,15,55117], [4,M,45,7,02460], [5,M,25,20,55455], [6,F,50,9,55117], [7,M,35,1,06810], [8,M,25,12,11413], [9,M,25,17,61614], [10,F,35,1,95370])

scala>

(9)檢視DataFrame可以轉化的資料格式
輸入userDF.write.,然後tab鍵,可以看到DataFrame可以轉化的資料格式

scala> userDF.write.
bucketBy   format       jdbc   mode     options   parquet       save          sortBy      
csv        insertInto   json   option   orc       partitionBy   saveAsTable   text        

scala>

(10)將DataFrame資料以JSON格式寫入HDFS

scala> userDF.write.json("/tmp/json")

scala>

(11)檢視HDFS

[[email protected] ~]# hdfs dfs -ls /tmp/json
Found 2 items
-rw-r--r--   3 root supergroup          0 2017-10-13 10:31 /tmp/json/_SUCCESS
-rw-r--r--   3 root supergroup     442408 2017-10-13 10:31 /tmp/json/part-00000-6f19a241-2f72-4a06-a6bc-81706c89bf5b-c000.json
[[email protected] ~]# 

(12)也可以寫入本地

scala> userDF.write.json("file:///tmp/json")
[root@node1 ~]# ls /tmp/json
part-00000-66aa0658-0343-4659-a809-468e4fde23a5-c000.json  _SUCCESS
[root@node1 ~]# tail -5 /tmp/json/part-00000-66aa0658-0343-4659-a809-468e4fde23a5-c000.json
{"userID":6036,"gender":"F","age":25,"occupation":"15","zipcode":"32603"}
{"userID":6037,"gender":"F","age":45,"occupation":"1","zipcode":"76006"}
{"userID":6038,"gender":"F","age":56,"occupation":"1","zipcode":"14706"}
{"userID":6039,"gender":"F","age":45,"occupation":"0","zipcode":"01060"}
{"userID":6040,"gender":"M","age":25,"occupation":"6","zipcode":"11106"}
[root@node1 ~]# 

(13)檢視Spark SQL可以讀的資料格式

scala> val df=spark.read.
csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

scala>

(14)將JSON檔案轉化為DataFrame

scala> val df=spark.read.json("/tmp/json")
df: org.apache.spark.sql.DataFrame = [age: bigint, gender: string ... 3 more fields]

scala> df.take(2)
res9: Array[org.apache.spark.sql.Row] = Array([1,F,10,1,48067], [56,M,16,2,70072])

scala>

(15)再將DataFrame轉化為ORC格式資料(該格式檔案是二進位制檔案)

scala> df.write.orc("file:///tmp/orc")
[[email protected] ~]# ls /tmp/orc
part-00000-09cf3025-cc71-4a76-a35d-a7cef4885be8-c000.snappy.orc  _SUCCESS
[[email protected] ~]#

(16)讀取目錄/tmp/orc下的所有orc檔案

scala> val orcDF=spark.read.orc("file:///tmp/orc")
orcDF: org.apache.spark.sql.DataFrame = [age: bigint, gender: string ... 3 more fields]

scala> orcDF.first
res11: org.apache.spark.sql.Row = [1,F,10,1,48067]

scala>

14.6 select和filter

(1)select

scala> userDF.select("UserID","age").show
+------+---+
|UserID|age|
+------+---+
|     1|  1|
|     2| 56|
|     3| 25|
|     4| 45|
|     5| 25|
|     6| 50|
|     7| 35|
|     8| 25|
|     9| 25|
|    10| 35|
|    11| 25|
|    12| 25|
|    13| 45|
|    14| 35|
|    15| 25|
|    16| 35|
|    17| 50|
|    18| 18|
|    19|  1|
|    20| 25|
+------+---+
only showing top 20 rows


scala> userDF.select("UserID","age").show(2)
+------+---+
|UserID|age|
+------+---+
|     1|  1|
|     2| 56|
+------+---+
only showing top 2 rows

scala> userDF.selectExpr("UserID","ceil(age/10) as newAge").show(2)
+------+------+
|UserID|newAge|
+------+------+
|     1|     1|
|     2|     6|
+------+------+
only showing top 2 rows

scala> userDF.select(max('age),min('age),avg('age)).show(2)
+--------+--------+------------------+
|max(age)|min(age)|          avg(age)|
+--------+--------+------------------+
|      56|       1|30.639238410596025|
+--------+--------+------------------+

**(2)filter**
scala> userDF.filter(userDF("age")>30).show(2)
+------+------+---+----------+-------+
|userID|gender|age|occupation|zipcode|
+------+------+---+----------+-------+
|     2|     M| 56|        16|  70072|
|     4|     M| 45|         7|  02460|
+------+------+---+----------+-------+
only showing top 2 rows


scala> userDF.filter("age>30 and occupation=10").show(2)
+------+------+---+----------+-------+
|userID|gender|age|occupation|zipcode|
+------+------+---+----------+-------+
|  4562|     M| 35|        10|  94133|
|  5223|     M| 56|        10|  11361|
+------+------+---+----------+-------+


scala> 

(3)select和filter組合

scala> userDF.select("userID","age").filter("age>30").show(2)
+------+---+
|userID|age|
+------+---+
|     2| 56|
|     4| 45|
+------+---+
only showing top 2 rows


scala> userDF.filter("age>30").select("userID","age").show(2)
+------+---+
|userID|age|
+------+---+
|     2| 56|
|     4| 45|
+------+---+
only showing top 2 rows

14.7 groupBy

scala> userDF.groupBy("age").count.show
+---+-----+                                                                     
|age|count|
+---+-----+
|  1|  222|
| 35| 1193|
| 50|  496|
| 45|  550|
| 25| 2096|
| 56|  380|
| 18| 1103|
+---+-----+


scala> userDF.groupBy("age").agg(count('gender),countDistinct('occupation)).show
+---+-------------+--------------------------+                                  
|age|count(gender)|count(DISTINCT occupation)|
+---+-------------+--------------------------+
|  1|          222|                        13|
| 35|         1193|                        21|
| 50|          496|                        20|
| 45|          550|                        20|
| 25|         2096|                        20|
| 56|          380|                        20|
| 18|         1103|                        20|
+---+-------------+--------------------------+


scala> userDF.groupBy("age").agg("gender"->"count","occupation"->"count").show
+---+-------------+-----------------+
|age|count(gender)|count(occupation)|
+---+-------------+-----------------+
|  1|          222|              222|
| 35|         1193|             1193|
| 50|          496|              496|
| 45|          550|              550|
| 25|         2096|             2096|
| 56|          380|              380|
| 18|         1103|             1103|
+---+-------------+-----------------+


scala> 

14.8 join

問題:求解看過movieID=2116電影的觀眾的性別與年齡的分佈。
(1)Users DataFrame

scala> userDF.printSchema
root
 |-- userID: long (nullable = false)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = false)
 |-- occupation: string (nullable = true)
 |-- zipcode: string (nullable = true)


scala>

(2)Ratings DataFrame

scala> case class Rating(userID:Long,movieID:Long,Rating:Int,Timestamp:String)
defined class Rating

scala> val ratingsRDD=sc.textFile("file:///root/data/ml-1m/ratings.dat")
ratingsRDD: org.apache.spark.rdd.RDD[String] = file:///root/data/ml-1m/ratings.dat MapPartitionsRDD[65] at textFile at <console>:25

scala> val ratingRDD =ratingsRDD.map(_.split("::")).map(p=>Rating(p(0).toLong,p(1).toLong,p(2).toInt,p(3)))
ratingRDD: org.apache.spark.rdd.RDD[Rating] = MapPartitionsRDD[67] at map at <console>:29

scala> val ratingDF=ratingRDD.toDF
ratingDF: org.apache.spark.sql.DataFrame = [userID: bigint, movieID: bigint ... 2 more fields]

scala> scala> ratingDF.printSchema
root
 |-- userID: long (nullable = false)
 |-- movieID: long (nullable = false)
 |-- Rating: integer (nullable = false)
 |-- Timestamp: string (nullable = true)

scala>

(2)join

scala> val mergeredDF=ratingDF.filter("movieID=2116").join(userDF,"userID").select("gender","age").groupBy("gender","age").count
mergeredDF: org.apache.spark.sql.DataFrame = [gender: string, age: int ... 1 more field]

scala> mergeredDF.show
+------+---+-----+                                                              
|gender|age|count|
+------+---+-----+
|     M| 18|   72|
|     F| 18|    9|
|     M| 56|    8|
|     M| 45|   26|
|     F| 45|    3|
|     M| 25|  169|
|     F| 56|    2|
|     M|  1|   13|
|     F|  1|    4|
|     F| 50|    3|
|     M| 50|   22|
|     F| 25|   28|
|     F| 35|   13|
|     M| 35|   66|
+------+---+-----+


scala> 

14.9 臨時表

scala> userDF.createOrReplaceTempView("users")

scala> val groupedUsers=spark.sql("select gender,age,count(*) as num from users group by gender, age")
groupedUsers: org.apache.spark.sql.DataFrame = [gender: string, age: int ... 1 more field]

scala> groupedUsers.show
+------+---+----+                                                               
|gender|age| num|
+------+---+----+
|     M| 18| 805|
|     F| 18| 298|
|     M| 56| 278|
|     M| 45| 361|
|     F| 45| 189|
|     M| 25|1538|
|     F| 56| 102|
|     M|  1| 144|
|     F|  1|  78|
|     F| 50| 146|
|     M| 50| 350|
|     F| 25| 558|
|     F| 35| 338|
|     M| 35| 855|
+------+---+----+


scala> 

注意:在Spark程式執行中,臨時表才存在。當Spark程式執行結束,臨時表也被銷燬。

14.10 Spark SQL的表

(1)Session範圍內的臨時表

  • df.createOrReplaceTempView(“tableName”)
  • 只在Session範圍內有效,Session結束臨時表自動銷燬

(2)全域性範圍內的臨時表

  • df.createGlobalTempView(“tableName”)
  • 所有Session共享
scala> userDF.createGlobalTempView("users")

scala> spark.sql("select * from global_temp.users").show
+------+------+---+----------+-------+
|userID|gender|age|occupation|zipcode|
+------+------+---+----------+-------+
|     1|     F|  1|        10|  48067|
|     2|     M| 56|        16|  70072|
|     3|     M| 25|        15|  55117|
|     4|     M| 45|         7|  02460|
|     5|     M| 25|        20|  55455|
|     6|     F| 50|         9|  55117|
|     7|     M| 35|         1|  06810|
|     8|     M| 25|        12|  11413|
|     9|     M| 25|        17|  61614|
|    10|     F| 35|         1|  95370|
|    11|     F| 25|         1|  04093|
|    12|     M| 25|        12|  32793|
|    13|     M| 45|         1|  93304|
|    14|     M| 35|         0|  60126|
|    15|     M| 25|         7|  22903|
|    16|     F| 35|         0|  20670|
|    17|     M| 50|         1|  95350|
|    18|     F| 18|         3|  95825|
|    19|     M|  1|        10|  48073|
|    20|     M| 25|        14|  55113|
+------+------+---+----------+-------+
only showing top 20 rows


scala> spark.newSession().sql("select * from global_temp.users").show
+------+------+---+----------+-------+
|userID|gender|age|occupation|zipcode|
+------+------+---+----------+-------+
|     1|     F|  1|        10|  48067|
|     2|     M| 56|        16|  70072|
|     3|     M| 25|        15|  55117|
|     4|     M| 45|         7|  02460|
|     5|     M| 25|        20|  55455|
|     6|     F| 50|         9|  55117|
|     7|     M| 35|         1|  06810|
|     8|     M| 25|        12|  11413|
|     9|     M| 25|        17|  61614|
|    10|     F| 35|         1|  95370|
|    11|     F| 25|         1|  04093|
|    12|     M| 25|        12|  32793|
|    13|     M| 45|         1|  93304|
|    14|     M| 35|         0|  60126|
|    15|     M| 25|         7|  22903|
|    16|     F| 35|         0|  20670|
|    17|     M| 50|         1|  95350|
|    18|     F| 18|         3|  95825|
|    19|     M|  1|        10|  48073|
|    20|     M| 25|        14|  55113|
+------+------+---+----------+-------+
only showing top 20 rows


scala> 

(3)將DataFrame或Dataset持久化到Hive中
df.write.mode(“overwrite”).saveAsTable(“database.tableName”)