1. 程式人生 > >Spark快速入門指南 – Spark安裝與基礎使用

Spark快速入門指南 – Spark安裝與基礎使用

Apache Spark 是一個新興的大資料處理通用引擎,提供了分散式的記憶體抽象。Spark 正如其名,最大的特點就是快(Lightning-fast),可比 Hadoop MapReduce 的處理速度快 100 倍。此外,Spark 提供了簡單易用的 API,幾行程式碼就能實現 WordCount。本教程主要參考官網快速入門教程,介紹了 Spark 的安裝,Spark shell 、RDD、Spark SQL、Spark Streaming 等的基本使用。

本教程的具體執行環境如下:

  • CentOS 6.4
  • Spark 1.6
  • Hadoop 2.6.0
  • Java JDK 1.7
  • Scala 2.10.5

準備工作

執行 Spark 需要 Java JDK 1.7,CentOS 6.x 系統預設只安裝了 Java JRE,還需要安裝 Java JDK,並配置好 JAVA_HOME 變數。此外,Spark 會用到 HDFS 與 YARN,因此請先安裝 Hadoop,具體請瀏覽Hadoop安裝教程,在此就不再複述。

安裝 Spark

待 Hadoop 安裝好之後,我們再開始安裝 Spark。

從官網下載 Spark從官網下載 Spark

Package type

  • Source code: Spark 原始碼,需要編譯才能使用,另外 Scala 2.11 需要使用原始碼編譯才可使用
  • Pre-build with user-provided Hadoop: “Hadoop free” 版,可應用到任意 Hadoop 版本
  • Pre-build for Hadoop 2.6 and later: 基於 Hadoop 2.6 的預先編譯版,需要與本機安裝的 Hadoop 版本對應。可選的還有 Hadoop 2.4 and later、Hadoop 2.3、Hadoop 1.x,以及 CDH 4。

為方便,本教程選擇的是 Pre-build with user-provided Hadoop,簡單配置後可應用到任意 Hadoop 版本。

下載後,執行如下命令進行安裝:

  1. sudo tar -zxf ~/下載/spark-1.6.0-bin-without-hadoop.tgz -C /usr/local/
  2. cd /usr/local
  3. sudo mv ./spark-1.6.0-bin-without-hadoop/ ./spark
  4. sudo chown -R hadoop:hadoop ./spark # 此處的 hadoop 為你的使用者名稱

Shell 命令

安裝後,需要在 ./conf/spark-env.sh 中修改 Spark 的 Classpath,執行如下命令拷貝一個配置檔案:

  1. cd /usr/local/spark
  2. cp ./conf/spark-env.sh.template ./conf/spark-env.sh

Shell 命令

編輯 ./conf/spark-env.sh(vim ./conf/spark-env.sh) ,在最後面加上如下一行:

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

儲存後,Spark 就可以啟動、運行了。

執行 Spark 示例

注意,必須安裝 Hadoop 才能使用 Spark,但如果使用 Spark 過程中沒用到 HDFS,不啟動 Hadoop 也是可以的。此外,接下來教程中出現的命令、目錄,若無說明,則一般以 Spark 的安裝目錄(/usr/local/spark)為當前路徑,請注意區分。

在 ./examples/src/main 目錄下有一些 Spark 的示例程式,有 Scala、Java、Python、R 等語言的版本。我們可以先執行一個示例程式 SparkPi(即計算 π 的近似值),執行如下命令:

  1. cd /usr/local/spark
  2. ./bin/run-example SparkPi

Shell 命令

執行時會輸出非常多的執行資訊,輸出結果不容易找到,可以通過 grep 命令進行過濾(命令中的 2>&1 可以將所有的資訊都輸出到 stdout 中,否則由於輸出日誌的性質,還是會輸出到螢幕中):

  1. ./bin/run-example SparkPi 2>&1 | grep "Pi is roughly"

Shell 命令

過濾後的執行結果如下圖所示,可以得到 π 的 5 位小數近似值 :

從官網下載 Spark從官網下載 Spark

Python 版本的 SparkPi 則需要通過 spark-submit 執行:

  1. ./bin/spark-submit examples/src/main/python/pi.py

Shell 命令

通過 Spark Shell 進行互動分析

