1. 程式人生 > >Spark分散式計算和RDD模型研究

Spark分散式計算和RDD模型研究

1背景介紹

現今分散式計算框架像MapReduceDryad都提供了高層次的原語,使使用者不用操心任務分發和錯誤容忍,非常容易地編寫出平行計算程式。然而這些框架都缺乏對分散式記憶體的抽象和支援,使其在某些應用場景下不夠高效和強大。RDD(Resilient Distributed Datasets彈性分散式資料集)模型的產生動機主要來源於兩種主流的應用場景:

Ø迭代式演算法:迭代式機器學習、圖演算法,包括PageRankK-means聚類和邏輯迴歸(logistic regression)

Ø互動式資料探勘工具:使用者在同一資料子集上執行多個Adhoc查詢。

不難看出,這兩種場景的共同之處是:在多個計算或計算的多個階段間,重用中間結果

。不幸的是,在目前框架如MapReduce中,要想在計算之間重用資料,唯一的辦法就是把資料儲存到外部儲存系統中,例如分散式檔案系統。這就導致了巨大的資料複製、磁碟I/O、序列化的開銷,甚至會佔據整個應用執行時間的一大部分。

為了解決這種問題,研究人員為有這種資料重用需要的應用開發了特殊的框架。例如將中間結果儲存在記憶體中的迭代式圖計算框架Pregel。然而這些框架只支援一些特定的計算模式,而沒有提供一種通用的資料重用的抽象。於是,RDD橫空出世,它的主要功能有:

Ø高效的錯誤容忍

Ø中間結果持久化到記憶體的並行資料結構

Ø可控制資料分割槽來優化資料儲存

Ø豐富的操作方法

對於設計RDD來說,最大的挑戰在於如何提供高效的錯誤容忍(fault-tolerance)

。現有的叢集上的記憶體儲存抽象,如分散式共享記憶體、key-value儲存、記憶體資料庫以及Piccolo等,都提供了對可變狀態(如資料庫表裡的Cell)的細粒度更新。在這種設計下為了容錯,就必須在叢集結點間進行資料複製(data replicate)或者記錄日誌。這兩種方法對於資料密集型的任務來說開銷都非常大,因為需要在結點間拷貝大量的資料,而網路頻寬遠遠低於RAM

與這些框架不同,RDD提供基於粗粒度轉換(coarse-grained transformation)的介面,例如mapfilterjoin,能夠將同一操作施加到許多資料項上。於是通過記錄這些構建資料集(lineage世族)

的粗粒度轉換的日誌,而非實際資料,就能夠實現高效的容錯。當某個RDD丟失時,RDD有充足的關於丟失的那個RDD是如何從其他RDD產生的資訊,從而通過重新計算來還原丟失的資料,避免了資料複製的高開銷。

儘管基於粗粒度轉換的介面第一眼看起來有些受限、不夠強大,但實際上RDD卻能很好地用於許多平行計算應用,因為這些應用本身自然而然地就是在多個數據項上運用相同的操作。事實上,RDD能夠高效地表達許多框架的程式設計模型,如MapReduceDryadLINQSQLPregelHaLoop,以及它們處理不了的互動式資料探勘應用。

2 RDD簡介

2.1概念

RDD是一種只讀的、分割槽的記錄集合。具體來說,RDD具有以下一些特點:

Ø建立:只能通過轉換(transformation,如map/filter/groupBy/join等,區別於動作action)從兩種資料來源中建立RDD1)穩定儲存中的資料;2)其他RDD

Ø只讀:狀態不可變,不能修改

Ø分割槽:支援使RDD中的元素根據那個key來分割槽(partitioning),儲存到多個結點上。還原時只會重新計算丟失分割槽的資料,而不會影響整個系統。

Ø路徑:在RDD中叫世族或血統(lineage),即RDD有充足的資訊關於它是如何從其他RDD產生而來的。

Ø持久化:支援將會·被重用的RDD快取(in-memory或溢位到磁碟)

Ø延遲計算:像DryadLINQ一樣,Spark也會延遲計算RDD,使其能夠將轉換管道化(pipeline transformation)

Ø操作:豐富的動作(action)count/reduce/collect/save等。

