1. 程式人生 > >storm原始碼分析之acker工作流程

storm原始碼分析之acker工作流程

我們知道storm一個很重要的特性是它能夠保證你發出的每條訊息都會被完整處理,完整處理的意思是指:

一個tuple以及這個tuple所導致的所有的tuple都會被成功處理。而一個tuple會被認為處理失敗瞭如果這個訊息在timeout所指定的時間內沒有成功處理。

也就是說對於任何一個spout-tuple以及它的所有的子孫到底處理成功失敗與否我們都會得到通知。關於如果做到這一點的原理,先前以及講過。從那篇文章可以看出storm裡面有個撞門的acker來跟蹤過所有tuple的完成情況。這篇文章就來討論acker的詳細工作流程。

演算法簡介

acker對於tuple的跟蹤演算法是storm的主要突破之一,這個演算法使得對於任意大的一個tuple樹,它只需要恆定的20個位元組就可以跟蹤了。原理很簡單:acker對於每個spout-tuple儲存一個ack-val的校驗值,它的初始值為0,然後沒發射一個tuple/ack一個tuple,那麼tuple的id都要跟這個校驗值異或一下,並且得到的值更新為ack-val的新值。那麼假設每個發射出去的tuple都被ack了,那麼最後ack-val一定是0(因為一個數字跟自己異或得到的值是0)。

進入正題

那麼下面我們從原始碼層面來看看那些元件在哪些時候會給acker傳送什麼樣的訊息來共同完成這個演算法的。acker對訊息進行處理的主要是下面這塊程式碼:

