1. 程式人生 > >彈性分散式資料集:基於記憶體叢集計算的容錯抽象

彈性分散式資料集:基於記憶體叢集計算的容錯抽象

摘要

       我們提出的彈性分散式資料集(RDDs),是一個讓程式設計師在大型叢集上以容錯的方式執行基於記憶體計算的分散式記憶體抽象。RDDs受啟發於兩類使用當前計算框架處理不高效的應用:迭代演算法和互動式資料探勘工具。這二者在記憶體中儲存資料效能能夠提高一個數量級。為了有效容錯,RDDs提供一種受限的共享記憶體,基於粗粒度轉換(transformations)而非細粒度地更新共享狀態。儘管如此,RDDs仍足以表達許多型別的計算,包括最近專門用於迭代作業的程式設計模型(如Pregel)以及這些模型無法表示的新應用。我們已經在Spark系統中實現了RDDs。

1 引言

       像MapReduce和Dryad這樣的叢集計算框架已經廣泛應用於大規模資料分析。這些系統讓使用者使用一系列高階運算元寫平行計算,而不用擔心工作分配和容錯。 
       雖然現有框架為訪問叢集計算資源提供了大量抽象,但是他們缺少利用分散式記憶體的抽象。這使得它們對那些跨多個計算重用中間結果的新興應用效率不高。資料重用普遍存在於很多迭代的機器學習和影象演算法中,包括PageRank,K-means聚類和logistic迴歸。另一個引人注目的用例是互動式資料探勘,即使用者在同樣的資料子集上執行多個即席查詢。不幸的是,大多數現有框架中,在計算之間(比如在兩個MapReduce作業之間)重用資料的唯一方式是把它寫到外部穩定儲存系統(如分散式檔案系統)。這種方式會因為資料複製,磁碟I/O和序列化而帶來巨大開銷,這直接影響了應用的執行時間。 
       由於認識到這個問題,研究人員為一些需要資料重用的應用開發出了專門的框架。例如,Pregel是為迭代的影象計算而在記憶體中儲存中間資料的系統;而HaLoop則提供一個迭代的MapReduce介面。但這些框架只提供了特定的計算模型,並對這些模式隱式地進行資料共享。它們沒有為更普遍的重用提供抽象,如讓使用者把幾個資料集載入到記憶體中,在它們上執行即席查詢。 
       這篇論文中,我們提出一個新的抽象,叫彈性分散式資料集(RDDs),它讓資料重用在廣泛的應用上都是高效的。RDDs是容錯的並行資料結構,它讓使用者顯示地在記憶體中儲存中間結果,控制它們的分割槽以優化資料佈局,用豐富的運算元去操作它們。(注:這裡的它們就是中間結果)

 
       在設計RDDs時主要的挑戰是:設計一個能有效提供容錯性的程式設計介面。現有的對叢集上基於記憶體儲存的抽象(如分散式共享記憶體、key-value stores、資料庫、Piccolo)提供了基於細粒度更新可變狀態(比如表中的元素)的介面。這個介面提供容錯的僅有的方式是跨機器複製資料或跨機器記錄更新。這兩個方法對資料密集型工作負載代價很大,因為它們需要在叢集網路上拷貝大量資料,它的頻寬遠比RAM的要小,並且導致嚴重的儲存開銷。 
       和這些系統對比,RDDs提供基於粗粒度轉換(map、filter和join等)的介面。這允許它們通過記錄用於構建資料集(它的lineage
)的轉換(transformations)去有效地提供容錯性。如果RDD的分割槽丟失了,那個RDD會有足夠的資訊從其他RDDs衍生去重新計算剛才那個分割槽。從而,丟失的資料通常可以很快被恢復,且不需要花銷很大的複製。(注:粗粒度是應用於整個資料集,如上述map、filter;細粒度是指應用於單行,如get(index)和set(index)。) 
       即便是基於粗粒度轉換的介面在最初也會受限,RDDs很適合很多並行化應用,因為這些應用天然適合在多個數據項上應用相同的操作。RDDs能高效地表示很多已經提出的作為獨立系統的叢集程式設計模型,包括MapReduce、Haloop等,也包括這些系統不能很好地應用的新應用程式,如互動式資料探勘。 
       我們已經在Spark的系統中實現了RDDs,Spark系統在UC Berkeley和個別公司被用於研究、生產應用。Spark提供一個方便語言整合的程式設計介面,介面是Scala語言編寫的。另外,Spark可以用來從Scala直譯器上互動式地查詢大資料集。Spark允許通用程式語言以互動式速度用於叢集上基於記憶體的資料探勘,我們相信Spark是第一個這樣的系統。 
       我們通過微基準和使用者應用程式的測試評估RDDs。發現,對於迭代應用,Spark比Hadoop快20倍;實際資料分析類報表速度提升40倍;Spark用於互動式查詢1TB的資料集只有5至7秒的延遲。為了說明RDDs的普適性,我們已經在Spark之上實現了Pregel和HaLoop程式設計模型,包括以相對較小的庫的形式提供了它們使用的位置優化策略(每個200行程式碼)。 
       本文先分別在2和3節概述RDDs和Spark。然後,在第4節討論RDDs的內部表示,第5節是RDDs內部的實現,第6節是實驗結果。最後,在第7節討論RDDs是怎樣引起幾個現有的叢集程式設計模型興趣的,第8節研究相關工作,最後是結論。

2 彈性分散式資料集(RDDs)

       這節對RDDs進行概述。先在2.1節給RDDs下定義,2.2節闡述Spark中RDDs的程式設計介面,然後在2.3節比較RDDs和細粒度共享記憶體抽象,最後在2.4節討論RDD模型的侷限性。

