1. 程式人生 > >Spark2.1.0文件:Spark程式設計指南-Spark Programming Guide

Spark2.1.0文件:Spark程式設計指南-Spark Programming Guide

1 概述

從一個較高的層次來看,每一個 Spark 應用程式由兩部分組成:driver program(驅動程式)端執行的 main 函式以及在整個叢集中被執行的各種並行操作。Spark 提供的主要抽象是一個彈性分散式資料集(RDD),它是可以被並行處理且跨節點分佈的元素的集合。我們可以通過三種方式得到一個RDD

1、 可以從一個 Hadoop 檔案系統(或者任何其它 Hadoop 支援的檔案系統)建立RDD;

2、 並行化一個在 driver program端已存在的 Scala 集合;

3、 通過 transforming(轉換)運算元從一個已存在的RDD建立一個 新的RDD。

使用者為了讓RDD在整個並行操作中更高效的重用,可以將一個 RDD 持久化到記憶體中。而且,即使某些節點發生故障,RDD也能夠自動的恢復丟失的資料分割槽。

 Spark 中的第二個抽象是能夠用於並行操作的 shared variables(共享變數),預設情況下,當 Spark 中的方法作為一組任務執行在不同節點上時,它會為每一個變數建立副本併發送到各個任務中去。有時候,需要在任務之間或任務和驅動程式之間共享一個變數。Spark支援兩種型別的共享變數broadcast variables(廣播變數),它可以用於在所有節點上快取一個值,和 accumulators(累加器),他是一個只能被added(增加)的變數,例如 counters  sums

您可以啟動Spark的互動式shell來完成本篇指南的所有示例。

2 連線到Spark

Spark 2.1.0預設基於Scala 2.11來構建和分發。 (Spark可以基於其他版本的Scala來構建。)要在Scala中編寫應用程式,您將需要使用相容的Scala版本(例如2.11.X)。(譯者注:對於直接使用官網預編譯包的同學請務必安裝scala2.11.X)

要編寫Spark應用程式,您需要新增Spark的Maven依賴。 Spark可通過MavenCentral獲得:

groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.1.0

另外,如果您希望訪問HDFS叢集,則需要為您的HDFS版本的hadoop-client新增依賴關係。
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最後,您需要將一些Spark類匯入到程式中。 新增以下行:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(在Spark 1.3.0之前,您需要顯式匯入org.apache.spark.SparkContext._才能啟用基本的隱式轉換。)

3 初始化Spark

在Spark程式中必須做的第一件事是建立一個SparkContext物件,該物件告訴Spark如何訪問叢集。 要建立SparkContext,您首先需要構建一個包含有關應用程式資訊的SparkConf物件。

每個JVM程序中只能有一個SparkContext是活動的。 在建立新的SparkContext之前,必須呼叫SparkContext.stop()將原有的SparkContext例項停掉。

val conf=new SparkConf().setAppName(appName).setMaster(master)
val sc = new SparkContext(conf)

appName引數是應用程式在叢集UI上顯示的名稱。 master是Spark,Mesos或YARN叢集的URL,或以本地模式執行的特殊“local”字串。 實際上,當在叢集上執行時,您不需要在程式中指定master的地址,而是使用spark-submit啟動應用程式並指定相關引數。 但是,對於本地測試和單元測試,您可以通過“local”來以單機模式執行Spark程序。

3.1使用shell

在Spark shell中,將預設建立一個名為sc的SparkContext例項。您自己建立的SparkContext例項將無法正常工作。 您可以通過--master引數設定context連線的主機URL,並且可以通過將逗號分隔的列表傳遞給--jars引數來將JAR新增到classpath。 您還可以通過向--packages引數提供逗號分隔的maven座標列表,將依賴關係(例如SparkPackages)新增到shell會話。 對於可能存在依賴關係的其他儲存庫(例如Sonatype)可以通過--repositories引數指定。 例如,要在四個核心上執行bin / spark-shell,請使用:

$ ./bin/spark-shell --master local[4]

或者,還要將code.jar新增到其classpath中,請使用:

$ ./bin/spark-shell --master local[4] --jars code.jar

使用maven座標來新增依賴關係:

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

有關引數選項的完整列表,請執行spark-shell --help。 在幕後,spark-shell實際上也是通過spark-submit指令碼來提交執行的。

4 彈性分散式資料集RDD

Spark對資料的處理是圍繞著彈性分佈資料集(RDD)的概念建立起來的,RDD是可以並行操作的可容錯的元素集合。 建立RDD有兩種方法:並行化驅動程式中的現有集合,或者引用外部儲存系統(如共享檔案系統,HDFS,HBase或提供Hadoop InputFormat的任何資料來源)中的資料集。

4.1 並行化一個集合

