1. 程式人生 > >Spark官方文件-快速入門

Spark官方文件-快速入門

本文為使用Spark提供了簡明介紹。我們首先會通過Spark的互動shell(使用Python或者Scala語言)介紹API,然後展示如何用Java,Scala和Python寫應用程式。更加完整的介紹請看程式設計指南

為了能跟隨本文進行實踐,首先需要搭建Spark執行環境(可以參考搭建Spark單節點本地執行環境)。由於我們不會用到HDFS,你可以下載任何版本的Hadoop包。

使用Spark Shell進行互動的分析

基礎

Spark的shell提供了一種學習API的簡單方式,也是一種互動地分析資料的強大工具。既適用於Scala(Scala執行在Java虛擬機器上,因此可以很好的使用已有的Java庫),也適用於Python。在Spark目錄下執行如下命令來啟動shell:

./bin/spark-shell

Spark的基本抽象是叫做RDD的彈性分散式資料集合。RDDs可以從Hadoop InputFormats(例如HDFS檔案)建立或者從其它RDDs轉變而來。讓我們從位於Spark目錄的README檔案建立一個新的RDD:

scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String]= README.md MapPartitionsRDD[1] at textFile at <console>:25

RDDs具有可以返回values的actions,以及返回指向新RDDs的指標的transformations。首先從一些actions開始:

scala> textFile.count()// Number of items in this RDD
res0:Long=126// May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first()// First item in this RDD
res1:String=# Apache Spark

現在讓我們使用一個transformation。我們將會用filter transformation來返回一個新的RDD,這個新的RDD是檔案中的資料子集。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[2] at filter at <console>:27

我們可以將transformations和actions鏈到一起:

scala> textFile.filter(line => line.contains("Spark")).count()// How many lines contain "Spark"?
res3:Long=15

關於RDD的更多操作

RDD actions和transformations可以被用在更加複雜的計算中。例如我們想找到單詞數最多的一行:

scala> textFile.map(line => line.split(" ").size).reduce((a, b)=>if(a > b) a else b)
res4:Long=15

首先將一行對映為一個整數值,建立了一個新的RDD。在這個新RDD之上呼叫reduce,來找到最大值。map和reduce的引數是Scala函式,這裡的引數可以使用任何語言特徵或者Scala/Java庫。例如,我們可以簡單地呼叫其它地方宣告的函式。我們將會使用函式Math.max()使得程式碼更容易理解:

scala>import java.lang.Mathimport java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b)=>Math.max(a, b))
res5:Int=15

MapReduce是一種通用的資料流模式,通過Hadoop普及開來。Spark可以很容易地實現MapReduce流:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word =>(word,1)).reduceByKey((a, b)=> a + b)
wordCounts: org.apache.spark.rdd.RDD[(String,Int)]=ShuffledRDD[8] at reduceByKey at <console>:28

這裡,我們將flatMap,map和reduceByKey transformations組合在一起作為(String, Int) pairs的RDD,來計算檔案中每個單詞的個數。為了在shell中收集單詞個數,我們可以使用collect action:

scala> wordCounts.collect()
res6:Array[(String,Int)]=Array((means,1),(under,2),(this,3),(Because,1),(Python,2),(agree,1),(cluster.,1),...)

快取

Spark也支援將資料集合拉進叢集範圍內的快取中。當資料被重複訪問時,這很有用,例如當查詢一小部分熱點資料或者執行一個類似PageRank的迭代演算法時。作為一個簡單的例子,讓我們將linesWithSpark資料集合標記為快取的:

scala> linesWithSpark.cache()
res7: linesWithSpark.type =MapPartitionsRDD[2] at filter at <console>:27

scala> linesWithSpark.count()
res8:Long=15

scala> linesWithSpark.count()
res9:Long=15

使用Spark探索並快取一個100行的文字檔案可能看起來很傻。有趣的是這些函式同樣可以應用到非常大的資料集合,即使它們是分佈在幾十甚至幾百個節點上的。你也可以通過將bin/spark-shell連線到叢集上做這種互動,正如程式設計指南上所描述的那樣。

獨立的應用程式

假如我們想使用Spark API寫一個獨立的應用程式。我們將會學習一個用Scala(使用sbt管理專案),Java(使用Maven管理專案)和Python編寫的簡單的應用程式。

我們將會建立一個非常簡單的Spark應用程式,將其命名為SimpleApp.scala:

/* SimpleApp.scala */import org.apache.spark.SparkContextimport org.apache.spark.SparkContext._
import org.apache.spark.SparkConfobjectSimpleApp{def main(args:Array[String]){
    val logFile ="YOUR_SPARK_HOME/README.md"// Should be some file on your system
    val conf =newSparkConf().setAppName("Simple Application")
    val sc =newSparkContext(conf)
    val logData = sc.textFile(logFile,2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    sc.stop()}}

注意應用程式應該定義一個方法main(),而不是擴充套件scala.Appscala.App的子類可能無法正確工作。

這個程式只是統計Spark目錄下檔案README.md含有’a’的行數以及含有’b’的行數。注意你需要將YOUR_SPARK_HOME替換成Spark安裝的位置路徑。和之前使用Spark shell的例子不同,這裡我們需要在程式中初始化一個SparkContext

我們給SparkContext建構函式傳遞了一個SparkConf物件,SparkConf物件中包含了我們的應用程式的資訊。

我們的應用程式依賴Spark API,所以需要一個sbt配置檔案build.sbt。這個檔案還增加了Spark依賴的一個庫:

name :="Simple Project"

version :="1.0"

scalaVersion :="2.11.7"

libraryDependencies +="org.apache.spark"%%"spark-core"%"2.1.1"

為了使sbt工作正常,我們需要將SimpleApp.scala和build.sbt放到正確的目錄結構中。一旦放好之後,我們就可以建立一個包含了應用程式程式碼的JAR包,然後使用指令碼spark-submit執行我們的程式。

# Your directory layout should look like this
$ find .../build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package...[info]Packaging{..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class"SimpleApp" \
  --master local[4] \
  target/scala-2.11/simple-project_2.11-1.0.jar
...Lineswith a:46,Lineswith b:23

下一步學習什麼

恭喜運行了你的第一個Spark應用程式!

為了深入地學習API,從Spark程式設計指南開始學習,或者檢視程式設計指南選單學習其它組成部分。

為了在叢集上執行應用程式,可以學習叢集模式

最後,Spark在目錄examples中提供了一些例子(ScalaJavaPythonR)。你可以按如下方式執行它們:

# For Scala and Java, use run-example:./bin/run-example SparkPi# For Python examples, use spark-submit directly:./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:./bin/spark-submit examples/src/main/r/dataframe.R