2.1 RDD抽象

       RDD是一個只讀的、分割槽記錄的集合。RDDs能通過通過以下兩種方式建立: 
       (1)穩定儲存中的資料注:等動手寫程式時就知道原因,如從HDFS中用sc.textFile(path)讀檔案就能建立) 
       (2)其他RDDs 
我們稱這些操作為轉換(transformations),以把它們和其他RDDs上的操作(actions)區分開。map、filter、join都屬於transformations。 
       RDDs任何時候都不需要物化(真正執行操作後存入穩定儲存)。相反,一個RDD有足夠多的關於它是怎樣從其他資料集衍生出(它的Lineage)的資訊,據此從穩定儲存中的資料計算出它的分割槽。這是一個強有力的特性:從本質上說,如果故障後無法重建RDD,程式就不能引用該RDD。(有點繞,其實就是說RDD能夠被重建是個很重要的特性,RDD有很強大的容錯機制) 
       最終,使用者能控制RDDs的兩個其他方面:持久化和分割槽。使用者可以表明他們將重用那個RDDs併為它們選擇儲存策略(如in-memory 儲存)。他們也可以讓一個RDD的元素基於每個記錄的key跨機器分割槽。這對位置優化是有用的,如確定兩個將要join的資料集是以相同方式進行雜湊分割槽的。

2.2 Spark程式設計介面

       Spark通過一個語言整合API暴露RDD,API中每個資料集被表示為一個物件,在這些物件上transformations被使用方法呼叫。 
       程式設計師程式設計的第一步是通過在穩定儲存中的資料呼叫transformations來定義RDDs(如map和filter)。然後他們可以對這些RDDs呼叫actions,actions是給應用程式返回值或者把資料匯出到儲存系統的操作。count、collect、save都屬於actions。Spark只有在第一次呼叫action時才會真正計算RDDs,在此之前進行的transformations都是惰性計算,這樣能對transformations進行並行流水線化(pipeline)。 
       另外,程式設計師可以呼叫persist方法表明在以後的操作中他們想用哪個RDDs。Spark預設在記憶體中持久化RDDs,但是如果沒有足夠的RAM將會把多餘的存入磁碟。使用者也可以請求其它的持久化策略,如只在磁碟上儲存RDD,或跨叢集複製RDD,通過flags進行persist。最後,使用者可以在每個RDD上設定持久化優先順序去指定哪些記憶體的資料應該優先被存入磁碟。

2.2.1 示例:控制檯日誌挖掘

       假設有一個web服務出錯了,操作員想從儲存在HDFS中的TB級的日誌中找出原因。通過使用Spark,操作員只需從日誌中把剛才那個錯誤的資訊載入到一組節點的RAM,並互動式地查詢它們。她將先寫出下面的Scala程式碼:

lines =spark.textFile("hdfs://...")
errors = lines.filter(_.startWith("ERROR"))
errors.persist()
 

   

 

 

 

 

   第一行通過HDFS檔案定義了一個RDD(即是文字形式的lines的集合),第二行對lines進

行過濾得到一個過濾後的RDD,第三行將errors存入記憶體以便查詢中共享。值得注意的是,在Scala語法中filter的引數是一個閉包。 
       此時,叢集上並沒有執行任何工作。但是,使用者可以對該RDD執行動作(actions),如統計資訊條數:

errors.count()

       使用者也可以在該RDD上進一步執行transformations,並使用轉換後的結果,如下:

//統計errors中涉及MySQL的行數:

//統計errors中涉及MySQL的行數:
errors.filter(_.contains("MySQL")).count()

//以陣列的形式返回errors中涉及HDFS的時間欄位
//(假設時間是'\t'分隔的number為3的欄位)
errors.filter(_.contains("HDFS"))
      .map(_split('\t')(3))
      .collect()
 

 

 

 

 

 

 

 

   在errors的第一個action執行後,Spark將在記憶體中儲存errors的分割槽,極大地加速了後續計算。值得注意的是,最初的RDD(lines)沒有被快取。這很合理,因為錯誤資訊可能只是資料的一小部分(小到足以存入記憶體)。 
       最後,為了說明我們的模型怎樣容錯,我們在圖1中展示了第三次查詢中的RDDs的血緣(lineage)圖。這次查詢,在lines上進行filter得到errors,然後在errors上進一步應用filter和map,之後是collect。Spark排程器將並行流水線化後兩個transformations,給擁有errors的分割槽快取的節點發送任務集去計算。另外,如果errors的一個分割槽丟失了,Spark可以僅在lines相應的分割槽上應用過濾器來重建該分割槽。 


 
方框代表RDDs,箭頭代表transformations 

2.3 RDD模型的優勢

       為了明白RDDs作為分散式記憶體抽象的好處,我們在表1中列出了RDDs與分散式共享記憶體(DSM)的對比。DSM系統中,應用在全域性地址空間任意位置讀寫。值得注意的是,在這種定義下,DSM不僅包括傳統的共享記憶體系統,還包括其他採用細粒度寫共享狀態的系統,提供共享的DHT的Piccolo和分散式資料庫。DSM是非常通用的抽象,這種通用性使它難以在商業叢集上實現高效率和容錯性。

方面

RDDs

分散式共享記憶體

粗/細粒度

細粒度

粗粒度

細粒度

一致性

不重要(不可變)

取決於app/執行時

故障恢復

細粒度,使用Lineage的的開銷

需要檢查點和程式回滾

落後任務降災

可以使用任務備份

難以實現

任務安排

基於資料本地化自動分配

取決於app(執行時實現透明性)

記憶體不足時行為

類似於已有資料流系統

效能差(交換?)

