1. 程式人生 > >【Spark核心原始碼】Word Count程式的簡單分析

【Spark核心原始碼】Word Count程式的簡單分析

目錄

啟動Spark Shell

日誌級別的設定

解析word count程式

第0步:設定日誌級別(“可選”)

第1步:讀取檔案

第2步:將每行的內容根據空格進行拆分成單詞

第3步:設定每一個單詞的計數為1

第4步:單詞根據Key進行計數值累加聚合

第5步:輸出結果與分析


剛接觸Spark那會,還是Spark1.3版本,那時覺得Spark好厲害,但由於能力和工作的原因,沒有沉澱下來仔細研究這門技術,只是膚淺地使用,並沒有深入瞭解追溯其本源。接下來,我要仔細研究Spark核心,也算是給自己一個交代吧。

這裡我就不寫Spark環境搭建的過程了,這個真的是太多太多了。基於Spark2.1.0,從Spark Shell入手,慢慢走向Spark核心深處。

啟動Spark Shell

輸入spark-shell,啟動成功會列印如下日誌資訊:

 日誌資訊中有幾點需要關注一下:

  1. 預設的日誌配置檔案是"org/apache/spark/log4j-defaults.properties",日誌級別是"WARN";
  2. 我們可以通過“sc.setLogLevel(newLevel)”方法指定日誌級別;
  3. Spark Context Web UI 地址是"http://192.168.31.115:4040","192.168.31.115"是本機IP地址,4040是埠號;
  4. Spark Shell部署模式是“master = local[*]”,當前應用ID是“local-153....”;
  5. Spark Sehll預設建立了SparkContext物件,名叫sc;建立了SparkSession物件,名叫spark。

日誌級別的設定

在使用spark-shell編寫word count程式之前,我們先設定輸出日誌的級別,修改為INFO級別,因為WARN級別輸出的東西有點少。方法有二,其一:在spark的repl中使用sc.setLogLevel()方法直接設定日誌等級。

其二:進入spark的conf目錄中,找到“log4j.properties.template”,並改名為“log4j.properties”。

cp log4j.properties.template log4j.properties

編輯“log4j.properties”檔案,將“log4j.logger.org.apache.spark.repl.Main=WARN”改成“log4j.logger.org.apache.spark.repl.Main=INFO”。

這裡我採用的是第一種方法,下面是完整的編寫過程。

解析word count程式

第0步:設定日誌級別(“可選”)

第1步:讀取檔案

逐行讀取檔案,並建立了一個MapPartitionsRDD,MapPartitionsRDD繼承了“org.apache.spark.rdd.RDD”這個抽象類

第2步:將每行的內容根據空格進行拆分成單詞

第3步:設定每一個單詞的計數為1

雖然此時依舊生成了MapPartitionsRDD,但是它的泛型改變成了“(String,Int)”

第4步:單詞根據Key進行計數值累加聚合

把相同單詞的計數1,進行累加,生成了一個ShuffledRDD。(這一步非常重要,是spark作業中最容易出現效能問題的一個過程--shuffle過程)

第5步:輸出結果與分析

1-4步驟屬於transformation操作,最後一個步驟處於action操作,只有在action操作,spark作業才真正的執行。

執行的日誌如下:

sc提交的job的ID是0

一共產生的4個RDD,被換分成了ShuffleMapStage(ID=0,嘗試號=0)和ResultStage(ID=1,嘗試號=0)。

此時對應步驟1,2,3

提交了ShuffleMapStage,做了一些準備工作之後(比如建立廣播變數等),開始執行任務。

“Executor: Running task 0.0 in stage 0.0 (TID 0)”,

task 0.0表示任務ID為0,嘗試號為0

stage 0.0表示stageID為0也就是ShuffleMapStage,嘗試號為0

執行任務時進行讀取檔案,這裡是HadoopRDD進行讀取的,也就是說第一個MapPartionsRDD的上游RDD是HadoopRDD。

上圖的結尾表示,此時的task執行結束了。

第一個stage的任務執行結束,需要從任務列表中去除,然後查詢、等待機新的stage任務,然後尋找到了ID為1的stage,也就是ResultStage。

此時對應步驟4,5

執行ResultStage,同ShuffledStage執行階段一樣先有個初期準備,然後執行任務

“Executor: Running task 0.0 in stage 1.0 (TID 1)”

task 0.0表示任務ID為0,嘗試號為0

stage 0.0表示stageID為1也就是ResultStage,嘗試號為0

結尾還是表示執行結束,彙報一下工作。

最後從任務列表中去除ResultStage的任務,並宣佈ID為0的job也就是整個word count任務結束。