1. 程式人生 > >Spark流式程式設計介紹 - 程式設計模型

Spark流式程式設計介紹 - 程式設計模型

來源Spark官方文件
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#programming-model

程式設計模型

結構化流中的核心概念就是將活動資料流當作一個會不斷增長的表。這是一個新的流處理模型,但是與批處理模型很相似。你在做流式計算就像是標準針對靜態表的批查詢,Spark會在一個無限輸入的表上進行增量查詢。我們來從更多詳細內容來理解這個模型。

基本概念

將輸入的資料流理解為“寫入表”,每個流中到達的資料就像是寫入表中新增的一行。

針對輸入的查詢會生成“結果表”。每個觸發間隔之間(比如1秒鐘),就會有新的行新增到“寫入表”,最終更新結果表。當結果表變更後,我們能夠將變更的結果行寫入外部儲存。

“輸出(Output)”定義為寫入外部儲存的內容。輸出存在幾種模式:

  • 完全模式(Complete Mode) :整個更新後的結果表會全部寫入外部儲存。具體的全表寫入方式取決於與儲存的底層連線。
  • 增量模式(Append Mode) :從上次觸發後的新增結果表資料才會寫入外部儲存。這個模式只適用於那些預期結果表中的存量資料不會變化的查詢。
  • 更新模式(Update Mode) : 從上次觸發後的更新結果表資料才會寫入外部儲存(從Spark 2.1.1開始生效)。注意本模式和完全模式的差異,本模式下只會輸出上次觸發後的變更行。如果查詢不包含聚合,基本會和增量模式相同。

要注意每個模式都有確定的適配的查詢,這個會在稍後討論。
為了解釋這個模型的使用方式,我們用上面的快速示例來輔助理解模型。第一個DataFrame型別的變數 line

就是寫入表,而最後DataFrame型別的變數 wordCounts 就是結果表。注意針對流的查詢方法,從 line 生成 wordCounts 和一個靜態的DataFrame完全相同。當查詢開始之後,Spark會持續檢查從socket連結傳入的新資料。如果存在新資料,Spark會執行“增量”查詢,並且針對新資料計算更新的計數,整合之前執行的計數,如下圖所示。

注意結構化流並沒有儲存整張表。從資料來源讀取最近有效的資料,增量的處理並且更新結果資料,然後丟棄源資料。Spark只保留最小中間狀態資料,用於更新結果(例如上面例子中的中間統計結果計數)。
這個模型明顯和其他的流處理引擎不同。許多流處理系統要求使用者自行維護執行聚合,因為有諸如容錯性(fault-tolerance)、資料一致性(data consistency:at-least-once, at-most-once, exactly-once)。在這個模型中,當有新資料時,由Spark負責更新結果表,因此解放了使用者無需關注。我們以模型處理事件時間和延遲資料作為例子來看下。

處理事件時間和延遲資料

事件時間是包含在資料本身的。很多應用都希望基於事件時間操作。例如你的想要獲取物聯網裝置每分鐘產生事件數量,然後你可能希望使用資料生成的時間(這就是事件時間),而不是Spark接收到他們的時間。事件時間在這個模型中是很自然的,因為每個裝置產生事件都是都是表中的一行資料,而事件時間就是一行資料中的一列。這樣基於視窗的聚合(例如每分鐘的事件數量)可以作為基於事件時間列做的特別的分組和聚合。每個時間視窗都是一個分組,每行資料也可以屬於多個視窗或分組。因此類似這種基於事件時間的聚合查詢能夠在靜態資料集(例如收集的裝置事件日誌)和動態資料流,能夠是使用者的使用比較簡單。
此外模型天然的能夠基於事件時間處理延遲到達的資料。當Spark更新結果表時,他仍然能夠針對延遲資料來更新歷史聚合的結果,也同時可以清除歷史聚合資料,從而限制中間狀態資料的大小。從Spark2.1開始,我們支援水位線概念(watermarking),允許使用者指定延遲資料的閾值,系統也能夠清理舊狀態資料。稍後會在視窗操作章節介紹。

容錯性

保證唯一投送端到端是結構化流的設計中的關鍵目標之一。為了達成這樣的目標,我們設計了結構化流的源(Source)、匯(Sink)以及執行引擎能夠可靠的跟蹤處理進度,從而能夠重啟/重新處理來應對各種故障。每個資料流的源應該都有偏移量概念(類似Kafka的偏移量,或者Amazon Kinesis 的序列編號)來跟蹤流中的讀取位置。引擎使用儲存點和先寫日誌來記錄每次處理的資料偏移邊界。流的匯設計天然就支援重新處理的冪等性。整合起來,使用可重放的源與冪等的匯,結構化流在面對任何故障時都能保證端對端嚴格一致性(end-to-end exactly-once semantics)