關於轉換(transformation)與動作(action)的區別,前者會生成新的RDD,而後者只是將RDD上某項操作的結果返回給程式,而不會生成新的RDD


2.2例子

假設網站中的一個WebService出現錯誤,我們想要從數以TBHDFS日誌檔案中找到問題的原因,此時我們就可以用Spark載入日誌檔案到一組結點組成叢集的RAM中,並互動式地進行查詢。以下是程式碼示例:


首先行1HDFS檔案中創建出一個RDD,而行2則衍生出一個經過某些條件過濾後的RDD。行3將這個RDD errors快取到記憶體中,然而第一個RDD lines不會駐留在記憶體中。這樣做很有必要,因為errors可能非常小,足以全部裝進記憶體,而原始資料則會非常龐大。經過快取後,現在就可以反覆重用errors資料了。我們這裡做了兩個操作,第一個是統計errors中包含MySQL字樣的總行數,第二個則是取出包含HDFS字樣的行的第三列時間,並儲存成一個集合。


這裡要注意的是前面曾經提到過的Spark的延遲處理。Spark排程器會將filtermap這兩個轉換儲存到管道,然後一起傳送給結點去計算。

2.3優勢

RDDDSM(distributed shared memory)的最大不同是:RDD只能通過粗粒度轉換來建立,而DSM則允許對每個記憶體位置上資料的讀和寫。在這種定義下,DSM不僅包括了傳統的共享記憶體系統,也包括了像提供了共享DHT(distributed hash table)Piccolo以及分散式資料庫等。所以RDD相比DSM有著下面這些優勢:

Ø高效的容錯機制:沒有檢查點(checkpoint)開銷,能夠通過世族關係還原。而且還原只涉及了丟失資料分割槽的重計算,並且重算過程可以在不同結點並行進行,而無需回滾整個系統。

Ø結點落後問題的緩和(mitigate straggler)RDD的不可變性使得系統能夠執行類似MapReduce備份任務,來緩和慢結點。這在DSM系統中卻難以實現,因為多個相同任務一起執行會訪問同樣的記憶體資料而相互干擾。

Ø批量操作:任務能夠根據資料本地性(data locality)被分配,從而提高效能。

Ø優雅降級(degrade gracefully):當記憶體不足時,大分割槽會被溢位到磁碟,提供與其他現今的資料平行計算系統類似的效能。

2.4應用場景

RDD最適合那種在資料集上的所有元素都執行相同操作的批處理式應用。在這種情況下,RDD只需記錄世族圖譜中的每個轉換就能還原丟失的資料分割槽,而無需記錄大量的資料操作日誌。所以RDD不適合那些需要非同步、細粒度更新狀態的應用,比如Web應用的儲存系統,或增量式的Web爬蟲等。對於這些應用,使用具有事務更新日誌和資料檢查點的資料庫系統更為高效。

3 RDD表現形式

3.1深入RDD

使用RDD作為抽象的一個挑戰就是:選擇一種合適的表現形式,來追蹤橫跨眾多轉換的RDD世族關係。在Spark中,我們使用一種簡單的、基於圖的表現形式,使得Spark在無需為每個轉換都增加特殊的處理邏輯的情況下,就能支援大量的轉換型別,這大大簡化了系統的設計。

總的來說,對於每個RDD都包含五部分資訊,即資料分割槽的集合,能根據本地性快速訪問到資料的偏好位置,依賴關係,計算方法,是否是雜湊/範圍分割槽的元資料:


Spark中內建的幾個RDD舉例來說:

資訊/RDD

HadoopRDD

FilteredRDD

JoinedRDD

Partitions

每個HDFS塊一個分割槽,組成集合

與父RDD相同

每個Reduce任務一個分割槽

PreferredLoc

HDFS塊位置

(或詢問父RDD)

Dependencies

(RDD)

與父RDD一對一

對每個RDD進行混排

Iterator

讀取對應的塊資料

過濾

聯接混排的資料

Partitioner

HashPartitioner

3.2工作原理

在瞭解了RDD的概念和內部表現形式之後,那麼RDD是如何執行的呢?總高層次來看,主要分為三步:建立RDD物件,DAG排程器建立執行計劃,Task排程器分配任務並排程Worker開始執行。