通過呼叫SparkContext的parallelize方法來將驅動程式端的Scala集合並行化。 集合的元素被複制以形成可以並行操作的分散式資料集。例如,下面是如何建立一個包含數字1到5的並行集合:

val data=Array(1,2,3,4,5)
val distData=sc.parallelize(data)

一旦建立完成,分散式資料集(distData)就可以被並行化處理。例如,我們可以呼叫distData.reduce((a,b)=> a + b)來對陣列的元素進行求和。我們稍後介紹將分散式資料集的操作。

並行集合的一個重要引數是將資料集被切分的分割槽數。Spark將為叢集的每個分割槽執行一個任務。通常,您需要為叢集中的每個CPU分配2-4個分割槽。 通常,Spark會根據您的群集配置自動設定分割槽數。但是,您也可以通過將其作為第二個引數手動新增,以設定並行度(例如sc.parallelize(data, 10))。注意:程式碼中的某些地方使用術語“片”(分割槽的同義詞)來保持向後相容性。

4.2 使用外部資料來源

Spark可以從任何Hadoop支援的持久層建立分散式資料集,包括本地檔案系統,HDFS,Cassandra,HBase,Amazon S3等。Spark支援文字檔案,SequenceFiles(譯者注:SequenceFile檔案是Hadoop用來儲存二進位制形式的key-value對而設計的一種平面檔案(Flat File))和任何其他Hadoop InputFormat。

