1. 程式人生 > >Spark 實戰,第 3 部分: 使用 Spark SQL 對結構化資料進行統計分析

Spark 實戰,第 3 部分: 使用 Spark SQL 對結構化資料進行統計分析

引言

在很多領域,如電信,金融等,每天都會產生大量的結構化資料,當資料量不斷變大,傳統的資料儲存 (DBMS) 和計算方式 (單機程式) 已經不能滿足企業對資料儲存,統計分析以及知識挖掘的需要。在過去的數年裡,傳統的軟體開發和維護人員已經積累了大量的基於 DBMS 的操作資料知識和經驗,他們已經習慣了通過編寫 SQL 語句來對資料記錄進行統計分析。於是大資料工程師們開始探索如何使用類 SQL 的方式來操作和分析大資料,通過大量的努力,目前業界已經出現很多 SQL on Hadoop 的方案,如 Hive, Impala 等。Spark SQL 就是其中的一個,實際上 Spark SQL 並不是一開始就存在於 Spark 生態系統裡的,它的前身是 Shark。隨著 Spark 自身的發展,Spark 團隊開始試圖放棄 Shark 這個對於 Hive 有太多依賴 (查詢優化,語法解析) 的東西,於是才有了 Spark SQL 這個全新的模組,通過幾個版本的發展,目前 Spark SQL 已經趨於穩定,功能也逐漸豐富。本文將以 Spark1.4.1 版本為基礎,由淺入深地向讀者介紹 Spark SQL/DataFrame 的基本概念和原理,並且通過例項向讀者展示如何使用 Spark SQL/DataFrame API 開發應用程式。接下來,就讓我們開始 Spark SQL 的體驗之旅吧。

關於 Spark SQL/DataFrame

Spark SQL 是 Spark 生態系統裡用於處理結構化大資料的模組,該模組裡最重要的概念就是 DataFrame, 相信熟悉 R 語言的工程師對此並不陌生。Spark 的 DataFrame 是基於早期版本中的 SchemaRDD,所以很自然的使用分散式大資料處理的場景。Spark DataFrame 以 RDD 為基礎,但是帶有 Schema 資訊,它類似於傳統資料庫中的二維表格。

Spark SQL 模組目前支援將多種外部資料來源的資料轉化為 DataFrame,並像操作 RDD 或者將其註冊為臨時表的方式處理和分析這些資料。當前支援的資料來源有:

  • Json
  • 文字檔案
  • RDD
  • 關係資料庫
  • Hive
  • Parquet

一旦將 DataFrame 註冊成臨時表,我們就可以使用類 SQL 的方式操作這些資料,我們將在下文的案例中詳細向讀者展示如何使用 Spark SQL/DataFrame 提供的 API 完成資料讀取,臨時表註冊,統計分析等步驟。

案例介紹與程式設計實現

案例一

a.案例描述與分析

本案例中,我們將使用 Spark SQL 分析包含 5 億條人口資訊的結構化資料,資料儲存在文字檔案上,總大小為 7.81G。檔案總共包含三列,第一列是 ID,第二列是性別資訊 (F -> 女,M -> 男),第三列是人口的身高資訊,單位是 cm。實際上這個檔案與我們在本系列文章第一篇中的案例三使用的格式是一致的,讀者可以參考相關章節,並使用提供的測試資料生成程式,生成 5 億條資料,用於本案例中。為了便於讀者理解,本案例依然把用於分析的文字檔案的內容片段貼出來,具體格式如下。

圖 1. 案例一測試資料格式預覽
圖 1. 案例一測試資料格式預覽

