storm原始碼分析之acker工作流程
我們知道storm一個很重要的特性是它能夠保證你發出的每條訊息都會被完整處理,完整處理的意思是指:
一個tuple以及這個tuple所導致的所有的tuple都會被成功處理。而一個tuple會被認為處理失敗瞭如果這個訊息在timeout所指定的時間內沒有成功處理。
也就是說對於任何一個spout-tuple以及它的所有的子孫到底處理成功失敗與否我們都會得到通知。關於如果做到這一點的原理,先前以及講過。從那篇文章可以看出storm裡面有個撞門的acker來跟蹤過所有tuple的完成情況。這篇文章就來討論acker的詳細工作流程。
演算法簡介
acker對於tuple的跟蹤演算法是storm的主要突破之一,這個演算法使得對於任意大的一個tuple樹,它只需要恆定的20個位元組就可以跟蹤了。原理很簡單:acker對於每個spout-tuple儲存一個ack-val的校驗值,它的初始值為0,然後沒發射一個tuple/ack一個tuple,那麼tuple的id都要跟這個校驗值異或一下,並且得到的值更新為ack-val的新值。那麼假設每個發射出去的tuple都被ack了,那麼最後ack-val一定是0(因為一個數字跟自己異或得到的值是0)。
進入正題
那麼下面我們從原始碼層面來看看那些元件在哪些時候會給acker傳送什麼樣的訊息來共同完成這個演算法的。acker對訊息進行處理的主要是下面這塊程式碼:
spout建立一個新的tuple的時候給acker傳送訊息(let [id (.getValue tuple 0) ^TimeCacheMap pending @pending curr (.get pending id) curr (condp = (.getSourceStreamId tuple) ACKER-INIT-STREAM-ID (-> curr (update-ack id) (assoc :spout-task (.getValue tuple 1))) ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1)) ACKER-FAIL-STREAM-ID (assoc curr :failed true))] ...)
訊息格式(看上面程式碼的第一行和第七行對於tuple.getValue()的呼叫)
(spout-tuple-id, task-id)
訊息的streamId是_ack_init(ACKER-INIT-STREAM-ID)
這是告訴acker,一個新的spout-tuple出來了,你跟蹤一下,它是由id為task-id的task建立的(這個task-id會在後面用來通知這個task,你的tuple處理成功了/失敗了)。處理完這個訊息之後,acker會在它的pending這個map(型別為TimeCacheMap)裡面新增這樣一條記錄:
這就是acker對spout-tuple進行跟蹤的核心資料結構,對於每個spout-tuple所產生的tuple樹的跟蹤都只需要儲存上面這條記錄。acker後面會檢查:val什麼時候變成0,變成0,說明這個spout-tuple產生的tuple都處理完成了。{spout-tuple-id {:spout-task task-id :val ack-val)}
Bolt發射一個新的tuple的時候會給acker傳送訊息嗎?
任何一個bolt在發射一個新的tuple的時候,是不會直接通知acker的,如果這樣做的話,那麼沒發射一個訊息都會有三條訊息了:
1.Bolt建立這個tuple的時候,把它發給下一個bolt的訊息
2.Bolt建立這個tuple的時候,傳送給acker的訊息
3.ack tuple的時候傳送的ack訊息
事實上storm裡面只有第一條和第三條訊息,他把第二條訊息省掉了,怎麼做到的呢?
storm這點做得挺巧妙的,bolt在發射一個新的bolt的時候會把這個新的tuple跟它的父tuple的關係儲存起來。然後在ack每個tuple的時候,storm會把要ack的tuple的id,以及這個tuple新建立的所有的tuple的id的異或值傳送給acker.這樣每個tuple就省掉了一個訊息。
tuple被ack的時候給acker傳送訊息
每個tuple在被ack的時候,會給acker傳送一個訊息,訊息格式為:
(spout-tuple-id, tmp-ack-val)
訊息的streamId是_ack_ack(ACKER-ACK-STRANM-ID)
注意,這裡的tmp-ack-val是要ack的tuple的id與它新建立的所有的tuple的id異或的結果
tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 ... )
我們可以從task.clj裡面的send-ack方法看出這一點:
(defn- send-ack [^TopologyContext topology-context
^Tuple input-tuple
^List generated-ids send-fn]
(let [ack-val (bit-xor-vals generated-ids)]
(doseq [
[anchor id] (.. input-tuple
getMessageId
getAnchorsToIds)]
(send-fn (Tuple. topology-context
[anchor (bit-xor ack-val id)]
(.getThisTaskId topology-context)
ACKER-ACK-STREAM-ID))
)))
這裡面的generated-ids引數就是這個input-tuple的所有子tuple的id,從程式碼可以看出storm會給這個tuple的每一個spout-tuple傳送一個ack訊息
為什麼說這裡的generated-ids是input-tuple的子tuple呢?這個send-ack是被OutputCollectorImpl裡面的ack方法呼叫的:
public void ack(Tuple input) {
List generated = getExistingOutput(input);
// don't just do this directly in case
// there was no output
_pendingAcks.remove(input);
_collector.ack(input, generated);
}
_pendingAcks裡面存的是什麼東西呢?private Tuple anchorTuple(Collection< Tuple > anchors, String streamId, List< Object > tuple) {
// The simple algorithm in this function is the key
// to Storm. It is what enables Storm to guarantee
// message processing.
// 這個map存的東西是 spout-tuple-id到ack-val的對映
Map< Long, Long > anchorsToIds = new HashMap<Long, Long>();
// anchors 其實就是它的所有父親:spout-tuple
if(anchors!=null) {
for(Tuple anchor: anchors) {
long newId = MessageId.generateId();
// 告訴每一個父親,你們又多了一個兒子了。
getExistingOutput(anchor).add(newId);
for(long root: anchor.getMessageId().getAnchorsToIds().keySet()) {
Long curr = anchorsToIds.get(root);
if(curr == null) curr = 0L;
// 更新spout-tuple-id的ack-val
anchorsToIds.put(root, curr ^ newId);
}
}
}
return new Tuple(_context, tuple, _context.getThisTaskId(), streamId, MessageId.makeId(anchorsToIds));
}
從上面程式碼裡面的紅色部分我們可以看出,_pendingAcks裡面維護的其實就是tuple到自己兒子的對應關係。
Tuple處理失敗的時候會給acker傳送失敗訊息
acker會忽略掉這種訊息的訊息內容,直接將對應的spout-tuple標記為失敗
最後acker傳送訊息通知spout-tuple對應的worker
(when (and curr
(:spout-task curr))
(cond (= 0 (:val curr))
;; ack-val == 0 說明這個tuple的所有子孫都
;; 處理成功了(都發送ack訊息了)
;; 那麼傳送成功訊息通知建立這個spout-tuple的task.
(do
(.remove pending id)
(acker-emit-direct @output-collector
(:spout-task curr)
ACKER-ACK-STREAM-ID
[id]
))
;; 如果這個spout-tuple處理失敗了
;; 傳送失敗訊息給建立這個spout-tuple的task
(:failed curr)
(do
(.remove pending id)
(acker-emit-direct @output-collector
(:spout-task curr)
ACKER-FAIL-STREAM-ID
[id]
))
))
上面原始碼分析,自己理解為如下:
當tuple從spout被髮送出來時,會初始化task-id,ack-val(初始值為0),spout發射值的時候,會將tuple-id給acker,bolt的tuple進行ack時,會將tuple-id與其子的tuple-id進行異或傳給acker,這樣如果bolt不產生新的tuple,那麼ack是ack-val就是0,就完全處理了,如果是fail,ack-val就不是0.