1. 程式人生 > >Spark程式設計指引(三)-----------------RDD操作,shuffle和持久化

Spark程式設計指引(三)-----------------RDD操作,shuffle和持久化

處理鍵-值對

儘管Spark的大部操作支援包含所有物件型別的RDDs,但是還有一些操作只支援鍵-值對的的RDDs.最常見的是類似"洗牌"的操作,比如以鍵值來分組或聚合所有的元素。

在Scala裡,這些操作對包含2元組的RDD是自動可用的。(Scala語言內建的元組,通過(a,b)這樣的形式建立)。對鍵-值對可用的操作在PairRDDFunctions類裡,將自動包含在含有2元組的RDD裡。

比如,下面的程式碼使用在鍵-值對上使用reduceByKey操作來計算檔案中同一行出現的次數。

val lines = sc.textFile("data.txt")
val pairs = lines
.map(s => (s, 1)) val counts = pairs.reduceByKey((a, b) => a + b)

我們也可以使用counts.sortByKey().例如,將鍵-值對以字典序排序,然後用counts.collect()將結果以物件陣列的方式帶回驅動程式。

注:當使用自定義的物件作為鍵-值對的鍵值時,你必須保證自定義物件的equals()方法伴隨有一個匹配的hashcode()方法。

轉換:

下表列出了一些Spark中常用的轉換操作。

轉換 含義
map(func) 通過將源資料中的每個元素傳遞給func函式來返回一個新的RDD
filter
(func)
從源資料中選出那些傳遞給func函式後返回true的元素構成的新RDD
flatMap(func) 與map類似,但是每個輸入元素可以被對映為0或更多的輸出元素(所以func應該返回一個序列而不是單個元素)
mapPartitions(func) 與map類似, 但是在RDD的每個分割槽上分開運算,所以函式func 必須是型別 Iterator<T> => Iterator<U>,當RDD的型別是RDD[T]時。
mapPartitionsWithIndex(func) 與mapPartitions類似, 但是提供了一個整型引數給函式func,代表了分割槽的索引。所以函式func
的型別必須為 (Int, Iterator<T>) => Iterator<U> ,當RDD的型別是RDD[T].
sample(withReplacement, fraction, seed) 對資料的一部分進行取樣,可選的替換引數, 使用一個給定的隨機數發生器種子。
union(otherDataset) 返回一個新的資料集,包含了引數與源資料的並集。
intersection(otherDataset) 返回一個新的RDD與引數的交集。
distinct([numTasks])) 返回一個新的資料集包含了與源資料集的不同元素
groupByKey([numTasks]) 當在一個 (K, V)對構成的資料集上呼叫時, 返回一個新的(K, Iterable<V>) 對構成的資料集.
注意: 如果你grouping是為了執行一個聚合操作(如求和,求平均值)在每個鍵上,使用reduceByKey oraggregateByKey和會獲得更好的效能
注意: 預設情況下, 進行輸出操作的並行度依懶於父RDD的分割槽數.你可以通過numTasks引數來設定不同的任務數
reduceByKey(func, [numTasks]) 在一個 (K, V) 對構成的資料集上呼叫時, 返回一個 (K, V) 對構成的資料集。新資料集由每個鍵對應的所有資料通過呼叫傳入的func函式聚合而成, 這個函式的型別必須是 (V,V) => V. 與groupByKey一樣。reduce任務的個數由每二個可選的引數設定。
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the booleanascending argument.
join(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported throughleftOuterJoin,rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also calledgroupWith.
cartesian(otherDataset) When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command, [envVars]) Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions) Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions) Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner) Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than callingrepartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

動作

下表是Spark中常用的動作操作:

動作 含義
reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect() Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count() Return the number of elements in the dataset.
first() Return the first element of the dataset (similar to take(1)).
take(n) Return an array with the first n elements of the dataset.
takeSample(withReplacement, num, [seed]) Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n, [ordering]) Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path) Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path)
(Java and Scala)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path)
(Java and Scala)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().
countByKey() Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func) Run a function func on each element of the dataset. This is usually done for side effects such as updating anAccumulator or interacting with external storage systems.
Note: modifying variables other than Accumulators outside of theforeach() may result in undefined behavior. SeeUnderstanding closures for more details.

