Spark Streaming 設計原理
最近兩年流式計算又開始逐漸火了起來,說到流式計算主要分兩種:continuous-based 和 micro-batch。最近在使用基於 micro-batch 模式的 Spark Streaming,正好結合論文介紹一下。這裡說的論文是 2013 年釋出的 《Discretized Streams: Fault-Tolerant Streaming Computation at Scala》,雖然是 2013 年發表的論文,但是系統的核心邏輯基本沒怎麼變化,對於理解 Spark Streaming 的系統設計、工作方式還是很有幫助的。注:Spark 在 2016 年推出了 continuous-based 的 Structure Streaming,基於 micro-batch 這種模式的 Spark Streaming 可能將逐漸淡出吧。
0. abstract
當前(2013 年)分散式流式系統的主要問題在於錯誤恢復的代價非常高:熱備份或者恢復時間長,而且不處理 straggler,這裡的 straggler 是分散式系統某一個成員/成分執行滯後於其他組成部分,比如某個 task 節點的執行時間要明顯長於其他節點。相比之下,DStream (Spark Streaming 的流式處理模式,全稱 discretized streams) 錯誤恢復更快,而且會處理 straggler。除此之外,還有其他優點包括:提供豐富的運算元、高吞吐、可以線性擴充套件到 100 個節點的叢集規模、亞秒級延遲和壓秒級故障恢復。最後,DStream 還可以和批處理、互動式查詢結合使用。
1. Overview
分散式計算主要分兩種:批(batch)處理和流式(streaming)計算,流式計算的主要優勢在於其時效性和低延遲。而大規模流式計算系統設計的兩個主要問題是錯誤處理和 straggler 處理。由於流式系統的實時性,錯誤之後如何快速恢復顯得極其重要。
不幸的是,現存的流式系統在這兩個點的設計上都不夠好。比如 Storm 和 TimeStream 等(這個時候 flink 還沒有大規模流行起來) 都是基於 continuous operator 模式,由一個持續執行、有狀態的節點負責接收和處理資料。在這種模式下的錯誤恢復主要由兩個方式:一個是 replication,也就是每個 operator 節點有一個 replication 節點;另一個是上游在某個節點失敗之後對新的節點提供 replay。在大規模叢集模式下,這兩種方式都不太可取:replication 會耗費一倍的資源;上游 replay 會耗費一定時間。而且這兩種模式都沒處理 straggler: 第一種模式會由於 straggler 的存在導致 replication 的過程變慢;第二種模式會將 straggler 當成失敗節點,然後進行恢復,代價比較大。
Spark Streaming 的模式是 discretized streams (D-Streams),這種模式不存在一直執行的 operator,而是將每一個時間間隔的資料通過一系列無狀態、確定性(deterministic)的批處理來處理。比如對每一秒的資料通過 MapReduce 計算 count。類似的,也可以疊加計算多個批次的資料的 count。簡而言之,DStream 模式下,一旦 input 給定,輸出狀態就是確定的;下面我們會詳細說明為什麼 DStream 的失敗恢復模式要優於前面兩種模式。
DStream 的實現難點主要由兩個:低延遲和快速錯誤恢復(包括 straggler)。傳統的批處理系統,比如 Hadoop,一般執行的比較慢,主要是因為中間結果要進行持久化(注:這種也代表容錯性比較好)。DStream 使用彈性分散式資料集(Resilient Distributed Datasets),也就是 RDD,來進行批處理(注:RDD 可以將資料儲存到記憶體中,然後通過 RDD 之間的依賴關係快速計算)。這個過程一般是亞秒級的,對於大部分場景都是可以滿足的。
快速錯誤恢復主要是通過 DStream 的確定性來提供一種新的恢復機制:par- allel recovery。當一個節點失敗之後,我們通過叢集的其他節點來一起快速重建出失敗節點的 RDD 資料。這種恢復模式相比之前的 replication 和 upstream replay 都要快。這裡 straggler 的處理因為我們可以獲取到一個批處理 task 的執行時間,所以我們可以通過推測 task 的執行時間判斷是不是 straggler。
DStream 的實現系統是 Spark Streaming,基於 Spark 引擎。這個系統可以在 100 個節點的叢集上每秒處理 6kw 條資料,並保證亞秒級的延遲,錯誤恢復也可以保證在亞秒級。當然這些評測資料都是 2013 年,也就是 5 年前。論文繼續列舉了一些對比資料,這裡就不贅述了,總之結論就是 Spark Streaming 的吞吐和線性擴充套件要優於時下的其他流式計算系統。
最後值得一提的是,因為 Spark Streaming 使用的 RDD model 和批處理相同,所以使用者可以將 Spark Streaming 和批處理和互動式出現結合起來。或者將歷史 RDD 資料結合 Spark Streaming 一起來用(注:這裡的一個場景是線下訓練模型,然後通過 Spark Streaming 運用到實時資料上)。
2. Backgroud
很多分散式流式計算系統使用的是 continuous operator 模式,這種模式下會有多個持續執行的 operator,每個 operator 接收自己的資料然後並更新狀態。儘管這種方式可以減小延遲,但是因為 operator 本身是有狀態的,導致錯誤恢復起來特別麻煩,如前所述,要麼通過 replication,要麼通過 upstream backup 和 replay。而這兩種方式的缺點也很明顯:資源浪費;恢復時間長。
replication 除了成本問題還有資料一致性的問題,如何保證兩個節點收到的資料是一致,所以還需要引入分散式協議,比如 Flux 或者 Borealis’s DPC。
upstream backup 模式下,當某個 operator 節點 fail 之後,upstream 將之前傳送給失敗 operator 節點的資料從某個 checkpoint 重新發送給新的替代節點,這樣就會導致恢復時間比較長。論文這裡沒有說 operator 的狀態儲存問題,實際上 operator 的狀態也是要儲存的,而且 checkpoint 要和 upstream 的 checkpoint 一致。
除此之後,straggler 不管在 replication 模式還是 upstream backup 模式都不能很好的處理。
3. DStream
如上所述,DStream 通過一系列小的批處理作業來代替 operator 從而達到快速錯誤恢復的目的。
3.1 computation model
DStream 將批處理分解成多個一定時間間隔的批處理。在每個時間間隔的資料會被儲存成一系列的 RDD,然後通過一系列的運算元,比如 map, reduce, group 等進行平行計算,最後將結果輸出成新的 RDD 或者輸出到系統外(比如 stdout, 檔案系統,網路等)。

