1. 程式人生 > >《Spark 官方文件》Spark快速入門

《Spark 官方文件》Spark快速入門

快速入門

本教程是對Spark的一個快速簡介。首先,我們通過Spark的互動式shell介紹一下API(主要是Python或Scala),然後展示一下如何用Java、Scala、Python寫一個Spark應用。更完整參考看這裡:programming guide

首先,請到Spark website下載一個Spark釋出版本,以便後續方便學習。我們暫時還不會用到HDFS,所以你可以使用任何版本的Hadoop。

使用Spark shell互動式分析

基礎

利用Spark shell 很容易學習Spark API,同時也Spark shell也是強大的互動式資料分析工具。Spark shell既支援Scala(Scala版本的shell在Java虛擬機器中執行,所以在這個shell中可以引用現有的Java庫),也支援Python。在Spark目錄下執行下面的命令可以啟動一個Spark shell:

./bin/spark-shell

Spark最主要的抽象概念是個分散式集合,也叫作彈性分散式資料集(Resilient Distributed Dataset – RDD)。RDD可以由Hadoop InputFormats讀取HDFS檔案建立得來,或者從其他RDD轉換得到。下面我們就先利用Spark原始碼目錄下的README檔案來新建一個RDD:

scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = [email protected]

RDD有兩種運算元,action運算元(

actions)返回結果,transformation運算元()返回一個新RDD。我們先來看一下action運算元:

scala> textFile.count() // Number of items in this RDD
res0: Long = 126

scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark

再來看下如何使用transformation運算元。我們利用filter這個transformation運算元返回一個只包含原始檔案子集的新RDD。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = 
[email protected]

把這兩個例子串起來,我們可以這樣寫:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

更多RDD運算元

RDD action 和 transformation 運算元可以做更加複雜的計算。下面的程式碼中,我們將找出檔案中包含單詞數最多的行有多少個單詞:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

首先,用一個map運算元將每一行對映為一個整數,返回一個新RDD。然後,用reduce運算元找出這個RDD中最大的單詞數。map和reduce算組的引數都是scala 函式體(閉包),且函式體內可以使用任意的語言特性,或引用scala/java庫。例如,我們可以呼叫其他函式。為了好理解,下面我們用Math.max作為例子:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

Hadoop上的MapReduce是大家耳熟能詳的一種通用資料流模式。而Spark能夠輕鬆地實現MapReduce流程:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = [email protected]

這個例子裡,我使用了flatMapmap, and reduceByKey 這幾個transformation運算元,把每個單詞及其在檔案中出現的次數轉成一個包含(String,int)鍵值對的RDD,計算出每個單詞在檔案中出現的次數

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

快取

Spark同樣支援把資料集拉到叢集範圍的記憶體快取中。這對於需要重複訪問的資料非常有用,比如:查詢一些小而”熱“(頻繁訪問)的資料集 或者 執行一些迭代演算法(如 PageRank)。作為一個簡單的示例,我們把 linesWithSpark 這個資料集快取一下:

scala> linesWithSpark.cache()
res7: spark.RDD[String] = [email protected]

scala> linesWithSpark.count()
res8: Long = 19

scala> linesWithSpark.count()
res9: Long = 19

用Spark來快取一個100行左右的檔案,看起來確實有點傻。但有趣的是,同樣的程式碼可以用於快取非常大的資料集,即使這些資料集可能分佈在數十或數百個節點,也是一樣。你可以用 bin/spark-shell 連到一個叢集上來驗證一下,更詳細的請參考:programming guide.

獨立的應用程式

假設我們想寫一個獨立的Spark應用程式。我們將快速的過一下一個簡單的應用程式,分別用Scala(sbt編譯),Java(maven編譯)和Python。

首先用Scala新建一個簡單的Spark應用 – 簡單到連名字都叫SimpleApp.scala

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

注意,應用程式需要定義一個main方法,而不是繼承scala.App。scala.App的子類可能不能正常工作。

這個程式,統計了Spark README檔案中分別包含‘a’和’b’的行數。注意,你需要把YOUR_SPARK_HOME換成你的Spark安裝目錄。與之前用spark-shell不同,這個程式有一個單獨的SparkContext物件,我們初始化了這個SparkContext物件並將其作為程式的一部分。

我們把一個 SparkConf 物件傳給SparkContext的建構函式,SparkConf物件包含了我們這個應用程式的基本資訊和配置。

我們的程式需要依賴Spark API,所以我們需要包含一個sbt配置檔案,simple.sbt,在這個檔案裡,我們可以配置Spark依賴項。這個檔案同時也添加了Spark本身的依賴庫:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"

為了讓sbt能正常工作,我們需要一個典型的目錄結構來放SimpleApp.scala和simple.sbt程式。一旦建立好目錄,我們就可以建立一個jar包,然後用spark-submit指令碼執行我們的程式碼。

# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.10/simple-project_2.10-1.0.jar
...
Lines with a: 46, Lines with b: 23

下一步

恭喜你!你的首個Spark應用已經跑起來了!

  • 進一步的API參考,請看這裡:Spark programming guide,或者在其他頁面上點選 “Programming Guides”選單
  • 最後,Spark examples子目錄下包含了多個示例,你可以這樣來執行這些例子:
# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R