1. 程式人生 > >Spark 入門實戰之最好的例項

Spark 入門實戰之最好的例項

轉載:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice1/

搭建開發環境

  1. 安裝 Scala IDE

    搭建 Scala 語言開發環境很容易,Scala IDE 官網 下載合適的版本並解壓就可以完成安裝,本文使用的版本是 4.1.0。

  2. 安裝 Scala 語言包

    如果下載的 Scala IDE 自帶的 Scala 語言包與 Spark 1.3.1 使用的 Scala 版本 (2.10.x) 不一致,那麼就需要下載和本文所使用的 Spark 所匹配的版本,以確保實現的 Scala 程式不會因為版本問題而執行失敗。

  3. 安裝 JDK

    如果您的機器上沒有安裝 JDK,請下載並安裝 1.6 版本以上的 JDK。

  4. 建立並配置 Spark 工程

    開啟 Scala IDE,建立一個名稱為 spark-exercise 的 Scala 工程。

圖 1. 建立 scala 工程
圖 1. 建立 scala 工程

在工程目錄下建立一個 lib 資料夾,並且把您的 Spark 安裝包下的 spark-assembly jar 包拷貝到 lib 目錄下。

圖 2. Spark 開發 jar 包
圖 2. Spark 開發 jar 包

並且新增該 jar 包到工程的 classpath 並配置工程使用剛剛安裝的 Scala 2.10.5 版本.,工程目錄結構如下。

圖 3. 新增 jar 包到 classpath
圖 3. 新增 jar 包到 classpath

執行環境介紹

為了避免讀者對本文案例執行環境產生困惑,本節會對本文用到的叢集環境的基本情況做個簡單介紹。

  • 本文所有例項資料儲存的環境是一個 8 個機器的 Hadoop 叢集,檔案系統總容量是 1.12T,NameNode 叫 hadoop036166, 服務埠是 9000。讀者可以不關心具體的節點分佈,因為這個不會影響到您閱讀後面的文章。
  • 本文執行例項程式使用的 Spark 叢集是一個包含四個節點的 Standalone 模式的叢集, 其中包含一個 Master 節點 (監聽埠 7077) 和三個 Worker 節點,具體分佈如下:
Server Name Role
hadoop036166 Master
hadoop036187 Worker
hadoop036188 Worker
hadoop036227 Worker
  • Spark 提供一個 Web UI 去檢視叢集資訊並且監控執行結果,預設地址是:http://<spark_master_ip>:8080 ,對於該例項提交後我們也可以到 web 頁面上去檢視執行結果,當然也可以通過檢視日誌去找到執行結果。
圖 4. Spark 的 web console
圖 4. Spark 的 web console

案例分析與程式設計實現

案例一

a. 案例描述

提起 Word Count(詞頻數統計),相信大家都不陌生,就是統計一個或者多個檔案中單詞出現的次數。本文將此作為一個入門級案例,由淺入深的開啟使用 Scala 編寫 Spark 大資料處理程式的大門。

b.案例分析

對於詞頻數統計,用 Spark 提供的運算元來實現,我們首先需要將文字檔案中的每一行轉化成一個個的單詞, 其次是對每一個出現的單詞進行記一次數,最後就是把所有相同單詞的計數相加得到最終的結果。

對於第一步我們自然的想到使用 flatMap 運算元把一行文字 split 成多個單詞,然後對於第二步我們需要使用 map 運算元把單個的單詞轉化成一個有計數的 Key-Value 對,即 word -> (word,1). 對於最後一步統計相同單詞的出現次數,我們需要使用 reduceByKey 運算元把相同單詞的計數相加得到最終結果。
c. 程式設計實現

清單 1.SparkWordCount 類原始碼
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object SparkWordCount {
 def FILE_NAME:String = "word_count_results_";
 def main(args:Array[String]) {
 if (args.length < 1) {
 println("Usage:SparkWordCount FileName");
 System.exit(1);
 }
 val conf = new SparkConf().setAppName("Spark Exercise: Spark Version Word Count Program");
 val sc = new SparkContext(conf);
 val textFile = sc.textFile(args(0));
 val wordCounts = textFile.flatMap(line => line.split(" ")).map(
                                        word => (word, 1)).reduceByKey((a, b) => a + b)
 //print the results,for debug use.
 //println("Word Count program running results:");
 //wordCounts.collect().foreach(e => {
 //val (k,v) = e
 //println(k+"="+v)
 //});
 wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis());
 println("Word Count program running results are successfully saved.");
 }
}

d. 提交到叢集執行