可以使用SparkContext的TextFile方法讀取文字檔案並建立RDD,該方法的引數為檔案的URI(機器上的本地路徑,或hdfs://,s3n://,等等),並將其作為行的集合讀取,下面是一個示例:

scala>val distFile=sc.textFile("data.txt")
distFile:org.apache.spark.rdd.RDD[String]=data.txtMapPartitionsRDD[10]attextFileat<console>:26

一旦RDD建立完成,就可以在distFile上執行資料集操作。例如,我們可以使用map和reduce操作將所有行的長度相加,像這樣:distFile.map(s=> s.length).reduce((a, b) => a + b).

有關Spark讀取檔案的一些注意事項:

1、 如果使用本地檔案系統上的檔案路徑,則檔案也必須在worker node上的相同路徑上可訪問。要麼將該檔案複製到所有worker節點,要麼使用網路掛載的共享檔案系統。

2、 Spark的所有基於檔案的讀入方法,包括textFile,支援目錄讀取,壓縮檔案讀取和包含萬用字元的檔名的讀取。例如,您可以使用 textFile("/my/directory")textFile("/my/directory/*.txt"),textFile("/my/directory/*.gz").

3、 textFile還支援通過一個可選的第二個引數來控制分割槽的數量。預設情況下,Spark為檔案的每個塊建立一個分割槽(HDFS中預設為128MB),但也可以通過傳遞較大的值來請求更高數量的分割槽。請注意,分割槽的數量不能比資料塊的數量少。

除了文字檔案,Spark的Scala API還支援其他幾種資料格式:

1、 SparkContext.wholeTextFiles允許您讀取包含多個小文字檔案的目錄,並將它們作為(檔名,內容)對返回。 這與textFile方法不同,textFile將為每個檔案中每行生成一條記錄。

2、 對於SequenceFiles,使用SparkContext的sequenceFile [K,V]方法,其中K和V是檔案中的鍵和值的型別。K和V應該是Hadoop的Writable介面的子類,如IntWritable和Text。此外,Spark允許您使用本地型別來替代幾個常見的Writable子類; 例如,sequenceFile [Int,String]相當於sequenceFile [IntWritable,Text]。

3、 對於其他Hadoop InputFormats,您可以使用SparkContext.hadoopRDD方法讀取資料,該方法接受任意的JobConf和輸入格式類,key類和value類。這些設定與使用相同輸入源的Hadoop作業相同。 您還可以使用SparkContext.newAPIHadoopRDD方法,該方法會呼叫“新的”MapReduce API(org.apache.hadoop.mapreduce)作為InputFormats。

4、 RDD.saveAsObjectFile和SparkContext.objectFile方法支援以包含  “序列化Java物件”的簡單格式儲存RDD。 雖然比專用格式Avro效率低,但它提供了一種簡單的方式來儲存任何RDD。

4.3 RDD運算元

RDD支援兩種型別的操作:transformations用於從已存在的資料集建立新的資料集,和actions在執行完資料集上的計算後會返回一個值給driver program。 例如,map操作是一個transformation操作,它將函式傳遞給資料集中的每個元素並返回包含所有計算結果的新RDD。 另一方面,reduce是一個action操作,它通過一些函式對RDD的所有元素進行聚合並將最終結果返回給driver program(儘管還有一個返回分散式資料集的並行的reduceByKey操作)。

Spark中的所有的transformation操作都是lazy的,因為它們不會立即計算結果。 相反,他們只是記住對於基本資料集(例如檔案)的一系列轉換操作。只有當某個動作需要將結果返回給driver program時,才會執行轉換操作。此設計使Spark能夠更高效地執行。例如,一個通過map建立然後被reduce操作聚合的資料集僅需要將reduce的結果返回給驅動程式,而無需返回較大的map結果資料集。

預設情況下,每次通過transformation操作得來的RDD可能會在每次對其進行操作時重新執行之前的transformation操作。但是,您也可以使用persist(或cache)方法在記憶體中持久化RDD,在這種情況下,Spark將在下次查詢時直接訪問叢集記憶體上的資料,效率可以大大提升。Spark還支援在磁碟上持久儲存RDD,或跨多個節點進行復制。

4.3.1 基本操作

為了說明RDD的基本操作,請考慮以下基本程式:

val lines=sc.textFile("data.txt")
val lineLengths=lines.map(s=>s.length)
val totalLength=lineLengths.reduce((a,b)=>a+b)

第一行定義了一個指向外部檔案的基本RDD。 此時資料集未載入到記憶體中或者有其它操作:lines只是指向檔案的指標。 第二行定義lineLengths為map轉換的結果。由於惰性計算的特性,仍然不會立即計算出lineLengths。 最後,我們對lineLengths進行reduce操作,這是一個action操作。在這個時候,Spark才會將計算分解為一個個task並在分散式的機器上執行,每臺機器都執行部分資料的map和進行本地的聚合操作,僅將計算結果返回給驅動程式(driverprogram)。

如果我們希望接下來再次使用lineLengths,最好在reduce操作之前加上:

lineLengths.persist()

如此一來lineLengths在第一次計算之後,其結果會被快取在記憶體中。

4.3.2 向Spark傳遞方法

Spark的API很大程度上依賴於在驅動程式中傳遞函式然後在叢集上去執行。有兩種推薦方式來實現這一點:

1、 匿名函式,當函式體比較短時推薦這種用法。

2、 全域性單例物件中的靜態方法。例如,你可以定義object MyFunctions並且像下面這樣傳遞MyFunctions.func1方法:

object MyFunctions{
  deffunc1(s:String):String={...}
}
myRdd.map(MyFunctions.func1)

需要注意的是,雖然你也可以傳遞對某個類的例項(而不是單例物件)中的方法的引用,但這需要傳送整個物件以及物件中的所有方法。例如,考慮下面的情況:
class MyClass{
  deffunc1(s:String):String={...}
  defdoStuff(rdd:RDD[String]):RDD[String]={rdd.map(func1)}
}

在這裡,如果我們建立一個新的MyClass例項並呼叫doStuff方法,那麼裡面的map會引用MyClass例項的func1方法,所以整個物件需要被髮送到叢集。它類似於寫rdd.map(x=> this.func1(x))。

與此類似,訪問外部物件中的欄位將會導致整個物件被引用:

class MyClass{
  valfield="Hello"
  defdoStuff(rdd:RDD[String]):RDD[String]={rdd.map(x=>field+x)}
}


相當於這種寫法:rdd.map(x => this.field + x),這會引用這個物件的所有東西(方法、欄位)。為了避免這個問題,最簡單的方法是將field(域)拷貝一份作為本地變數而不是直接引用外部的欄位:

def doStuff(rdd:RDD[String]):RDD[String]={
val field_=this.field
rdd.map(x=>field_+x)
}

4.3.3 理解閉包

使用Spark的一個難點是在跨叢集執行程式碼時需要了解變數和方法的範圍和生命週期。在RDD運算元中對其範圍之外的變數進行修改是常見的導致混亂的原因。在下面的例子中,我們將嘗試使用foreach()操作來增加計數器並觀察結果,對於其他操作也可能會出現類似的問題。

4.3.3.1 例子

下面的例子試圖對一個簡單RDD中的所有元素進行求和,根據操作是否發生在同一個JVM中,可能會出現不同的行為。一個常見的例子是以本地模式(--master= local [n])執行Spark應用,以及將Spark應用部署到叢集中(例如通過Spark-submit將程式提交到yarn叢集):

var counter=0
var rdd=sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x=>counter+=x)
println("Counter value: "+counter)

4.3.3.2 本地模式 vs 叢集模式

上述程式碼的行為是不確定的,可能不會以預期的方式工作。為了執行任務,Spark將RDD的處理操作分解為一個個tasks。在執行之前,Spark會計算task的closure(閉包)。closure是對於executor在RDD上執行計算必須可見的那些變數和方法(在上述例子中為foreach())。 該closure被序列化併發送給每個executor。

傳送給每個executor的閉包中的變數現在只是一個副本,因此,當在foreach()方法中引用計數器時,它不再是驅動程式節點上的那個計數器了。在驅動程式節點的記憶體中仍然有一個計數器,但這對executors來說已經不再可見了! executor只能看到已經序列化的閉包的副本。因此,計數器的最終值仍然為零,因為對計數器的所有操作都是在序列化閉包中的值之上進行的。

