Spark 2.4.0程式設計指南--Spark DataSources
阿新 • • 發佈:2018-12-26
Spark 2.4.0程式設計指南–Spark DataSources
更多資源
視訊
- Spark 2.4.0程式設計指南–Spark DataSources(bilibili視訊): https://www.bilibili.com/video/av38193405/?p=5
前置條件
- 已安裝好java(選用的是java 1.8.0_191)
- 已安裝好scala(選用的是scala 2.11.121)
- 已安裝好hadoop(選用的是Hadoop 2.9.2)
- 已安裝好hive(選用的是apache-hive-3.1.1-bin)
- 已安裝好spark(選用的是spark-2.4.0-bin-hadoop2.7)
技能標籤
- parquet、orc、csv、json、text、avro格式檔案的讀、寫
- spark.sql直接執行檔案
- BucketyBy,PartitionBy 讀寫檔案
- mergining dataSet
- jdbc(mysql)讀寫操作
- Hive操作(create drop database ,create,insert,show,truncate,drop table)
- 官網:
常規 Load/Save (parquet)
讀取parquest格式檔案
- 讀取parquet格式檔案users.parquet
- users.parquet 直接開啟是十六進位制資料
spark.read.load("hdfs://standalone.com:9000/home/liuwen/data/parquest/users.parquet").show //+------+--------------+----------------+ //| name|favorite_color|favorite_numbers| //+------+--------------+----------------+ //|Alyssa| null| [3, 9, 15, 20]| //| Ben| red| []| //+------+--------------+----------------+
儲存parquest格式檔案
- 讀取parquet格式檔案users.parquet
- users.parquet 直接開啟是十六進位制資料
- 儲存的資料會在這個目錄下namesAndFavColors.parquet
val usersDF = spark.read.load("hdfs://standalone.com:9000/home/liuwen/data/parquest/users.parquet")
usersDF.select("name", "favorite_color").write.save("hdfs://standalone.com:9000/home/liuwen/data/parquest/namesAndFavColors.parquet")
spark.read.load("hdfs://m0:9000/home/liuwen/data/parquest/namesAndFavColors.parquet").show
//+------+--------------+
//| name|favorite_color|
//+------+--------------+
//|Alyssa| null|
//| Ben| red|
//+------+--------------+
Load/Save (json)
讀取json格式檔案
- 讀取json格式檔案people.json
- people.json儲存的是json格式的資料
- 注意,json格式儲存的檔案,每行中都包含欄位名稱資訊,比較佔空間,不推薦使用,可以考慮用預設的 parquet格式儲存
spark.read.format("json").load("hdfs://standalone.com:9000/home/liuwen/data/json/people.json").show
//+----+-------+
//| age| name|
//+----+-------+
//|null|Michael|
//| 30| Andy|
//| 19| Justin|
//+----+-------+
儲存json格式檔案
- 讀取json格式檔案people.json
- people.json儲存的是json格式的資料
spark.read.format("json").load("hdfs://standalone.com:9000/home/liuwen/data/json/people.json").show
//+----+-------+
//| age| name|
//+----+-------+
//|null|Michael|
//| 30| Andy|
//| 19| Justin|
//+----+-------+
//儲存json格式資料到hdfs上面
ds.select("name", "age").write.format("json").save("hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json")
//讀取儲存的資料
spark.read.format("json").load("hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json").show
//+----+-------+
//| age| name|
//+----+-------+
//|null|Michael|
//| 30| Andy|
//| 19| Justin|
//+----+-------+
- HDFS檢視儲存的檔案資訊
hdfs dfs -ls -R hdfs://standalone.com:9000/home/liuwen/output/json
// drwxr-xr-x - liuwen supergroup 0 2018-12-18 17:44 //hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json
//-rw-r--r-- 1 liuwen supergroup 0 2018-12-18 17:44 //hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json/_SUCCESS
//-rw-r--r-- 1 liuwen supergroup 71 2018-12-18 17:44 //hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json/part-00000-6690fee8-33d3-413c-8364-927f02593ff2-c000.json
hdfs dfs -cat hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json/*
//資料在檔案 namesAndAges.json/part-00000-6690fee8-33d3-413c-8364-927f02593ff2-c000.json
//{"name":"Michael"}
//{"name":"Andy","age":30}
//{"name":"Justin","age":19}
//[[email protected] ~]$
Load/Save (csv)
讀取csv格式檔案
val peopleDFCsv = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("hdfs://m0:9000/home/liuwen/data/csv/people.csv")
//peopleDFCsv: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]
peopleDFCsv.show
// +-----+---+---------+
//| name|age| job|
//+-----+---+---------+
//|Jorge| 30|Developer|
//| Bob| 32|Developer|
//+-----+---+---------+
寫csv格式檔案
//儲存json格式資料到hdfs上面
peopleDFCsv.select("name", "age").write.format("csv").save("hdfs://standalone.com:9000/home/liuwen/output/csv/people.csv")
spark.read.format("csv").option("sep", ",").option("inferSchema", "true").option("header", "true").load("hdfs://standalone.com:9000//home/liuwen/output/csv/people.csv").show
//+-----+---+
//|Jorge| 30|
//+-----+---+
//| Bob| 32|
//+-----+---+
- 檢視hdfs上csv檔案
hdfs dfs -ls -R hdfs://m0:9000/home/liuwen/output/csv/
//drwxr-xr-x - liuwen supergroup 0 2018-12-18 18:04 hdfs://m0:9000/home/liuwen/output/csv/people.csv
//-rw-r--r-- 1 liuwen supergroup 0 2018-12-18 18:04 hdfs://m0:9000/home/liuwen/output/csv/people.csv/_SUCCESS
//-rw-r--r-- 1 liuwen supergroup 16 2018-12-18 18:04 hdfs://m0:9000/home/liuwen/output/csv/people.csv/part-00000-d6ad5563-5908-4c0e-8e6f-f13cd0ff445e-c000.csv
hdfs dfs -text hdfs://m0:9000/home/liuwen/output/csv/people.csv/part-00000-d6ad5563-5908-4c0e-8e6f-f13cd0ff445e-c000.csv
//Jorge,30
//Bob,32
Load/Save (orc)
寫orc格式檔案
val usersDF = spark.read.load("hdfs://standalone.com:9000/home/liuwen/data/parquest/users.parquet")
usersDF.show
//+------+--------------+----------------+
//| name|favorite_color|favorite_numbers|
//+------+--------------+----------------+
//|Alyssa| null| [3, 9, 15, 20]|
//| Ben| red| []|
//+------+--------------+----------------+
usersDF.write.format("orc").option("orc.bloom.filter.columns", "favorite_color").option("orc.dictionary.key.threshold", "1.0").save("hdfs://standalone.com:9000/home/liuwen/output/orc/users_with_options.orc")
讀orc格式檔案
spark.read.format("orc").load("hdfs://standalone.com:9000/home/liuwen/output/orc/users_with_options.orc").show
//+------+--------------+----------------+
//| name|favorite_color|favorite_numbers|
//+------+--------------+----------------+
//|Alyssa| null| [3, 9, 15, 20]|
//| Ben| red| []|
//+------+--------------+----------------+
直接在檔案上執行sql
- 直接在檔案上執行sql
val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://standalone.com:9000/home/liuwen/data/parquest/users.parquet`")
sqlDF.show
//+------+--------------+----------------+
//| name|favorite_color|favorite_numbers|
//+------+--------------+----------------+
//|Alyssa| null| [3, 9, 15, 20]|
//| Ben| red| []|
//+------+--------------+----------------+
saveAsTable
- 把資料儲存為Hive表
val sqlDF = spark.read.format("json").load("hdfs://standalone.com:9000/home/liuwen/output/json/employ.json")
sqlDF.show
//+----+-------+
//| age| name|
//+----+-------+
//|null|Michael|
//| 30| Andy|
//| 19| Justin|
sqlDF.write.saveAsTable("people_bucketed")
val sqlDF2 = spark.sql("select * from people_bucketed")
- 讀取hive表中的資料
val sqlDF = spark.sql("select * from people_bucketed")
bucket
- 把資料儲存為Hive表,bucketBy 分桶
val sqlDF = spark.read.format("json").load("hdfs://standalone.com:9000/home/liuwen/output/json/employ.json")
sqlDF.show
//+----+-------+
//| age| name|
//+----+-------+
//|null|Michael|
//| 30| Andy|
//| 19| Justin|
sqlDF.write.bucketBy(42, "name").sortBy("salary")
.saveAsTable("people_bucketed3")
val sqlDF2 = spark.sql("select * from people_bucketed3")
sqlDF2.show
- 讀取hive表中的資料
val sqlDF = spark.sql("select * from people_bucketed3")
partitionBy
- 把資料儲存為Hive表,partitionBy 按欄位分割槽
val spark = sparkSession(true)
val usersDF = spark.read.load("hdfs://standalone.com:9000/home/liuwen/data/parquest/users.parquet")
usersDF.show()
//+------+--------------+----------------+
//| name|favorite_color|favorite_numbers|
//+------+--------------+----------------+
//|Alyssa| null| [3, 9, 15, 20]|
//| Ben| red| []|
//+------+--------------+----------------+
//儲存在HDFS上 hdfs://standalone.com:9000/user/liuwen/namesPartByColor.parquet
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
- 讀取hive表中的資料
val sqlDF = spark.sql("select * from namesPartByColor.parquet")
dataFrame的合併
- 把兩個dataSet合併,就是把兩個dataSet先儲存到hdfs的檔案上,這兩個dataSet的檔案在同一個目錄上,再讀這個目錄下的檔案
import spark.implicits._
val df1 = Seq(1,2,3,5).map(x => (x,x * x)).toDF("a","b")
val df2 = Seq(10,20,30,50).map(x => (x,x * x)).toDF("a","b")
df1.write.parquet("data/test_table/key=1")
df1.show()
// +---+---+
// | a| b|
// +---+---+
// | 1| 1|
// | 2| 4|
// | 3| 9|
// | 5| 25|
// +---+---+
df2.write.parquet("data/test_table/key=2")
df2.show()
// +---+----+
// | a| b|
// +---+----+
// | 10| 100|
// | 20| 400|
// | 30| 900|
// | 50|2500|
// +---+----+
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
// root
// |-- a: integer (nullable = true)
// |-- b: integer (nullable = true)
// |-- key: integer (nullable = true)
mergedDF.show()
// +---+----+---+
// | a| b|key|
// +---+----+---+
// | 10| 100| 2|
// | 20| 400| 2|
// | 30| 900| 2|
// | 50|2500| 2|
// | 1| 1| 1|
// | 2| 4| 1|
// | 3| 9| 1|
// | 5| 25| 1|
// +---+----+---+
mysql(jdbc)
- 讀mysql的資料
val connectionProperties = new Properties()
connectionProperties.put("user","admin")
connectionProperties.put("password","000000")
val jdbcDF = spark.read.jdbc("jdbc:mysql://mysql.com:3306/test","test.test2",connectionProperties)
jdbcDF.show()
- 往mysql寫資料
val connectionProperties = new Properties()
connectionProperties.put("user","admin")
connectionProperties.put("password","000000")
val jdbcDF = spark.read.jdbc("jdbc:mysql://macbookmysql.com:3306/test","test.test",connectionProperties)
jdbcDF.show()
jdbcDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://macbookmysql.com:3306/test","test.test3",connectionProperties)
spark hive
- 就是支援hive的語法,只不過是在spark中執行,把hive的資料轉成dataFrame,供spark運算元計算
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.master("local")
// .master("spark://standalone.com:7077")
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.sql
sql("CREATE database IF NOT EXISTS test_tmp")
sql("use test_tmp")
sql("CREATE TABLE IF NOT EXISTS student(name VARCHAR(64), age INT)")
sql("INSERT INTO TABLE student VALUES ('小王', 35), ('小李', 50)")
end