Spark shell 提供了簡單的方式來學習 API,也提供了互動的方式來分析資料。Spark Shell 支援 Scala 和 Python,本教程選擇使用 Scala 來進行介紹。

Scala

Scala 是一門現代的多正規化程式語言,志在以簡練、優雅及型別安全的方式來表達常用程式設計模式。它平滑地集成了面向物件和函式語言的特性。Scala 運行於 Java 平臺(JVM,Java 虛擬機器),併兼容現有的 Java 程式。

Scala 是 Spark 的主要程式語言,如果僅僅是寫 Spark 應用,並非一定要用 Scala,用 Java、Python 都是可以的。使用 Scala 的優勢是開發效率更高,程式碼更精簡,並且可以通過 Spark Shell 進行互動式實時查詢,方便排查問題。

執行如下命令啟動 Spark Shell:

  1. ./bin/spark-shell

Shell 命令

啟動成功後如圖所示,會有 “scala >” 的命令提示符。

成功啟動Spark Shell成功啟動Spark Shell

基礎操作

Spark 的主要抽象是分散式的元素集合(distributed collection of items),稱為RDD(Resilient Distributed Dataset,彈性分散式資料集),它可被分發到叢集各個節點上,進行並行操作。RDDs 可以通過 Hadoop InputFormats 建立(如 HDFS),或者從其他 RDDs 轉化而來。

我們從 ./README 檔案新建一個 RDD,程式碼如下(本文出現的 Spark 互動式命令程式碼中,與位於同一行的註釋內容為該命令的說明,命令之後的註釋內容表示互動式輸出結果):

  1. val textFile = sc.textFile("file:///usr/local/spark/README.md")
  2. // textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:27

scala

程式碼中通過 “file://” 字首指定讀取本地檔案。Spark shell 預設是讀取 HDFS 中的檔案,需要先上傳檔案到 HDFS 中,否則會有“org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/README.md”的錯誤。

上述命令的輸出結果如下圖所示:

新建RDD新建RDD

RDDs 支援兩種型別的操作

  • actions: 在資料集上執行計算後返回值

下面我們就來演示 count() 和 first() 操作:

  1. textFile.count() // RDD 中的 item 數量,對於文字檔案,就是總行數
  2. // res0: Long = 95
  3. textFile.first() // RDD 中的第一個 item,對於文字檔案,就是第一行內容
  4. // res1: String = # Apache Spark

scala

接著演示 transformation,通過 filter transformation 來返回一個新的 RDD,程式碼如下:

  1. val linesWithSpark = textFile.filter(line => line.contains("Spark")) // 篩選出包含 Spark 的行
  2. linesWithSpark.count() // 統計行數
  3. // res4: Long = 17

scala

可以看到一共有 17 行內容包含 Spark,這與通過 Linux 命令 cat ./README.md | grep "Spark" -c 得到的結果一致,說明是正確的。action 和 transformation 可以用鏈式操作的方式結合使用,使程式碼更為簡潔:

  1. textFile.filter(line => line.contains("Spark")).count() // 統計包含 Spark 的行數
  2. // res4: Long = 17

scala

RDD的更多操作

RDD 的 actions 和 transformations 可用在更復雜的計算中,例如通過如下程式碼可以找到包含單詞最多的那一行內容共有幾個單詞:

  1. textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
  2. // res5: Int = 14

scala

程式碼首先將每一行內容 map 為一個整數,這將建立一個新的 RDD,並在這個 RDD 中執行 reduce 操作,找到最大的數。map()、reduce() 中的引數是 Scala 的函式字面量(function literals,也稱為閉包 closures),並且可以使用語言特徵或 Scala/Java 的庫。例如,通過使用 Math.max() 函式(需要匯入 Java 的 Math 庫),可以使上述程式碼更容易理解:

  1. import java.lang.Math
  2. textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
  3. // res6: Int = 14

scala

Hadoop MapReduce 是常見的資料流模式,在 Spark 中同樣可以實現(下面這個例子也就是 WordCount):

  1. val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) // 實現單詞統計
  2. // wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:29
  3. wordCounts.collect() // 輸出單詞統計結果
  4. // res7: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1), (processing.,1), (Because,1), (The,1)...)

scala

快取

