1. 程式人生 > >30分鐘--Spark快速入門指南

30分鐘--Spark快速入門指南

Apache Spark 是一個新興的大資料處理通用引擎,提供了分散式的記憶體抽象。Spark 正如其名,最大的特點就是快(Lightning-fast),可比 Hadoop MapReduce 的處理速度快 100 倍。此外,Spark 提供了簡單易用的 API,幾行程式碼就能實現 WordCount。本教程主要參考官網快速入門教程,介紹了 Spark 的安裝,Spark shell 、RDD、Spark SQL、Spark Streaming 等的基本使用。

本教程的具體執行環境如下:

  • CentOS 6.4
  • Spark 1.6
  • Hadoop 2.6.0
  • Java JDK 1.7
  • Scala 2.10.5

準備工作

執行 Spark 需要 Java JDK 1.7,CentOS 6.x 系統預設只安裝了 Java JRE,還需要安裝 Java JDK,並配置好 JAVA_HOME 變數。此外,Spark 會用到 HDFS 與 YARN,因此請先安裝 Hadoop,具體請瀏覽Hadoop安裝教程,在此就不再複述。

安裝 Spark

待 Hadoop 安裝好之後,我們再開始安裝 Spark。

從官網下載 Spark從官網下載 Spark

Package type
  • Source code: Spark 原始碼,需要編譯才能使用,另外 Scala 2.11 需要使用原始碼編譯才可使用
  • Pre-build with user-provided Hadoop: “Hadoop free” 版,可應用到任意 Hadoop 版本
  • Pre-build for Hadoop 2.6 and later: 基於 Hadoop 2.6 的預先編譯版,需要與本機安裝的 Hadoop 版本對應。可選的還有 Hadoop 2.4 and later、Hadoop 2.3、Hadoop 1.x,以及 CDH 4。

為方便,本教程選擇的是 Pre-build with user-provided Hadoop,簡單配置後可應用到任意 Hadoop 版本。

下載後,執行如下命令進行安裝:

  1. sudo tar -zxf ~/下載/spark-1.6.0-bin-without-hadoop.tgz -C /usr/local/
  2. cd /usr/local
  3. sudo mv
    ./spark-1.6.0-bin-without-hadoop/ ./spark
  4. sudo chown -R hadoop:hadoop ./spark # 此處的 hadoop 為你的使用者名稱
Shell 命令

安裝後,需要在 ./conf/spark-env.sh 中修改 Spark 的 Classpath,執行如下命令拷貝一個配置檔案:

  1. cd /usr/local/spark
  2. cp ./conf/spark-env.sh.template ./conf/spark-env.sh
Shell 命令

編輯 ./conf/spark-env.sh(vim ./conf/spark-env.sh) ,在最後面加上如下一行:

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

儲存後,Spark 就可以啟動、運行了。

執行 Spark 示例

注意,必須安裝 Hadoop 才能使用 Spark,但如果使用 Spark 過程中沒用到 HDFS,不啟動 Hadoop 也是可以的。此外,接下來教程中出現的命令、目錄,若無說明,則一般以 Spark 的安裝目錄(/usr/local/spark)為當前路徑,請注意區分。

在 ./examples/src/main 目錄下有一些 Spark 的示例程式,有 Scala、Java、Python、R 等語言的版本。我們可以先執行一個示例程式 SparkPi(即計算 π 的近似值),執行如下命令:

  1. cd /usr/local/spark
  2. ./bin/run-example SparkPi
Shell 命令

執行時會輸出非常多的執行資訊,輸出結果不容易找到,可以通過 grep 命令進行過濾(命令中的 2>&1 可以將所有的資訊都輸出到 stdout 中,否則由於輸出日誌的性質,還是會輸出到螢幕中):

  1. ./bin/run-example SparkPi 2>&1 | grep "Pi is roughly"
Shell 命令

過濾後的執行結果如下圖所示,可以得到 π 的 5 位小數近似值 :

從官網下載 Spark從官網下載 Spark