在本地模式(local)下,在某些情況下,foreach() 函式可能在與驅動程式相同的JVM內執行,並引用相同的原始計數器,所以在本地模式中計數器可能會被更新。

為了確保在這些場景下程式能擁有確定的行為,你應該使用累加器(Accumulator)。 Spark中的累加器專門用於提供一種機制,這種機制使得在群集中的worker節點上執行切分的任務時能夠安全的更新變數。本指南的“累加器”部分將會有更詳細的討論。

一般來說,閉包 - 構造如迴圈或本地定義的方法不應該用於突變某些全域性狀態。 Spark不定義或保證從關閉外部引用的物件的突變行為。 這樣做的一些程式碼可能在本地模式下工作,但這只是意外,並且這種程式碼在分散式模式下將不會按預期的方式執行。如果需要進行全域性聚合,則使用累加器。

一般來說像閉包(closures)這種類似於迴圈或者本地定義方法不應該被用於改變一些全域性的狀態。Spark不能保證在閉包內修改閉包之外的變數這種行為的確定性,有這樣行為的程式碼可能在本地模式下能夠輸出正確結果,但是這只是意外,這種行為在分散式模式下將不會按照預期的方式執行,如何需要全域性的聚合操作,請務必使用累加器。

4.3.3.3 列印RDD中的元素

另一個常見的慣用語法是試圖使用rdd.foreach(println)或者rdd.map(println)來列印RDD的元素。在單個機器上,這將產生預期的輸出並列印RDD中所有的元素。但是,在cluster模式下,stdout被executor呼叫而且輸出會被寫入executor的stdout,而不是驅動程式上的那個,所以在驅動程式上的stdout不會顯示這些元素!要在驅動程式端列印所有元素,可以使用collect()方法先將RDD提取到驅動程式節點:rdd.collect().foreach(println)。不過這可能會導致驅動程式端的記憶體資源被耗盡,因為collect()會將整個RDD提取到一臺機器上; 如果只需要列印RDD中的幾個元素,一個更安全的方法是使用take():rdd.take(100).foreach(println)。

4.3.4 使用鍵值對

雖然大多數Spark操作適用於包含任何型別物件的RDD,但是幾個特殊操作只能在包含鍵值對的RDD上使用。 最常見的是分散式“shuffle”操作,例如按鍵對元素進行分組或聚合。

在Scala中,這些操作在包含Tuple2物件的RDD(Scala語言中的內建元組,通過簡單寫入(a,b)建立)中自動可用。鍵值對操作均定義在PairRDDFunctions類中,當在包含二元組的RDD上執行鍵值對操作時會自動應用該類中的方法。

例如,以下程式碼使用鍵值對上的reduceByKey操作來計算每行文字在檔案中的出現次數:

val lines=sc.textFile("data.txt")
val pairs=lines.map(s=>(s,1))
val counts=pairs.reduceByKey((a,b)=>a+b)

我們也可以使用counts.sortByKey()來按字母順序對鍵值對排序,最後用counts.collect()將計算結果作為物件陣列返回到驅動程式端。

注意:當使用自定義物件作為key時,必須確保自定義的equals()方法有一個與之相匹配的hashCode()方法。有關完整的詳細資訊,請參閱Object.hashCode()文件。

4.3.5 變換運算元(transformations)

下表列出了Spark支援的一些常見的轉換(transformation)操作。有關詳細資訊,請參閱RDD API文件(Scala,Java,Python,R)和pair RDD函式文件(Scala,Java)。

Transformation

Meaning

map(func)

對源資料中的每個元素分別執行函式func,將結果形成一個新的分散式資料集並返回。

filter(func)

選擇經func函式運算返回true的元素形成新的資料集並返回。

flatMap(func)

類似於map操作,但是每個元素可以被對映成0個或多個輸出(所以func的返回型別應該是Seq)

mapPartitions(func)

類似於map操作,但是是分別運行於RDD的每個分割槽(塊)之上,所以當在元素型別為T的RDD上執行時,func的型別必須為:Iterator<T> => Iterator<U> 

mapPartitionsWithIndex(func)

與mapPartitions類似,但func也提供了一個表示分割槽索引的整數型引數,所以當在元素型別為T的RDD上執行時,func必須是型別(Int, Iterator<T>) => Iterator<U> 

sample(withReplacementfractionseed)

用於從RDD中隨機提取一定比例(fraction)的樣本,作為一個子集返回。Seed為隨機數發生器的種子,withReplacement引數用於指定同一個元素是否可以被提取多次。

union(otherDataset)

返回一個包含源資料集和引數資料集中元素的並集組成的新資料集。

intersection(otherDataset)

返回源資料集和引數資料集的交集

distinct([numTasks]))