Spark 支援在叢集範圍內將資料集快取至每一個節點的記憶體中,可避免資料傳輸,當資料需要重複訪問時這個特徵非常有用,例如查詢體積小的“熱”資料集,或是執行如 PageRank 的迭代演算法。呼叫 cache(),就可以將資料集進行快取:

  1. linesWithSpark.cache()

scala

Spark SQL 和 DataFrames

Spark SQL 是 Spark 內嵌的模組,用於結構化資料。在 Spark 程式中可以使用 SQL 查詢語句或 DataFrame API。DataFrames 和 SQL 提供了通用的方式來連線多種資料來源,支援 Hive、Avro、Parquet、ORC、JSON、和 JDBC,並且可以在多種資料來源之間執行 join 操作。

Spark SQL 的功能是通過 SQLContext 類來使用的,而建立 SQLContext 是通過 SparkContext 建立的。在 Spark shell 啟動時,輸出日誌的最後有這麼幾條資訊

16/01/16 13:25:41 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
16/01/16 13:25:41 INFO repl.SparkILoop: Created sql context..
SQL context available as sqlContext.

這些資訊表明 SparkContent 和 SQLContext 都已經初始化好了,可通過對應的 sc、sqlContext 變數直接進行訪問。

使用 SQLContext 可以從現有的 RDD 或資料來源建立 DataFrames。作為示例,我們通過 Spark 提供的 JSON 格式的資料來源檔案 ./examples/src/main/resources/people.json 來進行演示,該資料來源內容如下:

  1. {"name":"Michael"}
  2. {"name":"Andy", "age":30}
  3. {"name":"Justin", "age":19}

json

執行如下命令匯入資料來源,並輸出內容:

  1. val df = sqlContext.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
  2. // df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  3. df.show() // 輸出資料來源內容
  4. // +----+-------+
  5. // | age| name|
  6. // +----+-------+
  7. // |null|Michael|
  8. // | 30| Andy|
  9. // | 19| Justin|
  10. // +----+-------+

scala

接著,我們來演示 DataFrames 處理結構化資料的一些基本操作:

  1. df.select("name").show() // 只顯示 "name" 列
  2. // +-------+
  3. // | name|
  4. // +-------+
  5. // |Michael|
  6. // | Andy|
  7. // | Justin|
  8. // +-------+
  9. df.select(df("name"), df("age") + 1).show() // 將 "age" 加 1
  10. // +-------+---------+
  11. // | name|(age + 1)|
  12. // +-------+---------+
  13. // |Michael| null|
  14. // | Andy| 31|
  15. // | Justin| 20|
  16. // +-------+---------+
  17. df.filter(df("age") > 21).show() # 條件語句
  18. // +---+----+
  19. // |age|name|
  20. // +---+----+
  21. // | 30|Andy|
  22. // +---+----+
  23. df.groupBy("age").count().show() // groupBy 操作
  24. // +----+-----+
  25. // | age|count|
  26. // +----+-----+
  27. // |null| 1|
  28. // | 19| 1|
  29. // | 30| 1|
  30. // +----+-----+

scala

當然,我們也可以使用 SQL 語句來進行操作:

  1. df.registerTempTable("people") // 將 DataFrame 註冊為臨時表 people
  2. val result = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") // 執行 SQL 查詢
  3. result.show() // 輸出結果
  4. // +------+---+
  5. // | name|age|
  6. // +------+---+
  7. // |Justin| 19|
  8. // +------+---+

scala

更多的功能可以檢視完整的 DataFrames API ,此外 DataFrames 也包含了豐富的 DataFrames Function 可用於字串處理、日期計算、數學計算等。

Spark Streaming

流計算除了使用 Storm 框架,使用 Spark Streaming 也是一個很好的選擇。基於 Spark Streaming,可以方便地構建可拓展、高容錯的流計算應用程式。Spark Streaming 使用 Spark API 進行流計算,這意味著在 Spark 上進行流處理與批處理的方式一樣。因此,你可以複用批處理的程式碼,使用 Spark Streaming 構建強大的互動式應用程式,而不僅僅是用於分析資料。

下面以一個簡單的 Spark Streaming 示例(基於流的單詞統計)來演示一下 Spark Streaming:本地伺服器通過 TCP 接收文字資料,實時輸出單詞統計結果。該部分內容主要參考了 Spark Streaming 程式設計指南