Python 版本的 SparkPi 則需要通過 spark-submit 執行:

  1. ./bin/spark-submit examples/src/main/python/pi.py
Shell 命令

通過 Spark Shell 進行互動分析

Spark shell 提供了簡單的方式來學習 API,也提供了互動的方式來分析資料。Spark Shell 支援 Scala 和 Python,本教程選擇使用 Scala 來進行介紹。

Scala

Scala 是一門現代的多正規化程式語言,志在以簡練、優雅及型別安全的方式來表達常用程式設計模式。它平滑地集成了面向物件和函式語言的特性。Scala 運行於 Java 平臺(JVM,Java 虛擬機器),併兼容現有的 Java 程式。

Scala 是 Spark 的主要程式語言,如果僅僅是寫 Spark 應用,並非一定要用 Scala,用 Java、Python 都是可以的。使用 Scala 的優勢是開發效率更高,程式碼更精簡,並且可以通過 Spark Shell 進行互動式實時查詢,方便排查問題。

執行如下命令啟動 Spark Shell:

  1. ./bin/spark-shell
Shell 命令

啟動成功後如圖所示,會有 “scala >” 的命令提示符。

成功啟動Spark Shell成功啟動Spark Shell

基礎操作

Spark 的主要抽象是分散式的元素集合(distributed collection of items),稱為RDD(Resilient Distributed Dataset,彈性分散式資料集),它可被分發到叢集各個節點上,進行並行操作。RDDs 可以通過 Hadoop InputFormats 建立(如 HDFS),或者從其他 RDDs 轉化而來。

我們從 ./README 檔案新建一個 RDD,程式碼如下(本文出現的 Spark 互動式命令程式碼中,與位於同一行的註釋內容為該命令的說明,命令之後的註釋內容表示互動式輸出結果):

  1. val textFile = sc.textFile("file:///usr/local/spark/README.md")
  2. // textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:27
scala

程式碼中通過 “file://” 字首指定讀取本地檔案。Spark shell 預設是讀取 HDFS 中的檔案,需要先上傳檔案到 HDFS 中,否則會有“org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/README.md”的錯誤。

上述命令的輸出結果如下圖所示:

新建RDD新建RDD

RDDs 支援兩種型別的操作

  • actions: 在資料集上執行計算後返回值

下面我們就來演示 count() 和 first() 操作:

  1. textFile.count()// RDD 中的 item 數量,對於文字檔案,就是總行數
  2. // res0: Long = 95
  3. textFile.first()// RDD 中的第一個 item,對於文字檔案,就是第一行內容
  4. // res1: String = # Apache Spark
scala

接著演示 transformation,通過 filter transformation 來返回一個新的 RDD,程式碼如下:

  1. val linesWithSpark = textFile.filter(line => line.contains("Spark"))// 篩選出包含 Spark 的行
  2. linesWithSpark.count()// 統計行數
  3. // res4: Long = 17
scala

可以看到一共有 17 行內容包含 Spark,這與通過 Linux 命令 cat ./README.md | grep "Spark" -c 得到的結果一致,說明是正確的。action 和 transformation 可以用鏈式操作的方式結合使用,使程式碼更為簡潔:

  1. textFile.filter(line => line.contains("Spark")).count()// 統計包含 Spark 的行數
  2. // res4: Long = 17
scala

RDD的更多操作

RDD 的 actions 和 transformations 可用在更復雜的計算中,例如通過如下程式碼可以找到包含單詞最多的那一行內容共有幾個單詞:

  1. textFile.map(line => line.split(" ").size).reduce((a, b)=>if(a > b) a else b)
  2. // res5: Int = 14
scala

程式碼首先將每一行內容 map 為一個整數,這將建立一個新的 RDD,並在這個 RDD 中執行 reduce 操作,找到最大的數。map()、reduce() 中的引數是 Scala 的函式字面量(function literals,也稱為閉包 closures),並且可以使用語言特徵或 Scala/Java 的庫。例如,通過使用 Math.max() 函式(需要匯入 Java 的 Math 庫),可以使上述程式碼更容易理解:

  1. import java.lang.Math
  2. textFile.map(line => line.split(" ").size).reduce((a, b)=>Math.max(a, b))
  3. // res6: Int = 14