返回源資料集中的元素去重後的新資料集。

groupByKey([numTasks])

當對包含(k,v)對的資料集呼叫時,返回一個包含(k, Iterable<V>)對的新資料集。

注意:如果要進行分組以便按key執行聚合操作(如求和或平均值),則使用reduceByKey或aggregateByKey將獲得更好的效能。

注意:預設情況下,輸出中的並行級別取決於父RDD的分割槽數。 您可以傳遞一個可選的numTasks引數來自定義分割槽的數量。

reduceByKey(func, [numTasks])

當對(K, V)對的資料集進行呼叫時,返回(K, V)對的資料集,其中使用給定的函式func以reduce的方式聚合每個鍵的值,該函式型別必須為:(V,V) => V。像groupByKey一樣,可以通過可選的第二個引數來配置reduce任務的數量。

aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])

當在包含(K,V)對的資料集上呼叫該方法,返回值是一個包含(K,U)的資料集,該方法使用給定的組合函式和“零值”對相同key的value進行聚合。允許聚合結果的值的型別和輸入資料的型別不同,同時也避免了不必要的記憶體資源分配。像groupByKey一樣,reduce任務的數量可以通過可選的第二個引數進行配置。

sortByKey([ascending], [numTasks])

如果某個包含(K,V)對的資料集中Key的型別實現了Ordered介面,那麼可以在該資料集上呼叫這個方法對鍵值對按key排序。Ascending引數指定升序(true)還是降序(false)。

join(otherDataset, [numTasks])

源RDD的元素型別為(K,V),引數中RDD的元素型別為(K,W)那麼該方法會返回元素型別為(K,(V,W))的資料集(譯者注:類似於求笛卡爾積)。可以通過leftOuterJoinrightOuterJoin,fullOuterJoin.實現外連線。

cogroup(otherDataset, [numTasks])

源RDD的元素型別為(K,V),引數中RDD的元素型別為(K,W)那麼該方法會返回元素型別為(K,(Iterable<V>, Iterable<W>)) 元組的資料集。該方法和groupWith功能一樣。

cartesian(otherDataset)

當對包含型別T和U的資料集進行呼叫時,返回包含(T,U)對(所有元素對)的資料集(譯者注:即笛卡爾積,慎用!)。

pipe(command[envVars])

通過pipe執行外部程式,每個分割槽中的元素作為外部程式入參執行一次外部程式,而外部程式的輸出有建立一個新的RDD。

coalesce(numPartitions)

將RDD中的分割槽數減少到numPartitions。 過濾大型資料集後,執行該操作可以提高接下來的資料處理效率。

repartition(numPartitions)

重新對RDD中的資料shuffle並分割槽,該操作總會通過網路進行資料shuffle(即機器之間會有資料交換)。

repartitionAndSortWithinPartitions(partitioner)

根據給定的partitioner重新對RDD分割槽,並且在每個生成的分割槽中,根據key對分割槽中的記錄進行排序。 這比呼叫repartion,然後在每個分割槽中排序更有效,因為它可以在shuffle的時候完成排序動作。

4.3.6 行動運算元(actions)

下面的表格列出了Spark支援的一些常用的Action操作。有關詳細資訊,請參閱RDD API文件和pair RDD functions文件。

Action

Meaning

reduce(func)

使用函式func(接受兩個引數並返回一個值)來聚合資料集的元素。該函式應該是commutative和associative,以便它可以平行計算。

collect()

將RDD當做陣列返回到驅動程式端,在執行過濾或者其他能夠返回一個足夠小的資料集的操作後使用collect通常會使一個不錯的選擇。

count()

返回資料集中元素的個數

first()

返回資料集中的第一個元素(和take(1)類似)

take(n)

返回資料集中前n個元素。

takeSample(withReplacementnum, [seed])

從資料集中隨機取樣num個元素,並作為陣列返回,withReplacement用於指定是否可以重複取樣,seed是一個可選的隨機數發生器種子。

takeOrdered(n[ordering])

根據自然順序或者自定義的比較器確定的順序,比較返回排名前n個元素。

saveAsTextFile(path)

將資料集中的所有元素以文字檔案的形式寫入本地檔案系統、HDFS或其他hadoop檔案系統中的指定目錄,Spark將在每個元素上呼叫toString方法將其轉換為文字檔案上的一行文字(每個分割槽生成一個檔案)。

