1. 程式人生 > >Spark (Python版) 零基礎學習筆記(五)—— Spark RDDs程式設計

Spark (Python版) 零基礎學習筆記(五)—— Spark RDDs程式設計

RDD基礎概念

建立RDD
建立RDD的方法:
1.載入外部資料集
2.分佈一個物件的集合

前邊幾次的筆記已經提到過多次了,因此,這裡只列出幾個注意事項:
1.利用sc.parallelize建立RDD一般只適用於在測試的時候使用,因為這需要我們將整個資料集放入一臺機器的記憶體中。因此,除了我們學習使或者測試時,很少使用。
2.更通用的方法是從外部儲存系統上載入資料建立RDD

Spark支援兩種RDDs操作:
Transformations:從已有資料集建立一個新的資料集(注意:並不是改變現有的RDD,而是返回一個新的RDD的指標。)
Actions:在資料集上進行計算後,向驅動程式返回一個值或者將資料寫入外部儲存系統。
注意,transformations和actions是兩個完全不同的概念,因為這兩種操作中,Spark對RDDs進行計算的方式不同。Transformations是惰性的,也就是說當我們使用一個transformation操作後,這個transformation的動作不會立即執行,只有在我們需要進行一個action的時候才會執行。這在大資料中十分重要,使得Spark十分的高效!
到底這樣的設計有什麼用呢?舉個例子,比如說,我們在一個很大的資料集中進行map和reduce的操作,我們只有需要執行reduce(是一個action)在驅動程式中得到最後的結果時,map(是一個transformation)這一操作才會執行。這樣最後的結果,我們只會在驅動程式中得到一個,二不需要返回中間過程中執行map後的這個巨大的資料。
再比如,我們需要定義一個文字檔案並過濾其中含有“Python”字串的行。我們利用lines = sc.textFile(…),Spark並不會馬上載入和儲存這些檔案,因為這樣會十分浪費記憶體,因為我們實際需要的只是一小部分含有“Python”的語句。而只有當我們需要執行filter這個action時,Spark此時才會載入檔案並進行filter,最終只會返回filter的結果。更有趣的時,比如我們需要使用first()這個action,Spark在掃描檔案時,只要找到了第一行符合要求的文字,他就不會再對整個檔案進行讀取了。這也是為什麼Spark會如此高效!
預設情況下,我們每次執行一個action時,每個transformed RDD都會被重新進算一次。但是,Spark允許我們使用persist(或者cache)方法在記憶體中存留RDD,Spark將其中的元素儲存在叢集上,這樣我們在下次需要訪問它時,就會進一步加快速度。除了在記憶體中保留RDD之外,也可以將RDD暫存在硬碟上。這對於大資料來說又是一個非常關鍵的特性!因為我們如果不再使用RDD,Spark可以通過資料流訪問資料計算出結果,沒有必要浪費儲存空間。

注意:當我們利用transformations命令建立了新的RDDs時,Spark會追蹤儲存不同RDDs之間的依賴關係,稱為lineage graph普系統。Spark可以利用這些資訊在需要的時候計算每個RDD,如果一部分固有RDD丟失,Spark能夠利用這些資訊回覆丟失的資料。這也是一種故障修復的機制。
注意:在學習給過程中,經常使用action中的collect()或者take()將結果返回給驅動程式。但是,collect()在使用的時候需要考慮資料集的大小是否適合一臺單機的記憶體。通常情況才,在大的資料集上是不會使用collect()的。普遍的做法是將資料寫入分散式儲存系統,例如HDFS或者Amazon S3。 可以使用saveAsFile()、saveAsSequenceFile()或者其他的一些actions儲存RDD的內容,

總結以下Spark程式和shell會話的工作流程:
1.從外部資料建立RDDs
2.利用transformations將他們轉移,定義新的RDDs
3.使用persist()方法使Spark將中間結果的RDDs暫存,以便再次使用
4.部署actions進行平行計算

傳遞函式

在Python中,有三種方式可以將函式傳遞給Spark。
1.使用匿名函式lambda
2.利用def區域性定義的函式
3.模組中的高階函式

>>> lineRDD.filter(lambda x: 'Python' in x).collect()  # 匿名函式
['high-level APIs in Scala, Java, Python, and R, and an optimized engine that'
, '## Interactive Python Shell', 'Alternatively, if you prefer Python, you can use the Python shell:'] >>> def pythonLine(x): ... return "Python" in x ... >>> lineRDD.filter(pythonLine).collect() # 區域性定義的函式 ['high-level APIs in Scala, Java, Python, and R, and an optimized engine that', '## Interactive Python Shell', 'Alternatively, if you prefer Python, you can use the Python shell:']

需要注意的是,當傳遞函式的時候,如果函式是一個物件的成員,或者包含物件中欄位的引用(比如self.field),Spark會將整個物件都傳遞到工作節點,這要會導致傳遞的資訊遠大於我們需要的資訊。有時,如果你的類中含有Python無法pickle的物件,就會導致程式無法執行。
下面給出一個錯誤的例子:

class SearchFunctions(object):
    def __init__(self, query):
        self.query = query
    def isMatch(self, s):
        return self.query in s
    def getMatchesFunctionReference(self, rdd):
        return rdd.filter(self.isMatch)
    def getMatchesMemberReference(self, rdd):
        return rdd.filter(lambda x: self.query in x)

一個正確的例子:

class WordFunctions(object):
    ...
def getMatchesNoReference(self, rdd):
    query = self.query  # 使用區域性變數,避免傳遞整個物件
    return rdd.filter(lambda x: query in x) 

理解閉包

在Spark中,一個難點就是當跨越叢集執行工作節點時,需要正確理解變數和方法的作用域和生命週期。RDD的一些操作可以修改作用域之外的變數,很容易讓人困惑。

下面結合一個例子進行理解。下面這個例子本意是要實現sum的功能,但是當命令不再相同的JVM中執行時,就會出現不一樣的結果。

#這是一個錯誤的例子,不要這樣寫!!!!
>>> data = list(range(10))
>>> counter = 0
>>> rdd = sc.parallelize(data)
>>> def increment_counter(x):
...     global counter
...     counter += x
... 
>>> rdd.foreach(increment_counter)
>>> print("Counter value: ", counter)                                         
Counter value:  0

本地(local)模式 vs. 叢集(cluster)模式:
Spark的本地模式:–master = local[n]
釋出到叢集上的Spark應用: 例如spark-submit to YARN
Spark會將RDD操作的過程分割成不同的tasks,每個task由一個executor執行。在執行之前,Spark會計算task的閉包。閉包就是executor在執行RDD上的計算時可見的變數和方法。閉包會被序列化並傳遞給每個executor。
閉包中傳遞給executor的每個變數會被複制,因此當上述程式中counter在每個foreach函式中被引用後,此時的counter就不再是驅動節點上的counter了。在驅動節點上仍然存在一個counter,但是它是對executors不可見的。Executors只能見到從序列化的閉包中複製的counter。因此,counter最後的值仍然為0。
在本地模式中,如果foreach函式確實是在一個和驅動節點相同的JVM中執行的,那麼將會引用到同一個counter,並能夠真的更新counter的值。
為了確保在上述情況下正確定義RDD的行為,可以使用一個accumulator。Spark中,accumulators能夠提供一個機制,確保命令給分割在不同的工作節點上執行時,變數能夠安全的更新。一般來講,閉包的結構類似於迴圈(loop)和本地定義的方法,不能用於改變全域性變數。

列印RDD的元素:
另一個常見的用法是希望列印RDD中的每個元素。如果使用rdd.foreach(print)或者rdd.map(print)進行列印,在一個計算機上執行時,能夠產生我們希望的結果。然而,在叢集模式下,被executor呼叫的標準輸出會被executor自己的標準輸出代替,和驅動節點上的標準輸出不再相同。為了實現列印功能,可以使用collect()方法,現將RDD傳遞給驅動節點,然後才進行列印,也就是rdd.collect().foreach(print)。但這樣做可能會導致記憶體溢位,因為需要將整個RDD放在一個機器上,如果只需要列印RDD中的部分元素,更為安全的方法是使用take()方法:rdd.take(100).foreach(print)

Shuffle操作

Spark中的操作會觸發一個事件,稱為shuffle。Shuffle是spark中的一個用於資料再分佈的機制,從而實現資料在不同的partitions之間重新分組。這一操作通常需要在不同的executors上覆制資料,因此造成shuffle操作非常複雜和耗時。
應用背景:
為了理解shuffle的工作過程,我們可以結合reduceByKey進行學習。reduceByKey這一操作會將key值相同的值組合成一個tuple,生成一個新的RDD,一個key所有的value都會執行一個reduce函式。這個過程存在的一併不需要個難點和挑戰是,對於一個key,它的所有values並不需要存放在同一個partition中,甚至不需要在同一個machines中,但是必須都能夠被訪問從而計算結果。
在Spark中,要進行一個具體操作時,一般要求資料不要分佈在不同的partitions中。在計算過程中,一個task會在一個partition上執行,因此,為了執行reduceByKey這一單一的reduce操作,Spark需要執行一個all-to-all的操作。Spark會從所有partitions中讀取所有鍵對應的數值,並把他們從不同的partitions中合併到一起,計算每個鍵最後的值,這一過程就叫做shuffle。
雖然經過shuffle後,partition中元素的集合會被唯一確定,而且partitions的順序也會被唯一確定,但是其中的元素的順序並不確定。如果我們希望知道經過shuffle後資料的順序可以預知,則可以使用以下操作:
利用mapPartitions對每個partition進行排列,例如sorted
利用repartitionAndSortWithPartitions,在進行repartitioning的同時,高效的進行排列
利用sortBy,產生一個全域性順序的RDD
能夠引發shuffle的操作包括repartition操作(例如repartition和coalesce),ByKey操作(例如groupByKey和reduceByKey,但是不包括counting),和join操作(例如cogroup和join)。

