1. 程式人生 > >【Spark系列7】Spark如何讀寫hive

【Spark系列7】Spark如何讀寫hive

hive資料表建立可以在hive上建立,或者使用hiveContext.sql(“create table ....")

1) 寫入hive表

case class Person(name:String,col1:Int,col2:String)
val sc = new org.apache.spark.SparkContext  
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext.implicits._
hiveContext.sql("use DataBaseName")
val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))
data.toDF().insertInto("tableName")

2)寫入hive分割槽中

case class Person(name:String,col1:Int,col2:String)
val sc = new org.apache.spark.SparkContext  
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext.implicits._
hiveContext.sql("use DataBaseName")
val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))
data.toDF().registerTempTable("table1")
hiveContext.sql("insert into table2 partition(date='2015-04-02') select name,col1,col2 from table1")

將資料寫入分割槽表的思路是:首先將DataFrame資料寫入臨時表,之後是由hiveContext.sql語句將資料寫入hive分割槽表中。

3)優化

將檔案存為符合hive table檔案的格式,然後使用hive load將產生的結果檔案直接move到指定目錄下。程式碼如下:

result.rdd.map { r => r.mkString("\001") }.repartition(partitions).saveAsTextFile(output_tmp_dir)
sql(s"""load data inpath '$output_tmp_dir' overwrite into table $output partition (dt='$dt')""")
hive column預設分隔符在scala/java中的表示為“/001”,r.mkString("/001")既是將column以分隔符/001進行分割,hive在匯入時會自動識別。
使用hive load data命令,將hdfs檔案load到hive表中。後臺操作為直接將目錄下的檔案移到hive table所在目錄,所以只是hdfs move資料的過程,執行非常快。

需要注意的是,此處要求hive建表時,以textfile格式建表。

參考:

http://blog.csdn.net/zgc625238677/article/details/53928320

如果是命令列操作,可以參考http://blog.csdn.net/fansy1990/article/details/53401102

《如何解決spark寫hive慢的問題》http://blog.csdn.net/lulynn/article/details/51543567