1. 程式人生 > >Spark SQL與DataSet

Spark SQL與DataSet

Spark SQL的架構圖
這裡寫圖片描述
Spark SQL是用於結構化資料處理的Spark模組。與基本的Spark RDD API不同,Spark SQL提供的介面為Spark提供了有關資料結構和正在執行的計算的更多資訊。在內部,Spark SQL使用此額外資訊來執行額外的優化

Spark SQL執行計劃生成和優化都由Catalyst完成

DataSet是分散式資料集合。Dataset是Spark 1.6中新增的一個新介面,它提供了RDD的優勢(強型別,使用強大的lambda函式的能力)以及Spark SQL優化執行引擎的優點。資料集可以被構造從JVM物件,然後使用功能性的轉換(操作map,flatMap,filter等等)。
DataFrame

是組織為命名列的資料集。它在概念上等同於關係資料庫中的表或R / Python中的資料框,但在引擎蓋下具有更豐富的優化。DataFrame可以從多種來源構建,例如:結構化資料檔案,Hive中的表,外部資料庫或現有RDD。DataFrame API在Scala,Java,Python和R中可用。在Scala和Java中,DataFrame由Rows 的資料集表示。在Scala API中,DataFrame它只是一個類型別名Dataset[Row]。而在Java API中,使用者需要使用Dataset來表示DataFrame。

RDD 優點

  • JVM物件組成的分散式資料集合
  • 不可變並且有容錯能力
  • 可處理機構化和非結構化的資料
  • 支援函式式轉換

RDD缺點

  • 沒有Schema
  • 使用者自己優化程式
  • 從不同的資料來源讀取資料非常困難
  • 合併多個數據源中的資料也非常困難

DataFrame API

  • Row物件組成的分散式資料集
  • 不可變並且有容錯能力
  • 處理結構化資料
  • 自帶優化器Catalyset,可自動優化程式
  • Data source API
    DataFrame讓Spark對結構化資料有了處理能力

DataFrame的缺點:
1.編譯時不能型別轉化安全檢查,執行時才能確定是否有問題
2.對於物件支援不友好,rdd內部資料直接以java物件儲存,dataframe記憶體儲存的是row物件而不能是自定義物件

DateSet的優點:
1.DateSet整合了RDD和DataFrame的優點,支援結構化和非結構化資料
2.和RDD一樣,支援自定義物件儲存
3.和DataFrame一樣,支援結構化資料的sql查詢
4.採用堆外記憶體儲存,gc友好
5.型別轉化安全,程式碼友好

foreach 在Executor端遍歷
cache
persist //持久化
printSchema
toDF
unpersist //清除持久化的

建立SparkSession 命名為spark 下面使用

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

利用反射機制推斷RDD模式

在利用反射機制推斷RDD模式時,需要首先定義一個case class,因為,只有case class才能被Spark隱式地轉換為DataFrame
程式碼片段

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder 
import spark.implicits._  //匯入包,支援把一個RDD隱式轉換為一個DataFrame
//定義一個case class
case class Person(name: String, age: Long)  
//轉換成DataFrame
val peopleDF = spark.sparkContext
                .textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
                .map(_.split(","))
                .map(attributes =>Person(attributes(0),attributes(1).trim.toInt))
                .toDF()
//必須註冊為臨時表才能供下面的查詢使用
peopleDF.createOrReplaceTempView("people") 
val personsRDD = spark.sql("select name,age from people where age > 20")
//最終生成一個DataFrame,下面是系統執行返回的資訊
//personsRDD: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
personsRDD.map(t => "Name: "+t(0)+ ","+"Age: "+t(1)).show()  //DataFrame中的每個元素都是一行記錄,包含name和age兩個欄位,分別用t(0)和t(1)來獲取值
//下面是系統執行返回的資訊
+------------------+ 
| value|
+------------------+
|Name:Michael,Age:29|
| Name:Andy,Age:30|
+------------------+

當無法提前定義case class時,就需要採用程式設計方式定義RDD模式。
程式碼片段

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
//生成欄位
val fields = Array(StructField("name",StringType,true), StructField("age",IntegerType,true))
val schema = StructType(fields)
//從上面資訊可以看出,schema描述了模式資訊,模式中包含name和age兩個欄位
//shcema就是“表頭”
val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
//對peopleRDD 這個RDD中的每一行元素都進行解析
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1).trim.toInt))
//上面得到的rowRDD就是“表中的記錄”
//下面把“表頭”和“表中的記錄”拼裝起來
 val peopleDF = spark.createDataFrame(rowRDD, schema)
 //必須註冊為臨時表才能供下面查詢使用
peopleDF.createOrReplaceTempView("people")
val results = spark.sql("SELECT name,age FROM people")
results.map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).show()
+--------------------+
| value|
+--------------------+
|name: Michael,age:29|
| name: Andy,age:30|
| name: Justin,age:19|

下面的spark代表SparkSession
讀寫json資料

val peopleDF = spark.read.format("json").load("/people.json")
或者 
val peopleDF = spark.read.json("/people.json")

peopleDF.select("name", "age").write.format("json").save("namesAndAges.json")

讀寫parquet資料

val peopleDF = spark.read.format("parquet").load("/people.parquet")
或者
val peopleDF = spark.read.parquet("/people.parquet")

peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

讀取text資料

 spark.read.text(…) // 返 回 Dataset<Row>
或者
 spark.read.textFile(…) // 返 回 Dataset<String>

讀取寫jdbc資料 如:mysql

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:mysql://localhost:3306/spark")
  .option("driver","com.mysql.jdbc.Driver").
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

Spark SQL調優
DataFrame快取
dataFrame.cache()
Reduce task數目: spark.sql.shuffle.partitions (預設是200)可以根據自己的情況設定partitions個數
讀資料時每個Partition大小:spark.sql.files.maxPartitionBytes(預設128MB)
小檔案合併讀: spark.sql.files.openCostInBytes (預設是4194304 (4 MB) )
廣播小表大小: spark.sql.autoBroadcastJoinThreshold(預設是10485760 (10 MB))