論文給了一個計算網站 pv 的例子,虛擬碼如下
pageViews = readStream("http://...", "1s") ones = pageViews.map(event => (event.url, 1)) counts = ones.runningReduce((a, b) => a + b)
對應的資料流如下圖

執行過程簡單描述如下:
- Spark Streaming 持續不斷接收 http url 的 view 資料 pageViews
- 將 pageViews 按 1s 間隔拆分成一系列的 RDD 資料(每個時間間隔也會包含多個 RDD 資料)
- 對 2 中的資料進行 map, reduce 等處理。
系統錯誤處理通過 DStream 和 RDD 的依賴關係來恢復。依賴關係的維度是 partition,也就是說每個 RDD 可能會分成多個 partition 然後分佈在叢集的不同機器上,這樣當某個機器上的 RDD 資料丟失的時候就可以通過 RDD 的依賴關係從多個機器上來並行的回覆資料了。上圖中的 DStream 就表示有 3 個 partition。除此之後,如果各個時間間隔沒有時序關係,那麼每個時間間隔的 RDD 資料也可並行恢復。這正是 DStream 快速錯誤恢復的關鍵所在。
3.2 Consistency Semantics
基於 continuous operator 的流處理系統當多個 operator 由於各自的負載不同可能導致某些 operator 滯後,這樣整個系統的某個時間點的 snapshot 資料就是不準確的。針對這個問題,Borealis 對不同節點進行同步來避免這個問題;而 storm 直接忽略這個問題。
對於 DStream,由於時間被自然的離散化,而每個時間 interval 對應的 RDDs 都是容錯不可變且計算確定性的,所以 DStream 是滿足 exactly-once 語義的。
感覺這裡有一個前提,論文沒有點出來,就是 upstream 的資料是可靠的。
4. System Architecture
系統架構的主題變化不大,但是實現細節再討論感覺意義不大。Spark Streaming 主要包括三個部分:
- master: 負責記錄 DStream 的依賴圖(lineage graph)和 task 的排程。我們現在也叫 driver。
- worker: 負責接收資料,儲存資料以及執行 task。我們現在也叫 executor。
- client library。

Spark Streaming 的無狀態的 task 可以執行在任意節點,相比於傳統的流式系統的固定拓撲結構(注:不太確定目前還是不是都是這樣),擴充套件起來會更加的容易。
Spark Streaming 的所有狀態都儲存在 RDD 中,同時 RDD 的 partition 可以儲存到任意節點或者通過多個節點計算。task 計算會考慮資料區域性性,比如處理 parition A 的 task 會優先分配到 partition A 所在的節點執行。
下面的實現的一些細節部分不再討論。
5. Fault and Straggler Recovery
Parallel Recovery 前面已經說過了,這裡就不贅述了。這裡在補充一下 straggler 的處理。
straggler 的判斷非常簡單,由於很多 task 都是很快結束,如果一個 task 明顯比其他 task 長就可以認為是 straggler。straggler 可以進行遷移,也就是將 task 遷移到其他機器上。
這篇論文雖然很久了,但是對於理解 Spark Streaming 的設計初衷或者設計思路還是很有幫助的。最後,對於論文中其他部分,或者略顯陳舊,或者啟發意義不大,這裡就不再贅述了。對於文章中有失偏頗的地方,還希望多多指教。