scala

Hadoop MapReduce 是常見的資料流模式,在 Spark 中同樣可以實現(下面這個例子也就是 WordCount):

  1. val wordCounts = textFile.flatMap(line => line.split(" ")).map(word =>(word,1)).reduceByKey((a, b)=> a + b)// 實現單詞統計
  2. // wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:29
  3. wordCounts.collect()// 輸出單詞統計結果
  4. // res7: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1), (processing.,1), (Because,1), (The,1)...)
scala

快取

Spark 支援在叢集範圍內將資料集快取至每一個節點的記憶體中,可避免資料傳輸,當資料需要重複訪問時這個特徵非常有用,例如查詢體積小的“熱”資料集,或是執行如 PageRank 的迭代演算法。呼叫 cache(),就可以將資料集進行快取:

  1. linesWithSpark.cache()
scala

Spark SQL 和 DataFrames

Spark SQL 是 Spark 內嵌的模組,用於結構化資料。在 Spark 程式中可以使用 SQL 查詢語句或 DataFrame API。DataFrames 和 SQL 提供了通用的方式來連線多種資料來源,支援 Hive、Avro、Parquet、ORC、JSON、和 JDBC,並且可以在多種資料來源之間執行 join 操作。

Spark SQL 的功能是通過 SQLContext 類來使用的,而建立 SQLContext 是通過 SparkContext 建立的。在 Spark shell 啟動時,輸出日誌的最後有這麼幾條資訊

16/01/16 13:25:41 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
16/01/16 13:25:41 INFO repl.SparkILoop: Created sql context..
SQL context available as sqlContext.

這些資訊表明 SparkContent 和 SQLContext 都已經初始化好了,可通過對應的 sc、sqlContext 變數直接進行訪問。

使用 SQLContext 可以從現有的 RDD 或資料來源建立 DataFrames。作為示例,我們通過 Spark 提供的 JSON 格式的資料來源檔案 ./examples/src/main/resources/people.json 來進行演示,該資料來源內容如下:

  1. {"name":"Michael"}
  2. {"name":"Andy","age":30}
  3. {"name":"Justin","age":19}
json

執行如下命令匯入資料來源,並輸出內容:

  1. val df = sqlContext.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
  2. // df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  3. df.show()// 輸出資料來源內容
  4. // +----+-------+
  5. // | age| name|
  6. // +----+-------+
  7. // |null|Michael|
  8. // | 30| Andy|
  9. // | 19| Justin|
  10. // +----+-------+
scala

接著,我們來演示 DataFrames 處理結構化資料的一些基本操作:

  1. df.select("name").show()// 只顯示 "name" 列
  2. // +-------+
  3. // | name|
  4. // +-------+
  5. // |Michael|
  6. // | Andy|
  7. // | Justin|
  8. // +-------+
  9. df.select(df("name"), df("age")+1).show()// 將 "age" 加 1
  10. // +-------+---------+
  11. // | name|(age + 1)|
  12. // +-------+---------+
  13. // |Michael| null|
  14. // | Andy| 31|
  15. // | Justin| 20|
  16. // +-------+---------+
  17. df.filter(df("age")>21).show()# 條件語句
  18. // +---+----+
  19. // |age|name|
  20. // +---+----+
  21. // | 30|Andy|
  22. // +---+----+
  23. df.groupBy("age").count().show()// groupBy 操作
  24. // +----+-----+
  25. // | age|count|
  26. // +----+-----+
  27. // |null| 1|
  28. // | 19| 1|
  29. // | 30| 1|
  30. // +----+-----+
scala

當然,我們也可以使用 SQL 語句來進行操作:

  1. df.registerTempTable("people")// 將 DataFrame 註冊為臨時表 people
  2. val result = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")// 執行 SQL 查詢
  3. result.show()// 輸出結果
  4. // +------+---+
  5. // | name|age|
  6. // +------+---+
  7. // |Justin| 19|
  8. // +------+---+
