1. 程式人生 > >Spark通過修改DataFrame的schema給表字段添加註釋

Spark通過修改DataFrame的schema給表字段添加註釋

1、需求背景

通過Spark將關係型資料庫(以Oracle為例)的表同步的Hive表,要求用Spark建表,有欄位註釋的也要加上註釋。Spark建表,有兩種方法:
* 用Spark Sql,在程式裡組建表語句,然後用Spark.sql(“建表語句”)建表,這種方法麻煩的地方在於你要讀取Oracle表的詳細的表結構資訊,且要進行Oracle和Hive的欄位型別進行一一對應
* 用DataFrame 的saveAsTable方法,這種方法如果對應的資料庫裡沒有表,則Spark會根據DataFrame的schema自動建表,比較簡單,不用考慮欄位型別匹配轉化問題,但是這種方法有一個問題,Spark讀取Oracle的表為DataFrame時,並不能將表字段的註釋讀進來,所以就有了如標題所示的需求。(一開始以為DataFrame不能加註釋,經過研究,發現是可以的!)

2、如何檢視DataFrame是否有註釋

前面講到DataFrame裡沒有Oracle的註釋資訊,但是如果資料來源為Hive的話,是可以將註釋獲取到的。

2.1 新建Hive測試表(帶註釋)

create table `test` (
`id` string comment 'ID', 
`Name` string comment '名字'
)
comment '測試';


2.2 Spark讀取hive表並列印註釋(在spark-shell裡執行)

sql("use test")
val df = spark.table("test")
df.printSchema
root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

用下面這行程式碼便可以列印註釋資訊:

df.schema.foreach(s=>println(s.name,s.metadata))
(id,{"comment":"ID","HIVE_TYPE_STRING":"string"})
(name,{"comment":"名字","HIVE_TYPE_STRING":"string"})

3、讀取Oracle表並列印DataFrmae的元資料資訊

3.1 新建Oracle測試表(帶註釋)

CREATE TABLE ORA_TEST (
ID VARCHAR2(100), 
NAME VARCHAR2(100)
);
COMMENT ON COLUMN ORA_TEST.ID IS 'ID';
COMMENT ON COLUMN ORA_TEST.NAME IS '名字';
COMMENT ON TABLE ORA_TEST IS  '測試';
  • 注:上面的註釋語句和建表語句需要分開執行,或者也可以在資料庫工具執行指令碼,比如我用的DBeaver用快捷鍵Alt+x即可。當然也可以在工具的介面直接建表均可。

3.2 讀取Oracle表,並列印元資料

程式碼:

package com.dkl.leanring.spark.sql.Oracle

import org.apache.spark.sql.SparkSession

object OracleSchemaDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("OracleSchemaDemo").master("local").getOrCreate()
    val df = spark.read
      .format("jdbc")
      .option("url", "jdbc:oracle:thin:@192.168.44.128:1521:orcl")
      .option("dbtable", "ORA_TEST")
      .option("user", "bigdata")
      .option("password", "bigdata")
      .option("driver", "oracle.jdbc.driver.OracleDriver")
      .load()
    df.schema.foreach(s => println(s.name, s.metadata))

    spark.stop

  }
}
(ID,{"name":"ID","scale":0})
(NAME,{"name":"NAME","scale":0})


注:Spark2.3.0和Spark2.2.1的元資料不太一樣,上面的結果是Spark2.2.1(也是我寫部落格測試用的),專案中用的Spark2.3.0,2.3.0的元資料是空的,如下

(ID,{})
(NAME,{})

可見並沒有註釋資訊

3.3 給DataFrame添加註釋

import org.apache.spark.sql.types._
val commentMap = Map("ID" -> "ID", "NAME" -> "名字")

val schema = df.schema.map(s => {
  s.withComment(commentMap(s.name))
})

//根據添加了註釋的schema,新建DataFrame
val new_df = spark.createDataFrame(df.rdd, StructType(schema)).repartition(160)

new_df.schema.foreach(s => println(s.name, s.metadata))
(ID,{"comment":"ID","name":"ID","scale":0})
(NAME,{"comment":"名字","name":"NAME","scale":0})

4、 測試寫到Hive表有沒有註釋

需將前面程式碼中的spark改為支援hive,即加上enableHiveSupport()

spark.sql("use test")
new_df.write.mode("overwrite").saveAsTable("ORA_TEST")

然後在hive裡看一下,是否有註釋

可以看到,成功的把註釋也儲存到裡hive裡

5、附錄

附上在Eclipse執行的完整程式碼


package com.dkl.leanring.spark.sql.Oracle

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._

object OracleSchemaDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("OracleSchemaDemo").master("local").enableHiveSupport().getOrCreate()
    val df = spark.read
      .format("jdbc")
      .option("url", "jdbc:oracle:thin:@192.168.44.128:1521:orcl")
      .option("dbtable", "ORA_TEST")
      .option("user", "bigdata")
      .option("password", "bigdata")
      .option("driver", "oracle.jdbc.driver.OracleDriver")
      .load()
    df.schema.foreach(s => println(s.name, s.metadata))

    val commentMap = Map("ID" -> "ID", "NAME" -> "名字")

    val schema = df.schema.map(s => {
      s.withComment(commentMap(s.name))
    })

    //根據添加了註釋的schema,新建DataFrame
    val new_df = spark.createDataFrame(df.rdd, StructType(schema)).repartition(160)

    new_df.schema.foreach(s => println(s.name, s.metadata))

    spark.sql("use test")
    //儲存到hive
    new_df.write.mode("overwrite").saveAsTable("ORA_TEST")

    spark.stop

  }
}