以下面一個按A-Z首字母分類,查詢相同首字母下不同姓名總個數的例子來看一下RDD是如何執行起來的。


步驟1:建立RDD上面的例子除去最後一個collect是個動作,不會建立RDD之外,前面四個轉換都會創建出新的RDD。因此第一步就是建立好所有RDD(內部的五項資訊)

步驟2:建立執行計劃。Spark會盡可能地管道化,並基於是否要重新組織資料來劃分階段(stage),例如本例中的groupBy()轉換就會將整個執行計劃劃分成兩階段執行。最終會產生一個DAG(directed acyclic graph,有向無環圖)作為邏輯執行計劃。


步驟3:排程任務。將各階段劃分成不同的任務(task),每個任務都是資料和計算的合體。在進行下一階段前,當前階段的所有任務都要執行完成。因為下一階段的第一個轉換一定是重新組織資料的,所以必須等當前階段所有結果資料都計算出來了才能繼續。

假設本例中的hdfs://names下有四個檔案塊,那麼HadoopRDDpartitions就會有四個分割槽對應這四個塊資料,同時preferedLocations會指明這四個塊的最佳位置。現在,就可以創建出四個任務,並排程到合適的叢集結點上。


3.3混排

(待補充:關於混排(Shuffle)是如何執行的)

3.4寬窄依賴

在設計RDD的介面時,一個有意思的問題是如何表現RDD之間的依賴。在RDD中將依賴劃分成了兩種型別:窄依賴(narrow dependencies)和寬依賴(wide dependencies)。窄依賴是指RDD的每個分割槽都只被子RDD的一個分割槽所使用。相應的,那麼寬依賴就是指父RDD的分割槽被多個子RDD的分割槽所依賴。例如,map就是一種窄依賴,而join則會導致寬依賴(除非父RDDhash-partitioned,見下圖)


這種劃分有兩個用處。首先,窄依賴支援在一個結點上管道化執行。例如基於一對一的關係,可以在filter之後執行map。其次,窄依賴支援更高效的故障還原。因為對於窄依賴,只有丟失的父RDD的分割槽需要重新計算。而對於寬依賴,一個結點的故障可能導致來自所有父RDD的分割槽丟失,因此就需要完全重新執行。因此對於寬依賴,Spark會在持有各個父分割槽的結點上,將中間資料持久化來簡化故障還原,就像MapReduce會持久化map的輸出一樣。

4內部實現

4.1排程器

Spark的排程器類似於Dryad的,但是增加了對持久化RDD分割槽是否在記憶體裡的考慮。重溫一下前面例子裡介紹過的:排程器會根據RDD的族譜創建出分階段的DAG;每個階段都包含儘可能多的具有窄依賴的變換;具有寬依賴的混排操作是階段的邊界;排程器根據資料本地性分派任務到叢集結點上。

4.2直譯器整合

(待補充)

4.3記憶體管理

Spark支援三種記憶體管理方式:Java物件的記憶體儲存,序列化資料的記憶體儲存,磁碟儲存。第一種能提供最快的效能,因為JVM能夠直接訪問每個RDD物件。第二種使使用者在記憶體空間有限時,能選擇一種比Java物件圖更加高效的儲存方式。第三種則對大到無法放進記憶體,但每次重新計算又很耗時的RDD非常有用。

同時,當有新的RDD分割槽被計算出來而記憶體空間又不足時,Spark使用LRU策略將老分割槽移除到磁碟上。

4.4檢查點支援

儘管RDDLineage可以用來還原資料,但這通常會非常耗時。所以將某些RDD持久化到磁碟上會非常有用,例如前面提到過的,寬依賴的中間資料。對於Spark來說,對檢查點的支援非常簡單,因為RDD都是不可變的。所以完全可以在後臺持久化RDD,而無需暫停整個系統。

5高階特性

(待補充:Broadcast…)

6參考資料

本文內容主要來源於:1RDD論文《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》;2Spark峰會ppt資料:《A-Deeper-Understanding-of-Spark-Internals》和《Introduction to Spark Internals》。感興趣的可以自行查詢。