1. 程式人生 > >通過Spark結合使用Hive和ORC儲存格式

通過Spark結合使用Hive和ORC儲存格式

在這篇部落格中,我們將一起分析通過Spark訪問Hive的資料,主要分享以下幾點內容:

1.      如何通過Spark Shell互動式訪問Spark

2.      如何讀取HDFS檔案和建立一個RDD

3.      如何通過Spark API互動式地分析資料集

4.      如何建立Hive的ORC格式的表

5.      如何使用Spark SQL查詢Hive表

6.      如何以ORC格式儲存資料

Spark SQL使用Spark引擎對儲存在HDFS或者存在的RDDs執行SQL查詢。我們可以在Spark程式中使用SQL語句來操作資料。

1.      獲取資料集

在Linux伺服器終端中獲取樣例資料:

將下載的資料上傳到HDFS的目錄中,如下:

hdfs dfs -put ./yahoo_stocks.csv /tmp/

2.      啟動Spark Shell

spark-shell

這裡啟動了spark-shell,並且能夠和Hive進行互動,因為我們已經將hive-site.xml,hdfs-site.xml和core-site.xml拷貝到spark的conf目錄下面了。

匯入需要的庫檔案:

scala> import org.apache.spark.sql.hive.orc._

import org.apache.spark.sql.hive.orc._

scala> import org.apache.spark.sql._

import org.apache.spark.sql._

3.      建立SparkSession

在Spark 2.0中提供了SparkSession,內建支援Hive特性,包括使用HiveQL,訪問Hive UDFs,並且可以從Hive表中獲取資料。

建立例項:

scala> import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.SparkSession

我們使用spark-shell登入時,預設已經為我們建立了一個SparkSession的例項為spark,後面可以直接使用該例項。

Spark session available as 'spark'.

4.      建立ORC格式的表

在Hive中建立表:

scala> spark.sql("create table yahoo_orc_table (date STRING,open_price FLOAT, high_price FLOAT, low_price FLOAT, close_price FLOAT, volume INT, adj_price FLOAT) stored as orc")

res0: org.apache.spark.sql.DataFrame = []

5.      載入資料檔案並建立一個RDD

scala> val yahoo_stocks =sc.textFile("hdfs://SZB-L0023776:8020/tmp/yahoo_stocks.csv")

yahoo_stocks: org.apache.spark.rdd.RDD[String] =

hdfs://SZB-L0023776:8020/tmp/yahoo_stocks.csv MapPartitionsRDD[2] at textFile at <console>:30

獲取10行資料:

scala> yahoo_stocks.take(10).foreach(println)

Date,Open,High,Low,Close,Volume,AdjClose

2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34

2015-04-27,44.65,45.10,44.25,44.36,10840900,44.36

2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52

2015-04-23,43.92,44.06,43.58,43.70,14274900,43.70

2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98

2015-04-21,45.15,45.18,44.45,44.49,16103700,44.49

2015-04-20,44.73,44.91,44.41,44.66,10052900,44.66

2015-04-17,45.30,45.44,44.25,44.45,13305700,44.45

2015-04-16,45.82,46.13,45.53,45.78,13800300,45.78

6.      資料的首行為欄位名稱

scala> val header = yahoo_stocks.first

header: String = Date,Open,High,Low,Close,Volume,Adj Close

下面我們建立一個新的RDD,不包括首行欄位名稱:

scala> val data = yahoo_stocks.mapPartitionsWithIndex { (idx, iter)=> if (idx == 0) iter.drop(1) else iter }

data: org.apache.spark.rdd.RDD[String] =MapPartitionsRDD[3] at mapPartitionsWithIndex at <console>:32

7.      建立一個Schema

scala> case class YahooStockPrice(date: String, open: Float, high:Float, low: Float, close: Float, volume: Integer, adjClose: Float)

defined class YahooStockPrice

8.      將Schema繫結到處理後的資料上

針對YahooStockPrice建立一個RDD,並註冊為一張表:

