Spark2.x學習筆記:14、Spark SQL程式設計
Spark2.x學習筆記:14、 Spark SQL程式設計
14.1 RDD的侷限性
- RDD僅表示資料集,RDD沒有元資料,也就是說沒有欄位語義定義。
- RDD需要使用者自己優化程式,對程式設計師要求較高。
- 從不同資料來源讀取資料相對困難。
- 合併多個數據源中的資料也較困難。
14.2 DataFrame和Dataset
(1)DataFrame
由於RDD的侷限性,Spark產生了DataFrame。
DataFrame=RDD+Schema
其中Schema是就是元資料,是語義描述資訊。
在Spark1.3之前,DataFrame被稱為SchemaRDD。以行為單位構成的分散式資料集合,按照列賦予不同的名稱。對select、fileter、aggregation和sort等操作符的抽象。
- 內部資料無型別,統一為Row
- DataFrame是一種特殊型別的Dataset
- DataFrame自帶優化器Catalyst,可以自動優化程式。
- DataFrame提供了一整套的Data Source API。
(2)Dataset
由於DataFrame的資料型別統一是Row,所以DataFrame也是有缺點的。
- Row執行時型別檢查
比如salary是字串型別,下面語句也只有執行時才進行型別檢查。
dataframe.filter("salary>1000").show()
- Row不能直接操作domain物件
- 函式風格程式設計,沒有面向物件風格的API
所以,Spark SQL引入了Dataset,擴充套件了DataFrame API,提供了編譯時型別檢查,面向物件風格的API。
Dataset可以和DataFrame、RDD相互轉換。
DataFrame[Row]=Dataset
可見DataFrame是一種特殊的Dataset。
14.3 為什麼需要DataFrame和Dataset?
我們知道Spark SQL提供了兩種方式操作資料:
- SQL查詢
- DataFrame和Dataset API
既然Spark SQL提供了SQL訪問方式,那為什麼還需要DataFrame和Dataset的API呢?
這是因為SQL語句雖然簡單,但是SQL的表達能力卻是有限的(所以Oracle資料庫提供了PL/SQL)。DataFrame和Dataset可以採用更加通用的語言(Scala或Python)來表達使用者的查詢請求。此外,Dataset可以更快撲捉錯誤,因為SQL是執行時捕獲異常,而Dataset是編譯時檢查錯誤。
14.4 基本步驟
- 建立SparkSession物件
SparkSession封裝了Spark SQL執行環境資訊,是所有Spark SQL程式唯一的入口。 - 建立DataFrame或Dataset
Spark SQL支援多種資料來源 - 在DataFrame或Dataset之上進行轉換和Action
Spark SQL提供了多鍾轉換和Action函式 - 返回結果
儲存結果到HDFS中,或直接打印出來 。
步驟1:建立SparkSession物件
val spark=SparkSessin.builder
.master("local")
.appName("spark session example")
.getOrCreate()
注意:SparkSession中封裝了spark.sparkContext和spark.sqlContext
後面所有程式或程式片段中出現的spark變數均是SparkSession物件
將RDD隱式轉換為DataFrame
import spark.implicits._
步驟2:建立DataFrame或Dataset
提供了讀寫各種格式資料的API,包括常見的JSON,JDBC,Parquet,HDFS
步驟3:在DataFrame或Dataset之上進行各種操作
14.5 例項演示
(1)進入spark-shell
[[email protected] ~]# spark-shell
17/10/13 10:05:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.80.131:4040
Spark context available as 'sc' (master = local[*], app id = local-1507903559300).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
這裡的Spark session物件是對Spark context物件的進一步封裝。也就是說Spark session物件(spark)中的SparkContext就是Spark context物件(sc),從下面輸出資訊可以驗證。
scala> spark.sparkContext
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@7bd7c4cf
scala> println(sc)
org.apache.spark.SparkContext@7bd7c4cf
scala>
(2)匯入org.apache.spark.sql.Row
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
(3)定義case class
scala> case class User(userID:Long,gender:String,age:Int,occupation:String,zipcode:String)
defined class User
scala> val usersRDD=sc.textFile("file:///root/data/ml-1m/users.dat")
usersRDD: org.apache.spark.rdd.RDD[String] = file:///root/data/ml-1m/users.dat MapPartitionsRDD[3] at textFile at <console>:25
scala> usersRDD.count
res3: Long = 6040
(4)case class作為RDD的schema
scala> val userRDD =usersRDD.map(_.split("::")).map(p=>User(p(0).toLong,p(1).trim,p(2).toInt,p(3),p(4)))
userRDD: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[5] at map at <console>:29
(5)通過RDD.toDF將RDD轉換為DataFrame
scala> val userDF=userRDD.toDF
userDF: org.apache.spark.sql.DataFrame = [userID: bigint, gender: string ... 3 more fields]
(6)檢視DataFrame所以方法
輸入userDF.,然後tab鍵,可以看到DataFrame所以方法
scala> userDF.
agg cube hint randomSplitAsList take
alias describe inputFiles rdd takeAsList
apply distinct intersect reduce toDF
as drop isLocal registerTempTable toJSON
cache dropDuplicates isStreaming repartition toJavaRDD
checkpoint dtypes javaRDD rollup toLocalIterator
coalesce except join sample toString
col explain joinWith schema transform
collect explode limit select union
collectAsList filter map selectExpr unionAll
columns first mapPartitions show unpersist
count flatMap na sort where
createGlobalTempView foreach orderBy sortWithinPartitions withColumn
createOrReplaceGlobalTempView foreachPartition persist sparkSession withColumnRenamed
createOrReplaceTempView groupBy printSchema sqlContext withWatermark
createTempView groupByKey queryExecution stat write
crossJoin head randomSplit storageLevel writeStream
scala>
(7)輸出DataFrame的Schema
scala> userDF.printSchema
root
|-- userID: long (nullable = false)
|-- gender: string (nullable = true)
|-- age: integer (nullable = false)
|-- occupation: string (nullable = true)
|-- zipcode: string (nullable = true)
(8)DataFrame的其他方法
scala> userDF.first
res5: org.apache.spark.sql.Row = [1,F,1,10,48067]
scala> userDF.take(10)
res6: Array[org.apache.spark.sql.Row] = Array([1,F,1,10,48067], [2,M,56,16,70072], [3,M,25,15,55117], [4,M,45,7,02460], [5,M,25,20,55455], [6,F,50,9,55117], [7,M,35,1,06810], [8,M,25,12,11413], [9,M,25,17,61614], [10,F,35,1,95370])
scala>
(9)檢視DataFrame可以轉化的資料格式
輸入userDF.write.,然後tab鍵,可以看到DataFrame可以轉化的資料格式
scala> userDF.write.
bucketBy format jdbc mode options parquet save sortBy
csv insertInto json option orc partitionBy saveAsTable text
scala>
(10)將DataFrame資料以JSON格式寫入HDFS
scala> userDF.write.json("/tmp/json")
scala>
(11)檢視HDFS
[[email protected] ~]# hdfs dfs -ls /tmp/json
Found 2 items
-rw-r--r-- 3 root supergroup 0 2017-10-13 10:31 /tmp/json/_SUCCESS
-rw-r--r-- 3 root supergroup 442408 2017-10-13 10:31 /tmp/json/part-00000-6f19a241-2f72-4a06-a6bc-81706c89bf5b-c000.json
[[email protected] ~]#
(12)也可以寫入本地
scala> userDF.write.json("file:///tmp/json")
[root@node1 ~]# ls /tmp/json
part-00000-66aa0658-0343-4659-a809-468e4fde23a5-c000.json _SUCCESS
[root@node1 ~]# tail -5 /tmp/json/part-00000-66aa0658-0343-4659-a809-468e4fde23a5-c000.json
{"userID":6036,"gender":"F","age":25,"occupation":"15","zipcode":"32603"}
{"userID":6037,"gender":"F","age":45,"occupation":"1","zipcode":"76006"}
{"userID":6038,"gender":"F","age":56,"occupation":"1","zipcode":"14706"}
{"userID":6039,"gender":"F","age":45,"occupation":"0","zipcode":"01060"}
{"userID":6040,"gender":"M","age":25,"occupation":"6","zipcode":"11106"}
[root@node1 ~]#
(13)檢視Spark SQL可以讀的資料格式
scala> val df=spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
scala>
(14)將JSON檔案轉化為DataFrame
scala> val df=spark.read.json("/tmp/json")
df: org.apache.spark.sql.DataFrame = [age: bigint, gender: string ... 3 more fields]
scala> df.take(2)
res9: Array[org.apache.spark.sql.Row] = Array([1,F,10,1,48067], [56,M,16,2,70072])
scala>
(15)再將DataFrame轉化為ORC格式資料(該格式檔案是二進位制檔案)
scala> df.write.orc("file:///tmp/orc")
[[email protected] ~]# ls /tmp/orc
part-00000-09cf3025-cc71-4a76-a35d-a7cef4885be8-c000.snappy.orc _SUCCESS
[[email protected] ~]#
(16)讀取目錄/tmp/orc下的所有orc檔案
scala> val orcDF=spark.read.orc("file:///tmp/orc")
orcDF: org.apache.spark.sql.DataFrame = [age: bigint, gender: string ... 3 more fields]
scala> orcDF.first
res11: org.apache.spark.sql.Row = [1,F,10,1,48067]
scala>
14.6 select和filter
(1)select
scala> userDF.select("UserID","age").show
+------+---+
|UserID|age|
+------+---+
| 1| 1|
| 2| 56|
| 3| 25|
| 4| 45|
| 5| 25|
| 6| 50|
| 7| 35|
| 8| 25|
| 9| 25|
| 10| 35|
| 11| 25|
| 12| 25|
| 13| 45|
| 14| 35|
| 15| 25|
| 16| 35|
| 17| 50|
| 18| 18|
| 19| 1|
| 20| 25|
+------+---+
only showing top 20 rows
scala> userDF.select("UserID","age").show(2)
+------+---+
|UserID|age|
+------+---+
| 1| 1|
| 2| 56|
+------+---+
only showing top 2 rows
scala> userDF.selectExpr("UserID","ceil(age/10) as newAge").show(2)
+------+------+
|UserID|newAge|
+------+------+
| 1| 1|
| 2| 6|
+------+------+
only showing top 2 rows
scala> userDF.select(max('age),min('age),avg('age)).show(2)
+--------+--------+------------------+
|max(age)|min(age)| avg(age)|
+--------+--------+------------------+
| 56| 1|30.639238410596025|
+--------+--------+------------------+
**(2)filter**
scala> userDF.filter(userDF("age")>30).show(2)
+------+------+---+----------+-------+
|userID|gender|age|occupation|zipcode|
+------+------+---+----------+-------+
| 2| M| 56| 16| 70072|
| 4| M| 45| 7| 02460|
+------+------+---+----------+-------+
only showing top 2 rows
scala> userDF.filter("age>30 and occupation=10").show(2)
+------+------+---+----------+-------+
|userID|gender|age|occupation|zipcode|
+------+------+---+----------+-------+
| 4562| M| 35| 10| 94133|
| 5223| M| 56| 10| 11361|
+------+------+---+----------+-------+
scala>
(3)select和filter組合
scala> userDF.select("userID","age").filter("age>30").show(2)
+------+---+
|userID|age|
+------+---+
| 2| 56|
| 4| 45|
+------+---+
only showing top 2 rows
scala> userDF.filter("age>30").select("userID","age").show(2)
+------+---+
|userID|age|
+------+---+
| 2| 56|
| 4| 45|
+------+---+
only showing top 2 rows
14.7 groupBy
scala> userDF.groupBy("age").count.show
+---+-----+
|age|count|
+---+-----+
| 1| 222|
| 35| 1193|
| 50| 496|
| 45| 550|
| 25| 2096|
| 56| 380|
| 18| 1103|
+---+-----+
scala> userDF.groupBy("age").agg(count('gender),countDistinct('occupation)).show
+---+-------------+--------------------------+
|age|count(gender)|count(DISTINCT occupation)|
+---+-------------+--------------------------+
| 1| 222| 13|
| 35| 1193| 21|
| 50| 496| 20|
| 45| 550| 20|
| 25| 2096| 20|
| 56| 380| 20|
| 18| 1103| 20|
+---+-------------+--------------------------+
scala> userDF.groupBy("age").agg("gender"->"count","occupation"->"count").show
+---+-------------+-----------------+
|age|count(gender)|count(occupation)|
+---+-------------+-----------------+
| 1| 222| 222|
| 35| 1193| 1193|
| 50| 496| 496|
| 45| 550| 550|
| 25| 2096| 2096|
| 56| 380| 380|
| 18| 1103| 1103|
+---+-------------+-----------------+
scala>
14.8 join
問題:求解看過movieID=2116電影的觀眾的性別與年齡的分佈。
(1)Users DataFrame
scala> userDF.printSchema
root
|-- userID: long (nullable = false)
|-- gender: string (nullable = true)
|-- age: integer (nullable = false)
|-- occupation: string (nullable = true)
|-- zipcode: string (nullable = true)
scala>
(2)Ratings DataFrame
scala> case class Rating(userID:Long,movieID:Long,Rating:Int,Timestamp:String)
defined class Rating
scala> val ratingsRDD=sc.textFile("file:///root/data/ml-1m/ratings.dat")
ratingsRDD: org.apache.spark.rdd.RDD[String] = file:///root/data/ml-1m/ratings.dat MapPartitionsRDD[65] at textFile at <console>:25
scala> val ratingRDD =ratingsRDD.map(_.split("::")).map(p=>Rating(p(0).toLong,p(1).toLong,p(2).toInt,p(3)))
ratingRDD: org.apache.spark.rdd.RDD[Rating] = MapPartitionsRDD[67] at map at <console>:29
scala> val ratingDF=ratingRDD.toDF
ratingDF: org.apache.spark.sql.DataFrame = [userID: bigint, movieID: bigint ... 2 more fields]
scala> scala> ratingDF.printSchema
root
|-- userID: long (nullable = false)
|-- movieID: long (nullable = false)
|-- Rating: integer (nullable = false)
|-- Timestamp: string (nullable = true)
scala>
(2)join
scala> val mergeredDF=ratingDF.filter("movieID=2116").join(userDF,"userID").select("gender","age").groupBy("gender","age").count
mergeredDF: org.apache.spark.sql.DataFrame = [gender: string, age: int ... 1 more field]
scala> mergeredDF.show
+------+---+-----+
|gender|age|count|
+------+---+-----+
| M| 18| 72|
| F| 18| 9|
| M| 56| 8|
| M| 45| 26|
| F| 45| 3|
| M| 25| 169|
| F| 56| 2|
| M| 1| 13|
| F| 1| 4|
| F| 50| 3|
| M| 50| 22|
| F| 25| 28|
| F| 35| 13|
| M| 35| 66|
+------+---+-----+
scala>
14.9 臨時表
scala> userDF.createOrReplaceTempView("users")
scala> val groupedUsers=spark.sql("select gender,age,count(*) as num from users group by gender, age")
groupedUsers: org.apache.spark.sql.DataFrame = [gender: string, age: int ... 1 more field]
scala> groupedUsers.show
+------+---+----+
|gender|age| num|
+------+---+----+
| M| 18| 805|
| F| 18| 298|
| M| 56| 278|
| M| 45| 361|
| F| 45| 189|
| M| 25|1538|
| F| 56| 102|
| M| 1| 144|
| F| 1| 78|
| F| 50| 146|
| M| 50| 350|
| F| 25| 558|
| F| 35| 338|
| M| 35| 855|
+------+---+----+
scala>
注意:在Spark程式執行中,臨時表才存在。當Spark程式執行結束,臨時表也被銷燬。
14.10 Spark SQL的表
(1)Session範圍內的臨時表
- df.createOrReplaceTempView(“tableName”)
- 只在Session範圍內有效,Session結束臨時表自動銷燬
(2)全域性範圍內的臨時表
- df.createGlobalTempView(“tableName”)
- 所有Session共享
scala> userDF.createGlobalTempView("users")
scala> spark.sql("select * from global_temp.users").show
+------+------+---+----------+-------+
|userID|gender|age|occupation|zipcode|
+------+------+---+----------+-------+
| 1| F| 1| 10| 48067|
| 2| M| 56| 16| 70072|
| 3| M| 25| 15| 55117|
| 4| M| 45| 7| 02460|
| 5| M| 25| 20| 55455|
| 6| F| 50| 9| 55117|
| 7| M| 35| 1| 06810|
| 8| M| 25| 12| 11413|
| 9| M| 25| 17| 61614|
| 10| F| 35| 1| 95370|
| 11| F| 25| 1| 04093|
| 12| M| 25| 12| 32793|
| 13| M| 45| 1| 93304|
| 14| M| 35| 0| 60126|
| 15| M| 25| 7| 22903|
| 16| F| 35| 0| 20670|
| 17| M| 50| 1| 95350|
| 18| F| 18| 3| 95825|
| 19| M| 1| 10| 48073|
| 20| M| 25| 14| 55113|
+------+------+---+----------+-------+
only showing top 20 rows
scala> spark.newSession().sql("select * from global_temp.users").show
+------+------+---+----------+-------+
|userID|gender|age|occupation|zipcode|
+------+------+---+----------+-------+
| 1| F| 1| 10| 48067|
| 2| M| 56| 16| 70072|
| 3| M| 25| 15| 55117|
| 4| M| 45| 7| 02460|
| 5| M| 25| 20| 55455|
| 6| F| 50| 9| 55117|
| 7| M| 35| 1| 06810|
| 8| M| 25| 12| 11413|
| 9| M| 25| 17| 61614|
| 10| F| 35| 1| 95370|
| 11| F| 25| 1| 04093|
| 12| M| 25| 12| 32793|
| 13| M| 45| 1| 93304|
| 14| M| 35| 0| 60126|
| 15| M| 25| 7| 22903|
| 16| F| 35| 0| 20670|
| 17| M| 50| 1| 95350|
| 18| F| 18| 3| 95825|
| 19| M| 1| 10| 48073|
| 20| M| 25| 14| 55113|
+------+------+---+----------+-------+
only showing top 20 rows
scala>
(3)將DataFrame或Dataset持久化到Hive中
df.write.mode(“overwrite”).saveAsTable(“database.tableName”)