1. 程式人生 > >Spark 新增複用JDBC Schema功能

Spark 新增複用JDBC Schema功能


1)   修改原因

  使用者需要閱讀大量相同的資料庫表,比如相同schema的表有1000張(比如對mysql進行分表分庫)需要全讀,每次建立dataframe的時候需要通過jdbcrelation去查詢每一張表的schema,需要消耗了大量時間。本文對提出一種修改辦法,如果使用者知道表的sechema相同,可以使用sechema複用功能。

 

2)   程式碼流程

val df = sqlContext.read().format("jdbc").options(dfOptions).load();

->resolved= ResolvedDataSource(

      sqlContext,

      userSpecifiedSchema =userSpecifiedSchema,

      partitionColumns = Array.empty[String],

      provider = source,

      options = extraOptions.toMap)  //解析資料來源,獲取jdbc、parquet、josn的schema引數

->dataSource.createRelation(sqlContext,new CaseInsensitiveMap(options)) //傳入options

->JDBCRelation(url, table, parts, properties)(sqlContext)  //獲取jdbc的relation

->override val schema= JDBCRDD.resolveTable(url, table, properties)  //獲取schema

->conn.prepareStatement(s"SELECT * FROM $table WHERE1=0").executeQuery() //直接讀取database,需要優化

 

3)   修改方法

在使用者知道schema的情況下,沒有必要重複獲取schema;

使用者定義是否需要重複使用schema,修改程式碼流程最小;

修改方法:

a)      使用者通過Options傳入需要複用schema的開關:

   dfOptions.put("jdbcschemakey","sparkourtest");

b)     建立一個hashtable,儲存已經獲取的shema

val schemaHashTable= newjava.util.HashMap[String,StructType]()

c)      schema獲取流程:


4)   修改程式碼

29a30

> import org.apache.spark.Logging

40c41

< private[sql] object JDBCRelation {

---

> private[sql] object JDBCRelation{

48a50,55

>

>

>   // add by Ricky for get same table schema

>

>   val schemaHashTable= new java.util.HashMap[String,StructType]()

>

117c124

< private[sql] case class JDBCRelation(

---

> private[sql] case class JDBCRelation  (

124c131

<   with InsertableRelation {

---

>   with InsertableRelation with Logging{

128c135,160

<   override val schema: StructType = JDBCRDD.resolveTable(url, table, properties)

---

>

>   // add by Ricky for get same table schema

>   def  getSchema():StructType={

>     //val schemaKey = properties.getProperty("jdbcSchemaKey")

>     val schemaKey = properties.getProperty("jdbcschemakey")

>     if (schemaKey != null) {

>       val schemaStored = JDBCRelation.schemaHashTable.get(schemaKey)

>       if (schemaStored != null) {

>         schemaStored

>       } else {

>         val schemaStored = JDBCRDD.resolveTable(url, table, properties)

>         logInfo("schemaKey configed,schemaHashTable empty,now put  "+schemaKey.toString)

>         JDBCRelation.schemaHashTable.put(schemaKey, schemaStored)

>         schemaStored

>       }

>     }

>     else

>     {

>       JDBCRDD.resolveTable(url, table, properties)

>     }

>

>   }

>

>   override val schema: StructType = getSchema()

> // end by Ricky

> //  override val schema: StructType = JDBCRDD.resolveTable(url, table, properties)