1. 程式人生 > >【Big Data 每日一題20180926】Structured Streaming 之狀態儲存解析

【Big Data 每日一題20180926】Structured Streaming 之狀態儲存解析

Structured Streaming 之狀態儲存解析

[酷玩 Spark] Structured Streaming 原始碼解析系列 ,返回目錄請 猛戳這裡

本文內容適用範圍:
* 2017.07.11 update, Spark 2.2 全系列 √ (已釋出:2.2.0)
* 2017.10.02 update, Spark 2.1 全系列 √ (已釋出:2.1.0, 2.1.1, 2.1.2)

閱讀本文前,請一定先閱讀 Structured Streaming 實現思路與實現概述 一文,其中概述了 Structured Streaming 的實現思路(包括 StreamExecution, StateStore 等在 Structured Streaming 裡的作用),有了全域性概念後再看本文的細節解釋。

引言

我們知道,持續查詢的驅動引擎 StreamExecution 會持續不斷地驅動每個批次的執行。

對於不需要跨批次的持續查詢,如 map()filter() 等,每個批次之間的執行相互獨立,不需要狀態支援。而比如類似 count() 的聚合式持續查詢,則需要跨批次的狀態支援,這樣本批次的執行只需依賴上一個批次的結果,而不需要依賴之前所有批次的結果。這也即增量式持續查詢,能夠將每個批次的執行時間穩定下來,避免越後面的批次執行時間越長的情形。

這個增量式持續查詢的思路和實現,我們在 [Structured Streaming 實現思路與實現概述](1.1 Structured Streaming 實現思路與實現概述.md) 解析過:

而在這裡面的 StateStore,即是 Structured Streaming 用於儲存跨批次狀態結果的模組元件。本文解析 StateStore 模組。

StateStore 模組的總體思路

StateStore 模組的總體思路:

  • 分散式實現
    • 跑在現有 Spark 的 driver-executors 架構上
    • driver 端是輕量級的 coordinator,只做協調工作
    • executor 端負責狀態的實際分片的讀寫
  • 狀態分片
    • 因為一個應用裡可能會包含多個需要狀態的 operator,而且 operator 本身也是分 partition 執行的,所以狀態儲存的分片以 operatorId
      +partitionId 為切分依據
    • 以分片為基本單位進行狀態的讀入和寫出
    • 每個分片裡是一個 key-value 的 store,key 和 value 的型別都是 UnsafeRow(可以理解為 SparkSQL 裡的 Object 通用型別),可以按 key 查詢、或更新
  • 狀態分版本
    • 因為 StreamExection 會持續不斷地執行批次,因而同一個 operator 同一個 partition 的狀態也是隨著時間不斷更新、產生新版本的資料
    • 狀態的版本是與 StreamExecution 的進展一致,比如 StreamExection 的批次 id = 7 完成時,那麼所有 version = 7 的狀態即已經持久化
  • 批量讀入和寫出分片
    • 對於每個分片,讀入時
      • 根據 operator + partition + version, 從 HDFS 讀入資料,並快取在記憶體裡
    • 對於每個分片,寫出時
      • 累計當前版本(即 StreamExecution 的當前批次)的多行的狀態修改,一次性寫出到 HDFS 一個修改的流水 log,流水 log 寫完即標誌本批次的狀態修改完成
      • 同時應用修改到記憶體中的狀態快取

關於 StateStore 的 operator, partiton, version 有一個圖片可幫助理解:

StateStore:(a)遷移、(b)更新和查詢、(c)維護、(d)故障恢復

(a) StateStore 在不同的節點之間如何遷移

在 StreamExecution 執行過程中,隨時在 operator 實際執行的 executor 節點上喚起一個狀態儲存分片、並讀入前一個版本的資料即可(如果 executor 上已經存在一個分片,那麼就直接重用,不用喚起分片、也不用讀入資料了)。

