1. 程式人生 > >《Spark 官方文件》Spark程式設計指南

《Spark 官方文件》Spark程式設計指南

概述

總體上來說,每個Spark應用都包含一個驅動器(driver)程式,驅動器執行使用者的main函式,並在叢集上執行各種並行操作。

Spark最重要的一個抽象概念就是彈性分散式資料集(resilient distributed dataset – RDD),RDD是一個可分割槽的元素集合,其包含的元素可以分佈在叢集各個節點上,並且可以執行一些分散式並行操作。RDD通常是通過,HDFS(或者其他Hadoop支援的檔案系統)上的檔案,或者驅動器中的Scala集合物件,來建立或轉換得到;其次,使用者也可以請求Spark將RDD持久化到記憶體裡,以便在不同的並行操作裡複用之;最後,RDD具備容錯性,可以從節點失敗中自動恢復資料。

Spark第二個重要抽象概念是共享變數,共享變數是一種可以在並行操作之間共享使用的變數。預設情況下,當Spark把一系列任務排程到不同節點上執行時,Spark會同時把每個變數的副本和任務程式碼一起傳送給各個節點。但有時候,我們需要在任務之間,或者任務和驅動器之間共享一些變數。Spark提供了兩種型別的共享變數:廣播變數累加器,廣播變數可以用來在各個節點上快取資料,而累加器則是用來執行跨節點的“累加”操作,例如:計數和求和。

本文將會使用Spark所支援的所有語言來展示Spark的特性。如果你能啟動Spark的互動式shell動手實驗一下,效果會更好(對scala請使用bin/spark-shell,而對於python,請使用bin/pyspark)。

連結Spark

Spark 1.6.0 使用了Scala 2.10。用Scala寫應用的話,你需要使用一個相容的Scala版本(如:2.10.X)

同時,如果你需要在maven中依賴Spark,可以用如下maven工件標識:

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.6.0

另外,如果你需要訪問特定版本的HDFS,那麼你可能需要增加相應版本的hadoop-client依賴項,其maven工件標識如下:

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最後,你需要如下,在你的程式碼裡匯入一些Spark class:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(在Spark 1.3.0之前,你需要顯示的 import org.apache.spark.SparkContext._ 來啟用這些重要的隱式轉換)

初始化Spark

Spark應用程式需要做的第一件事就是建立一個 SparkContext 物件,SparkContext物件決定了Spark如何訪問叢集。而要新建一個SparkContext物件,你還得需要構造一個 SparkConf 物件,SparkConf物件包含了你的應用程式的配置資訊。

每個JVM程序中,只能有一個活躍(active)的SparkContext物件。如果你非要再新建一個,那首先必須將之前那個活躍的SparkContext 物件stop()掉。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

appName引數值是你的應用展示在叢集UI上的應用名稱。master引數值是Spark, Mesos or YARN cluster URL 或者特殊的“local”(本地模式)。實際上,一般不應該將master引數值硬編碼到程式碼中,而是應該用spark-submit指令碼的引數來設定。然而,如果是本地測試或單元測試中,你可以直接在程式碼裡給master引數寫死一個”local”值。

使用shell

在Spark shell中,預設已經為你新建了一個SparkContext物件,變數名為sc。所以spark-shell裡不能自建SparkContext物件。你可以通過–master引數設定要連線到哪個叢集,而且可以給–jars引數傳一個逗號分隔的jar包列表,以便將這些jar包加到classpath中。你還可以通過–packages設定逗號分隔的maven工件列表,以便增加額外的依賴項。同樣,還可以通過–repositories引數增加maven repository地址。下面是一個示例,在本地4個CPU core上執行的例項:

$ ./bin/spark-shell –master local[4]

或者,將code.jar新增到classpath下:

$ ./bin/spark-shell --master local[4] --jars code.jar

通過maven標識新增依賴:

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

spark-shell –help可以檢視完整的選項列表。實際上,spark-shell是在後臺呼叫spark-submit來實現其功能的(spark-submit script.)

彈性分散式資料集(RDD)

Spark的核心概念是彈性分散式資料集(RDD),RDD是一個可容錯、可並行操作的分散式元素集合。總體上有兩種方法可以建立RDD物件:由驅動程式中的集合物件通過並行化操作建立,或者從外部儲存系統中資料集載入(如:共享檔案系統、HDFS、HBase或者其他Hadoop支援的資料來源)。

並行化集合

並行化集合是以一個已有的集合物件(例如:Scala Seq)為引數,呼叫 SparkContext.parallelize() 方法建立得到的RDD。集合物件中所有的元素都將被複制到一個可並行操作的分散式資料集中。例如,以下程式碼將一個1到5組成的陣列並行化成一個RDD:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

一旦建立成功,該分散式資料集(上例中的distData)就可以執行一些並行操作。如,distData.reduce((a, b) => a + b),這段程式碼會將集合中所有元素加和。後面我們還會繼續討論分散式資料集上的各種操作。

