Spark學習(陸)- Spark操作外部資料來源
文章目錄
產生背景
每個Spark應用程式從讀取資料開始,到儲存資料結束
- 載入和儲存資料是不容易的
比如大資料場景解析關係型資料庫需要用工具如sqoop將資料轉到hbase等。 - 解析原始資料也不容易 :text/json/parquet
- 資料直接的轉換也麻煩
- 資料集儲存在各個儲存系統中無法統一拉取和推送
使用者的需要:
方便快速從不同的資料來源(json、parquet、rdbms),經過混合處理(json join parquet),再將處理結果以特定的格式(json、parquet)寫回到指定的系統(HDFS、S3)上去
Spark SQL 1.2 ==> 外部資料來源API
概念
External Data Source API
- 一種整合各種外部資料的擴充套件方法
- 可以使用各種格式和儲存系統讀寫DataFrame
- Data Source API可以自動裁剪列和推送過濾器到源(謂詞下推):parquet/JDBO
- Data Source API在Spark 1.2提出
目標
- 對於開發人員只需要構建針對外部資料來源的庫
開發人員:是否需要把程式碼合併到spark中????
比如weibo資料只需要通過–jars傳入就行。 - 對於使用人員通過DataFrames很容易載入和保持資料來源
使用者
讀:spark.read.format(format)
format
build-in:內建的 json parquet jdbc csv(2.0+後屬於內建)
packages: 外部的比如微博的資料 並不是spark內建 https://spark-packages.org/
寫:people.write.format(“parquet”).save(“path”)
操作Parquet檔案資料
parquet是無法直接檢視的;所以這裡就不提供資料了
- spark.read.format(“parquet”).load(path)
- df.write.format(“parquet”).save(path)
package com.kun.ExternalDataSource
import org.apache.spark.sql.SparkSession
/**
* Parquet檔案操作
*/
object ParquetApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("SparkSessionApp")
.master("local[2]").getOrCreate()
/**
* spark.read.format("parquet").load 這是標準寫法
*/
val userDF = spark.read.format("parquet").load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet")
userDF.printSchema()
userDF.show()
userDF.select("name","favorite_color").show
userDF.select("name","favorite_color").write.format("json").save("file:///home/hadoop/tmp/jsonout")
spark.read.load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet").show
//會報錯,因為sparksql預設處理的format就是parquet
spark.read.load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").show
spark.read.format("parquet").option("path","file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet").load().show
spark.stop()
}
}
啟動spark-shell來測試;速度快
載入parquet資料
資料型別
檢視所有列
只儲存前兩列
spark預設處理parquet資料
spark.read.load(“file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json”).show
會報錯:
RuntimeException: file:/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json is not a Parquet file
//原始碼中:
val DEFAULT_DATA_SOURCE_NAME = SQLConfigBuilder("spark.sql.sources.default")
.doc("The default data source to use in input/output.")
.stringConf
.createWithDefault("parquet")
利用spark中的sql來處理parquet
#注意USING的用法
CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
path "/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet"
)
SELECT * FROM parquetTable
讀取parquet很多種寫法
操作Hive表資料
- spark.table(tableName)
- df.write.saveAsTable(tableName)
測試資料:
dept表
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
emp表
7369 SMITH CLERK 7902 1980-12-17 800.00 20
7499 ALLEN SALESMAN 7698 1981-2-20 1600.00 300.00 30
7521 WARD SALESMAN 7698 1981-2-22 1250.00 500.00 30
7566 JONES MANAGER 7839 1981-4-2 2975.00 20
7654 MARTIN SALESMAN 7698 1981-9-28 1250.00 1400.00 30
7698 BLAKE MANAGER 7839 1981-5-1 2850.00 30
7782 CLARK MANAGER 7839 1981-6-9 2450.00 10
7788 SCOTT ANALYST 7566 1987-4-19 3000.00 20
7839 KING PRESIDENT 1981-11-17 5000.00 10
7844 TURNER SALESMAN 7698 1981-9-8 1500.00 0.00 30
7876 ADAMS CLERK 7788 1987-5-23 1100.00 20
7900 JAMES CLERK 7698 1981-12-3 950.00 30
7902 FORD ANALYST 7566 1981-12-3 3000.00 20
7934 MILLER CLERK 7782 1982-1-23 1300.00 10
8888 HIVE PROGRAM 7839 1988-1-23 10300.00
前置條件:啟動spark-shell
檢視hive表:
遍歷emp
spark.sql("select deptno, count(1) as mount from emp where group by deptno").filter("deptno is not null").write.saveAsTable("hive_table_1")
//會報錯:
org.apache.spark.sql.AnalysisException: Attribute name "count(1)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;
將資料寫入hive
注意:
spark.sqlContext.setConf(“spark.sql.shuffle.partitions”,“10”)配置的是分割槽的數量
在生產環境中一定要注意設定spark.sql.shuffle.partitions,預設是200
操作MySQL表資料
操作MySQL的資料方法一:
spark.read.format(“jdbc”).option(“url”, “jdbc:mysql://localhost:3306/hive”).option(“dbtable”, “hive.TBLS”).option(“user”, “root”).option(“password”, “root”).option(“driver”, “com.mysql.jdbc.Driver”).load()
不加option(“driver”, “com.mysql.jdbc.Driver”)會有錯誤:java.sql.SQLException: No suitable driver
可以看到結構和mysql裡的是一樣的
指定列輸出:
操作MySQL的資料方法二:
import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "root")
connectionProperties.put("driver", "com.mysql.jdbc.Driver")
val jdbcDF2 = spark.read.jdbc("jdbc:mysql://localhost:3306", "hive.TBLS", connectionProperties)
在spark-shell裡測試
寫入MySQL
// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
操作MySQL的資料方法三:
進入spark sql操作
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:mysql://localhost:3306",
dbtable "hive.TBLS",
user 'root',
password 'root',
driver 'com.mysql.jdbc.Driver'
)
show tables
select * from jdbctable;
Hive和MySQL綜合使用
關聯MySQL和Hive表資料關聯操作
外部資料來源綜合案例
在MySQL裡建立:
create database spark;
use spark;
CREATE TABLE DEPT(
DEPTNO int(2) PRIMARY KEY,
DNAME VARCHAR(14) ,
LOC VARCHAR(13) ) ;
INSERT INTO DEPT VALUES(10,'ACCOUNTING','NEW YORK');
INSERT INTO DEPT VALUES(20,'RESEARCH','DALLAS');
INSERT INTO DEPT VALUES(30,'SALES','CHICAGO');
INSERT INTO DEPT VALUES(40,'OPERATIONS','BOSTON');
進行下列兩表的jion
package com.kun.ExternalDataSource
import org.apache.spark.sql.SparkSession
/**
* 使用外部資料來源綜合查詢Hive和MySQL的表資料
*/
object HiveMySQLApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("HiveMySQLApp")
.master("local[2]").getOrCreate()
// 載入Hive表資料
val hiveDF = spark.table("emp")
// 載入MySQL表資料
val mysqlDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "spark.DEPT").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()
// JOIN
val resultDF = hiveDF.join(mysqlDF, hiveDF.col("deptno") === mysqlDF.col("DEPTNO"))
resultDF.show
resultDF.select(hiveDF.col("empno"),hiveDF.col("ename"),
mysqlDF.col("deptno"), mysqlDF.col("dname")).show
spark.stop()
}
}
spark-shell裡測試
同理json;parquet;csv等都是可以進行同樣操作的。