執行該示例需要 Netcat(在網路上通過 TCP 或 UDP 讀寫資料),CentOS 6.x 系統中預設沒有安裝,經過測試,如果通過 yum 直接安裝,執行時會有 “nc: Protocol not available” 的錯誤,需要下載較低版本的 nc 才能正常使用。我們選擇 Netcat 0.6.1 版本,在終端中執行如下命令進行安裝:

  1. wget http://downloads.sourceforge.net/project/netcat/netcat/0.6.1/netcat-0.6.1-1.i386.rpm -O ~/netcat-0.6.1-1.i386.rpm # 下載
  2. sudo rpm -iUv ~/netcat-0.6.1-1.i386.rpm # 安裝

Shell 命令

安裝好 NetCat 之後,使用如下命令建立本地資料服務,監聽 TCP 埠 9999:

  1. # 記為終端 1
  2. nc -l -p 9999

Shell 命令

啟動後,該埠就被佔用了,需要開啟另一個終端執行示例程式,執行如下命令:

  1. # 需要另外開啟一個終端,記為終端 2,然後執行如下命令
  2. /usr/local/spark/bin/run-example streaming.NetworkWordCount localhost 9999

Shell 命令

接著在終端 1 中輸入文字,在終端 2 中就可以實時看到單詞統計結果了。

Spark Streaming 的內容較多,本教程就簡單介紹到這,更詳細的內容可檢視官網教程。最後需要關掉終端 2,並按 ctrl+c 退出 終端 1 的Netcat。

獨立應用程式(Self-Contained Applications)

接著我們通過一個簡單的應用程式 SimpleApp 來演示如何通過 Spark API 編寫一個獨立應用程式。使用 Scala 編寫的程式需要使用 sbt 進行編譯打包,相應的,Java 程式使用 Maven 編譯打包,而 Python 程式通過 spark-submit 直接提交。

應用程式程式碼

在終端中執行如下命令建立一個資料夾 sparkapp 作為應用程式根目錄:

  1. cd ~ # 進入使用者主資料夾
  2. mkdir ./sparkapp # 建立應用程式根目錄
  3. mkdir -p ./sparkapp/src/main/scala # 建立所需的資料夾結構

Shell 命令

在 ./sparkapp/src/main/scala 下建立一個名為 SimpleApp.scala 的檔案(vim ./sparkapp/src/main/scala/SimpleApp.scala),新增程式碼如下:

  1. /* SimpleApp.scala */
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.SparkContext._
  4. import org.apache.spark.SparkConf
  5. object SimpleApp {
  6. def main(args: Array[String]) {
  7. val logFile = "file:///usr/local/spark/README.md" // Should be some file on your system
  8. val conf = new SparkConf().setAppName("Simple Application")
  9. val sc = new SparkContext(conf)
  10. val logData = sc.textFile(logFile, 2).cache()
  11. val numAs = logData.filter(line => line.contains("a")).count()
  12. val numBs = logData.filter(line => line.contains("b")).count()
  13. println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  14. }
  15. }

scala

該程式計算 /usr/local/spark/README 檔案中包含 “a” 的行數 和包含 “b” 的行數。程式碼第8行的 /usr/local/spark 為 Spark 的安裝目錄,如果不是該目錄請自行修改。不同於 Spark shell,獨立應用程式需要通過 val sc = new SparkContext(conf) 初始化 SparkContext,SparkContext 的引數 SparkConf 包含了應用程式的資訊。

該程式依賴 Spark API,因此我們需要通過 sbt 進行編譯打包。在 ./sparkapp 中新建檔案 simple.sbt(vim ./sparkapp/simple.sbt),新增內容如下,宣告該獨立應用程式的資訊以及與 Spark 的依賴關係:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"

檔案 simple.sbt 需要指明 Spark 和 Scala 的版本。啟動 Spark shell 的過程中,當輸出到 Spark 的符號圖形時,可以看到相關的版本資訊。

檢視 Spark 和 Scala 的版本資訊檢視 Spark 和 Scala 的版本資訊

安裝 sbt

Spark 中沒有自帶 sbt,需要手動安裝 sbt,我們選擇安裝在 /usr/local/sbt 中:

  1. sudo mkdir /usr/local/sbt
  2. sudo chown -R hadoop /usr/local/sbt # 此處的 hadoop 為你的使用者名稱
  3. cd /usr/local/sbt