我們上節講過,持久化的狀態是在 HDFS 上的。那麼如上圖所示:

  • executor a, 喚起了 operator = 1, partition = 1 的狀態儲存分片,從 HDFS 裡位於本機的資料副本 load 進來 version = 5 的資料;
  • 一個 executor 節點可以執行多個 operator,那麼也就可以在一個 executor 上喚起多個狀態儲存分片(分別對應不同的 operator + partition),如圖示 executor b
  • 在一些情況下,需要從其他節點的 HDFS 資料副本上 load 狀態資料,如圖中 executor c 需要從 executor b 的硬碟上 load 資料;
  • 另外還有的情況是,同一份資料被同時 load 到不同的 executor 上,如 executor d 和 executor a 即是讀入了同一份資料 —— 推測執行時就容易產生這種情況 —— 這時也不會產生問題,因為 load 進來的是同一份資料,然後在兩個節點上各自修改,最終只會有一個節點能夠成功提交對狀態的修改。

(b) StateStore 的更新和查詢

我們前面也講過,在一個狀態儲存分片裡,是 key-value 的 store。這個 key-value 的 store 支援如下操作:

  /* == CRUD 增刪改查 =============================== */

  // 查詢一條 key-value
  def get(key: UnsafeRow): Option[UnsafeRow]
    
  // 新增、或修改一條 key-value
  def put(key: UnsafeRow, value: UnsafeRow): Unit
    
  // 刪除一條符合條件的 key-value
  def remove(condition: UnsafeRow => Boolean): Unit
  // 根據 key 刪除 key-value
  def remove(key: UnsafeRow): Unit
  
  /* == 批量操作相關 =============================== */
    
  // 提交當前執行批次的所有修改,將刷出到 HDFS,成功後版本將自增
  def commit(): Long

  // 放棄當前執行批次的所有修改
  def abort(): Unit
    
  // 當前狀態分片、當前版本的所有 key-value 狀態
  def iterator(): Iterator[(UnsafeRow, UnsafeRow)]
    
  // 當前狀態分片、當前版本比上一個版本的所有增量更新
  def updates(): Iterator[StoreUpdate]

使用 StateStore 的程式碼可以這樣寫(現在都是 Structured Streaming 內部實現在使用 StateStore,上層使用者無需面對這些細節):

  // 在最開始,獲取正確的狀態分片(按需重用已有分片或讀入新的分片)
  val store = StateStore.get(StateStoreId(checkpointLocation, operatorId, partitionId), ..., version, ...)

  // 開始進行一些更改
  store.put(...)
  store.remove(...)
    
  // 更改完成,批量提交快取在記憶體裡的更改到 HDFS
  store.commit()
    
  // 檢視當前狀態分片的所有 key-value / 剛剛更新了的 key-value
  store.iterator()
  store.updates()

(c) StateStore 的維護

我們看到,前面 StateStore 在寫出狀態的更新時,是寫出的修改流水 log。

StateStore 本身也帶了 maintainess 即維護模組,會週期性的在後臺將過去的狀態和最近若干版本的流水 log 進行合併,並把合併後的結果重新寫回到 HDFS:old_snapshot + delta_a + delta_b + … => lastest_snapshot

這個過程跟 HBase 的 major/minor compact 差不多,但還沒有區別到 major/minor 的粒度。

(d) StateStore 的故障恢復

StateStore 的所有狀態以 HDFS 為準。如果某個狀態分片在更新過程中失敗了,那麼還沒有寫出的更新會不可見。

恢復時也是從 HDFS 讀入最近可見的狀態,並配合 StreamExecution 的執行批次重做。從另一個角度說,就是大家 —— 輸入資料、及狀態儲存 —— 先統一往後會退到本執行批次剛開始時的狀態,然後重新計算。當然這裡重新計算的粒度是 Spark 的單個 task,即一個 partition 的輸入資料 + 一個 partition 的狀態儲存。

從 HDFS 讀入最近可見的狀態時,如果有最新的 snapshot,也就用最新的 snapshot,如果沒有,就讀入稍舊一點的 snapshot 和新的 deltas,先做一下最新狀態的合併。

總結

在 Structured Streaming 裡,StateStore 模組提供了 分片的分版本的可遷移的高可用 key-value store。

基於這個 StateStore 模組,StreamExecution 實現了 增量的 持續查詢、和很好的故障恢復以維護 end-to-end exactly-once guarantees

擴充套件閱讀