(let [id (.getValue tuple 0)
   ^TimeCacheMap pending @pending
   curr (.get pending id)
   curr (condp = (.getSourceStreamId tuple)
        ACKER-INIT-STREAM-ID (-> curr
               (update-ack id)
               (assoc :spout-task (.getValue tuple 1)))
        ACKER-ACK-STREAM-ID (update-ack
                         curr (.getValue tuple 1))
        ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
            ...)
spout建立一個新的tuple的時候給acker傳送訊息

訊息格式(看上面程式碼的第一行和第七行對於tuple.getValue()的呼叫)

(spout-tuple-id, task-id)
訊息的streamId是_ack_init(ACKER-INIT-STREAM-ID)

這是告訴acker,一個新的spout-tuple出來了,你跟蹤一下,它是由id為task-id的task建立的(這個task-id會在後面用來通知這個task,你的tuple處理成功了/失敗了)。處理完這個訊息之後,acker會在它的pending這個map(型別為TimeCacheMap)裡面新增這樣一條記錄:

{spout-tuple-id {:spout-task task-id :val ack-val)}
這就是acker對spout-tuple進行跟蹤的核心資料結構,對於每個spout-tuple所產生的tuple樹的跟蹤都只需要儲存上面這條記錄。acker後面會檢查:val什麼時候變成0,變成0,說明這個spout-tuple產生的tuple都處理完成了。

Bolt發射一個新的tuple的時候會給acker傳送訊息嗎?

任何一個bolt在發射一個新的tuple的時候,是不會直接通知acker的,如果這樣做的話,那麼沒發射一個訊息都會有三條訊息了:

1.Bolt建立這個tuple的時候,把它發給下一個bolt的訊息

2.Bolt建立這個tuple的時候,傳送給acker的訊息

3.ack tuple的時候傳送的ack訊息
事實上storm裡面只有第一條和第三條訊息,他把第二條訊息省掉了,怎麼做到的呢?

storm這點做得挺巧妙的,bolt在發射一個新的bolt的時候會把這個新的tuple跟它的父tuple的關係儲存起來。然後在ack每個tuple的時候,storm會把要ack的tuple的id,以及這個tuple新建立的所有的tuple的id的異或值傳送給acker.這樣每個tuple就省掉了一個訊息。

tuple被ack的時候給acker傳送訊息

每個tuple在被ack的時候,會給acker傳送一個訊息,訊息格式為:

(spout-tuple-id, tmp-ack-val)
訊息的streamId是_ack_ack(ACKER-ACK-STRANM-ID)

注意,這裡的tmp-ack-val是要ack的tuple的id與它新建立的所有的tuple的id異或的結果

tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 ... )
我們可以從task.clj裡面的send-ack方法看出這一點:
(defn- send-ack [^TopologyContext topology-context
                          ^Tuple input-tuple
                          ^List generated-ids send-fn]
  (let [ack-val (bit-xor-vals generated-ids)]
    (doseq [
      [anchor id] (.. input-tuple
                      getMessageId
                      getAnchorsToIds)]
      (send-fn (Tuple. topology-context
                 [anchor (bit-xor ack-val id)]
                 (.getThisTaskId topology-context)
                 ACKER-ACK-STREAM-ID))
      )))
這裡面的generated-ids引數就是這個input-tuple的所有子tuple的id,從程式碼可以看出storm會給這個tuple的每一個spout-tuple傳送一個ack訊息

為什麼說這裡的generated-ids是input-tuple的子tuple呢?這個send-ack是被OutputCollectorImpl裡面的ack方法呼叫的:

public void ack(Tuple input) {
    List generated = getExistingOutput(input);
    // don't just do this directly in case
    // there was no output
    _pendingAcks.remove(input);
    _collector.ack(input, generated);
}
_pendingAcks裡面存的是什麼東西呢?
private Tuple anchorTuple(Collection< Tuple > anchors, String streamId, List< Object > tuple) {
    // The simple algorithm in this function is the key
    // to Storm. It is what enables Storm to guarantee
    // message processing.
    // 這個map存的東西是 spout-tuple-id到ack-val的對映
    Map< Long, Long > anchorsToIds = new HashMap<Long, Long>();
    // anchors 其實就是它的所有父親:spout-tuple
    if(anchors!=null) {
        for(Tuple anchor: anchors) {
            long newId = MessageId.generateId();
            // 告訴每一個父親,你們又多了一個兒子了。
            getExistingOutput(anchor).add(newId);
            for(long root: anchor.getMessageId().getAnchorsToIds().keySet()) {
                Long curr = anchorsToIds.get(root);
                if(curr == null) curr = 0L;
                // 更新spout-tuple-id的ack-val
                anchorsToIds.put(root, curr ^ newId);
            }
        }
    }
    return new Tuple(_context, tuple, _context.getThisTaskId(), streamId, MessageId.makeId(anchorsToIds));
}
從上面程式碼裡面的紅色部分我們可以看出,_pendingAcks裡面維護的其實就是tuple到自己兒子的對應關係。

Tuple處理失敗的時候會給acker傳送失敗訊息

acker會忽略掉這種訊息的訊息內容,直接將對應的spout-tuple標記為失敗

最後acker傳送訊息通知spout-tuple對應的worker

(when (and curr
          (:spout-task curr))
 (cond (= 0 (:val curr))
       ;; ack-val == 0 說明這個tuple的所有子孫都
       ;; 處理成功了(都發送ack訊息了)
       ;; 那麼傳送成功訊息通知建立這個spout-tuple的task.
       (do
         (.remove pending id)
         (acker-emit-direct @output-collector
                            (:spout-task curr)
                            ACKER-ACK-STREAM-ID
                            [id]
                            ))
       ;; 如果這個spout-tuple處理失敗了
       ;; 傳送失敗訊息給建立這個spout-tuple的task
       (:failed curr)
       (do
         (.remove pending id)
         (acker-emit-direct @output-collector
                            (:spout-task curr)
                            ACKER-FAIL-STREAM-ID
                            [id]
                            ))
       ))

上面原始碼分析,自己理解為如下:

當tuple從spout被髮送出來時,會初始化task-id,ack-val(初始值為0),spout發射值的時候,會將tuple-id給acker,bolt的tuple進行ack時,會將tuple-id與其子的tuple-id進行異或傳給acker,這樣如果bolt不產生新的tuple,那麼ack是ack-val就是0,就完全處理了,如果是fail,ack-val就不是0.