1. 程式人生 > >大話Spark(3)-一圖深入理解WordCount程式在Spark中的執行過程

大話Spark(3)-一圖深入理解WordCount程式在Spark中的執行過程

本文以WordCount為例, 畫圖說明spark程式的執行過程
WordCount就是統計一段資料中每個單詞出現的次數,
例如hello spark hello you 這段文字中hello出現2次, spark出現1次, you出現1次.
先上完整程式碼:

object WordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("WordCount");
    val sc = new SparkContext(conf)
  
    val lines = sc.textFile("hdfs://xxx:9000/spark.txt", 3); 
    val words = lines.flatMap { line => line.split("\s+") }   
    val pairs = words.map { word => (word, 1) }   
    val wordCounts = pairs.reduceByKey { _ + _ }
    wordCounts.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times."))  
  }
}

 

上面幾行程式碼就把hdfs上的spark.txt中每個單詞出現的個數計算完成.
Spark叢集的執行單位是Application,任何提交的任務都會產生一個Application。一個Application只會關聯上一個Spark上下文,也就是SparkContext。構建SparkContext時可以傳入Spark相關配置,也就是SparkConf,它可以用來指定Application的名稱,任務需要的CPU核數/記憶體大小,調優需要的配置等等. 以下兩行建立了SparkContext:

val conf = new SparkConf().setAppName("WordCount");
val sc = new SparkContext(conf)

 

建立完SparkContext之後, spark.txt的檔案數如何被spark處理的呢,讓我們一起看一下:
首先我們假設spark.txt在hdfs上對應著3個檔案,檔案內容都一樣,sc.textFile("hdfs://xxx:9000/spark.txt", 3)也執行了最小分割槽數為3.
然後wordcount執行過程如下:


說明:

  1. 綠,紅,黃色箭頭的地方發生了`Shuffer,把整個任務分成了2個Stage(2個藍色虛線框)
  2. 紅色虛線框代表一個Partition窄依賴(每個分割槽只被子RDD的一個分割槽所使用)的執行過程, 多個partition是並行執行的
  3. reduceByKey會先把每個Partition中的資料預聚合(groupByKey不會)
  4. Stage中的資料都是在記憶體中,不像MapReduce會頻繁寫磁碟,速度很快.
  5. 補充:其實textFile,flatMap,map,reduceByKey等transformation操作都是lazy的,程式執行到這裡不會立即執行,只有再觸發action操作的時候才會執行,此例中為wordCounts.foreach這個action操作.

&n