saveAsSequenceFile(path(Java and Scala)

將資料集的元素作為Hadoop SequenceFile寫入本地檔案系統、HDFS或其他hadoop支援的檔案系統的給定路徑中。該方法可作用於包含元素型別為:實現了Hadoop的writable介面的鍵值對的RDD。在Scala中一些基本的型別如Int、Double、String都可以隱式轉換為Writable的型別。

saveAsObjectFile(path(Java and Scala)

將資料集儲存為簡單的java序列化格式,後面你可以使用SparkContext.objectFile()方法來讀取該檔案。

countByKey()

僅適用於(K,V)型別的RDD。對每個key的個數進行統計並返回(K,Int)型別的hashmap

foreach(func)

在資料集的每個元素上執行函式func。 這通常用於產生“副作用”,例如更新累加器或與外部儲存系統進行互動。

注意:在foreach()中修改非累加器型別的變數可能會導致不確定的行為。 有關詳細資訊,請參閱瞭解閉包章節。

4.3.7 shuffle操作

Spark中的某些操作會觸發被稱之為shuffle的事件,shuffle是Spark的重新分佈資料的機制,這使得資料可以跨不同的區進行分組。這通常涉及在executor或者機器之間拷貝資料,從而使得shuffle成為複雜和代價高昂的操作。

4.3.7.1 背景

我們可以以reduceByKey操作為示例來了解在shuffle過程中會發生什麼。 reduceByKey操作生成一個新的RDD,其中對單個key對應的所有值執行reduce函式定義的操作,並將結果作為該key最終唯一對應的值。這個問題挑戰在於,並不是單個key對應的所有值都存在同一個分割槽上,或者甚至位於同一個機器上,但是它們必須位於同一位置我們才能計算出結果。

在Spark中,特定的操作往往要求某類資料不能是跨分割槽分佈的。在計算過程中,一個分割槽上執行一個task - 因此,為了重新組織資料以便執行reduceByKey中對單個key對應的values的聚合操作,Spark需要執行一個all-to-all 的操作。它讀取所有分割槽中的資料,以查詢每個key對應的所有value,然後將值跨分割槽彙總以計算每個鍵的最終結果 - 這個過程就被稱為shuffle。

儘管shuffle之後每個分割槽中的元素集合都是確定的,且分割槽本身是有序的,但是分割槽中的資料並不是有序的,如果你希望shuffle後的資料是有序的,你可以使用:

1、 mapPartitions來對每個分割槽進行排序,例如:.sorted;

2、 repartitionAndSortWithinPartitions來對每個分割槽排序,同時還可以重新分割槽;

3、 sortBy來生成一個全域性有序的RDD

會導致shuffle的操作包括:repartion、coalesce這樣的重分割槽操作和groupByKey和reduceByKey這樣的“ByKey操作”(除了按key計數),還有一些join操作,比如cogroup和join。

4.3.7.2 效能影響

Shuffle是代價高昂的操作,因為它涉及磁碟I/O,資料序列化和網路I/O。為了組織shuffle過程中的資料,Spark會生成一組任務,這些任務包括用於組織資料的map任務和用於聚合資料的reduce任務,這種任務命名方式來自於MapReduce,但是和Spark的map、reduce運算元沒有直接關係。

在內部,單個map任務的結果將被暫存到記憶體中,直到達到一定的閾值。然後,這些結果根據目標分割槽進行排序並寫入單個檔案。 在reduce端,task讀取和其相關的排序好的資料塊。

某些shuffle操作可能會佔用大量的記憶體,因為有些操作使用in-memory資料結構(譯者注:例如hashMap)來在資料傳輸之前或之後組織資料記錄。特別的,reduceByKey和aggregateByKey在map操作中建立這些資料結構,而其他的“ByKey”操作則在reduce端建立這些資料結構。當資料規模超出記憶體限制的時候,spark會將資料溢位到磁碟,這會帶來額外的磁碟I/O和垃圾回收的開銷。

Shuffle過程還會在磁碟上生成大量的中間檔案。從Spark 1.3開始,這些檔案將被保留,直到相應的RDD不再使用並被垃圾回收。這樣做的好處在於,如果需要根據lineage圖重新計算某些RDD,就不需要重新建立shuffle檔案。 如果應用程式保留對這些RDD的引用或GC不頻繁啟動,垃圾收集操作的間隔時間可能會很長。這意味著長時間執行的Spark作業可能會佔用大量的磁碟空間。在配置Spark上下文時,由spark.local.dir配置引數指定臨時儲存目錄。

可以通過調整各項配置引數來對shuffle進行調優。具體細節請參閱“Spark配置指南”中的“shuffle 行為”部分。

4.4 RDD永續性

Spark中最重要的功能之一是可以在操作中持久化(或快取)記憶體中的資料集。在持久化一個RDD之後,每個節點都會將自己負責計算的分割槽快取在記憶體中,接下來對於該資料集(或從其匯出的資料集)的操作都可以重用該資料集。這樣可以大大加快接下來的操作(通常超過10倍)。快取機制是spark適用於迭代演算法和快速互動場景的關鍵所在。

您可以使用persist()或cache()方法標記需要持久化的RDD。在action操作第一次觸發計算之後,該RDD將被儲存在叢集的記憶體中。Spark的快取是可容錯的 - 如果RDD的任何分割槽丟失,它將根據最初建立資料集的操作自動重新計算。

此外,可以使用不同的儲存級別儲存每個持久化的RDD,從而允許您將資料集儲存到磁碟上,或者儲存在記憶體中(但需要將其序列化為Java物件以節省空間),或將其在節點之間進行復制。可以通過將StorageLevel物件(Scala,Java,Python)傳遞給persist()方法來設定這些級別。Cache()方法是使用預設儲存級別的簡寫,即StorageLevel.MEMORY_ONLY(在記憶體中儲存反序列化的物件)。所有的儲存級別以及說明如下表:

Storage Level

Meaning

MEMORY_ONLY

將RDD作為反序列化的Java物件儲存在JVM中。如果記憶體存不下RDD的所有分割槽,某些分割槽將不會被快取,每次需要時都會重新計算。這是預設級別。

MEMORY_AND_DISK

將RDD作為反序列化的Java物件儲存在JVM中。如果記憶體存不下RDD的所有分割槽,會將一部分分割槽儲存到磁碟中,並在需要時從磁碟中讀取。

MEMORY_ONLY_SER 

(Java and Scala)

將RDD儲存為序列化的Java物件(每個分割槽儲存為一個位元組陣列)。這通常比反序列化物件格式節省空間,特別是在使用fast serializer(快速序列化器)的情況下,但是在讀取資料時會耗費較長的CPU時間。

MEMORY_AND_DISK_SER 

(Java and Scala)

與MEMORY_ONLY_SER類似,但是會將記憶體存不下的分割槽溢位到磁碟,而不是每次都需要重新計算這些存不下的分割槽。

DISK_ONLY

將RDD所有分割槽都儲存到磁碟上。

MEMORY_ONLY_2, MEMORY_AND_DISK_2,.

與上述的級別相同,但會將RDD的每個分割槽儲存兩份,且這兩份分散到叢集不同的兩個節點。

OFF_HEAP (experimental)

與MEMORY_ONLY_SER類似,但將資料儲存在off-heap memory(堆外記憶體)中。這需要在配置項中啟用堆外記憶體。

注意:在Python中,儲存的物件將始終使用Pickle庫進行序列化,因此是否選擇序列化級別都是無關緊要的。 Python中的可用儲存級別包括MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY和DISK_ONLY_2。

在shuffle操作(例如reduceByKey)中,有些時候即使使用者沒有呼叫persist()方法,Spark也會自動保留一些中間資料。這樣做是為了在當遇到某個節點在執行shuffle操作時發生故障,可以避免重新計算整個輸入。如果某個RDD會被重複使用,我們仍然強烈建議使用者在生成RDD之前呼叫persist()方法。

4.4.1 如何選擇持久化級別?

Spark的提供不同的儲存級別選項旨在向使用者提供“記憶體使用”和“CPU效率”之間權衡。 我們建議您通過考慮以下幾點來確定選用哪一個:

1、 如果記憶體容量能夠容納RDD所有的分割槽,建議使用預設的持久化級別(MEMORY_ONLY),在這個級別上cpu的效率最高,且定義在RDD之上的操作都能很快被執行(譯者注:因為沒有反序列化和磁碟I/O)。

2、 如果記憶體容不下這麼多資料,可以嘗試MEMORY_ONLY_SER並且選擇一個快速的序列化庫來節省物件佔用的空間,這種方式仍可以使資料儘快被計算。(譯者注:對於計算過程而言,多了反序列化這一步,但是由於不需要磁碟I/O所以仍然是效率很高的方式)。

3、 儘量避免將資料溢位到磁碟,除非重新計算該資料的耗時很長,或者需要過濾大量的資料。否則,從磁碟讀取資料還不如重新計算這部分資料來的快。

4、 如果需要要快速的故障恢復,請使用帶有複製機制的儲存級別(例如,使用Spark來響應來自Web應用程式的請求)。所有儲存級別通過重新計算丟失的資料來提供完整的容錯能力,但複製的資料可讓您繼續在RDD上執行任務,而無需重新計算丟失的分割槽。

4.4.2 移除資料

Spark會自動監視每個節點的快取使用情況,並以最近最少使用(LRU)演算法丟棄舊的資料分割槽。如果要手動刪除RDD,而不是等待它自動被清理出快取,請使用RDD.unpersist()方法。

5 共享變數

通常,當傳遞到Spark操作(如map或reduce)的函式在遠端叢集節點上執行時,它會為函式中使用的所有變數建立獨立的副本然後在其上進行操作。這些變數被複制到每個機器,並且遠端機器上的變數的更新不會傳播回驅動程式。在任務之間讀寫共享的變數一般是低效的做法。但是,Spark為滿足兩種常用的使用模式提供了兩種有限型別的共享變數:廣播變數(broadcast variables)和累加器(accumulators)。

5.1 廣播變數

廣播變數允許程式設計師在每臺機器上保留一個只讀變數,而不是向每個任務分發一個副本。例如,可以使用廣播變數以更加有效的方式為每個節點提供大型輸入資料集的副本。Spark還嘗試使用高效的廣播演算法分發廣播變數,以降低通訊成本。

Spark中的各種操作被劃分為一組stage然後分階段執行,這些stage由分散式的“shuffle”操作隔開。在每個stage中Spark會自動廣播tasks需要的通用資料。以這種方式廣播的資料以序列化形式進行快取,並在執行每個task之前進行反序列化。這意味著,顯式建立廣播變數僅在跨多個階段的任務需要相同資料或者特別需要以反序列化格式快取資料時才有用。

廣播變數通過呼叫SparkContext.broadcast(v)從變數v建立。廣播變數實際是v的包裝器,其值可以通過呼叫value方法來訪問。下面的程式碼顯示了廣播變數的建立和使用:

scala>val broadcastVar =sc.broadcast(Array(1,2,3))
broadcastVar:org.apache.spark.broadcast.Broadcast[Array[Int]]=Broadcast(0)
scala>broadcastVar.value
res0:Array[Int]=Array(1,2,3)

建立廣播變數後,就應在群集中執行的任何函式中使用它,而不是使用原來的v,以避免v會被多次傳送到節點。另外,在建立廣播變數之後不應再修改物件v,以確保所有節點獲得的廣播變數的值是相同的(例如,如果變數可能會在修改後被髮送到新節點就會導致在叢集中的不同節點對相同的廣播變數取到不同的值的情況)。

5.2 累加器

累加器是一個變數,該變數只能通過associative(可結合的)和commutative(可交換的)型別的操作(譯者注:即數學上的滿足加法結合律和加法交換律)被“added(加)”。累加器可以被用於實現計數器(和MapReduce過程一樣)和求和操作。Spark本身支援數字型別的累加器,程式設計師可以通過實現介面實現對新型別的支援。

作為使用者,您可以建立命名或未命名的累加器。 如下圖所示,命名的累加器(在本例中為counter)將顯示在Web UI中,用於展示每個stage中累加器的修改情況。Spark在“Tasks”表中顯示由每個修改的對應累加器的值。


在UI中跟蹤累加器有助於瞭解正在執行的stage的進度(注意:Python中尚不支援該功能)。

可以通過呼叫SparkContext.longAccumulator()或SparkContext.doubleAccumulator()來分別建立Long或Double型別的數字累加器。然後可以使用add方法將在群集上正在執行的任務中的值加到這個累加器中。 但是,累加器的值對於每個task都是不可見的。只有驅動程式(driverprogram)可以使用累加器的value方法讀取累加器的值。

下面的程式碼展示了一個用累加器來將陣列的元素相加的例子:

scala>val accum=sc.longAccumulator("My Accumulator")
accum:org.apache.spark.util.LongAccumulator=LongAccumulator(id:0,name:Some(MyAccumulator),value:0)
scala>sc.parallelize(Array(1,2,3,4)).foreach(x=>accum.add(x))
...
10/09/2918:41:08INFOSparkContext:Tasksfinishedin0.317106s
scala>accum.value
res2:Long=10

雖然上面的程式碼使用的是內建支援的Long型別的累加器,但開發人員也可以通過繼承AccumulatorV2來建立自定義型別的累加器。 AccumulatorV2抽象類有幾個方法,在子類中必須將其覆蓋:將累加器重置為零的reset方法,將另一個值新增到累加器中的add方法,將另一個相同型別的累加器合併進來的merge方法。關於其他必須覆蓋的方法請參閱API文件。舉個例子,假設我們有一個代表數學向量的MyVector類,我們可以這樣寫:

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector]{
privateval myVector:MyVector=MyVector.createZeroVector
defreset():Unit={
myVector.reset()
}
defadd(v:MyVector):Unit={
myVector.add(v)
}
...
}
// Then, create an Accumulator of this type:
val my VectorAcc=new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc,"MyVectorAcc1")

請注意,當程式設計師定義自己的AccumulatorV2型別時,結果的型別可以與新增的元素的型別不同。

對於僅在spark action型別操作內執行的累加器更新,Spark保證每個任務對累加器的更新只會被應用一次,即重新啟動的任務將不會更新該值。在transformation操作中,使用者應該注意,如果重新執行tasks或job stages,則每個task對累加器的更新可能會被多次應用。

累加器不會改變Spark的lazy計算方式。如果它們在RDD的操作中被更新,則只有在對RDD的計算真正執行的時候,才會更新其值。因此,累加器更新不能保證在像map()這樣的lazy變換中立即執行。 以下程式碼片段給出了這種特定的示例:

val accum=sc.longAccumulator
data.map{x