生成該測試檔案後,讀者需要把檔案上傳到 HDFS 上,您可以選擇使用 HDFS shell 命令或者 HDSF 的 eclipse 外掛。上傳到 HDFS 後,我們可以通過訪問 HDFS web console(http://namenode:50070),檢視檔案具體資訊。

圖 2. 案例一測試資料檔案基本資訊
圖 2. 案例一測試資料檔案基本資訊

本例中,我們的統計任務如下:

  • 用 SQL 語句的方式統計男性中身高超過 180cm 的人數。
  • 用 SQL 語句的方式統計女性中身高超過 170cm 的人數。
  • 對人群按照性別分組並統計男女人數。
  • 用類 RDD 轉換的方式對 DataFrame 操作來統計並列印身高大於 210cm 的前 50 名男性。
  • 對所有人按身高進行排序並列印前 50 名的資訊。
  • 統計男性的平均身高。
  • 統計女性身高的最大值。

讀者可以看到,上述統計任務中有些是相似的,但是我們要用不同的方式實現它,以向讀者展示不同的語法。

b.編碼實現

清單 1. 案例一示例程式原始碼
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
object PeopleDataStatistics2 {
 private val schemaString = "id,gender,height"
 def main(args: Array[String]) {
 if (args.length < 1) {
 println("Usage:PeopleDataStatistics2 filePath")
 System.exit(1)
 }
 val conf = new SparkConf().setAppName("Spark Exercise:People Data Statistics 2")
 val sc = new SparkContext(conf)
 val peopleDataRDD = sc.textFile(args(0))
 val sqlCtx = new SQLContext(sc)
 // this is used to implicitly convert an RDD to a DataFrame.
 import sqlCtx.implicits._
 val schemaArray = schemaString.split(",")
 val schema = StructType(schemaArray.map(fieldName =
                         > StructField(fieldName, StringType, true)))
 val rowRDD: RDD[Row] = peopleDataRDD.map(_.split(" ")).map(
                         eachRow => Row(eachRow(0), eachRow(1), eachRow(2)))
 val peopleDF = sqlCtx.createDataFrame(rowRDD, schema)
 peopleDF.registerTempTable("people")
 //get the male people whose height is more than 180
 val higherMale180 = sqlCtx.sql("select id,gender,
                     height from people where height > 180 and gender='M'")
 println("Men whose height are more than 180: " + higherMale180.count())
 println("<Display #1>")
 //get the female people whose height is more than 170
 val higherFemale170 = sqlCtx.sql("select id,gender,
                          height from people where height > 170 and gender='F'")
 println("Women whose height are more than 170: " + higherFemale170.count())
 println("<Display #2>")
 //Grouped the people by gender and count the number
 peopleDF.groupBy(peopleDF("gender")).count().show()
 println("People Count Grouped By Gender")
 println("<Display #3>")
 //
 peopleDF.filter(peopleDF("gender").equalTo("M")).filter(
                                   peopleDF("height") > 210).show(50)
 println("Men whose height is more than 210")
 println("<Display #4>")
 //
 peopleDF.sort($"height".desc).take(50).foreach { row => println(row(0) + "," + row(1) + "," + row(2)) }
 println("Sorted the people by height in descend order,Show top 50 people")
 println("<Display #5>")
 //
 peopleDF.filter(peopleDF("gender").equalTo("M")).agg(Map("height" -> "avg")).show()
 println("The Average height for Men")
 println("<Display #6>")
 //
 peopleDF.filter(peopleDF("gender").equalTo("F")).agg("height" -> "max").show()
 println("The Max height for Women:")
 println("<Display #7>")
 //......
 println("All the statistics actions are finished on structured People data.")
 }
}

c.提交併執行

編碼完成後,把專案打成 jar 包,在這裡,我們將原始碼打成名為 spark_exercise-1.0.jar, 筆者使用 Maven 來管理專案,也建議讀者可以嘗試下 Maven 管理您的 Scala 專案。

清單 2. 案例一示例程式執行命令
./spark-submit --class com.ibm.spark.exercise.sql.PeopleDataStatistics2 \
--master spark://<spark_master_node_ip>:7077 \
--driver-memory 8g \
--executor-memory 2g \
--total-executor-cores 12 \
 /home/fams/spark_exercise-1.0.jar \ hdfs://<hdfs_namenode_ip>:9000/
                                  user/fams/inputfiles/sample_people_info2.txt

d.監控執行過程

在提交後,我們可以在 Spark web console(http://<spark_master_node_ip>:8080) 中監控程式執行過程。下面我們將分別向讀者展示如何監控程式產生的 Jobs,Stages,以及 D 視覺化的檢視 DAG 資訊。

圖 3. 案例一程式監控圖 1
圖 3. 案例一程式監控圖 1
圖 4. 案例一程式監控圖 2
圖 4. 案例一程式監控圖 2
圖 5. 案例一程式監控圖 3
圖 5. 案例一程式監控圖 3
圖 6. 案例一程式監控圖 4
圖 6. 案例一程式監控圖 4

其實在 Spark web console 中還可以檢視很多資訊,如執行環境資訊,Executor 程序資訊,讀者可以在介面上一一檢視,在這裡不再贅述。

e.執行結果

圖 7. 案例一程式執行結果 (部分)
圖 7. 案例一程式執行結果 (部分)

案例二

a.案例描述與分析

在案例一中,我們將儲存於 HDFS 中的檔案轉換成 DataFrame 並進行了一系列的統計,細心的讀者會發現,都是一個關聯於一個 DataFrame 的簡單查詢和統計,那麼在案例二中,我們將向讀者展示如何連線多個 DataFrame 做更復雜的統計分析。

在本案例中,我們將統計分析 1 千萬使用者和 1 億條交易資料。對於使用者資料,它是一個包含 6 個列 (ID, 性別, 年齡, 註冊日期, 角色 (從事行業), 所在區域) 的文字檔案,具有以下格式。

圖 8. 案例二測試使用者資料格式預覽
圖 8. 案例二測試使用者資料格式預覽

我們使用以下 Scala 程式來生成本案例所需的測試資料。

清單 3. 案例二使用者測試資料生成類原始碼
import java.io.{File, FileWriter}
import scala.util.Random
object UserDataGenerator {
 private val FILE_PATH = "C:\\LOCAL_DISK_D\\sample_user_data.txt"
 private val ROLE_ID_ARRAY = Array[String]("ROLE001","ROLE002","ROLE003","ROLE004","ROLE005")
 private val REGION_ID_ARRAY = Array[String]("REG001","REG002","REG003","REG004","REG005")
 private val MAX_USER_AGE = 60
 //how many records to be generated
 private val MAX_RECORDS = 10000000
 def main(args:Array[String]): Unit = {
 generateDataFile(FILE_PATH , MAX_RECORDS)
 }

 private def generateDataFile(filePath : String, recordNum: Int): Unit = {
 var writer:FileWriter = null
 try {
 writer = new FileWriter(filePath,true)
 val rand = new Random()
 for (i <- 1 to recordNum) {
 //generate the gender of the user
 var gender = getRandomGender
 //
 var age = rand.nextInt(MAX_USER_AGE)
 if (age < 10) {
 age = age + 10
 }
 //generate the registering date for the user
 var year = rand.nextInt(16) + 2000
 var month = rand.nextInt(12)+1
 //to avoid checking if it is a valid day for specific month
 //we always generate a day which is no more than 28
 var day = rand.nextInt(28) + 1
 var registerDate = year + "-" + month + "-" + day
 //generate the role of the user
 var roleIndex:Int = rand.nextInt(ROLE_ID_ARRAY.length)
 var role = ROLE_ID_ARRAY(roleIndex)
 //generate the region where the user is
 var regionIndex:Int = rand.nextInt(REGION_ID_ARRAY.length)
 var region = REGION_ID_ARRAY(regionIndex)

 writer.write(i + " " + gender + " " + age + " " + registerDate
 + " " + role + " " + region)
 writer.write(System.getProperty("line.separator"))
 }
 writer.flush()
 } catch {
 case e:Exception => println("Error occurred:" + e)
 } finally {
 if (writer != null)
 writer.close()
 }
 println("User Data File generated successfully.")
 }

 private def getRandomGender() :String = {
 val rand = new Random()
 val randNum = rand.nextInt(2) + 1
 if (randNum % 2 == 0) {
 "M"
 } else {
 "F"
 }
 }
}

對於交易資料,它是一個包含 5 個列 (交易單號, 交易日期, 產品種類, 價格, 使用者 ID) 的文字檔案,具有以下格式。

圖 9. 案例二測試交易資料格式預覽
圖 9. 案例二測試交易資料格式預覽

對於交易資料,我們使用以下 Scala 程式來生成。

清單 4. 案例二交易測試資料生成類原始碼
import java.io.{File, FileWriter}
import scala.util.Random
object ConsumingDataGenerator {
 private val FILE_PATH = "C:\\LOCAL_DISK_D\\sample_consuming_data.txt"
 // how many records to be generated
 private val MAX_RECORDS = 100000000
 // we suppose only 10 kinds of products in the consuming data
 private val PRODUCT_ID_ARRAY = Array[Int](1,2,3,4,5,6,7,8,9,10)
 // we suppose the price of most expensive product will not exceed 2000 RMB
 private val MAX_PRICE = 2000
 // we suppose the price of cheapest product will not be lower than 10 RMB
 private val MIN_PRICE = 10
 //the users number which should be same as the one in UserDataGenerator object
 private val USERS_NUM = 10000000

 def main(args:Array[String]): Unit = {
 generateDataFile(FILE_PATH,MAX_RECORDS);
 }

 private def generateDataFile(filePath : String, recordNum: Int): Unit = {
 var writer:FileWriter = null
 try {
 writer = new FileWriter(filePath,true)
 val rand = new Random()
 for (i <- 1 to recordNum) {
 //generate the buying date
 var year = rand.nextInt(16) + 2000
 var month = rand.nextInt(12)+1
 //to avoid checking if it is a valid day for specific 
 // month,we always generate a day which is no more than 28
 var day = rand.nextInt(28) + 1
 var recordDate = year + "-" + month + "-" + day
 //generate the product ID
 var index:Int = rand.nextInt(PRODUCT_ID_ARRAY.length)
 var productID = PRODUCT_ID_ARRAY(index)
 //generate the product price
 var price:Int = rand.nextInt(MAX_PRICE)
 if (price == 0) {
 price = MIN_PRICE
 }
 // which user buys this product
 val userID = rand.nextInt(10000000)+1
 writer.write(i + " " + recordDate + " " + productID
 + " " + price + " " + userID)
 writer.write(System.getProperty("line.separator"))
 }
 writer.flush()
 } catch {
 case e:Exception => println("Error occurred:" + e)
 } finally {
 if (writer != null)
 writer.close()
 }
 println("Consuming Data File generated successfully.")
 }
}

b.編碼實現

清單 5. 案例二示例程式原始碼
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkContext, SparkConf}

//define case class for user
case class User(userID: String, gender: String, age: Int,
                                registerDate: String,role: String, region: String)
//define case class for consuming data
case class Order(orderID: String, orderDate: String, productID: Int, price: Int, userID: String)

object UserConsumingDataStatistics {
 def main(args: Array[String]) {
 if (args.length < 1) {
 println("Usage:UserConsumingDataStatistics userDataFilePath consumingDataFilePath")
 System.exit(1)
 }
 val conf = new SparkConf().setAppName("Spark Exercise:User Consuming Data Statistics")
 //Kryo serializer is more quickly by default java serializer
 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
 val ctx = new SparkContext(conf)
 val sqlCtx = new SQLContext(ctx)
 import sqlCtx.implicits._
 //Convert user data RDD to a DataFrame and register it as a temp table
 val userDF = ctx.textFile(args(0)).map(_.split(" ")).map(
                          u => User(u(0), u(1), u(2).toInt,u(3),u(4),u(5))).toDF()
 userDF.registerTempTable("user")
 //Convert consuming data RDD to a DataFrame and register it as a temp table
 val orderDF = ctx.textFile(args(1)).map(_.split(" ")).map(o => Order(
                                        o(0), o(1), o(2).toInt,o(3).toInt,o(4))).toDF()
 orderDF.registerTempTable("orders")
 //cache the DF in memory with serializer should make the program run much faster
 userDF.persist(StorageLevel.MEMORY_ONLY_SER)
 orderDF.persist(StorageLevel.MEMORY_ONLY_SER)

 //The number of people who have orders in the year 2015
 val count = orderDF.filter(orderDF("orderDate").contains("2015")).join(
                               userDF, orderDF("userID").equalTo(userDF("userID"))).count()
 println("The number of people who have orders in the year 2015:" + count)
 //total orders produced in the year 2014
 val countOfOrders2014 = sqlCtx.sql("SELECT * FROM orders where 
                                     orderDate like '2014%'").count()
 println("total orders produced in the year 2014:" + countOfOrders2014)
 //Orders that are produced by user with ID 1 information overview
 val countOfOrdersForUser1 = sqlCtx.sql("SELECT o.orderID,o.productID,
                  o.price,u.userID FROM orders o,user u where u.userID =
                                      1 and u.userID = o.userID").show()
 println("Orders produced by user with ID 1 showed.")
 //Calculate the max,min,avg prices for the orders that are producted by user with ID 10
 val orderStatsForUser10 = sqlCtx.sql("SELECT max(o.price) as maxPrice,
                min(o.price) as minPrice,avg(o.price) as avgPrice,u.userID FROM orders o,
                       user u where u.userID = 10 and u.userID = o.userID group by u.userID")
 println("Order statistic result for user with ID 10:")
 orderStatsForUser10.collect().map(order => "Minimum Price=" + order.getAs("minPrice")
 + ";Maximum Price=" + order.getAs("maxPrice")
 + ";Average Price=" + order.getAs("avgPrice")
 ).foreach(result => println(result))
 }
}

c.提交併執行

./spark-submit –class com.ibm.spark.exercise.sql.UserConsumingDataStatistics \
 --master spark://<spark_master_node_ip>:7077 \
--num-executors 6 \
--driver-memory 8g \
--executor-memory 2g \
--executor-cores 2 \
/home/fams/spark_exercise-1.0.jar \ 
 hdfs://<hdfs_namenode_ip>:9000/user/fams/inputfiles/sample_user_data.txt \ 
 hdfs://<hdfs_namenode_ip>:9000/user/fams/inputfiles/sample_consuming_data.txt

d.監控執行過程

程式提交後,讀者可以用案例一描述的方式在 Spark web console 監控執行過程,這樣也能幫助您深入的理解 Spark SQL 程式的執行過程。

e.執行結果

圖 10. 案例二程式執行結果 (部分)
圖 10. 案例二程式執行結果 (部分)

總結

關於 Spark SQL 程式開發,我們通常有以下需要注意的地方。

  1. Spark SQL 程式開發過程中,我們有兩種方式確定 schema,第一種是反射推斷 schema,如本文的案例二,這種方式下,我們需要定義樣本類 (case class) 來對應資料的列;第二種方式是通過程式設計方式來確定 schema,這種方式主要是通過 Spark SQL 提供的 StructType 和 StructField 等 API 來程式設計實現,這種方式下我們不需要定義樣本類,如本文中的案例一。

    在程式實現中,我們需要使用以便隱式的把 RDD 轉化成 DataFrame 來操作。

  2. 本文展示的 DataFrame API 使用的方法只是其中很小的一部分,但是一旦讀者掌握了開發的基本流程,就能通過參考 DataFrame API 文件 寫出更為複雜的程式。
  3. 通常來說,我們有兩種方式瞭解 Spark 程式的執行流程。第一種是通過在控制檯觀察輸出日誌,另一種則更直觀,就是通過 Spark Web Console 來觀察 Driver 程式裡各個部分產生的 job 資訊以及 job 裡包含的 stages 資訊。
  4. 需要指出的是,熟練的掌握 Spark SQL/DataFrame 的知識對學習最新的 Spark 機器學習庫 ML Pipeline 至關重要,因為 ML Pipeline 使用 DataFrame 作為資料集來支援多種的資料型別。
  5. 筆者在測試的過程中發現,處理相同的資料集和類似的操作,Spark SQL/DataFrame 比傳統的 RDD 轉換操作具有更好的效能。這是由於 SQL 模組的 Catalyst 對使用者 SQL 做了很好的查詢優化。在以後的文章中會向讀者詳細的介紹該元件。