scala> val stockprice = data.map(_.split(",")).map(row=> YahooStockPrice(row(0), row(1).trim.toFloat, row(2).trim.toFloat,row(3).trim.toFloat, row(4).trim.toFloat, row(5).trim.toInt,row(6).trim.toFloat)).toDF()

stockprice: org.apache.spark.sql.DataFrame = [date: string,open: float ... 5 more fields]

檢視資料:

scala> stockprice.first

res4: org.apache.spark.sql.Row =[2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34]

檢視更多的資料:

scala> stockprice.show

驗證Schema:

scala> stockprice.printSchema

root

 |-- date: string (nullable = true)

 |-- open: float (nullable = false)

 |-- high: float (nullable = false)

 |-- low: float (nullable = false)

 |-- close: float (nullable = false)

 |-- volume: integer (nullable = true)

 |-- adjClose: float (nullable = false)

9.      註冊一個臨時表

scala> stockprice.createOrReplaceTempView("yahoo_stocks_temp")

10.  查詢建立的臨時表

注意這裡的表不是Hive裡面的表,而是一個RDD:

scala> val results = spark.sql("SELECT * FROM yahoo_stocks_temp")

scala> results.map(t => "Stock Entry: " +t.toString).collect().foreach(println)

……

Stock Entry:[1996-05-06,32.50008,32.50008,29.37504,30.12504,8214400,1.25521]

Stock Entry: [1996-05-03,32.25,32.50008,31.24992,31.99992,6116800,1.33333]

Stock Entry:[1996-05-02,31.5,33.25008,31.5,32.87496,9731200,1.36979]

Stock Entry:[1996-05-01,30.25008,31.75008,30.0,31.62504,4881600,1.31771]

Stock Entry: [1996-04-30,31.24992,31.5,29.50008,29.74992,5003200,1.23958]

……

11.  作為ORC檔案格式儲存

我們將上面的資料寫入到Hive表裡面,並且儲存的檔案格式為ORC。

scala> results.write.format("orc").saveAsTable("yahoo_stocks_orc")

12.  讀取ORC檔案

scala> val yahoo_stocks_orc= spark.read.format("orc").load("yahoo_stocks_orc")

yahoo_stocks_orc: org.apache.spark.sql.DataFrame = [date:string, open: float ... 5 more fields]

註冊一個臨時基於記憶體的表並對映到此ORC表:

scala> yahoo_stocks_orc.createOrReplaceTempView("orcTest")

查詢:

scala> spark.sql("SELECT * from orcTest").collect.foreach(println)

……

[1996-04-29,31.5,31.99992,30.49992,31.00008,5928000,1.29167]

[1996-04-26,31.99992,32.25,31.24992,31.75008,7561600,1.32292]

[1996-04-25,30.0,32.25,28.99992,31.24992,19478400,1.30208]

[1996-04-24,28.5,29.12496,27.75,28.99992,7795200,1.20833]

……

13.  查詢Hive的表資料

我們在使用spark-shell登入時,預設初始化了一個spark例項:

Spark session available as 'spark'.

我們可以使用spark訪問Hive的表資料。

scala> val tableDF =spark.sql("select * from yahoo_stocks_orc limit 10")

tableDF: org.apache.spark.sql.DataFrame = [date: string,open: float ... 5 more fields]

檢視10行資料:

scala> tableDF.take(10).foreach(println)

[2015-04-28,44.34,44.57,43.94,44.34,7188300,44.34]

[2015-04-27,44.65,45.1,44.25,44.36,10840900,44.36]

[2015-04-24,43.73,44.71,43.69,44.52,11267500,44.52]

[2015-04-23,43.92,44.06,43.58,43.7,14274900,43.7]

[2015-04-22,44.58,44.85,43.67,43.98,32241200,43.98]

[2015-04-21,45.15,45.18,44.45,44.49,16103700,44.49]

[2015-04-20,44.73,44.91,44.41,44.66,10052900,44.66]

[2015-04-17,45.3,45.44,44.25,44.45,13305700,44.45]

[2015-04-16,45.82,46.13,45.53,45.78,13800300,45.78]

[2015-04-15,45.46,45.83,45.23,45.73,15033500,45.73]