並行化集合的一個重要引數是分割槽(partition),即這個分散式資料集可以分割為多少片。Spark中每個任務(task)都是基於分割槽的,每個分割槽一個對應的任務(task)。典型場景下,一般每個CPU對應2~4個分割槽。並且一般而言,Spark會基於叢集的情況,自動設定這個分割槽數。當然,你還是可以手動控制這個分割槽數,只需給parallelize方法再傳一個引數即可(如:sc.parallelize(data, 10) )。注意:Spark程式碼裡有些地方仍然使用分片(slice)這個術語,這只不過是分割槽的一個別名,主要為了保持向後相容。

外部資料集

Spark 可以通過Hadoop所支援的任何資料來源來建立分散式資料集,包括:本地檔案系統、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支援的檔案格式包括:文字檔案(text files)、SequenceFiles,以及其他 Hadoop 支援的輸入格式(InputFormat)。

文字檔案建立RDD可以用 SparkContext.textFile 方法。這個方法輸入引數是一個檔案的URI(本地路徑,或者 hdfs://,s3n:// 等),其輸出RDD是一個文字行集合。以下是一個簡單示例:

scala> val distFile = sc.textFile("data.txt")
distFile: RDD[String] = [email protected]

建立後,distFile 就可以執行資料集的一些操作。比如,我們可以把所有文字行的長度加和:distFile.map(s => s.length).reduce((a, b) => a + b)

以下是一些Spark讀取檔案的要點:

  • 如果是本地檔案系統,那麼這個檔案必須在所有的worker節點上能夠以相同的路徑訪問到。所以要麼把檔案複製到所有worker節點上同一路徑下,要麼掛載一個共享檔案系統。
  • 所有Spark基於檔案輸入的方法(包括textFile)都支援輸入引數為:目錄,壓縮檔案,以及萬用字元。例如:textFile(“/my/directory”), textFile(“/my/directory/*.txt”), 以及 textFile(“/my/directory/*.gz”)
  • textFile方法同時還支援一個可選引數,用以控制資料的分割槽個數。預設地,Spark會為檔案的每一個block建立一個分割槽(HDFS上預設block大小為64MB),你可以通過調整這個引數來控制資料的分割槽數。注意,分割槽數不能少於block個數。

除了文字檔案之外,Spark的Scala API還支援其他幾種資料格式:

  • SparkContext.wholeTextFiles 可以讀取一個包含很多小文字檔案的目錄,並且以 (filename, content) 鍵值對的形式返回結果。這與textFile 不同,textFile只返回檔案的內容,每行作為一個元素。
  • 對於SequenceFiles,可以呼叫 SparkContext.sequenceFile[K, V],其中 K 和 V 分別是檔案中key和value的型別。這些型別都應該是 Writable 介面的子類, 如:IntWritable and Text 等。另外,Spark 允許你為一些常用Writable指定原生型別,例如:sequenceFile[Int, String] 將自動讀取 IntWritable 和 Text。
  • 對於其他的Hadoop InputFormat,你可以用 SparkContext.hadoopRDD 方法,並傳入任意的JobConf 物件和 InputFormat,以及key class、value class。這和設定Hadoop job的輸入源是同樣的方法。你還可以使用 SparkContext.newAPIHadoopRDD,該方法接收一個基於新版Hadoop MapReduce API (org.apache.hadoop.mapreduce)的InputFormat作為引數。
  • RDD.saveAsObjectFile 和 SparkContext.objectFile 支援將RDD中元素以Java物件序列化的格式儲存成檔案。雖然這種序列化方式不如Avro效率高,卻為儲存RDD提供了一種簡便方式。

RDD運算元

RDD支援兩種型別的運算元(operation):transformation運算元 和 action運算元;transformation運算元可以將已有RDD轉換得到一個新的RDD,而action運算元則是基於資料集計算,並將結果返回給驅動器(driver)。例如,map是一個transformation運算元,它將資料集中每個元素傳給一個指定的函式,並將該函式返回結果構建為一個新的RDD;而 reduce是一個action運算元,它可以將RDD中所有元素傳給指定的聚合函式,並將最終的聚合結果返回給驅動器(還有一個reduceByKey運算元,其返回的聚合結果是一個數據集)。

Spark中所有transformation運算元都是懶惰的,也就是說,這些運算元並不立即計算結果,而是記錄下對基礎資料集(如:一個數據檔案)的轉換操作。只有等到某個action運算元需要計算一個結果返回給驅動器的時候,transformation運算元所記錄的操作才會被計算。這種設計使Spark可以執行得更加高效 – 例如,map運算元建立了一個數據集,同時該資料集下一步會呼叫reduce運算元,那麼Spark將只會返回reduce的最終聚合結果(單獨的一個數據)給驅動器,而不是將map所產生的資料集整個返回給驅動器。

預設情況下,每次呼叫action運算元的時候,每個由transformation轉換得到的RDD都會被重新計算。然而,你也可以通過呼叫persist(或者cache)操作來持久化一個RDD,這意味著Spark將會把RDD的元素都儲存在叢集中,因此下一次訪問這些元素的速度將大大提高。同時,Spark還支援將RDD元素持久化到記憶體或者磁碟上,甚至可以支援跨節點多副本。

基礎

以下簡要說明一下RDD的基本操作,參考如下程式碼:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

其中,第一行是從外部檔案載入資料,並建立一個基礎RDD。這時候,資料集並沒有載入進記憶體除非有其他操作施加於lines,這時候的lines RDD其實可以說只是一個指向 data.txt 檔案的指標。第二行,用lines通過map轉換得到一個lineLengths RDD,同樣,lineLengths也是懶惰計算的。最後,我們使用 reduce運算元計算長度之和,reduce是一個action運算元。此時,Spark將會把計算分割為一些小的任務,分別在不同的機器上執行,每臺機器上都執行相關的一部分map任務,並在本地進行reduce,並將這些reduce結果都返回給驅動器。

如果我們後續需要重複用到 lineLengths RDD,我們可以增加一行:

lineLengths.persist()

這一行加在呼叫 reduce 之前,則 lineLengths RDD 首次計算後,Spark會將其資料儲存到記憶體中。

將函式傳給Spark

Spark的API 很多都依賴於在驅動程式中向叢集傳遞操作函式。以下是兩種建議的實現方式:

  • 全域性單件中的靜態方法。例如,你可以按如下方式定義一個 object MyFunctions 並傳遞其靜態成員函式 MyFunctions.func1:
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

注意,技術上來說,你也可以傳遞一個類物件例項上的方法(不是單件物件),不過這回導致傳遞函式的同時,需要把相應的物件也傳送到叢集中各節點上。例如,我們定義一個MyClass如下:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

如果我們 new MyClass 建立一個例項,並呼叫其 doStuff 方法,同時doStuff中的 map運算元引用了該MyClass例項上的 func1 方法,那麼接下來,這個MyClass物件將被髮送到叢集中所有節點上。rdd.map(x => this.func1(x)) 也會有類似的效果。

類似地,如果應用外部物件的成員變數,也會導致對整個物件例項的引用:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

上面的程式碼對field的引用等價於 rdd.map(x => this.field + x),這將導致應用整個this物件。為了避免類似問題,最簡單的方式就是,將field固執到一個本地臨時變數中,而不是從外部直接訪問之,如下:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

理解閉包

Spark裡一個比較難的事情就是,理解在整個叢集上跨節點執行的變數和方法的作用域以及生命週期。Spark裡一個頻繁出現的問題就是RDD運算元在變數作用域之外修改了其值。下面的例子,我們將會以foreach() 運算元為例,來遞增一個計數器counter,不過類似的問題在其他運算元上也會出現。

示例

考慮如下例子,我們將會計算RDD中原生元素的總和,如果不是在同一個JVM中執行,其表現將有很大不同。例如,這段程式碼如果使用Spark本地模式(–master=local[n])執行,和在叢集上執行(例如,用spark-submit提交到YARN上)結果完全不同。

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

本地模式 v.s. 叢集模式

上面這段程式碼其行為是不確定的。在本地模式下執行,所有程式碼都在運行於單個JVM中,所以RDD的元素都能夠被累加並儲存到counter變數中,這是因為本地模式下,counter變數和驅動器節點在同一個記憶體空間中。

然而,在叢集模式下,情況會更復雜,以上程式碼的執行結果就不是所預期的結果了。為了執行這個作業,Spark會將RDD運算元的計算過程分割成多個獨立的任務(task)- 每個任務分發給不同的執行器(executor)去執行。而執行之前,Spark需要計算閉包閉包是由執行器執行RDD運算元(本例中的foreach())時所需要的變數和方法組成的。閉包將會被序列化,併發送給每個執行器。由於本地模式下,只有一個執行器,所有任務都共享同樣的閉包。而在其他模式下,情況則有所不同,每個執行器都運行於不同的worker節點,並且都擁有獨立的閉包副本。

在上面的例子中,閉包中的變數會跟隨不同的閉包副本,傳送到不同的執行器上,所以等到foreach真正在執行器上執行時,其引用的counter已經不再是驅動器上所定義的那個counter副本了,驅動器記憶體中仍然會有一個counter變數副本,但是這個副本對執行器是不可見的!執行器只能看到其所收到的序列化閉包中包含的counter副本。因此,最終驅動器上得到的counter將會是0。

為了確保類似這樣的場景下,程式碼能有確定的行為,這裡應該使用累加器Accumulator。累加器是Spark中專門用於叢集跨節點分散式執行計算中,安全地更新同一變數的機制。本指南中專門有一節詳細說明累加器。

通常來說,閉包(由迴圈或本地方法組成),不應該改寫全域性狀態。Spark中改寫閉包之外物件的行為是未定義的。這種程式碼,有可能在本地模式下能正常工作,但這只是偶然情況,同樣的程式碼在分散式模式下其行為很可能不是你想要的。所以,如果需要全域性聚合,請記得使用累加器(

列印RDD中的元素

另一種常見習慣是,試圖用 rdd.foreach(println) 或者 rdd.map(println) 來列印RDD中所有的元素。如果是在單機上,這種寫法能夠如預期一樣,打印出RDD所有元素。然後,在叢集模式下,這些輸出將會被列印到執行器的標準輸出(stdout)上,因此驅動器的標準輸出(stdout)上神馬也看不到!如果真要在驅動器上把所有RDD元素都打印出來,你可以先呼叫collect運算元,把RDD元素先拉倒驅動器上來,程式碼可能是這樣:rdd.collect().foreach(println)。不過如果RDD很大的話,有可能導致驅動器記憶體溢位,因為collect會把整個RDD都弄到驅動器所在單機上來;如果你只是需要列印一部分元素,那麼take是不更安全的選擇:rdd.take(100).foreach(println)

使用鍵值對

大部分Spark運算元都能在包含任意型別物件的RDD上工作,但也有一部分特殊的運算元要求RDD包含的元素必須是鍵值對(key-value pair)。這種運算元常見於做分散式混洗(shuffle)操作,如:以key分組或聚合。

在Scala中,這種操作在包含 Tuple2 (內建與scala語言,可以這樣建立:(a, b) )型別物件的RDD上自動可用。鍵值對操作是在 PairRDDFunctions 類上可用,這個型別也會自動包裝到包含tuples的RDD上。

例如,以下程式碼將使用reduceByKey運算元來計算檔案中每行文字出現的次數:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

同樣,我們還可以用 counts.sortByKey() 來對這些鍵值對按字母排序,最後再用 counts.collect() 將資料以物件資料組的形式拉到驅動器記憶體中。

注意:如果使用自定義型別物件做鍵值對中的key的話,你需要確保自定義型別實現了 equals() 方法(通常需要同時也實現hashCode()方法)。完整的細節可以參考:Object.hashCode() documentation

轉換運算元 – transformation

以下是Spark支援的一些常用transformation運算元。詳細請參考 RDD API doc (ScalaJavaPythonR) 以及 鍵值對 RDD 函式 (ScalaJava) 。

transformation運算元 作用
map(func) 返回一個新的分散式資料集,其中每個元素都是由源RDD中一個元素經func轉換得到的。
filter(func) 返回一個新的資料集,其中包含的元素來自源RDD中元素經func過濾後(func返回true時才選中)的結果
flatMap(func) 類似於map,但每個輸入元素可以對映到0到n個輸出元素(所以要求func必須返回一個Seq而不是單個元素)
mapPartitions(func) 類似於map,但基於每個RDD分割槽(或者資料block)獨立執行,所以如果RDD包含元素型別為T,則 func 必須是 Iterator<T> => Iterator<U> 的對映函式。
mapPartitionsWithIndex(func) 類似於 mapPartitions,只是func 多了一個整型的分割槽索引值,因此如果RDD包含元素型別為T,則 func 必須是 Iterator<T> => Iterator<U> 的對映函式。
sample(withReplacementfractionseed) 取樣部分(比例取決於 fraction )資料,同時可以指定是否使用回置取樣(withReplacement),以及隨機數種子(seed)
union(otherDataset) 返回源資料集和引數資料集(otherDataset)的並集
intersection(otherDataset) 返回源資料集和引數資料集(otherDataset)的交集
distinct([numTasks])) 返回對源資料集做元素去重後的新資料集
groupByKey([numTasks]) 只對包含鍵值對的RDD有效,如源RDD包含 (K, V) 對,則該運算元返回一個新的資料集包含 (K, Iterable<V>) 對。
注意:如果你需要按key分組聚合的話(如sum或average),推薦使用 reduceByKey或者 aggregateByKey 以獲得更好的效能。
注意:預設情況下,輸出計算的並行度取決於源RDD的分割槽個數。當然,你也可以通過設定可選引數 numTasks 來指定並行任務的個數。
reduceByKey(func, [numTasks]) 如果源RDD包含元素型別 (K, V) 對,則該運算元也返回包含(K, V) 對的RDD,只不過每個key對應的value是經過func聚合後的結果,而func本身是一個 (V, V) => V 的對映函式。
另外,和 groupByKey 類似,可以通過可選引數 numTasks 指定reduce任務的個數。
aggregateByKey(zeroValue)(seqOpcombOp, [numTasks]) 如果源RDD包含 (K, V) 對,則返回新RDD包含 (K, U) 對,其中每個key對應的value都是由 combOp 函式 和 一個“0”值zeroValue 聚合得到。允許聚合後value型別和輸入value型別不同,避免了不必要的開銷。和 groupByKey 類似,可以通過可選引數 numTasks 指定reduce任務的個數。
sortByKey([ascending], [numTasks]) 如果源RDD包含元素型別 (K, V) 對,其中K可排序,則返回新的RDD包含 (K, V) 對,並按照 K 排序(升序還是降序取決於 ascending 引數)
join(otherDataset, [numTasks]) 如果源RDD包含元素型別 (K, V) 且引數RDD(otherDataset)包含元素型別(K, W),則返回的新RDD中將包含內關聯後key對應的 (K, (V, W)) 對。外關聯(Outer joins)操作請參考 leftOuterJoin、rightOuterJoin 以及 fullOuterJoin 運算元。
cogroup(otherDataset, [numTasks]) 如果源RDD包含元素型別 (K, V) 且引數RDD(otherDataset)包含元素型別(K, W),則返回的新RDD中包含 (K, (Iterable<V>, Iterable<W>))。該運算元還有個別名:groupWith
cartesian(otherDataset) 如果源RDD包含元素型別 T 且引數RDD(otherDataset)包含元素型別 U,則返回的新RDD包含前二者的笛卡爾積,其元素型別為 (T, U) 對。
pipe(command[envVars]) 以shell命令列管道處理RDD的每個分割槽,如:Perl 或者 bash 指令碼。
RDD中每個元素都將依次寫入程序的標準輸入(stdin),然後按行輸出到標準輸出(stdout),每一行輸出字串即成為一個新的RDD元素。
coalesce(numPartitions) 將RDD的分割槽數減少到numPartitions。當以後大資料集被過濾成小資料集後,減少分割槽數,可以提升效率。
repartition(numPartitions) 將RDD資料重新混洗(reshuffle)並隨機分佈到新的分割槽中,使資料分佈更均衡,新的分割槽個數取決於numPartitions。該運算元總是需要通過網路混洗所有資料。
repartitionAndSortWithinPartitions(partitioner) 根據partitioner(spark自帶有HashPartitioner和RangePartitioner等)重新分割槽RDD,並且在每個結果分割槽中按key做排序。這是一個組合運算元,功能上等價於先 repartition 再在每個分割槽內排序,但這個運算元內部做了優化(將排序過程下推到混洗同時進行),因此效能更好。

動作運算元 – action

以下是Spark支援的一些常用action運算元。詳細請參考 RDD API doc (ScalaJavaPythonR) 以及 鍵值對 RDD 函式 (ScalaJava) 。

Action運算元 作用
reduce(func) 將RDD中元素按func進行聚合(func是一個 (T,T) => T 的對映函式,其中T為源RDD元素型別,並且func需要滿足 交換律 和 結合律 以便支援平行計算)
collect() 將資料集中所有元素以陣列形式返回驅動器(driver)程式。通常用於,在RDD進行了filter或其他過濾操作後,將一個足夠小的資料子集返回到驅動器記憶體中。
count() 返回資料集中元素個數
first() 返回資料集中首個元素(類似於 take(1) )
take(n) 返回資料集中前 個元素
takeSample(withReplacement,num, [seed]) 返回資料集的隨機取樣子集,最多包含 num 個元素,withReplacement 表示是否使用回置取樣,最後一個引數為可選引數seed,隨機數生成器的種子。
takeOrdered(n[ordering]) 按元素排序(可以通過 ordering 自定義排序規則)後,返回前 n 個元素
saveAsTextFile(path) 將資料集中元素儲存到指定目錄下的文字檔案中(或者多個文字檔案),支援本地檔案系統、HDFS 或者其他任何Hadoop支援的檔案系統。
儲存過程中,Spark會呼叫每個元素的toString方法,並將結果儲存成檔案中的一行。
saveAsSequenceFile(path)
(Java and Scala)
將資料集中元素儲存到指定目錄下的Hadoop Sequence檔案中,支援本地檔案系統、HDFS 或者其他任何Hadoop支援的檔案系統。適用於實現了Writable介面的鍵值對RDD。在Scala中,同樣也適用於能夠被隱式轉換為Writable的型別(Spark實現了所有基本型別的隱式轉換,如:Int,Double,String 等)
saveAsObjectFile(path)
(Java and Scala)
將RDD元素以Java序列化的格式儲存成檔案,儲存結果檔案可以使用 SparkContext.objectFile 來讀取。
countByKey() 只適用於包含鍵值對(K, V)的RDD,並返回一個雜湊表,包含 (K, Int) 對,表示每個key的個數。
foreach(func) 在RDD的每個元素上執行 func 函式。通常被用於累加操作,如:更新一個累加器(Accumulator ) 或者 和外部儲存系統互操作。
注意:用 foreach 操作出累加器之外的變數可能導致未定義的行為。更詳細請參考前面的“理解閉包”(Understanding closures )這一小節。

混洗操作

有一些Spark運算元會觸發眾所周知的混洗(Shuffle)事件。Spark中的混洗機制是用於將資料重新分佈,其結果是所有資料將在各個分割槽間重新分組。一般情況下,混洗需要跨執行器(executor)或跨機器複製資料,這也是混洗操作一般都比較複雜而且開銷大的原因。

背景

為了理解混洗階段都發生了哪些事,我首先以 reduceByKey 運算元為例來看一下。reduceByKey運算元會生成一個新的RDD,將源RDD中一個key對應的多個value組合進一個tuple - 然後將這些values輸入給reduce函式,得到的result再和key關聯放入新的RDD中。這個運算元的難點在於對於某一個key來說,並非其對應的所有values都在同一個分割槽(partition)中,甚至有可能都不在同一臺機器上,但是這些values又必須放到一起計算reduce結果。

在Spark中,通常是由於為了進行某種計算操作,而將資料分佈到所需要的各個分割槽當中。而在計算階段,單個任務(task)只會操作單個分割槽中的資料 – 因此,為了組織好每個reduceByKey中reduce任務執行時所需的資料,Spark需要執行一個多對多操作。即,Spark需要讀取RDD的所有分割槽,並找到所有key對應的所有values,然後跨分割槽傳輸這些values,並將每個key對應的所有values放到同一分割槽,以便後續計算各個key對應values的reduce結果 – 這個過程就叫做混洗(Shuffle)。

雖然混洗好後,各個分割槽中的元素和分割槽自身的順序都是確定的,但是分割槽中元素的順序並非確定的。如果需要混洗後分區內的元素有序,可以參考使用以下混洗操作:

  • mapPartitions 使用 .sorted 對每個分割槽排序 
  • repartitionAndSortWithinPartitions 重分割槽的同時,對分割槽進行排序,比自行組合repartition和sort更高效
  • sortBy 建立一個全域性有序的RDD

會導致混洗的運算元有:重分割槽(repartition)類運算元,如: repartition 和 ByKey 類運算元(除了計數類的,如 countByKey) 如:groupByKey 和 cogroup 和 join.

效能影響

混洗(Shuffle)之所以開銷大,是因為混洗操作需要引入磁碟I/O,資料序列化以及網路I/O等操作。為了組織好混洗資料,Spark需要生成對應的任務集 – 一系列map任務用於組織資料,再用一系列reduce任務來聚合資料。注意這裡的map、reduce是來自MapReduce的術語,和Spark的map、reduce運算元並沒有直接關係。

在Spark內部,單個map任務的輸出會盡量儲存在記憶體中,直至放不下為止。然後,這些輸出會基於目標分割槽重新排序,並寫到一個檔案裡。在reduce端,reduce任務只讀取與之相關的並已經排序好的blocks。

某些混洗運算元會導致非常明顯的記憶體開銷增長,因為這些運算元需要在資料傳輸前後,在記憶體中維護組織資料記錄的各種資料結構。特別地,reduceByKey和aggregateByKey都會在map端建立這些資料結構,而ByKey系列運算元都會在reduce端建立這些資料結構。如果資料在記憶體中存不下,Spark會把資料吐到磁碟上,當然這回導致額外的磁碟I/O以及垃圾回收的開銷。

混洗還會再磁碟上生成很多臨時檔案。以Spark-1.3來說,這些臨時檔案會一直保留到其對應的RDD被垃圾回收才刪除。之所以這樣做,是因為如果血統資訊需要重新計算的時候,這些混洗檔案可以不必重新生成。如果程式持續引用這些RDD或者垃圾回收啟動頻率較低,那麼這些垃圾回收可能需要等較長的一段時間。這就意味著,長時間執行的Spark作業可能會消耗大量的磁碟。Spark的臨時儲存目錄,是由spark.local.dir 配置引數指定的。

混洗行為可以由一系列配置引數來調優。參考Spark配置指南(Spark Configuration Guide)中“混洗行為”這一小節。

RDD持久化

Spark的一項關鍵能力就是它可以持久化(或者快取)資料集在記憶體中,從而跨操作複用這些資料集。如果你持久化了一個RDD,那麼每個節點上都會儲存該RDD的一些分割槽,這些分割槽是由對應的節點計算出來並保持在記憶體中,後續可以在其他施加在該RDD上的action運算元中複用(或者從這些資料集派生新的RDD)。這使得後續動作的速度提高很多(通常高於10倍)。因此,快取對於迭代演算法和快速互動式分析是一個很關鍵的工具。

你可以用persist() 或者 cache() 來標記一下需要持久化的RDD。等到該RDD首次被施加action運算元的時候,其對應的資料分割槽就會被保留在記憶體裡。同時,Spark的快取具備一定的容錯性 – 如果RDD的任何一個分割槽丟失了,Spark將自動根據其原來的血統資訊重新計算這個分割槽。

另外,每個持久化的RDD可以使用不同的儲存級別,比如,你可以把RDD儲存在磁碟上,或者以java序列化物件儲存到記憶體裡(為了省空間),或者跨節點多副本,或者使用 Tachyon 存到虛擬機器以外的記憶體裡。這些儲存級別都可以由persist()的引數StorageLevel物件來控制。cache() 方法本身就是一個使用預設儲存級別做持久化的快捷方式,預設儲存級別是 StorageLevel.MEMORY_ONLY(以java序列化方式存到記憶體裡)。完整的儲存級別列表如下:

儲存級別 含義
MEMORY_ONLY 以未序列化的Java物件形式將RDD儲存在JVM記憶體中。如果RDD不能全部裝進記憶體,那麼將一部分分割槽快取,而另一部分分割槽將每次用到時重新計算。這個是Spark的RDD的預設儲存級別。
MEMORY_AND_DISK 以未序列化的Java物件形式儲存RDD在JVM中。如果RDD不能全部裝進記憶體,則將不能裝進記憶體的分割槽放到磁碟上,然後每次用到的時候從磁碟上讀取。
MEMORY_ONLY_SER 以序列化形式儲存RDD(每個分割槽一個位元組陣列)。通常這種方式比未序列化儲存方式要更省空間,尤其是如果你選用了一個比較好的序列化協議(fast serializer),但是這種方式也相應的會消耗更多的CPU來讀取資料。
MEMORY_AND_DISK_SER 和MEMORY_ONLY_SER類似,只是當記憶體裝不下的時候,會將分割槽的資料吐到磁碟上,而不是每次用到都重新計算。
DISK_ONLY RDD資料只儲存於磁碟上。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 和上面沒有”_2″的級別相對應,只不過每個分割槽資料會在兩個節點上儲存兩份副本。
OFF_HEAP (實驗性的) 將RDD以序列化格式儲存到Tachyon。與MEMORY_ONLY_SER相比,OFF_HEAP減少了垃圾回收開銷,並且使執行器(executor)程序更小且可以共用同一個記憶體池,這一特性在需要大量消耗記憶體和多Spark應用併發的場景下比較吸引人。而且,因為RDD儲存於Tachyon中,所以一個執行器掛了並不會導致資料快取的丟失。這種模式下Tachyon 的記憶體是可丟棄的。因此,Tachyon並不會重建一個它逐出記憶體的block。如果你打算用Tachyon做為堆外儲存,Spark和Tachyon具有開箱即用的相容性。請參考這裡,有建議使用的Spark和Tachyon的匹配版本對:page

注意:在Python中儲存的物件總是會使用 Pickle 做序列化,所以這時是否選擇一個序列化級別已經無關緊要了。

Spark會自動持久化一些混洗操作(如:reduceByKey)的中間資料,即便使用者根本沒有呼叫persist。這麼做是為了避免一旦有一個節點在混洗過程中失敗,就要重算整個輸入資料。當然,我們還是建議對需要重複使用的RDD呼叫其persist運算元。

如何選擇儲存級別?

Spark的儲存級別主要可於在記憶體使用和CPU佔用之間做一些權衡。建議根據以下步驟來選擇一個合適的儲存級別:

  • 如果RDD能使用預設儲存級別(MEMORY_ONLY),那就儘量使用預設級別。這是CPU效率最高的方式,所有RDD運算元都能以最快的速度執行。
  • 如果步驟1的答案是否(不適用預設級別),那麼可以嘗試MEMORY_ONLY_SER級別,並選擇一個高效的序列化協議(selecting a fast serialization library),這回大大節省資料物件的儲存空間,同時速度也還不錯。
  • 儘量不要把資料吐到磁碟上,除非:1.你的資料集重新計算的代價很大;2.你的資料集是從一個很大的資料來源中過濾得到的結果。否則的話,重算一個分割槽的速度很可能和從磁碟上讀取差不多。
  • 如果需要支援容錯,可以考慮使用帶副本的儲存級別(例如:用Spark來服務web請求)。所有的儲存級別都能夠以重算丟失資料的方式來提供容錯性,但是帶副本的儲存級別可以讓你的應用持續的執行,而不必等待重算丟失的分割槽。
  • 在一些需要大量記憶體或者並行多個應用的場景下,實驗性的OFF_HEAP會有以下幾個優勢:
    • 這個級別下,可以允許多個執行器共享同一個Tachyon中記憶體池。
    • 可以有效地減少垃圾回收的開銷。
    • 即使單個執行器掛了,快取資料也不會丟失。

刪除資料

Spark能夠自動監控各個節點上快取使用率,並且以LRU(最近經常使用)的方式將老資料逐出記憶體。如果你更喜歡手動控制的話,可以用RDD.unpersist() 方法來刪除無用的快取。

共享變數

一般而言,當我們給Spark運算元(如 map 或 reduce)傳遞一個函式時,這些函式將會在遠端的叢集節點上執行,並且這些函式所引用的變數都是各個節點上的獨立副本。這些變數都會以副本的形式複製到各個機器節點上,如果更新這些變數副本的話,這些更新並不會傳回到驅動器(driver)程式。通常來說,支援跨任務的可讀寫共享變數是比較低效的。不過,Spark還是提供了兩種比較通用的共享變數:廣播變數和累加器。

廣播變數

廣播變數提供了一種只讀的共享變數,它是把在每個機器節點上儲存一個快取,而不是每個任務儲存一份副本。通常可以用來在每個節點上儲存一個較大的輸入資料集,這要比常規的變數副本更高效(一般的變數是每個任務一個副本,一個節點上可能有多個任務)。Spark還會嘗試使用高效的廣播演算法來分發廣播變數,以減少通訊開銷。

Spark的操作有時會有多個階段(stage),不同階段之間的分割線就是混洗操作。Spark會自動廣播各個階段用到的公共資料。這些方式廣播的資料都是序列化過的,並且在執行各個任務前需要反序列化。這也意味著,顯示地建立廣播變數,只有在跨多個階段(stage)的任務需要同樣的資料 或者 快取資料的序列化和反序列化格式很重要的情況下 才是必須的。

廣播變數可以通過一個變數v來建立,只需呼叫 SparkContext.broadcast(v)即可。這個廣播變數是對變數v的一個包裝,要訪問其值,可以呼叫廣播變數的 value 方法。程式碼示例如下:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

廣播變數建立之後,叢集中任何函式都不應該再使用原始變數v,這樣才能保證v不會被多次複製到同一個節點上。另外,物件v在廣播後不應該再被更新,這樣才能保證所有節點上拿到同樣的值(例如,更新後,廣播變數又被同步到另一新節點,新節點有可能得到的值和其他節點不一樣)。

累加器

累加器是一種只支援滿足結合律的“累加”操作的變數,因此它可以很高效地支援平行計算。利用累加器可以實現計數(類似MapReduce中的計數器)或者求和。Spark原生支援了數字型別的累加器,開發者也可以自定義新的累加器。如果建立累加器的時候給了一個名字,那麼這個名字會展示在Spark UI上,這對於瞭解程式執行處於哪個階段非常有幫助(注意:Python尚不支援該功能)。

創捷累加器時需要賦一個初始值v,呼叫 SparkContext.accumulator(v) 可以建立一個累加器。後續叢集中執行的任務可以使用 add 方法 或者 += 操作符 (僅Scala和Python支援)來進行累加操作。不過,任務本身並不能讀取累加器的值,只有驅動器程式可以用 value 方法訪問累加器的值。

以下程式碼展示瞭如何使用累加器對一個元素陣列求和:

scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10

以上程式碼使用了Spark內建支援的Int型累加器,開發者也可以通過子類化 AccumulatorParam 來自定義累加器。累加器介面(AccumulatorParam )主要有兩個方法:1. zero:這個方法為累加器提供一個“零值”,2.addInPlace 將收到的兩個引數值進行累加。例如,假設我們需要為Vector提供一個累加機制,那麼可能的實現方式如下:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  def zero(initialValue: Vector): Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector): Vector = {
    v1 += v2
  }
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

如果使用Scala,Spark還支援幾種更通用的介面:1.Accumulable,這個介面可以支援所累加的資料型別與結果型別不同(如:構建一個收集元素的list);2.SparkContext.accumulableCollection 方法可以支援常用的Scala集合型別。

對於在action運算元中更新的累加器,Spark保證每個任務對累加器的更新只會被應用一次,例如,某些任務如果重啟過,則不會再次更新累加器。而如果在transformation運算元中更新累加器,那麼使用者需要注意,一旦某個任務因為失敗被重新執行,那麼其對累加器的更新可能會實施多次。

累加器並不會改變Spark懶惰求值的運算模型。如果在RDD運算元中更新累加器,那麼其值只會在RDD做action運算元計算的時候被更新一次。因此,在transformation運算元(如:map)中更新累加器,其值並不能保證一定被更新。以下程式碼片段說明了這一特性:

val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
// 這裡,accum任然是0,因為沒有action運算元,所以map也不會進行實際的計算

部署到叢集

應用提交指南(application submission guide)中描述瞭如何向叢集提交應用。換句話說,就是你需要把你的應用打包成 JAR檔案(Java/Scala)或者一系列 .py 或 .zip 檔案(Python),然後再用 bin/spark-submit 指令碼將其提交給Spark所支援的叢集管理器。

從Java/Scala中啟動Spark作業

單元測試

Spark對所有常見的單元測試框架提供友好的支援。你只需要在測試中建立一個SparkContext物件,然後吧master URL設為local,執行測試操作,最後呼叫 SparkContext.stop() 來停止測試。注意,一定要在 finally 程式碼塊或者單元測試框架的 tearDown方法裡呼叫SparkContext.stop(),因為Spark不支援同一程式中有多個SparkContext物件同時執行。

從1.0之前版本遷移過來

Spark 1.0 凍結了Spark Core 1.x 系列的核心API,只要是沒有標記為 “experimental” 或者 “developer API”的API,在未來的版本中會一直支援。對於Scala使用者來說,唯一的變化就是分組相關的運算元,如:groupByKey, cogroup, join,這些運算元的返回型別由 (Key, Seq[Value]) 變為 (Key, Iterable[Value])。

下一步

你可以去Spark的官網上看看示例程式(example Spark programs)。另外,Spark程式碼目錄下也自帶了不少例子,見 examples 目錄(Scala,JavaPythonR)。你可以把示例中的類名傳給 bin/run-example 指令碼來執行這些例子;例如:

./bin/run-example SparkPi

如果需要執行Python示例,則需要使用 spark-submit 指令碼:

./bin/spark-submit examples/src/main/python/pi.py

對R語言,同樣也需要使用 spark-submit:

./bin/spark-submit examples/src/main/r/dataframe.R

配置(configuration)和調優(tuning)指南提供了不少最佳實踐的資訊,可以幫助你優化程式,特別是這些資訊可以幫助你確保資料以一種高效的格式儲存在記憶體裡。叢集模式概覽(cluster mode overview)這篇文章描述了分散式操作中相關的元件,以及Spark所支援的各種叢集管理器。

最後,完整的API檔案見:ScalaJavaPython 以及 R.