1. 程式人生 > >Spark學習(柒)- Spark SQL擴充套件和總結

Spark學習(柒)- Spark SQL擴充套件和總結

文章目錄

Spark SQL使用場景

  • 檔案中資料的特殊查詢(即席查詢;即席查詢是可以進行特殊的欄位查詢自定義的查詢;普通查詢就是別人已經定義好的查詢方式)
  • 實時SQL分析流資料
  • 可以進行ETL操作
  • 與外部資料庫的互動
  • 具有更大叢集的可伸縮查詢效能

Spark SQL載入資料

  • 直接將資料載入到一個DataFrame中
  • 將資料載入到RDD並進行轉換
  • 可以從本地和雲端載入資料

啟動一個spark-shell
在這裡插入圖片描述

1) RDD DataFrame/Dataset

用本地spark的啟動日誌來進行測試

//將資料載入成RDD
val masterLog = sc.textFile("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/logs/spark-hadoop-org.apache.spark.deploy.master.Master-1-hadoop001.out"
) val workerLog = sc.textFile("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/logs/spark-hadoop-org.apache.spark.deploy.worker.Worker-1-hadoop001.out") val allLog = sc.textFile("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/logs/*out*")

在這裡插入圖片描述
在這裡插入圖片描述
在這裡插入圖片描述
輸出檢視
在這裡插入圖片描述
存在的問題:如何使用SQL進行查詢呢?

//轉換成DataFrame
import org.apache.spark.
sql.Row val masterRDD = masterLog.map(x => Row(x)) import org.apache.spark.sql.types._ val schemaString = "line" val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) val masterDF = spark.createDataFrame(masterRDD, schema) masterDF.show

在這裡插入圖片描述
在這裡插入圖片描述
在這裡插入圖片描述
把DF轉換成一個表;使用SQL操作
在這裡插入圖片描述

如果檔案是JSON/Parquet格式;不需要建立schema;DF可以直接拿取。

val usersDF = spark.read.format("parquet").load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet")
usersDF.show

sql裡Spark提供了直接使用sql來查詢parquet檔案。

spark.sql("select * from  parquet.`file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet`").show

2) Local Cloud(HDFS/S3)

從Cloud讀取資料: HDFS/S3(s3a/s3n)

val hdfsRDD = sc.textFile("hdfs://path/file")
val s3RDD = sc.textFile("s3a://bucket/object")
spark.read.format("text").load("hdfs://path/file")
spark.read.format("text").load("s3a://bucket/object")

DataFrame與SQL的對比

  • DataFrame=RDD+Schema
  • DataFrame只是一個Dataset的row類型別名
  • 在RDD上的DataFrame:Catalyst optimization&schemas
  • DataFrame可以處理:Text、JSON、Parquet等等
  • DF中的API和SQL函式都是經過Catalyst優化的

Schema

隱式的(inferred):比如Parquet,orc等
顯式的(explicit):比如文字檔案

示例操作
https://blog.csdn.net/bingdianone/article/details/84580342#t5

SaveMode

Loading&Saving Results
Save操作可以選擇使用SaveMode,它指定如何處理現有資料。
在這裡插入圖片描述

val df=spark.read.format("json").load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json")

df.show

df.select("name").write.format("parquet").mode("overwrite").save("file:///home/hadoop/data/overwrite")

在這裡插入圖片描述

處理複雜的JSON資料

  • JSON資料最容易在換行時被讀入
  • Schema是很容易進行推導的
  • 如果你希望flat你的JSON資料,請使用explode方法
  • 使用點語法訪問巢狀物件
    在這裡插入圖片描述
    臨時表操作
    在這裡插入圖片描述
    在這裡插入圖片描述
    內嵌式的json訪問
    在這裡插入圖片描述
    在這裡插入圖片描述
    註冊成臨時表
    在這裡插入圖片描述
    在這裡插入圖片描述

SQL的覆蓋程度

  • SQL 2003的支援
  • 執行99個TPC-DS基準測試查詢
  • 子查詢支援
  • 向量化支援(一次可以讀取1024行)

外部資料來源

  • rdbms、need JDBC jars
  • Parquet、Phoenix、csv、 avro etc

https://blog.csdn.net/bingdianone/article/details/84585293