1. 程式人生 > >storm 消息的可靠處理機制——Ack整個tuple樹異或

storm 消息的可靠處理機制——Ack整個tuple樹異或

實現 out 異或 orm 我們 保存 通知 數據 collect

消息的可靠處理機制
Storm內部通過一種巧妙的異或算法判讀每個tuple是否被正確完整的處理。

Spout的一個Task創建一個Tuple時,即在Spout的nextTuple()方法中實現從特定數據源讀取數據的處理邏輯中,會與Acker進行通信,向Acker發送消息,Acker保存該Tuple對應信息:{:spout-task task-id :val ack-val)}。
Bolt在emit一個新的子Tuple時,會保存子Tuple與父Tuple的關系。
在Bolt中進行ack時,會計算出父Tuple與由該父Tuple新生成的所有子Tuple的一個異或值,將該值發送給Acker(計算異或值:tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 … ^ child-tuple-idN))。可見,這裏Bolt並沒有把所有生成的子Tuple發送給Acker,這要比發送一個異或值大得多了,只發送一個異或值大大降低了Bolt與Acker之間網絡通信的開銷。
Acker收到Bolt發送的異或值,與當前保存的task-id對應的初始ack-val做異或,tuple-id與ack-val相同,異或結果為0,但是子Tuple的child-tuple-id等並不互相相同,只有等所有的子Tuple的child-tuple-id都執行ack回來,最後ack-val就為0,表示整個Tuple樹處理成功。無論成功與失敗,最後都要從Acker維護的隊列中移除。
最後,Acker會向產生該原始父Tuple的Spout對應的Task發送通知,成功或者失敗,回調Spout的ack或fail方法。如果我們在實現Spout時,重寫了ack和fail方法,處理回調就會執行這裏的邏輯。

當然這種異或算法存在1/2^64概率的誤差,可以忽略不計。
在開發中,對於那些不允許丟失的消息我們在發送消息時要對tuple指定messageID並進行錨定,告訴tuple tree這裏增加了一個新的節點,保證消息的可靠性。

collector.emit(tuple,messageId)//可靠消息
collector.emit(tuple)//不可靠的消息

collector.emit(tuple, new Values(word));//錨定發送,可靠的消息
collector.emit(new Values(word)));//非錨定發送,不可靠的消息
註意:繼承BaseBasicBolt實現的API本是就是可靠性的,不需要自己進行錨定發送和調用ack以及fail方法

storm 消息的可靠處理機制——Ack整個tuple樹異或