1. 程式人生 > >spark 將dataframe資料寫入Hive分割槽表

spark 將dataframe資料寫入Hive分割槽表

從spark1.2 到spark1.3,spark SQL中的SchemaRDD變為了DataFrame,DataFrame相對於SchemaRDD有了較大改變,同時提供了更多好用且方便的API。
DataFrame將資料寫入hive中時,預設的是hive預設資料庫,insertInto沒有指定資料庫的引數,本文使用了下面方式將資料寫入hive表或者hive表的分割槽中,僅供參考。
1、將DataFrame資料寫入到Hive表中
從DataFrame類中可以看到與hive表有關的寫入Api有以下幾個:


registerTempTable(tableName: String): Unit,
insertInto(tableName: String): Unit
insertInto(tableName: String, overwrite: Boolean): Unit
saveAsTable(tableName: String, source: String, mode: SaveMode, options: Map[String, String]): Unit


有很多過載函式,不一一列舉
registerTempTable函式是建立spark臨時表
insertInto函式是向表中寫入資料,可以看出此函式不能指定資料庫和分割槽等資訊,不可以直接進行寫入。
向hive資料倉庫寫入資料必須指定資料庫,hive資料表建立可以在hive上建立,或者使用hiveContext.sql(“create table ....")
下面語句是向指定資料庫資料表中寫入資料:

1 2 3 4 5 6 7 case class Person(name:String,col1:Int,col2:String) val sc = new org.apache.spark.SparkContext  
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext.implicits._ hiveContext.sql("use DataBaseName") val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2))) data.toDF().insertInto("tableName")

建立一個case類將RDD中資料型別轉為case類型別,然後通過toDF轉換為DataFrame,呼叫insertInto函式時,首先指定資料庫,使用的是hiveContext.sql("use DataBaseName")語句,就可以將DataFrame資料寫入hive資料表中了


2、將DataFrame資料寫入hive指定資料表的分割槽中
hive資料表建立可以在hive上建立,或者使用hiveContext.sql(“create table ...."),使用saveAsTable時資料儲存格式有限,預設格式為parquet,可以指定為json,如果有其他格式指定,儘量使用語句來建立hive表。
將資料寫入分割槽表的思路是:首先將DataFrame資料寫入臨時表,之後是由hiveContext.sql語句將資料寫入hive分割槽表中。具體操作如下:

1 2 3 4 5 6 7 8 case class Person(name:String,col1:Int,col2:String) val sc = new org.apache.spark.SparkContext   val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext.implicits._ hiveContext.sql("use DataBaseName") val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2))) data.toDF().registerTempTable("table1") hiveContext.sql("insert into table2 partition(date='2015-04-02') select name,col1,col2 from table1")

使用以上方式就可以將dataframe資料寫入hive分割槽表了