本例項中, 我們將統計 HDFS 檔案系統中/user/fams 目錄下所有 txt 檔案中詞頻數。其中 spark-exercise.jar 是 Spark 工程打包後的 jar 包,這個 jar 包執行時會被上傳到目標伺服器的/home/fams 目錄下。執行此例項的具體命令如下:

清單 2.SparkWordCount 類執行命令
 ./spark-submit \
--class com.ibm.spark.exercise.basic.SparkWordCount \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g --executor-memory 2g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/*.txt

e. 監控執行狀態

該例項把最終的結果儲存在了 HDFS 上,那麼如果程式執行正常我們可以在 HDFS 上找到生成的檔案資訊

圖 5. 案例一輸出結果
圖 5. 案例一輸出結果

開啟 Spark 叢集的 Web UI, 可以看到剛才提交的 job 的執行結果。

圖 6. 案例一完成狀態
圖 6. 案例一完成狀態

如果程式還沒執行完成,那麼我們可以在 Running Applications 列表裡找到它。

案例二

a. 案例描述

該案例中,我們將假設我們需要統計一個 1000 萬人口的所有人的平均年齡,當然如果您想測試 Spark 對於大資料的處理能力,您可以把人口數放的更大,比如 1 億人口,當然這個取決於測試所用叢集的儲存容量。假設這些年齡資訊都儲存在一個檔案裡,並且該檔案的格式如下,第一列是 ID,第二列是年齡。

圖 7. 案例二測試資料格式預覽
圖 7. 案例二測試資料格式預覽

現在我們需要用 Scala 寫一個生成 1000 萬人口年齡資料的檔案,源程式如下:

清單 3. 年齡資訊檔案生成類原始碼
 import java.io.FileWriter
 import java.io.File
 import scala.util.Random

 object SampleDataFileGenerator {
 
 def main(args:Array[String]) {
 val writer = new FileWriter(new File("C: \\sample_age_data.txt"),false)
 val rand = new Random()
 for ( i <- 1 to 10000000) {
 writer.write( i + " " + rand.nextInt(100))
 writer.write(System.getProperty("line.separator"))
 }
 writer.flush()
 writer.close()
 }
 }

b. 案例分析

要計算平均年齡,那麼首先需要對原始檔對應的 RDD 進行處理,也就是將它轉化成一個只包含年齡資訊的 RDD,其次是計算元素個數即為總人數,然後是把所有年齡數加起來,最後平均年齡=總年齡/人數。

對於第一步我們需要使用 map 運算元把原始檔對應的 RDD 對映成一個新的只包含年齡資料的 RDD,很顯然需要對在 map 運算元的傳入函式中使用 split 方法,得到陣列後只取第二個元素即為年齡資訊;第二步計算資料元素總數需要對於第一步對映的結果 RDD 使用 count 運算元;第三步則是使用 reduce 運算元對只包含年齡資訊的 RDD 的所有元素用加法求和;最後使用除法計算平均年齡即可。

由於本例輸出結果很簡單,所以只打印在控制檯即可。

c. 程式設計實現

清單 4.AvgAgeCalculator 類原始碼
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object AvgAgeCalculator {
 def main(args:Array[String]) {
 if (args.length < 1){
 println("Usage:AvgAgeCalculator datafile")
 System.exit(1)
 }
 val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator")
 val sc = new SparkContext(conf)
 val dataFile = sc.textFile(args(0), 5);
 val count = dataFile.count()
 val ageData = dataFile.map(line => line.split(" ")(1))
 val totalAge = ageData.map(age => Integer.parseInt(
                                String.valueOf(age))).collect().reduce((a,b) => a+b)
 println("Total Age:" + totalAge + ";Number of People:" + count )
 val avgAge : Double = totalAge.toDouble / count.toDouble
 println("Average Age is " + avgAge)
 }
}

d. 提交到叢集執行

要執行本例項的程式,需要將剛剛生成的年齡資訊檔案上傳到 HDFS 上,假設您剛才已經在目標機器上執行生成年齡資訊檔案的 Scala 類,並且檔案被生成到了/home/fams 目錄下。

那麼您需要執行一下 HDFS 命令把檔案拷貝到 HDFS 的/user/fams 目錄。

清單 5. 年齡資訊檔案拷貝到 HDFS 目錄的命令
hdfs dfs –copyFromLocal /home/fams /user/fams
清單 6.AvgAgeCalculator 類的執行命令
 ./spark-submit \
 --class com.ibm.spark.exercise.basic.AvgAgeCalculator \
 --master spark://hadoop036166:7077 \
 --num-executors 3 \
 --driver-memory 6g \
 --executor-memory 2g \
 --executor-cores 2 \
 /home/fams/sparkexercise.jar \
 hdfs://hadoop036166:9000/user/fams/inputfiles/sample_age_data.txt

e. 監控執行狀態

在控制檯您可以看到如下所示資訊:

圖 8. 案例二輸出結果
圖 8. 案例二輸出結果

我們也可以到 Spark Web Console 去檢視 Job 的執行狀態

圖 9. 案例二完成狀態
圖 9. 案例二完成狀態

案例三

a. 案例描述

本案例假設我們需要對某個省的人口 (1 億) 性別還有身高進行統計,需要計算出男女人數,男性中的最高和最低身高,以及女性中的最高和最低身高。本案例中用到的原始檔有以下格式, 三列分別是 ID,性別,身高 (cm)。

圖 10. 案例三測試資料格式預覽
圖 10. 案例三測試資料格式預覽

我們將用以下 Scala 程式生成這個檔案,原始碼如下:

清單 7. 人口資訊生成類原始碼
import java.io.FileWriter
import java.io.File
import scala.util.Random

object PeopleInfoFileGenerator {
 def main(args:Array[String]) {
 val writer = new FileWriter(new File("C:\\LOCAL_DISK_D\\sample_people_info.txt"),false)
 val rand = new Random()
 for ( i <- 1 to 100000000) {
 var height = rand.nextInt(220)
 if (height < 50) {
 height = height + 50
 }
 var gender = getRandomGender
 if (height < 100 && gender == "M")
 height = height + 100
 if (height < 100 && gender == "F")
 height = height + 50
 writer.write( i + " " + getRandomGender + " " + height)
 writer.write(System.getProperty("line.separator"))
 }
 writer.flush()
 writer.close()
 println("People Information File generated successfully.")
 }
 
 def getRandomGender() :String = {
 val rand = new Random()
 val randNum = rand.nextInt(2) + 1
 if (randNum % 2 == 0) {
 "M"
 } else {
 "F"
 }
 }
}

b. 案例分析

對於這個案例,我們要分別統計男女的資訊,那麼很自然的想到首先需要對於男女資訊從原始檔的對應的 RDD 中進行分離,這樣會產生兩個新的 RDD,分別包含男女資訊;其次是分別對男女資訊對應的 RDD 的資料進行進一步對映,使其只包含身高資料,這樣我們又得到兩個 RDD,分別對應男性身高和女性身高;最後需要對這兩個 RDD 進行排序,進而得到最高和最低的男性或女性身高。

對於第一步,也就是分離男女資訊,我們需要使用 filter 運算元,過濾條件就是包含”M” 的行是男性,包含”F”的行是女性;第二步我們需要使用 map 運算元把男女各自的身高資料從 RDD 中分離出來;第三步我們需要使用 sortBy 運算元對男女身高資料進行排序。

c. 程式設計實現

在實現上,有一個需要注意的點是在 RDD 轉化的過程中需要把身高資料轉換成整數,否則 sortBy 運算元會把它視為字串,那麼排序結果就會受到影響,例如 身高資料如果是:123,110,84,72,100,那麼升序排序結果將會是 100,110,123,72,84,顯然這是不對的。

清單 8.PeopleInfoCalculator 類原始碼
object PeopleInfoCalculator {
 def main(args:Array[String]) {
 if (args.length < 1){
 println("Usage:PeopleInfoCalculator datafile")
 System.exit(1)
 }
 val conf = new SparkConf().setAppName("Spark Exercise:People Info(Gender & Height) Calculator")
 val sc = new SparkContext(conf)
 val dataFile = sc.textFile(args(0), 5);
 val maleData = dataFile.filter(line => line.contains("M")).map(
                              line => (line.split(" ")(1) + " " + line.split(" ")(2)))
 val femaleData = dataFile.filter(line => line.contains("F")).map(
                              line => (line.split(" ")(1) + " " + line.split(" ")(2)))
 //for debug use
 //maleData.collect().foreach { x => println(x)}
 //femaleData.collect().foreach { x => println(x)}
 val maleHeightData = maleData.map(line => line.split(" ")(1).toInt)
 val femaleHeightData = femaleData.map(line => line.split(" ")(1).toInt)
 //for debug use
 //maleHeightData.collect().foreach { x => println(x)}
 //femaleHeightData.collect().foreach { x => println(x)}
 val lowestMale = maleHeightData.sortBy(x => x,true).first()
 val lowestFemale = femaleHeightData.sortBy(x => x,true).first()
 //for debug use
 //maleHeightData.collect().sortBy(x => x).foreach { x => println(x)}
 //femaleHeightData.collect().sortBy(x => x).foreach { x => println(x)}
 val highestMale = maleHeightData.sortBy(x => x, false).first()
 val highestFemale = femaleHeightData.sortBy(x => x, false).first()
 println("Number of Male Peole:" + maleData.count())
 println("Number of Female Peole:" + femaleData.count())
 println("Lowest Male:" + lowestMale)
 println("Lowest Female:" + lowestFemale)
 println("Highest Male:" + highestMale)
 println("Highest Female:" + highestFemale)
 }
}

d. 提交到叢集執行

在提交該程式到叢集執行之前,我們需要將剛才生成的人口資訊資料檔案上傳到 HDFS 叢集,具體命令可以參照上文。

清單 9.PeopleInfoCalculator 類的執行命令
 ./spark-submit \
 --class com.ibm.spark.exercise.basic.PeopleInfoCalculator \
 --master spark://hadoop036166:7077 \
 --num-executors 3 \
 --driver-memory 6g \
 --executor-memory 3g \
 --executor-cores 2 \
 /home/fams/sparkexercise.jar \
 hdfs://hadoop036166:9000/user/fams/inputfiles/sample_people_info.txt

e. 監控執行狀態

對於該例項,如程式中列印的一樣,會在控制檯顯示如下資訊:

圖 11. 案例三輸出結果
圖 11. 案例三輸出結果

在 Spark Web Console 裡可以看到具體的執行狀態資訊

圖 12. 案例三完成狀態
圖 12. 案例三完成狀態

案例四

a. 案例描述

該案例中我們假設某搜尋引擎公司要統計過去一年搜尋頻率最高的 K 個科技關鍵詞或片語,為了簡化問題,我們假設關鍵片語已經被整理到一個或者多個文字檔案中,並且文件具有以下格式。

圖 13. 案例四測試資料格式預覽
圖 13. 案例四測試資料格式預覽

我們可以看到一個關鍵詞或者片語可能出現多次,並且大小寫格式可能不一致。

b. 案例分析

要解決這個問題,首先我們需要對每個關鍵詞出現的次數進行計算,在這個過程中需要識別不同大小寫的相同單詞或者片語,如”Spark”和“spark” 需要被認定為一個單詞。對於出現次數統計的過程和 word count 案例類似;其次我們需要對關鍵詞或者片語按照出現的次數進行降序排序,在排序前需要把 RDD 資料元素從 (k,v) 轉化成 (v,k);最後取排在最前面的 K 個單詞或者片語。

對於第一步,我們需要使用 map 運算元對源資料對應的 RDD 資料進行全小寫轉化並且給片語記一次數,然後呼叫 reduceByKey 運算元計算相同片語的出現次數;第二步我們需要對第一步產生的 RDD 的資料元素用 sortByKey 運算元進行降序排序;第三步再對排好序的 RDD 資料使用 take 運算元獲取前 K 個數據元素。

c. 程式設計實現

清單 10.TopKSearchKeyWords 類原始碼
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object TopKSearchKeyWords {
 def main(args:Array[String]){
 if (args.length < 2) {
 println("Usage:TopKSearchKeyWords KeyWordsFile K");
 System.exit(1)
 }
 val conf = new SparkConf().setAppName("Spark Exercise:Top K Searching Key Words")
 val sc = new SparkContext(conf)
 val srcData = sc.textFile(args(0))
 val countedData = srcData.map(line => (line.toLowerCase(),1)).reduceByKey((a,b) => a+b)
 //for debug use
 //countedData.foreach(x => println(x))
 val sortedData = countedData.map{ case (k,v) => (v,k) }.sortByKey(false)
 val topKData = sortedData.take(args(1).toInt).map{ case (v,k) => (k,v) }
 topKData.foreach(println)
 }
}

d. 提交到叢集執行

清單 11.TopKSearchKeyWords 類的執行命令
 ./spark-submit \
 --class com.ibm.spark.exercise.basic.TopKSearchKeyWords \
 --master spark://hadoop036166:7077 \
 --num-executors 3 \
 --driver-memory 6g \
 --executor-memory 2g \
 --executor-cores 2 \
 /home/fams/sparkexercise.jar \
 hdfs://hadoop036166:9000/user/fams/inputfiles/search_key_words.txt

e. 監控執行狀態

如果程式成功執行,我們將在控制檯看到以下資訊。當然讀者也可以仿照案例二和案例三那樣,自己嘗試使用 Scala 寫一段小程式生成此案例需要的源資料檔案,可以根據您的 HDFS 叢集的容量,生成儘可能大的檔案,用來測試本案例提供的程式。

圖 14. 案例四輸出結果
圖 14. 案例四輸出結果
圖 15. 案例四完成狀態
圖 15. 案例四完成狀態

Spark job 的執行流程簡介

我們可以發現,Spark 應用程式在提交執行後,控制檯會列印很多日誌資訊,這些資訊看起來是雜亂無章的,但是卻在一定程度上體現了一個被提交的 Spark job 在叢集中是如何被排程執行的,那麼在這一節,將會向大家介紹一個典型的 Spark job 是如何被排程執行的。

我們先來了解以下幾個概念:

DAG: 即 Directed Acyclic Graph,有向無環圖,這是一個圖論中的概念。如果一個有向圖無法從某個頂點出發經過若干條邊回到該點,則這個圖是一個有向無環圖。

Job:我們知道,Spark 的計算操作是 lazy 執行的,只有當碰到一個動作 (Action) 運算元時才會觸發真正的計算。一個 Job 就是由動作運算元而產生包含一個或多個 Stage 的計算作業。

Stage:Job 被確定後,Spark 的排程器 (DAGScheduler) 會根據該計算作業的計算步驟把作業劃分成一個或者多個 Stage。Stage 又分為 ShuffleMapStage 和 ResultStage,前者以 shuffle 為輸出邊界,後者會直接輸出結果,其邊界可以是獲取外部資料,也可以是以一個 ShuffleMapStage 的輸出為邊界。每一個 Stage 將包含一個 TaskSet。

TaskSet: 代表一組相關聯的沒有 shuffle 依賴關係的任務組成任務集。一組任務會被一起提交到更加底層的 TaskScheduler。

Task:代表單個數據分割槽上的最小處理單元。分為 ShuffleMapTask 和 ResultTask。ShuffleMapTask 執行任務並把任務的輸出劃分到 (基於 task 的對應的資料分割槽) 多個 bucket(ArrayBuffer) 中,ResultTask 執行任務並把任務的輸出傳送給驅動程式。

Spark 的作業任務排程是複雜的,需要結合原始碼來進行較為詳盡的分析,但是這已經超過本文的範圍,所以這一節我們只是對大致的流程進行分析。

Spark 應用程式被提交後,當某個動作運算元觸發了計算操作時,SparkContext 會向 DAGScheduler 提交一個作業,接著 DAGScheduler 會根據 RDD 生成的依賴關係劃分 Stage,並決定各個 Stage 之間的依賴關係,Stage 之間的依賴關係就形成了 DAG。Stage 的劃分是以 ShuffleDependency 為依據的,也就是說當某個 RDD 的運算需要將資料進行 Shuffle 時,這個包含了 Shuffle 依賴關係的 RDD 將被用來作為輸入資訊,進而構建一個新的 Stage。我們可以看到用這樣的方式劃分 Stage,能夠保證有依賴關係的資料可以以正確的順序執行。根據每個 Stage 所依賴的 RDD 資料的 partition 的分佈,會產生出與 partition 數量相等的 Task,這些 Task 根據 partition 的位置進行分佈。其次對於 finalStage 或是 mapStage 會產生不同的 Task,最後所有的 Task 會封裝到 TaskSet 內提交到 TaskScheduler 去執行。有興趣的讀者可以通過閱讀 DAGScheduler 和 TaskScheduler 的原始碼獲取更詳細的執行流程。

結束語

通過本文,相信讀者對如何使用 Scala 編寫 Spark 應用程式處理大資料已經有了較為深入的瞭解。當然在處理實際問題時,情況可能比本文舉得例子複雜很多,但是解決問題的基本思想是一致的。在碰到實際問題的時候,首先要對源資料結構格式等進行分析,然後確定如何去使用 Spark 提供的運算元對資料進行轉化,最終根據實際需求選擇合適的運算元操作資料並計算結果。本文並未介紹其它 Spark 模組的知識,顯然這不是一篇文章所能完成的,希望以後會有機會總結更多的 Spark 應用程式開發以及效能調優方面的知識,寫成文章與更多的 Spark 技術愛好者分享,一起進步。由於時間倉促並且本人知識水平有限,文章難免有未考慮周全的地方甚至是錯誤,希望各位朋友不吝賜教。有任何問題,都可以在文末留下您的評論,我會及時回覆。