scala

更多的功能可以檢視完整的 DataFrames API ,此外 DataFrames 也包含了豐富的 DataFrames Function 可用於字串處理、日期計算、數學計算等。

Spark Streaming

流計算除了使用 Storm 框架,使用 Spark Streaming 也是一個很好的選擇。基於 Spark Streaming,可以方便地構建可拓展、高容錯的流計算應用程式。Spark Streaming 使用 Spark API 進行流計算,這意味著在 Spark 上進行流處理與批處理的方式一樣。因此,你可以複用批處理的程式碼,使用 Spark Streaming 構建強大的互動式應用程式,而不僅僅是用於分析資料。

下面以一個簡單的 Spark Streaming 示例(基於流的單詞統計)來演示一下 Spark Streaming:本地伺服器通過 TCP 接收文字資料,實時輸出單詞統計結果。該部分內容主要參考了 Spark Streaming 程式設計指南

執行該示例需要 Netcat(在網路上通過 TCP 或 UDP 讀寫資料),CentOS 6.x 系統中預設沒有安裝,經過測試,如果通過 yum 直接安裝,執行時會有 “nc: Protocol not available” 的錯誤,需要下載較低版本的 nc 才能正常使用。我們選擇 Netcat 0.6.1 版本,在終端中執行如下命令進行安裝:

  1. wget http://downloads.sourceforge.net/project/netcat/netcat/0.6.1/netcat-0.6.1-1.i386.rpm -O ~/netcat-0.6.1-1.i386.rpm # 下載
  2. sudo rpm -iUv ~/netcat-0.6.1-1.i386.rpm # 安裝
Shell 命令

安裝好 NetCat 之後,使用如下命令建立本地資料服務,監聽 TCP 埠 9999:

  1. # 記為終端 1
  2. nc -l -p 9999
Shell 命令

啟動後,該埠就被佔用了,需要開啟另一個終端執行示例程式,執行如下命令:

  1. # 需要另外開啟一個終端,記為終端 2,然後執行如下命令
  2. /usr/local/spark/bin/run-example streaming.NetworkWordCount localhost 9999
Shell 命令

接著在終端 1 中輸入文字,在終端 2 中就可以實時看到單詞統計結果了。

Spark Streaming 的內容較多,本教程就簡單介紹到這,更詳細的內容可檢視官網教程。最後需要關掉終端 2,並按 ctrl+c 退出 終端 1 的Netcat。

獨立應用程式(Self-Contained Applications)

接著我們通過一個簡單的應用程式 SimpleApp 來演示如何通過 Spark API 編寫一個獨立應用程式。使用 Scala 編寫的程式需要使用 sbt 進行編譯打包,相應的,Java 程式使用 Maven 編譯打包,而 Python 程式通過 spark-submit 直接提交。

應用程式程式碼

在終端中執行如下命令建立一個資料夾 sparkapp 作為應用程式根目錄:

  1. cd ~ # 進入使用者主資料夾
  2. mkdir ./sparkapp # 建立應用程式根目錄
  3. mkdir -p ./sparkapp/src/main/scala # 建立所需的資料夾結構
Shell 命令

在 ./sparkapp/src/main/scala 下建立一個名為 SimpleApp.scala 的檔案(vim ./sparkapp/src/main/scala/SimpleApp.scala),新增程式碼如下:

  1. /* SimpleApp.scala */
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.SparkContext._
  4. import org.apache.spark.SparkConf
  5. objectSimpleApp{
  6. def main(args:Array[String]){
  7. val logFile ="file:///usr/local/spark/README.md"// Should be some file on your system
  8. val conf =newSparkConf().setAppName("Simple Application")
  9. val sc =newSparkContext(conf)
  10. val logData = sc.textFile(logFile,2).cache()
  11. val numAs = logData.filter(line => line.contains("a")).count()
  12. val numBs = logData.filter(line => line.contains("b")).count()
  13. println(