Spark學習(柒)- Spark SQL擴充套件和總結
阿新 • • 發佈:2018-12-09
文章目錄
Spark SQL使用場景
- 檔案中資料的特殊查詢(即席查詢;即席查詢是可以進行特殊的欄位查詢自定義的查詢;普通查詢就是別人已經定義好的查詢方式)
- 實時SQL分析流資料
- 可以進行ETL操作
- 與外部資料庫的互動
- 具有更大叢集的可伸縮查詢效能
Spark SQL載入資料
- 直接將資料載入到一個DataFrame中
- 將資料載入到RDD並進行轉換
- 可以從本地和雲端載入資料
啟動一個spark-shell
1) RDD DataFrame/Dataset
用本地spark的啟動日誌來進行測試
//將資料載入成RDD
val masterLog = sc.textFile("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/logs/spark-hadoop-org.apache.spark.deploy.master.Master-1-hadoop001.out" )
val workerLog = sc.textFile("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/logs/spark-hadoop-org.apache.spark.deploy.worker.Worker-1-hadoop001.out")
val allLog = sc.textFile("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/logs/*out*")
輸出檢視
存在的問題:如何使用SQL進行查詢呢?
//轉換成DataFrame
import org.apache.spark. sql.Row
val masterRDD = masterLog.map(x => Row(x))
import org.apache.spark.sql.types._
val schemaString = "line"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val masterDF = spark.createDataFrame(masterRDD, schema)
masterDF.show
把DF轉換成一個表;使用SQL操作
如果檔案是JSON/Parquet格式;不需要建立schema;DF可以直接拿取。
val usersDF = spark.read.format("parquet").load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet")
usersDF.show
sql裡Spark提供了直接使用sql來查詢parquet檔案。
spark.sql("select * from parquet.`file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet`").show
2) Local Cloud(HDFS/S3)
從Cloud讀取資料: HDFS/S3(s3a/s3n)
val hdfsRDD = sc.textFile("hdfs://path/file")
val s3RDD = sc.textFile("s3a://bucket/object")
spark.read.format("text").load("hdfs://path/file")
spark.read.format("text").load("s3a://bucket/object")
DataFrame與SQL的對比
- DataFrame=RDD+Schema
- DataFrame只是一個Dataset的row類型別名
- 在RDD上的DataFrame:Catalyst optimization&schemas
- DataFrame可以處理:Text、JSON、Parquet等等
- DF中的API和SQL函式都是經過Catalyst優化的
Schema
隱式的(inferred):比如Parquet,orc等
顯式的(explicit):比如文字檔案
示例操作
https://blog.csdn.net/bingdianone/article/details/84580342#t5
SaveMode
Loading&Saving Results
Save操作可以選擇使用SaveMode,它指定如何處理現有資料。
val df=spark.read.format("json").load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json")
df.show
df.select("name").write.format("parquet").mode("overwrite").save("file:///home/hadoop/data/overwrite")
處理複雜的JSON資料
- JSON資料最容易在換行時被讀入
- Schema是很容易進行推導的
- 如果你希望flat你的JSON資料,請使用explode方法
- 使用點語法訪問巢狀物件
臨時表操作
內嵌式的json訪問
註冊成臨時表
SQL的覆蓋程度
- SQL 2003的支援
- 執行99個TPC-DS基準測試查詢
- 子查詢支援
- 向量化支援(一次可以讀取1024行)
外部資料來源
- rdbms、need JDBC jars
- Parquet、Phoenix、csv、 avro etc