shuffle 操作

Spark的一些特定操作會觸發shuffle事件。shuffle是Spark的一種機制,用來將資料重新分配,使它的不同分組跨分割槽。這通常涉及複製資料到不同的執行體和機器,所以shuffle操作被認為是一種複雜且昂貴的操作。

背景

為了瞭解shuffle過程中發生了什麼,我們用reduceByKey操作來說明。reduceByKey操作產生了一個新的RDD,原RDD中所有對應同一個鍵的值組合成一個元組---鍵和在該鍵對應的所有元素上執行一個reduce函式後的結果。該操作的挑戰是同一個鍵所對應的所有元素不一定在同一分割槽,甚至不在同一個機器上,但是它們必須被統一定位被計算結果。

在Spark裡,資料通常並不是跨分割槽分佈,而是分佈在便於進行特定操作的地方。在計算的過程中,單一任務將會在單一的分割槽上進行操作。這樣,為了為reduceByKey這個reduce任務的執行組織所有的資料,Spark需要執行一個所有對所有的操作,它必須從所有的分割槽中讀取所有鍵值對應的所有值,然後跨分割槽彙集值來計算每個鍵對應的最終結果-----這就是shuffle.如下圖所示,


儘管shuffle操作後每個分割槽上的元素集合是確定的,但是分割槽的順序以及分割槽中元素的順序是不確定的。如果你希望預測shuffle操作後資料的順序,可以使用如下操作:

1.mapPartions用來對使用的每個分割槽排序,如.sorted

2.repartitionAndSortWithinPartitions高效地對分割槽排序並重新分割槽

3.sortBy來建立一個全域性有序的RDD

會引起shuffle的操作包括“重分割槽”操作,如repartition和coalesce."ByKey"操作(除了counting)比如groupByKey和reduceByKey,和"join"操作比如cogroup和join.

效能影響

Shuffle操作屬於昂貴的操作,因為它涉及磁碟I/O,資料序列化,和網路I/O。為是組織Shuffle的資料,Spark生成一個map任務集合來組織資料,和一個reduce任務集合來聚合資料,這種概念來自於MapReduce,和Spark自身的map,reduce操作沒有直接關係。

在實現上,每個單獨的map任務將結果儲存在記憶體中,直到無法分配記憶體。然後,這些結果以目標分割槽排序並存儲在單一的檔案中。在reduce中,reduce任務讀取已排序的相關塊.

某些shuffle操作會消耗大量的堆記憶體,因為它們使用基於記憶體的資料結構來組織記錄,在傳遞它們之前或之後。特別地, reduceByKey和aggregateByKey在map中建立這些結構然後'ByKey'操作在reduce中生成這些結構. 當沒有足夠的記憶體時, Spark將這些資料寫到磁碟上, 引起額外的磁碟I/O和垃圾回收處理。

shuffle也會產生大量的中間檔案。在Spark1.3.1中,這些檔案在Spark停止之前不會清除,這意味著長時間執行的Spark會消耗可用的磁碟空間。
這樣做是為了使shuffle操作不用重新計算如果需要使用上次的結果。臨時檔案目錄在配置sparkContext時spark.local.dir配置指定。

shuffle操作可以被一些配置引數調整,這些將在spark的配置指導中詳細說明。

RDD持久化

將資料通過操作持久化(或快取)在記憶體中是Spark的重要能力之一。當你快取了一個RDD,每個節點都快取了RDD的所有分割槽。這樣就可以在記憶體中進行計算,或從其它的動作中還原RDD(或派生自其的RDD)。這樣可以使以後在RDD上的動作更快(通常可以提高10倍)。快取對於遍歷演算法和更快的互動體驗至關重要。

你可以對希望快取的RDD通過使用persist或cache方法進行標記。它通過動作操作第一次在RDD上進行計算後,它就會被快取在節點上的記憶體中。Spark的快取具有容錯性,如果RDD的某一分割槽丟失,它會自動使用最初建立RDD時的轉換操作進行重新計算。