表1:RDDs與DSM的對比


       RDDs和DSM主要的區別在於RDDs只能通過粗粒度transformations建立(“written”),而DSM允許讀寫每個儲存單元。這雖然限制了RDDs只讀,但允許RDDs有更高效的容錯。尤其,RDDs不需要檢查點的開銷,因為它們可以使用lineage恢復。此外,只有RDD丟失的分割槽才需要重新計算,並且它們可以在不同節點上平行計算,而不用回滾整個程式。 
       RDDs的第二個好處是它們的不可變性使系統能夠執行類似MapReduce的備份任務來緩和慢節點。用DSM難以實現備份任務,因為一個任務的兩個副本將訪問相同的記憶體位置,相互干擾更新。 
       RDDs相比於DSM還提供了另外兩個好處。第一, 在對RDDs的批量操作中,執行時會基於資料本地化去排程任務以提高效能。第二,僅僅當它們用於基於掃描的操作時,RDDs在記憶體不足以儲存它們時會優雅降級,記憶體存不下的分割槽會存在磁碟中,此時與現有的資料並行系統性能相當。

2.4 不適合RDDs的應用

       就像引言中討論的,RDDs最適合批處理應用,批處理應用就是對一個數據集的所有元素執行相同的操作。這種情況下,RDDs能夠有效地記住每個transformation,每個transformation是lineage圖中的一個步驟,並且,不需要記錄大量資料就能夠恢復丟失分割槽。RDDs不太適合那些對共享狀態進行非同步的細粒度的更新,如web應用的儲存系統或web爬蟲增量抓取器。對於這些應用,執行傳統的更新日誌和資料檢查點操作更加高效,例如資料庫等。我們的目標是給批處理分析提供高效程式設計模型,把這些非同步應用程式留給專門的系統。

3 Spark程式設計介面

       Spark給RDD抽象提供了一個用Scala編寫了語言整合API。Scala是在JVM上的靜態型別函數語言程式設計語言。我們選擇Scala是因為它簡潔的組合(便於互動式使用)和效率(由於靜態型別)。然而,RDD抽象並不是一定要用函式式語言。 
       為了使用Spark,開發者寫一個驅動程式(driver program)連線wokers叢集,如圖2所示。driver定義一或多個RDDs,並在RDDs上呼叫action。在driver上的Spark程式碼還會追蹤這些RDDs的lineage。wokers是長期執行的進行,能在記憶體中儲存RDD分割槽。 



Spark執行態。使用者的驅動程式啟動多個worker,worker從分散式檔案系統讀取資料塊,並在記憶體中儲存已計算的RDD分割槽* 


       就像在2.2.1節日誌挖掘例項中所展現的,使用者給RDD操作(如map)傳參是傳遞閉包(字面函式)。Scala用一個java物件代表每個閉包,這些物件可以被序列化,可以通過網路中傳遞閉包載入到另一個節點上。Scala也把閉包中的變數約束儲存為Java物件中的欄位。例如,一個人可以寫下面的程式碼去對RDD中每個元素加5。

var x = 5
rdd.map(_ + x)

 

 

 

       RDDs本身是元素型別引數化的靜態型別物件。例如,RDD[Int]是整數的RDD。由於Scala支援型別推斷,所以大多數例子中都省略了型別。 
       雖然用Scala實現RDDs在概念上很簡單,但是我們必須使用反射解決Scala閉包物件的問題。通過Scala直譯器去使用用Spark,我們還需要做很多的工作,我們將在5.2節討論。但我們並不需要去修改Scala編譯器。

3.1 Spark中的RDD操作

       表2列出了Spark中主要的RDD的transformations和actions操作。每個操作都給出了表示,中括號表示型別引數。前面說過transformations是定義新RDD的惰性操作,而actions開始真正的計算並返回一個值給程式或是把資料寫到外部儲存。 


 
Seq[T]是型別為T的元素序列 

3.2 應用例項

       我們對2.21節中的資料探勘示例補充兩個迭代應用:logistic迴歸和網頁排名。之後說明怎樣控制RDDs的分割槽能提高效能。

3.2.1 logistic迴歸

       很多機器學習演算法都具有迭代特性,因為他們需要執行迭代優化程式去最大化一個函式,如梯度下降法。它們的資料儲存在記憶體中會讓它們執行更快。 
       下面的程式實現了logistic迴歸。logistic迴歸是一個常見的用於尋找一個能最佳分割兩組點(如垃圾郵件和非垃圾郵件)的超平面w的經典演算法。演算法使用梯度下降法:w開始時是隨機值,每一次迭代,對w的函式求和,使w朝著優化的方向移動。

val points = spark.textFile(...)
                  .map(parsePoint).persist()
var w = //隨機初始向量for(i <- 1to ITERATIONS){
  val gradient = points.map{p =>
    p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
  }.reduce((a,b) => a+b)
  w -= gradient
}
 

 

 

 

 

 

 

       我們一開始定義了一個快取RDD——points,作為在一個文字檔案上呼叫map轉換的結果,即把文字的每行都解析為Point物件。然後在points上執行mapreduce來計算梯度,在每一步對當前w的函式求和。把points儲存在記憶體中迭代能提高20倍的速度,我們將在6.1節可以看到。

3.2.2 網頁排名

       在網頁排名中會出現更加複雜的資料共享模式。演算法通過把鏈向每個頁面的所有頁面的貢獻值(contributions)加起來,迭代地更新每個頁面的rank。在每次迭代過程中,每個頁面給周圍頁面傳送r/n個contribution,這裡r是它的秩,n是周圍檔案的個數。接下來,它把它的秩更新為αN+(1−α)ciαN+(1−α)∑ci,其中求和是對它接收到的貢獻值,N是頁面個數。在Spark中我們可以把網頁排名寫成如下程式碼:

val links = spark.textFile(...).map(...).persist()
var ranks = //(URL, rank)對的RDDfor(i <- 1 to ITERATIONS){
//根據每個頁面傳送過來的貢獻值建立(targetURL, float)對的RDD,
val contibs = links.join(ranks).flatMap{
  (url, (links, rank)) =>
  links.map(dest => (dest, rank/links.size))
  }
  //根據URL對貢獻值求和,並獲取新的秩
  ranks = contibs.reduceByKey((x,y) => x+y)
                 .mapValues(sum => a/N+(1-a)*sum)
}
 















 

 

 


       這個程式會生成如圖3中的RDD血緣圖。每一步迭代過程,我們都會基於上一個迭代器的contribs、ranks和靜態的links集合建立一個新秩集合(秩更新了)。這個圖有一個特徵:隨著迭代次數的增加,圖會變得越來越長。因此,在一個擁有很多迭代步驟的作業中,有必要可靠複製ranks的一些版本以減少故障恢復次數。使用者可以呼叫帶可靠標識的persist來達到目的。值得注意的是,links集合不需要被複制,因為它的分割槽可以通過在輸入檔案塊上重新執行map操作來重建。這個資料集將比ranks大,因為每個檔案有很多連結但只有一定數量會作為它的秩,以致於在系統上使用lineage比檢查點檢查程式整個記憶體狀態來恢復要更快。 
       最後,我們可以通過控制RDDs的分割槽來優化網頁排名中的通訊。如果我們為links指定一個分割槽方式(如根據URL對link列表跨節點雜湊分割槽),我們則可以以相同的方式對ranks分割槽,並確保links和ranks的join操作不需要通訊(因為每個URL的秩將會與它的link列表在相同的機器上)。我們也可以寫一個Partitioner類把相互連線的頁面聚在一起(比如按域名對URL分割槽)。這兩種優化可以表示為在定義links時呼叫partitionBy:

links = spark.textFile(...).map(...)
.partitionBy(myPartFunc).persist()​​​​​​​
  •        該初始化呼叫後,links和ranks的join操作將自動把每個URL的貢獻值聚合到它link列表所在的機器上,在那計算它的新秩,並join它和它的links。這類多次迭代的一致性分割槽是指定框架(Pregel)中主要優化方式之一。RDDs讓使用者直接表達他的目的。

