SparkSQL學習(一)SparkSQL簡單使用
1、SparkSQL的進化之路
1.0以前:
Shark
1.1.x開始:
SparkSQL(只是測試性的) SQL
1.3.x:
SparkSQL(正式版本)+Dataframe
1.5.x:
SparkSQL 鎢絲計劃
1.6.x:
SparkSQL+DataFrame+DataSet(測試版本)
1.x:
SparkSQL+DataFrame+DataSet(正式版本)
SparkSQL:還有其他的優化
StructuredStreaming(DataSet)2、認識SparkSQL
2.1、什麼是SparkSQL
SparkSQL是Spark的一個模組,主要用於進行結構化資料的處理。它提供的最核心的程式設計抽象就是DataFrame。
2.2、SparkSQL的作用
提供一個程式設計抽象(DataFrame)並且作為分散式SQL查詢引擎。
DataFrame:它可以根據很多源進行構建,包括:結構化的資料檔案,Hive中的表,外部的關係型資料庫,以及RDD
2.3、執行原理
將SparkSQL 轉化為 RDD ,然後提交到叢集執行。
2.4、特點
(1)容易整合
(2)統一的資料訪問方式
(3)相容Hive
(4)標準的資料連線
2.5、SparkSession
SparkSession是Spark 2.0引如的新概念。SparkSession為使用者提供了統一的切入點,來讓使用者學習spark的各項功能。
在spark的早期版本中,SparkContext是spark的主要切入點,由於RDD是主要的API,我們通過sparkcontext來建立和操作RDD。對於每個其他的API,我們需要使用不同的context。例如,對於Streming,我們需要使用StreamingContext;對於sql,使用sqlContext;對於Hive,使用hiveContext。但是隨著DataSet和DataFrame的API逐漸成為標準的API,就需要為他們建立接入點。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點,SparkSession封裝了SparkConf、SparkContext和SQLContext。為了向後相容,SQLContext和HiveContext也被儲存下來。
SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了sparkContext,所以計算實際上是由sparkContext完成的。
特點:
---- 為使用者提供一個統一的切入點使用Spark 各項功能
---- 允許使用者通過它呼叫 DataFrame 和 Dataset 相關 API 來編寫程式
---- 減少了使用者需要了解的一些概念,可以很容易的與 Spark 進行互動
2.6、DataFrames
在Spark中,DataFrame是一種以RDD為基礎的分散式資料集,類似於傳統資料庫中的二維表格。DataFrame與RDD的主要區別在於,前者帶有schema元資訊,即DataFrame所表示的二維表資料集的每一列都帶有名稱和型別。這使得Spark SQL得以洞察更多的結構資訊,從而對藏於DataFrame背後的資料來源以及作用於DataFrame之上的變換進行了針對性的優化,最終達到大幅提升執行時效率的目標,反觀RDD,由於無從得知所存資料元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優化。
3、RDD轉換成為DataFrame
使用spark1.x版本的方式
測試資料目錄:/home/hadoop/apps/spark-2.3.0-bin-hadoop2.7/examples/src/main/resources(spark的安裝目錄裡面)
people.txt
3.1、方式一:通過case class 建立 DataFrames(反射)
package Lession2
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
case class People(var name: String,var age:Int)
object TestDataFrame1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local")
val sc = new SparkContext(conf)
val context = new SQLContext(sc)
//將本地的資料讀入 RDD , 並將 RDD 與 case slass關聯
val peopleRDD: RDD[People] = sc.textFile("D:\\111\\222\\people.txt")
.map(line => People(line.split(",")(0), line.split(",")(1).trim.toInt))
import context.implicits._
//將 RDD 轉換成 DataFrames
val df: DataFrame = peopleRDD.toDF
//將 DataFrames 建立成一個臨時的檢視
df.createOrReplaceTempView("people")
//使用SQL語句進行查詢
context.sql("select * from people").show()
}
}
執行結果:
3.2、方式二:通過 structType 建立 DataFrames(程式設計介面)
package Lession2
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object TestDataFrame2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val fileRDD: RDD[String] = sc.textFile("D:\\111\\222\\people.txt")
//將 RDD 資料對映成 Row, 需要 import org.apache.spark.sql.Row
val rowRDD: RDD[Row] = fileRDD.map(line => {
val fields = line.split(",")
Row(fields(0), fields(1).trim.toInt)
})
//建立 StructType 來定義結構
val structType: StructType = StructType(
//欄位名,欄位型別,是否可以為空
StructField("name", StringType, true) ::
StructField("age", IntegerType, true) :: Nil
)
/**
* rows: java.util.List[Row]
* schema: StructType
*/
val df: DataFrame = sqlContext.createDataFrame(rowRDD,structType)
df.createOrReplaceTempView("people")
sqlContext.sql("select * from people").show()
}
}
執行結果:
3.3、方式三:通過 json 檔案建立 DataFrames
package Lession2
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object TestDataFrame3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("TestDataFrame3 ").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext: SQLContext = new SQLContext(sc)
val df: DataFrame = sqlContext.read.json("D:\\111\\222\\people.json")
df.createOrReplaceTempView("people")
sqlContext.sql("select * from people").show()
}
}
執行結果:
4、DataFrame 的 read 和 save 和 savemode
4.1、資料的讀取
package Lession2
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object TestRead {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("TestRead ").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext: SQLContext = new SQLContext(sc)
//方式一
val df1: DataFrame = sqlContext.read.json("D:\\111\\222\\people.json")
val df2: DataFrame = sqlContext.read.parquet("D:\\111\\222\\users.parquet")
//方式二
val df3: DataFrame = sqlContext.read.format("json").load("D:\\111\\222\\people.json")
val df4: DataFrame = sqlContext.read.format("parquet").load("D:\\111\\222\\users.parquet")
//方式三,預設是 parquet 模式
val df5: DataFrame = sqlContext.load("D:\\111\\222\\users.parquet")
}
}
4.2、資料的儲存
package Lession2
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SQLContext}
object TestSave {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("TestSave ").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext: SQLContext = new SQLContext(sc)
val df1: DataFrame = sqlContext.read.json("D:\\111\\222\\people.json")
//方式一
// df1.write.json("D:\\11")
df1.write.parquet("D:\\11\\2")
//方式二
df1.write.format("json").save("D:\\11\\3")
df1.write.format("parquet").save("D:\\11\\4")
//方式三
df1.write.save("D:\\11\\5")
}
}
4.3、資料的儲存模式
使用mode
df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\\444")
5、資料來源
5.1、資料來源之 json
參考 4.1
5.2 資料來源之parquet
參考4.15.3、資料來源之 Mysql
package Lession2
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object TestMysql {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("TestMysql").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var url = "jdbc:mysql://192.168.123.102:3306/hivedb"
val table = "DBS"
val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","root")
//需要傳入Mysql的URL、表明、properties(連線資料庫的使用者名稱和密碼)
val df: DataFrame = sqlContext.read.jdbc(url,table,properties)
df.createOrReplaceTempView("dbs")
sqlContext.sql("select * from dbs").show()
}
}
執行結果:
5.4、資料來源之 Hive
(1)準備工作
在pom.xml檔案中新增依賴
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.3.0</version>
</dependency>
開發環境則在 resource 資料夾下新增 hive-site.xml 檔案,叢集環境把 hive 的配置檔案要發到 $SPARK_HOME/conf 目錄下
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop02:3306/hivedb?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
<!-- 濡傛灉 mysql 鍜hive 鍦ㄥ悓涓€涓湇鍔″櫒鑺傜偣錛岄偅涔堣鏇存敼 hadoop02 涓localhost -->
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
<description>password to use against metastore database</description>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
<description>hive default warehouse, if nessecory, change it</description>
</property>
</configuration>
(2)測試程式碼
object TestHive {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
sqlContext.sql("select * from myhive.student").show()
}
}
執行結果: