1. 程式人生 > >Trident exactly once實現原理

Trident exactly once實現原理

為了實現exactly-once,storm0.7.0開始支援transactional toplogy(事務topology),也是微批處理架構,但目前已經不再維護(基本沒有人用),功能完全被trident所替代。準確的說,trident topology是從transactional topology的基本上發展而來,包括spout和state都延用的transactional topology的思路,最大的改變就是抽象出了stream的概念。

資料流對比

流對比1.jpg

原理簡介

trident中將tuple封裝成batch,每一個batch提供一個唯一的txid,資料的傳送、提交、重發都是基於txid,以batch為單位進行。

實現exactly-once的關鍵在於狀態的儲存,就trident而言,包括:

  1. 元資料的儲存,資料處理失敗時,能知道要重發是哪些資料
  2. 中間狀態的儲存,重發資料時,能敏銳發現數據的狀態是否已經更新過

trident只在用state來做中間狀態儲存的地方確保exactly once,而資料流並不一定要在所有的地方都需要用state,以wordcount為例,最終我們關心的只是單詞的統計結果,中間的read和split並不需要關心,也就不需要儲存state,因此,選擇state的儲存時機是重點。

三類spout

Transactional spouts,也叫事務spout,提供了強一致性,有如下三點保障:

  1. 一個txid對應一個batch,如果一個batch被重發,txid不變
  2. 任意兩個batch中不會有tuple相同;
  3. 每個tuple都會被放到一個batch中,不會有tuple被漏掉

事務spout的實現是假設訊息中介軟體是可靠的,但如果在重發一個batch時,正好batch中tuple所以某個分割槽失效,則會導致spout一直卡住,因為為了保證batch完全一致,會一直去嘗試讀取去tuple。為了解決這個缺陷,所以有了Opaque Transactional spouts。

Opaque Transactional spouts,也叫不透明事務spout,提供了弱一致性,即每個tuple只在一個batch中被成功處理,但不保證同一個txid對應的batch完全一致。當重新去讀取一個batch的tuple時,不會因為讀取不到某個tuple而卡住。

No-Transactional spouts ,非事務性spout,不保證對一致性

三類state

就以上三種spout,分別有三種state來做狀態儲存

Transactional state ,儲存txid和value

Opaque transactional state ,儲存txid、value、preValue

Non-transactional state ,不保證exactly once

spout和state的組合

yes表示可以實現exactly-once的組合

spout-state-small.jpg

Trident spout

trident spout實際是一個簡單的topology結構,spout包含兩個內部介面:Coordinator(協調者)和Emitter(訊息傳送者),這兩組介面將由三個執行器來執行

MasterBatchCoordinator(MBC):主協調者,Coordinator的執行器之一,也是實際的spout,負責的流的管理,以及控制新事務的產生

TridentSpoutCoordinator(TSC):另一個協調者,也是Coordinator的執行器之一,實際為bolt,主要負責元資料的封裝

TridentSpoutExecutor(TSE):訊息的傳送者,Emitter的執行器,從資料來源里根據TSC發來的元資料讀出實際資料,傳送出去

通過下圖可以更清楚看到三個執行器的關係:

spout.png

batch大小不能精確控制

一個batch中tuple的數量並不能直接做到精確控制,主要受資料量的影響,也可以通過配置topology.max.spout.pending的值(預設是1),來增加併發。

可以增加併發,當然也可以限制batch產生的速度。在batch數小於topology.max.spout.pending的情況下,MBC至少會等待trident batch interval的時間(預設是500ms)才會產生一個新的batch,關於這個引數,官方建議設定為正常的端到端處理時間的一半左右 —— 也就是說如果需要花費 600 ms 的時間處理一個batch,那麼就可以每將此引數設定為300ms

state狀態儲存與更新

Trident 會按照txid的大小來順序更新 batch 的狀態,也就是說txid=3的batch必然在txid=2的batch之後進行更新。

主要分析一下Transactional stateOpaque transactional state 的實現原理,都是通過txid來判斷資料是否已經處理過,不同之處在於,當txid為重發資料時,Transactional state直接忽略此次value更新,而Opaque transactional state是將上次處理的值與重發後的值進行combina後更新value,以wordcount為例,分別說明兩者的儲存過程。

假如正在處理的batch的txid=3,包含tuple為:

["man"]["man"]["dog"]

庫中已存入如下結果:

man => [value=3, txid=1]
dog => [value=4, txid=3]
apple => [value=10, txid=2]

庫中單詞“man”的txid是1,但當前的txid是3,所以可以確定當前batch中的“man”還沒有更新過,可以放心的給count加2並更新txid為3.

與此同時,庫中單詞“dog”的txid和當前的txid是相同的,表明當前batch中的”dog”已經更新過,因此要可以跳過這次更新。此次更新後,資料庫中的資料如下:

man => [value=5, txid=3]
dog => [value=4, txid=3]
apple => [value=10, txid=2]

以上是Transactional state狀態更新過程,前提是每個batch重發時,所包含的tuple都是一致的,但如果這個batch在重發的過程中,讀取訊息中介軟體時,某些區分也失效,則batch可能並不完整Transactional state不能保證exactly once,但opaque transactional spout可以。

使用opaque transactional state儲存時,庫中除了儲存value和txid以外,還會存preValue(上次處理後的值),以更新單詞man為例,如設man在庫中已經儲存如下:

man => [value=5,preValue=3, txid=3]

若新的batch的txid為4,單詞man出現3次,則正常更新,將value的值覆蓋到preValue,同時value加上新增的值,更新後如下:

man => [value=8,preValue=5, txid=4]

若新的batch的txid為3,單詞man出現3次,則表示此txid之前已經處理過,但不能確認單詞man出現的次數和上次是否一致,更新時,preValue的值不變,value的值更新成preValue + 3,更新後的結果為:

man => [value=6,preValue=3, txid=3]