1. 程式人生 > >Spark學習(陸)- Spark操作外部資料來源

Spark學習(陸)- Spark操作外部資料來源

文章目錄

產生背景

每個Spark應用程式從讀取資料開始,到儲存資料結束

  • 載入和儲存資料是不容易的
    比如大資料場景解析關係型資料庫需要用工具如sqoop將資料轉到hbase等。
  • 解析原始資料也不容易 :text/json/parquet
  • 資料直接的轉換也麻煩
  • 資料集儲存在各個儲存系統中無法統一拉取和推送
    在這裡插入圖片描述

使用者的需要:
方便快速從不同的資料來源(json、parquet、rdbms),經過混合處理(json join parquet),再將處理結果以特定的格式(json、parquet)寫回到指定的系統(HDFS、S3)上去

Spark SQL 1.2 ==> 外部資料來源API

概念

External Data Source API

  • 一種整合各種外部資料的擴充套件方法
  • 可以使用各種格式和儲存系統讀寫DataFrame
  • Data Source API可以自動裁剪列和推送過濾器到源(謂詞下推):parquet/JDBO
  • Data Source API在Spark 1.2提出
    在這裡插入圖片描述

目標

  • 對於開發人員只需要構建針對外部資料來源的庫
    開發人員:是否需要把程式碼合併到spark中????
    比如weibo資料只需要通過–jars傳入就行。
  • 對於使用人員通過DataFrames很容易載入和保持資料來源
    使用者
    讀:spark.read.format(format)
    format
    build-in:內建的 json parquet jdbc csv(2.0+後屬於內建)
    packages: 外部的比如微博的資料 並不是spark內建 https://spark-packages.org/
    寫:people.write.format(“parquet”).save(“path”)

操作Parquet檔案資料

parquet是無法直接檢視的;所以這裡就不提供資料了

  • spark.read.format(“parquet”).load(path)
  • df.write.format(“parquet”).save(path)
package com.kun.ExternalDataSource

import org.apache.spark.sql.SparkSession

/**
 * Parquet檔案操作
 */
object ParquetApp {

  def main(args: Array[String]) {

    val spark = SparkSession.builder().appName("SparkSessionApp")
      .master("local[2]").getOrCreate()


    /**
     * spark.read.format("parquet").load 這是標準寫法
     */
    val userDF = spark.read.format("parquet").load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet")

    userDF.printSchema()
    userDF.show()

    userDF.select("name","favorite_color").show

    userDF.select("name","favorite_color").write.format("json").save("file:///home/hadoop/tmp/jsonout")

    spark.read.load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet").show

    //會報錯,因為sparksql預設處理的format就是parquet
    spark.read.load("file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").show

    spark.read.format("parquet").option("path","file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet").load().show
    spark.stop()
  }

}

啟動spark-shell來測試;速度快
在這裡插入圖片描述
載入parquet資料
在這裡插入圖片描述
資料型別
在這裡插入圖片描述
檢視所有列
在這裡插入圖片描述
只儲存前兩列
在這裡插入圖片描述
在這裡插入圖片描述
在這裡插入圖片描述

spark預設處理parquet資料
spark.read.load(“file:///home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json”).show
會報錯:

RuntimeException: file:/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json is not a Parquet file

//原始碼中:
  val DEFAULT_DATA_SOURCE_NAME = SQLConfigBuilder("spark.sql.sources.default")
    .doc("The default data source to use in input/output.")
    .stringConf
    .createWithDefault("parquet")

利用spark中的sql來處理parquet

#注意USING的用法
CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
  path "/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet"
)

SELECT * FROM parquetTable

讀取parquet很多種寫法
在這裡插入圖片描述

操作Hive表資料

  • spark.table(tableName)
  • df.write.saveAsTable(tableName)

測試資料:
dept表

10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON

emp表

7369 SMITH CLERK 7902 1980-12-17 800.00 20
7499 ALLEN SALESMAN 7698 1981-2-20 1600.00 300.00 30
7521 WARD SALESMAN 7698 1981-2-22 1250.00 500.00 30
7566 JONES MANAGER 7839 1981-4-2 2975.00 20
7654 MARTIN SALESMAN 7698 1981-9-28 1250.00 1400.00 30
7698 BLAKE MANAGER 7839 1981-5-1 2850.00 30
7782 CLARK MANAGER 7839 1981-6-9 2450.00 10
7788 SCOTT ANALYST 7566 1987-4-19 3000.00 20
7839 KING PRESIDENT 1981-11-17 5000.00 10
7844 TURNER SALESMAN 7698 1981-9-8 1500.00 0.00 30
7876 ADAMS CLERK 7788 1987-5-23 1100.00 20
7900 JAMES CLERK 7698 1981-12-3 950.00 30
7902 FORD ANALYST 7566 1981-12-3 3000.00 20
7934 MILLER CLERK 7782 1982-1-23 1300.00 10
8888 HIVE PROGRAM 7839 1988-1-23 10300.00