Shell 命令

經筆者測試,按官網教程安裝 sbt 0.13.9 後,使用時可能存在網路問題,無法下載依賴包,導致 sbt 無法正常使用,需要進行一定的修改。為方便,請使用筆者修改後的版本,下載地址:http://pan.baidu.com/s/1eRyFddw

下載後,執行如下命令拷貝至 /usr/local/sbt 中:

  1. cp ~/下載/sbt-launch.jar .

Shell 命令

接著在 /usr/local/sbt 中建立 sbt 指令碼(vim ./sbt),新增如下內容:

  1. #!/bin/bash
  2. SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
  3. java $SBT_OPTS -jar `dirname $0`/sbt-launch.jar "[email protected]"

Shell 命令

儲存後,為 ./sbt 指令碼增加可執行許可權:

  1. chmod u+x ./sbt

Shell 命令

最後檢驗 sbt 是否可用(首次執行會處於 “Getting org.scala-sbt sbt 0.13.9 …” 的下載狀態,請耐心等待。筆者等待了 7 分鐘才出現第一條下載提示):

  1. ./sbt sbt-version

Shell 命令

下載過程中可能會類似 “Server access Error: java.security.ProviderException: java.security.KeyException url=https://jcenter.bintray.com/org/scala-sbt/precompiled-2_9_3/0.13.9/precompiled-2_9_3-0.13.9.jar” 的錯誤,可以忽略。可再執行一次 ./sbt sbt-version,只要能得到如下圖的版本資訊就沒問題:

驗證 sbt 是否可用驗證 sbt 是否可用

如果由於網路問題無法下載依賴,導致 sbt 無法正確執行的話,可以下載筆者提供的離線依賴包 sbt-0.13.9-repo.tar.gz 到本地中(依賴包的本地位置為 ~/.sbt 和 ~/.ivy2,檢查依賴關係時,首先檢查本地,本地未找到,再從網路中下載),下載地址:http://pan.baidu.com/s/1sjTQ8yD。下載後,執行如下命令解壓依賴包:

  1. tar -zxf ~/下載/sbt-0.13.9-local-repo.tar.gz ~

Shell 命令

通過這個方式,一般可以解決依賴包缺失的問題(讀者提供的依賴包僅適合於 Spark 1.6 版本,不同版本依賴關係不一樣)。

如果對 sbt 存在的網路問題以及如何解決感興趣,請點選下方檢視。

點選檢視:解決 sbt 無法下載依賴包的問題

使用 sbt 打包 Scala 程式

為保證 sbt 能正常執行,先執行如下命令檢查整個應用程式的檔案結構:

  1. cd ~/sparkapp
  2. find .

Shell 命令

檔案結構應如下圖所示:

SimpleApp的檔案結構SimpleApp的檔案結構

接著,我們就可以通過如下程式碼將整個應用程式打包成 JAR(首次運行同樣需要下載依賴包,如果這邊遇到網路問題無法成功,也請下載上述安裝 sbt 提到的離線依賴包 sbt-0.13.9-repo.tar.gz ):

  1. /usr/local/sbt/sbt package

Shell 命令

打包成功的話,會輸出如下圖內容:

SimpleApp的檔案結構SimpleApp的檔案結構

生成的 jar 包的位置為 ~/sparkapp/target/scala-2.10/simple-project_2.10-1.0.jar。

通過 spark-submit 執行程式

最後,我們就可以將生成的 jar 包通過 spark-submit 提交到 Spark 中運行了,命令如下:

  1. /usr/local/spark/bin/spark-submit --class "SimpleApp" ~/sparkapp/target/scala-2.10/simple-project_2.10-1.0.jar
  2. # 輸出資訊太多,可以通過如下命令過濾直接檢視結果
  3. /usr/local/spark/bin/spark-submit --class "SimpleApp" ~/sparkapp/target/scala-2.10/simple-project_2.10-1.0.jar 2>&1 | grep "Lines with a:"

Shell 命令

最終得到的結果如下:

Lines with a: 58, Lines with b: 26

自此,你就完成了你的第一個 Spark 應用程式了。

進階學習

Spark 官網提供了完善的學習文件(許多技術文件都只有英文版本,因此學會檢視英文文件也是學習大資料技術的必備技能):