另外,RDD可以被持久化成不同的級別。比如,可以允許你儲存在磁碟,記憶體,甚至是序列化的JAVA物件(節省空間),備份在不同的節點上,或者儲存在基於記憶體的檔案系統Tachyon上。通過向persist()方法傳遞StorageLevel物件來設定。cache方法是使用預設級別StorageLevel.MEMORY_ONLY的方法。所有的級別如下:

儲存級別 含義
MEMORY_ONLY 以反序列化的JAVA物件的方式儲存在JVM中. 如果記憶體不夠, RDD的一些分割槽將不會被快取, 這樣當再次需要這些分割槽的時候,將會重新計算。這是預設的級別。
MEMORY_AND_DISK 以反序列化的JAVA物件的方式儲存在JVM中. 如果記憶體不夠, RDD的一些分割槽將將會快取在磁碟上,再次需要的時候從磁碟讀取。
MEMORY_ONLY_SER 以序列化JAVA物件的方式儲存 (每個分割槽一個位元組陣列). 相比於反序列化的方式,這樣更高效的利用空間, 尤其是使用快速序列化時。但是讀取是CPU操作很密集。
MEMORY_AND_DISK_SER 與MEMORY_ONLY_SER相似, 區別是但記憶體不足時,儲存在磁碟上而不是每次重新計算
DISK_ONLY 只儲存RDD在磁碟
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 與上面的級別相同,只不過每個分割槽的副本只儲存在兩個叢集節點上。
OFF_HEAP (experimental) 將RDD以序列化的方式儲存在 Tachyon. 與 MEMORY_ONLY_SER相比, OFF_HEAP減少了垃圾回收。允許執行體更小通過共享一個記憶體池。因此對於擁有較大堆記憶體和高併發的環境有較大的吸引力。更重要的是,因為RDD儲存在Tachyon上,執行體的崩潰不會造成快取的丟失。在這種模式下.Tachyon中的記憶體是可丟棄的,這樣 Tachyon 對於從記憶體中擠出的塊不會試圖重建它。如果你打算使用Tachyon作為堆快取,Spark提供了與Tachyon相相容的版本。
注:在python裡,通過Pickle庫來儲存物件總是以序列化的方式,與儲存級別中的序列化標誌無關。

Spark也自動地持久化一些中間資料在shuffle操作中,儘管使用者沒有呼叫persist.這樣做是為了避免重新計算整個輸入當一個節點失敗時。我們建議使用者對結果RDD呼叫persist,如果計劃重複使用它們。

選擇何種儲存級別?

Spark的儲存級別是為了提供記憶體使用率和CPU效率的均衡。我們建議您通過以下方式來選擇:

1.如果你的RDD在預設的儲存級別下工作的很好,就不要用其它的級別。這是最有CPU效率的選項,允許RDD上的操作儘快的完成。

2.如果不行,試下MEMORY_ONLY_SER並使用一個能快速序列化的庫,這樣更節省空間,同時訪問速度也比較快。

3.不要儲存資料到磁碟,除非在資料集上的計算操作是昂貴的,或者過濾了大量的資料。否則重新計算可能比從磁碟中讀取更快

4.使用備份級別,如果你需要更快的恢復。(比如,使用Spark為網路應用程式提供服務)。所有的儲存級別都通過重新計算提供了全面的容錯性,但是備份級別允許你繼續在RDD上執行任務而無需重新計算丟失的分割槽。

5.在擁有大量記憶體和多應用程式的環境中,實驗下OFF_HEAP方式有以下優勢:

 允許多個執行體共享同一個記憶體池。

 顯著地減少了垃圾回收的消……

 如果某個執行體崩潰,快取資料不會丟失。

刪除資料

Spark自動地監控每個節點上的快取使用率。並通過LRU演算法刪除過時的資料。如果你想手動刪除一個RDD而不是等待它從快取中過時,使用RDD.unpersist方法。