1. 程式人生 > >基於Jupyter平臺通過python實現Spark的應用程式之wordCount

基於Jupyter平臺通過python實現Spark的應用程式之wordCount

1、啟動spark平臺,介面如下:
這裡寫圖片描述
2、啟動Jupyter,介面如下圖所示:
這裡寫圖片描述
如果你對以上啟動存在疑問的話,請看我的上一篇部落格,關於Jupyter配置Spark的。
3、功能分析
- 我們要實現的一個功能是統計詞頻
- 我們需要把統計的檔案上傳到hdfs裡面
- 編寫python指令碼
4、程式碼實現
- 上傳檔案到hdfs
我有一個hello.txt檔案,裡面有兩行內容,如下圖所示:
這裡寫圖片描述
檔案放到了~/Downloads下,進入hadoop/bin檔案,輸入如下命令:
./hadoop fs -put ~/Downloads/hello.txt
檢視上傳是否成功,輸入如下命令:
./hadoop fs -ls /data |grep he*


*代表匹配he後邊所有的內容,輸出結果截圖如下:
這裡寫圖片描述
- 在Jupyter平臺開啟python程式設計頁面,引入spark包,程式碼如下:from pyspark import SparkContext, SparkConf

  • 初始化sc,這裡不瞭解spark程式設計原理的話建議看下spark官網,程式碼如下:

conf = SparkConf().setAppName("wordcount").setMaster("local")
sc = SparkContext(conf = conf)

setAppName設定的我這個執行例項的名字,setMaster設定的是執行的模式,這裡選擇本地模式。
- 讀取本地檔案,程式碼如下:
lines = sc.textFile("hdfs://hadoop0:9000/data/hello.txt")


- 使用spark的flatMap運算元,將讀入的檔案拆成一個個單詞,程式碼如下:
words = lines.flatMap(lambda line:line.split())
注意這個時候words是一個字串陣列,裡面存的是hello,you,hello,me。這裡用到了spark中提供transformations和pair兩種RDD,前一種只是做一個標記不執行,後一種才是真正的執行,這裡具體的執行流程我稍後會做一個詳細的分析給大家,讓大家徹底理解spark的執行原理。
- 使用spark的map運算元,將words設定為(key,value) 形式,程式碼如下:
pairs = words.map(lambda word:(word,1))

注意這個時候pairs 存放的是一個字串陣列,內容是:(hello,1),(you,1),(hello,1),(me,1)
- 使用spark的reduceByKey運算元,統計相同key出現的次數,即key相同的把value值相加,key合併為一個,程式碼如下:
counts = pairs.reduceByKey(lambda a,b:a + b)
注意這個時候counts為一個字串陣列,內容是:(hello,2),(you,1),(me,1)
- 使用spark的collect運算元,將結果打印出來,程式碼如下:
counts.collect() ``
- 還可以使用spark的reduceBykey,對結果按key排序,程式碼如下:
counts.countByKey()#根據key排序“`
注意輸出結果變成(hello,2),(me,1),(you,1),整個截圖如下:
這裡寫圖片描述