效能影響Performance Impact
Shuffle是一種非常昂貴的操作,因為它涉及到硬碟的I/O,資料序列化和network I/O。為了組織資料進行Shuffle,Spark需要產生一系列的tasks,包括map操作進行組織,一系列reduce操作進行聚集。

Shuffle操作會消耗很多堆記憶體空間,因為這一過程需要利用記憶體中的資料結果,從而在資料transfer之前或者之後進行記錄。如果資料超出記憶體容量,Spark會將表格溢位到硬碟上,造成額外的硬碟I/O操作和垃圾收集。Shuffle還會在硬碟上生成大量的中間檔案。在Spark 1.3中,這些檔案會一直被儲存,知道RDD不再使用並且垃圾已經被收集。垃圾收集只有在很長一段時間後才會發生,如果應用還需要訪問這些RDDs,這意味著長時間執行的Spark工作需要消耗大量的磁碟空間。在配置Spark Context過程中,臨時儲存目錄儲存在spark.local.dir這一配置引數中。可以通過調整配置引數來調製shuffle行為。

RDD持久化(RDD Persistence)

Spark一個重要的功能是可以在進行不同操作過程中,在記憶體中持久化(persisting或者caching)資料。當持久化一個RDD後,每個節點都會將記憶體中計算得到的partitions進行儲存,需要進行其他操作時再次使用。這樣能夠提升操作的速度。Caching是迭代演算法的一個重要工具。
可以利用persist()或者cache()方法進行RDD的持久化操作。當RDD第一次執行action時,就會被儲存在node中。Spark的cache方法可以容錯,如果RDD的一個partition丟失了,它能夠自動利用當初建立它的transformations重新計算。
此外,每個持久化的RDD能夠被儲存在不同的儲存層,比如,循序我們在硬碟上暫存一個RDD,也可以在記憶體中暫存一個RDD,在其他節點複製RDD。這些不同的儲存層次可以通過向persist()傳遞一個StorageLevel的物件進行設定。而cache方法只能夠使用預設的儲存層次,為StorageLevel.MEMORY_ONLY。儲存層次分別有以下幾種:

儲存層次 含義
MEMORY_ONLY 將RDD以反序列化的Java物件儲存在JVM中。如果RDD的大小與記憶體大小不匹配,一部分partitions將不會被快取,而是在需要使用它們的時候重新進行計算。這是預設的儲存層次。
MEMORY_AND_DISK 將RDD以反序列化的Java物件儲存在JVM中。如果RDD的大小與記憶體大小不匹配,將超出記憶體容量的partitions儲存在硬碟上,在需要使用它們時從硬碟上讀取。
MEMORY_ONLY_SER(Java和Scala) 將RDD以序列化的Java物件儲存(每個partition為1byte的陣列)。這種形式相比於反序列化的儲存方式更加節省空間,特別是在使用fast serializer時。但是需要佔用更多的CPU進行讀取。
MEMORY_AND_DISK_SER(Java和Scala) 和MEMORY_ONLY_SER相似,但是會將溢位記憶體的部分partitions儲存在硬碟上,而不是在需要使用的時候重新計算。
DISK_ONLY 將RDD僅儲存在硬碟上。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 和上述的儲存層次相同,但是會將每個partition複製在兩個叢集節點上。
OFF_HEAP(試驗階段) 和MEMORY_ONLY_SER相似,但是資料儲存在外堆記憶體中。這需要系統對外堆儲存的支援。

注意在python中,被儲存的物件總會通過pickle library進行序列化,因此無論是否選擇了序列化的儲存層次都沒有關係。在python中支援的儲存層次包括MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, DISK_ONLY, and DISK_ONLY_2。

及時使用者沒有呼叫persist(),Spark也會在Shuffle操作(例如reduceByKey)中自動快取一些中間資料。這一機制可以確保shuffle過程中,在某個節點出現故障時無需對整個輸入進行重新計算。

儲存層次的選擇依據:
Spark的不同儲存層次的選擇需要權衡記憶體佔用和CPU的執行效率。可以根據以下步驟對儲存層次進行選擇:
1.如果RDD適合按照預設儲存層次(MEMORY_ONLY)進行儲存,則不做改變。這一方式的CPU執行效率最高。
2.如果不適合,嘗試使用 MEMORY_ONLY_SER,並選擇一個快速序列化的庫(fast serialization library),從而更加節省儲存空間,前提保證合理的讀取速度。(適用於Java和Scala)
3.儘量不要使用硬碟,除非計算資料集的函式非常複雜,或者他們需要過濾大量的資料。否則,從新計算partitions的速度還是要比從硬碟讀取RDD更快。
4.如果希望具備快速的故障恢復功能(比如需要使用Spark進行網頁應用層序的請求服務),可以使用複製儲存的方式。Spark的所有儲存層次都具備容錯功能,但是複製RDDs能夠使我們無需等待重新計算丟失的partitions就可以繼續在RDD上執行任務。

刪除快取出RDD
Spark能夠自動監測內個節點上cache的使用狀況,從而按照least-recently-used模式自動丟棄舊的資料分割槽。如果希望手動刪除RDD,可以使用RDD.unpersist()方法。