前置條件:啟動spark-shell

檢視hive表:
在這裡插入圖片描述
遍歷emp
在這裡插入圖片描述
在這裡插入圖片描述

在這裡插入圖片描述

spark.sql("select deptno, count(1) as mount from emp where group by deptno").filter("deptno is not null").write.saveAsTable("hive_table_1")

//會報錯:
org.apache.spark.sql.AnalysisException: Attribute name "count(1)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;

將資料寫入hive
在這裡插入圖片描述
注意:
spark.sqlContext.setConf(“spark.sql.shuffle.partitions”,“10”)配置的是分割槽的數量
在生產環境中一定要注意設定spark.sql.shuffle.partitions,預設是200
在這裡插入圖片描述
在這裡插入圖片描述

操作MySQL表資料

在這裡插入圖片描述

操作MySQL的資料方法一:

spark.read.format(“jdbc”).option(“url”, “jdbc:mysql://localhost:3306/hive”).option(“dbtable”, “hive.TBLS”).option(“user”, “root”).option(“password”, “root”).option(“driver”, “com.mysql.jdbc.Driver”).load()

不加option(“driver”, “com.mysql.jdbc.Driver”)會有錯誤:java.sql.SQLException: No suitable driver
在這裡插入圖片描述
可以看到結構和mysql裡的是一樣的
在這裡插入圖片描述
在這裡插入圖片描述
指定列輸出:
在這裡插入圖片描述

操作MySQL的資料方法二:

import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "root")
connectionProperties.put("driver", "com.mysql.jdbc.Driver")

val jdbcDF2 = spark.read.jdbc("jdbc:mysql://localhost:3306", "hive.TBLS", connectionProperties)

在spark-shell裡測試
在這裡插入圖片描述
在這裡插入圖片描述
寫入MySQL

// Saving data to a JDBC source
jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

jdbcDF2.write
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

操作MySQL的資料方法三:

進入spark sql操作

CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://localhost:3306",
  dbtable "hive.TBLS",
  user 'root',
  password 'root',
  driver 'com.mysql.jdbc.Driver'
)

在這裡插入圖片描述
show tables
在這裡插入圖片描述
select * from jdbctable;
在這裡插入圖片描述

Hive和MySQL綜合使用

關聯MySQL和Hive表資料關聯操作

外部資料來源綜合案例
在MySQL裡建立:

create database spark;
use spark;

CREATE TABLE DEPT(
DEPTNO int(2) PRIMARY KEY,
DNAME VARCHAR(14) ,
LOC VARCHAR(13) ) ;

INSERT INTO DEPT VALUES(10,'ACCOUNTING','NEW YORK');
INSERT INTO DEPT VALUES(20,'RESEARCH','DALLAS');
INSERT INTO DEPT VALUES(30,'SALES','CHICAGO');
INSERT INTO DEPT VALUES(40,'OPERATIONS','BOSTON');

進行下列兩表的jion
在這裡插入圖片描述
在這裡插入圖片描述

package com.kun.ExternalDataSource

import org.apache.spark.sql.SparkSession

/**
 * 使用外部資料來源綜合查詢Hive和MySQL的表資料
 */
object HiveMySQLApp {

  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("HiveMySQLApp")
      .master("local[2]").getOrCreate()

    // 載入Hive表資料
    val hiveDF = spark.table("emp")

    // 載入MySQL表資料
    val mysqlDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "spark.DEPT").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()

    // JOIN
    val resultDF = hiveDF.join(mysqlDF, hiveDF.col("deptno") === mysqlDF.col("DEPTNO"))
    resultDF.show


    resultDF.select(hiveDF.col("empno"),hiveDF.col("ename"),
      mysqlDF.col("deptno"), mysqlDF.col("dname")).show

    spark.stop()
  }

}

spark-shell裡測試
在這裡插入圖片描述
在這裡插入圖片描述
在這裡插入圖片描述
在這裡插入圖片描述
同理json;parquet;csv等都是可以進行同樣操作的。