4 代表RDDs

       把RDDs作為一個抽象會有一個問題:為它們選擇一個代表在廣泛的transformations中能追蹤血緣。理想情況下,一個實現RDDs的系統應該提供儘可能豐富的轉換運算元,並讓使用者以任意方式組合它們。我們為RDDs提出一個簡單的基於圖的代表以達到這些目的。我們已經在Spark中使用這個代表去支援廣泛的transformations,而且不會為任何transformation去給排程器新增特殊邏輯,這極大地簡化了系統設計。 
       簡而言之,我們了一個通用介面去代表每個RDD,該介面表達五種資訊:

  • 一組分割槽(partitons),資料集的原子組成
  • 一組父RDDs上的依賴(dependencies
  • 一個基於父資料集計算的函式
  • 分割槽策略的元資料
  • 資料位置策略

例如,一個RDD表示一個HDFS檔案的每個塊都有一個分割槽,並且知道每個塊在哪臺機器上。同時,在這個RDD上執行map操作後的結果分割槽不變。我們把這個介面總結在表3中。

操作

含義

partitions()

返回Partiton物件的列表

preferredLocations(p)

列出p分割槽由於資料區域性性可以被快速訪問的節點

dependencies()

返回依賴列表

iterator(p, parentIters)

根據為父分割槽指定的迭代器,逐個計算p分割槽的元素

partitioner()

返回RDD是否是hash/range分割槽的元資料資訊

表3:Spark中用於表示RDDs的介面

       在設計這個介面時最有趣的問題是怎樣在RDDs間表示依賴。我們發現把依賴分成兩類足夠了,並且很有用。

  • 一類是窄依賴。父RDD的每個分割槽被子RDD的至多一個分割槽使用。
  • 一類是寬依賴。多個子分割槽依賴於一個父分割槽。

例如,map操作會發生窄依賴,join操作發生寬依賴(除非父RDD是雜湊分割槽)。圖4顯示了其他例子。 


 
寬窄依賴的例子。每個空心框代表一個RDD,裡面的陰影矩陣代表分割槽。 


       這個區別是有用的,有兩個原因。第一,窄依賴考慮到了一個節點上的流水線執行。例如,在一個個元素上先應用filter然後map。相反,寬依賴需要父分割槽的所有資料可用,並用一個類似於MapReduce的操作在節點間shuffle。第二,窄依賴使節點故障後恢復更有效,因為只有丟失的父分割槽需要被重新計算,並且可能在不同節點上平行計算。相反,在寬依賴的血緣圖中,單一的故障節點可能導致一個RDD所有祖先的一些分割槽丟失,需要一個完全的重新執行。 
       這個RDDs通用介面在Spark中實現的大部分transformations都少於20行程式碼。甚至Spark新使用者在不知道排程器的細節的情況下都能夠實現新的transformations(如sampling和各種join)。下面寫了一些RDD實現。

HDFS files:我們取樣的輸入RDDs都是來自於HDFS中的檔案。對於這些RDDs,partitions返回檔案每個塊的分割槽(塊的偏移量存在於每個Partition物件中)。preferredLocations給出塊所在節點和讀塊的迭代器。

map:在任何RDD上呼叫map都會返回MappedRDD物件。這個物件和它的父RDD有相同的分割槽和首選位置。map的引數是一個函式,對於父RDD中的所有記錄,將以iterator方法的方式執行這個函式。

union:對兩個RDD呼叫union會返回一個分割槽為父RDD分割槽的聯合的RDD。每個子分割槽都是在相應的父分割槽上進行窄依賴計算得到的。

sample:取樣類似於對映,除了RDD會為每個分割槽儲存一個隨機數生成種子以確定性地取樣父分割槽記錄。

join:join兩個RDD可能會產生兩個窄依賴(如果他們具有相同的hash/range分割槽),可能是兩個寬依賴,也可能都有(如果一個父RDD有分割槽,另一個沒有)。

5 實現

       我們用大約14000行Scala程式碼實現了Spark。系統執行在Mesos叢集管理器上允許通Hadoop,MPI和其他應用分享資源。每個Spark程式都作為獨立的Mesos應用執行,它有自己dirver(master)和workers,並且在這些被Mesos處理的應用中共享資源。 
       Spark可以從任何Hadoop輸入源讀取資料(比如HDFS、HBase),只需要使用Hadoop已存在的外掛的API,在Scala的未修改版本上執行。 
       我們大致說幾個該系統技術上比較有趣的部分:任務排程(5.1節),Spark直譯器允許互動式使用(5.2節),記憶體管理(5.3節),支援檢查點(5.4節)。

5.1 工作排程

       Spark的排程器使用了RDDs的表示,在第4節已經描述過了。 
       總而言之,我們的排程器與Dryad的類似,但它另外還考慮了持久化的RDDs的哪些分割槽在記憶體中可用。無論何時使用者執行在RDD上執行action,排程器都會檢查RDD的血緣圖,建立由stages組成的DAG,然後執行,如圖5的插圖。每個階段(stage)包含儘可能多的窄依賴流水線轉換(transformations)。stages的邊界是寬依賴shuffle操作,或者任何已經計算過的分割槽,它可以截斷父RDD的計算。然後,排程器在每個stage中執行任務去計算丟失的分割槽,知道計算出想要的RDD。 
       排程器基於資料存放位置使用延遲排程給機器指派任務。如果一個任務需要處理節點記憶體中可用的分割槽,我們就把它傳送給那個節點。但是,如果處理的分割槽位於多個可能的位置(如HDFS檔案),則把任務傳送給這些節點。 
       對於寬依賴(即shuffle依賴),我們一般把中間結果物化在持有父分割槽的節點上,以簡化故障恢復,很像MapReduce物化map的輸出結果。 
       如果一個任務失敗,只要它的stage的父分割槽還可用,我們將在另一個節點上重新執行它。如果一些stages已經不可用(例如,由於一個shuffle的map輸出結果丟失了),我們會重新提交任務去並行地計算丟失分割槽。我們還不能對排程器故障容錯,但是複製RDD血緣圖是直截了當的做法。 
       最後,雖然Spark中所有當前執行的計算都對驅動程式中呼叫的actions響應,我們也會試驗讓叢集上的任務(如maps)呼叫lookup操作,該操作允許按關鍵字隨機訪問雜湊分割槽的RDDs的元素。在這種情況下,任務需要告訴排程器去計算哪些丟失的分割槽。

5.2 直譯器的整合

       Scala包含一個類似於Ruby和Python的互動式shell。考慮到實現記憶體中資料帶來的低延遲,我們希望讓使用者在直譯器上互動式地執行Spark,查詢大資料集。 
       Scala直譯器通常把使用者輸入的程式碼行編譯成一個類,然後載入到JVM,之後呼叫類的函式。這個類包括一個單例物件(單例物件包含那行程式碼的變數或方法),並且在一個初始化函式中執行那行程式碼。例如,如果使用者寫入
var x = 5,接下來println(x),直譯器會定義包含x的Line1類,並讓第二行編譯成println(Line1.getInstance().x)。 
       我們對Spark中的編譯器做了兩點改變:

  • 類傳輸:為了讓工作節點獲取每行程式碼上建立的類的位元組碼,我們使直譯器基於HTTP傳輸這些類。
  • 改進的程式碼生成邏輯:一般地,每行程式碼生成的單例物件是通過相應的類上的一個靜態方法去訪問的。這意味著當我們序列化一個引用了前一行定義的變數的閉包時(如上面例子中的Line1.x),Java將不會通過物件圖跟蹤而傳輸包裝xLine1例項。因此,工作節點將不接受x。我們把程式碼生成邏輯改成了直接引用每行物件的例項。

       圖6顯示在做了上面兩個改變後,直譯器怎樣把使用者寫的一系列程式碼行翻譯成Java物件。 


 
顯示直譯器怎樣把使用者輸入的兩行程式碼翻譯成Java物件 


       我們發現Spark直譯器便於處理大量跟蹤關係,也便於研究HDFS中儲存的資料集。我們也計劃互動式地執行更高階的查詢語言,如SQL。

5.3 記憶體管理

       Spark對於持久化RDDs提供了三個選項:

  • 序列化成Java物件儲存在記憶體中
  • 作為序列化資料儲存在記憶體中
  • 儲存在磁碟上

第一個選項效能最快,因為JVM能在本機訪問每個RDD元素。第二個選項讓使用者在記憶體空間有限時,選擇比Java物件圖更加有效的儲存方式,效能會差一點。第三個選項對於那些太大而無法存入記憶體的RDDs是有用的,但是每次使用都需要重新計算,這很耗時。 
       為了管理有限的可用記憶體,我們基於RDDs的級別使用LRU淘汰策略。當計算一個新的RDD分割槽,而又沒有足夠的空間儲存它時,我們淘汰一個最近最少訪問RDD的分割槽,除非這個RDD和新分割槽的一樣。在這種情況下,我們把舊分割槽存入記憶體以防止相同RDD的分割槽迴圈讀入和寫出。這很重要,因為大多數操作將在整個RDD上執行任務,所以很可能以後會需要用到一直在記憶體中的分割槽。我們發現,目前為止這個預設的策略在我們的應用中很好地工作,我們通過每個RDD的“持久化優先順序“,讓使用者進一步控制RDD。 
       目前,叢集上Spark的每個例項都有自己獨立的記憶體空間。未來,我們打算研究通過統一的記憶體管理實現在Spark例項間共享RDDs。

5.4 支援檢查點(Checkpointing)

       雖然血緣圖總是被用於在故障後恢復RDDs,但這樣的恢復在血lineage鏈很長的時候會很耗時。因此,把一些RDD執行檢查點操作存入穩定記憶體十分有用。 
       一般地,檢查點對於Lineage圖長、寬依賴的RDDs很有用,如3.2.2節中PageRank例子中的rank集合。在這些情況下,叢集中節點故障可能導致會每個父RDD的一些資料分片丟失,這就需要完全重新計算,檢查點操作在這裡就可以避免完全重新計算。相反,對於穩定儲存資料上的窄依賴RDDs,檢查點沒什麼價值。如果節點故障,這些RDDs丟失的分割槽可以並行地在其他節點上計算,成本比複製整個RDD要少得多。 
       Spark目前為檢查點提供了一個API(給persist傳入REPLICATE標識),把checkpoint哪個資料的決定權留給了使用者。然而,我們也研究了怎樣實現自動的檢查點。因為我們的排程器知道每個資料集的大小,也知道第一次計算花費的時間,所以它應該能選擇一個優化的RDDs集合執行檢查點操作,最小化系統恢復時間。 
       最後,值得注意的是RDDs的只讀屬性使檢查點操作比常見的共享記憶體更簡單。因為不需要關注一致性,RDDs可以在不需要程式中斷或分散式快照方案的情況下在後臺寫出。

6 評測

       我們再Amazon EC2上做了一系列實驗,以此來評估Spark和RDDs,並與其他使用者應用程式的基準做了對比。總之,我們的結果如下:

  • Spark在迭代機器學習和影象應用方面效能比Hadoop高20倍。速度的提升的原因是,把資料作為Java物件存入記憶體中避免了I/O和反序列化的成本。
  • 使用者所寫應用執行效果好。我們使用Spark對原來在Hadoop上執行的分析報告提升了40倍。
  • 當節點故障,Spark可以通過只重建丟失的RDD分割槽來快速恢復。
  • Spark用於互動式地查詢1TB資料集,只有5-7s的延遲。

       我們將在6.1節呈現與Hadoop對比的迭代機器學習應用的基準,6.2節呈現對比後的PageRank。然後,6.3節評估Spark中故障恢復,6.4節呈現資料集在記憶體不足時的行為。最後,6.5節討論使用者應用程式的結果,6.6節互動式資料探勘。 
       除非另有說明,我們的測試使用m1.xlarge EC2節點,4核,15G記憶體。我們使用HDFS儲存,塊大小256M。在每個測試之前,為了準確地測量IO成本,我們清理了作業系統快取。

6.1 迭代機器學習應用

       我們實現了兩個迭代機器學習應用,邏輯迴歸和k-means,為了比較下面系統的效能:

  • Hadoop:Hadoop0.20.0穩定版本
  • HadoopBinMem:一個Hadoop部署。第一次迭代時,把輸入資料轉換成低開銷二進位制格式,以消除在之後迭代過程的文字解析,並把它存入記憶體中的HDFS例項。
  • Spark:RDDs的實現。

       我們用25-100臺機器在100GB的資料集上對這兩個演算法進行了10次迭代。兩個應用程式的主要區別在於他們執行資料每個位元組的計算的數量。k-means的迭代時間主要取決於計算,但邏輯迴歸不是計算密集型的,因此時間更多地花費在反序列化和I/O上。 
       由於經典的學習演算法需要幾十次迭代才能收斂,所以我們在報告時間時,把首輪迭代的時間與後續迭代的時間分開。我們發現經RDDs共享資料極大地加快了後面的迭代。 


 
圖7:圖表示了邏輯迴歸和k-means兩種演算法分別在Hadoop、HadoopBinMem和Spark三種叢集中的首輪迭代和後續迭代的時長。實驗是在有100個節點的叢集上對100GB資料進行的。 
圖8:Hadoop、HadoopBinMem和Spark上後續迭代的執行時間。 


首輪迭代 三個系統在首輪迭代時都從HDFS中讀取文字輸入。如圖7中的淺色長方形所示,實驗中Spark比Hadoop更快。這個不同是因為在Hadoop的master和workers之間的心跳協議中的通訊開銷。HadoopBinMem是最慢的,因為它運行了一個額外的MapReduce工作去把資料轉成二進位制的,它必須通過網路把這個資料寫向一個複製的記憶體HDFS例項。 
後續迭代 圖7也顯示了後續迭代的平均執行時間,圖8顯示了隨著叢集大小的變化是執行時間的分佈情況。對於邏輯迴歸,在100臺機器上,Spark比Hadoop快25.3倍,比HadoopBinMem快20.7倍。對於更加計算密集型的k-means應用,Spark有1.9到3.2倍的提速。 
理解速度 我們驚訝地發現,Spark甚至都超過了基於記憶體儲存二進位制資料的Hadoop(HadoopBinMem)20多倍。在HadoopBinMem中,我們使用了Hadoop的標準二進位制格式(SequenceFile)和256MB大的塊,我們還強制HDFS的資料直接存放在記憶體檔案系統。然而Hadoop仍然執行緩慢,有以下幾個因素:

  1. Hadoop軟體棧的最小開銷
  2. 提供資料時HDFS的開銷
  3. 把二進位制記錄轉換成可用的記憶體Java物件的反序列化成本

       我們來依次研究這幾個因素。為了衡量(1),我們執行沒有操作的Hadoop作業,然後發現,僅僅完成作業設定,啟動任何和清理工作的最小需求就需要花費至少25s的開銷。對於(2),我們發現HDFS為每個塊提供多次記憶體拷貝和計算校驗和的操作。 
       最後,為了測量(3),我們在單機上執行微基準程式,在256MB多種格式的輸入上執行邏輯迴歸計算。尤其,我們比較了來源於HDFS(這裡將體現出HDFS棧的開銷)和記憶體本地檔案(核心能夠非常有效地把資料傳遞給程式)的文字輸入和二進位制輸入的處理時間。 
       我們在圖9中展示了這些實驗的結果。記憶體中的HDFS和本地檔案的區別顯示,從HDFS讀取資料會多花費2s的開銷,甚至資料就在本地機器的記憶體中。文字輸入和二進位制輸入的區別表明,解析的開銷相差7s。甚至當從記憶體檔案讀取資料時,把預解析的二進位制資料轉成Java物件都要花費3s,這幾乎和邏輯迴歸本身的成本一樣。而Spark通過在記憶體中把RDD元素直接存成Java物件,避免了以上所有開銷。

6.2 PageRank

       我們使用了54GB維基百科資料,比較分別在Spark和Hadoop上實現PageRank的效能。我們執行10輪PageRank演算法去處理大約4百萬文章的連結圖。圖10顯示在30臺節點上,只基於記憶體儲存時,Spark比Hadoop快2.4倍。另外,控制RDDs的分割槽方式使整個迭代過程保持一致,如3.2.2節所討論的,提升速度至7.4倍。擴充套件到60臺機器,結果也隨之近於線性地減少。 


 
圖10: 在Hadoop和Spark上PageRank的效能 


       我們也評估了PageRank的兩一個版本——用Pregel在Spark上實現,我們再7.1節進行描述。迭代時間與圖10類似,但是長了4s,因為Pregel會在每次迭代額外執行一個操作,這個操作讓頂點“投票”是否結束作業。

6.3 故障恢復

       我們對k-means應用評估了其在單點故障後,使用lineage重建RDD分割槽的成本。圖11比較了在75個節點的叢集上k-means10輪迭代在正常情況下和一個節點在第六輪迭代的開始時故障的情況下的執行時間。沒有任何故障,每輪迭代會執行400各任務處理100GB資料。 


 
k-means存在故障時的迭代時間。在第六輪迭代開始時kill掉一臺機器導致使用lineage部分重構RDD。 


       直到第五輪迭代結束,迭代時間都是大約58s。在第六輪迭代,一臺機器被kill掉,導致執行在該機器上的任務和儲存該機器上的RDD分割槽的丟失。Spark在其他機器上並行地重新執行這些任務,他們在這些機器上重新讀取相應的輸入資料並通過lineage重構RDDs,這會讓迭代時間增至80s。一旦丟失的RDD分割槽被重建,迭代時間將降回到58s。 
       值得注意的是,基於檢查點的故障恢復機制,恢復將可能需要執行至少幾輪迭代,取決於檢查點操作的頻率。更進一步說,系統將需要通過網路複製應用的100GB工作集(文字輸入資料轉成二進位制),要麼消費兩次Spark記憶體去複製它到記憶體中,要麼將不得不等到100GB寫入磁碟。相反地,我們例子中RDDs的血緣圖都是小於10KB的。

6.4 記憶體不足時的行為

       目前為止,我們確保每個叢集中每臺機器有足夠記憶體儲存迭代中所有的RDDs。一個自然的問題是,如果沒有足夠的記憶體去儲存作業的資料時Spark怎樣執行。在這個實驗中,我們配置Spark在每臺機器上不是用超過一定百分比的記憶體去儲存RDDs。我們在圖12中展示邏輯迴歸在多種百分比記憶體空間下的執行結果。我們看見隨著空間變小效能緩慢下降。 


6.5 基於Spark建立的使用者應用

記憶體分析 Conviva Inc(一家視訊發行公司)使用Spark加快了資料分析報告的資料,以前是基於Hadoop上執行的。舉個例子,一個報告被作為一系列Hive查詢執行為客戶計算多種統計。這些查詢全都是基於資料的相同子集(記錄匹配使用者提供的過濾器),但在不同分組的欄位上執行聚合(averages, precentiles和COUNT DISTINCT)操作需要獨立的MapReduce作業。在Spark中實現上述查詢,並把資料子集一次載入到RDD中,該公司能夠對報告提速40倍。一個基於200G壓縮資料的報告在Hadoop叢集上執行要花費20小時,現在僅僅只需要2臺Spark機器就可以執行在30分鐘以內。更進一步,Spark程式只需要96GB的RAM,因為它只把匹配使用者過濾器的行和列存入RDD,而不是所有解壓檔案。 
交通建模 在Berkeley的Mobile Millennium專案中,研究人員基於分散的汽車GPS測量,並行化一個學習演算法去預測道路交通阻塞情況。源資料是城市的10000個互聯的道路網,還有600,000由裝備GPS的汽車採集到的點到點的行駛時間的樣本(每條路線的形式時間可能包括多條互聯的道路)。使用交通模型,系統可以估計跨交通網花費的時間。研究人員用一個期望最大化演算法訓練這個模型,這個演算法迭代地重複兩次mapreduceByKey步驟。這個應用近乎線性地從20個節點擴充套件到80個節點,每個節點4核,如圖13(a)所示。 


 
圖13:用Spark實現的兩個使用者應用程式每次執行時間。錯誤條顯示標準差。 


Twitter垃圾分類 伯克利的Monarch專案用Spark識別Twitter資訊中的垃圾連結。他們在Spark之上實現了一個邏輯迴歸分類器,和6.1節中例子類似,但他們使用分散式的reduceByKey對並行的梯度向量求和。在圖13(b)中,我們展示了在超過50GB資料上訓練一個分類器的擴充套件結果,資料包括250000URLs和10^7與網路相關的特徵/維度和在每個URL的頁面的內容屬性。縮放不是接近線性的原因是每次迭代都會有更高的固定通訊成本。

6.6 互動式資料探勘

       為了證明Spark在互動查詢大資料集方面的能力,我們用它去分析1TB的維基百科頁面訪問日誌(2年的資料)。這個實驗,我們用8核、68GB記憶體的100m2.3xlarge EC2例項。我們執行查詢以獲得以下內容總訪問次數(1)所有頁面,(2)標題能精確匹配給定關鍵字的頁面,(3)標題部分匹配關鍵字的頁面。每個查詢會掃描整個輸入資料。 
       圖14顯示的是在整個資料集、一半資料和十分之一資料的查詢響應時間。甚至在1TB的資料上,Spark上的查詢只需要花費5-7s。這比查詢磁碟上的資料的速度快一個數量級以上。例如,從磁碟上查詢1TB的檔案花費了170s。這證明了RDDs使Spark更適用於互動式資料探勘。 


 
圖14:在Spark上互動式查詢的響應時間,在100臺機器上掃描持續增大的輸入資料集 

7 討論

       雖然,由於RDDs的不可變的性質和粗粒度轉換,它們提供了一個限制的程式設計介面,但我們發現它們適用於廣泛類別的應用。尤其,RDDs可以表達的叢集程式設計模型數量驚人,這些叢集程式設計模型目前為止都被作為獨立框架提出,允許使用者在一個專案(例如,執行一個MapReduce操作圖建立一個圖,然後在其上執行Pregel)中compose這些模型,並在他們之間分享資料。在這一節,我們將在7.1小節討論RDDs可以表達哪些程式設計模型和為什麼它們應用這麼廣泛。另外,我們在7.2小節討論RDDs中lineage資訊的另一個好處,它是為了方便在這些模型上除錯。

7.1 表達已有的程式設計模型

       RDDs可以有效地表達一些目前已經獨立提出的叢集程式設計模型。這裡說有效地,我們的意思是RDDs不僅可以產生和這些模型相同的結果,還可以捕獲這些框架執行的優化,如把指定的資料儲存在記憶體中,對它進行分割槽以最小化通訊並且高效地從故障中恢復。可以使用RDDs表達的模型包括: 
MapReduce:這個模型可以通過在Spark使用flatMapgroupByKey操作,或者在有結合器時使用reduceByKey操作來表達。 
DryadLINQ:DryadLINQ系統提供了比在更普通的Dryad執行時的MapReduce更廣泛的運算元。但是這些是直接對應於Spark中可用的RDD轉換的所有大型運算元(如map, groupByKey,join,etc)。 
SQL:類似於DryadLINQ表示式,SQL需要在記錄集上執行資料並行化操作。 
Pregel:谷歌的Pregel是一個用於迭代影象應用的特殊化模型。最初看起來和其他系統的面向集合程式設計模型十分不同。在Pregel中,一個程式作為

7.2 利用RDDs進行除錯

       雖然我們最初把RDDs設計成容錯的確定性重新計算,但這個屬性也方便除錯。

8 相關工作

叢集程式設計模型: 叢集程式設計模型相關工作主要有幾類。 
       第一,資料流模型,如MapReduce、Drayad和Ciel,支援豐富的運算元去處理資料,但通過穩定外部系統共享資料。RDDs表示的是一個比穩定儲存更高效的資料共享抽象,因為它們避免了資料複製、I/O和序列化的成本。 
       第二,資料流系統的高級別程式設計介面,包括DryadLINQ和FlumeJava,提供語言整合APIs,它們可以讓使用者通過像mapjoin的運算元操作“並行的集合”。然而,在這些系統,並行的集合要麼代表磁碟上的檔案,要麼代表用來表達查詢計劃的臨時資料集。即使,系統將在相同的查詢中的運算元間流水線化資料(如一個map接著另一個map),它們不能在查詢間高效地共享資料。我們把Spark的API基於並行集合模型是因為它的便利,並且不主張新奇的語言整合介面,但是通過這個介面的背後提供RDDs作為儲存抽象,我們允許它支援更廣泛型別的應用。 
       第三類系統為需要資料共享的特別型別的應用提供高級別介面。例如,Pregel支援迭代影象應用,而Twister和HaLoop迭代的MapReduce執行時系統。然而,這些框架對它們支援的計算型別隱式地執行資料共享,也不提供一個普遍地抽象讓使用者可以在他選擇的操作間去共享他們選擇的資料。例如,一個使用者不能用Pregel或者Twister去把資料集載入到記憶體中,然後決定在它上面執行去查詢什麼。RDDs顯示地提供一個分散式儲存抽象,並可以支援這些特定系統不包括的應用,如互動式資料探勘。 
       最後,一些系統暴露出共享可變狀態以允許使用者執行記憶體中的計算。例如,Picco讓使用者執行變形的函式讀和更新分散式雜湊表中的單元分散式共享。分散式共享記憶體系統和鍵值儲存提供相似的模型。RDDs以兩種方式區別這些系統。第一:RDDs提供一個高級別程式設計介面,介面基於如map/sort這些運算元。而Piccolo和DSM中的介面僅僅是讀和更新到表的單元中。第二,Piccolo和DSM系統通過檢查點和回滾實現恢復,這比RDDs基於lineage的策略更耗費效能。最終,如2.3節所討論的,RDDs也提供其他優點。 
快取系