JStorm 原始碼解析:ACK 機制
Ack 機制是 storm 能夠保證訊息至少被處理一次(at least once)的核心,從而保證訊息不丟失。在 topology 有向無環圖中,spout 向 bolt 發射訊息,上游 bolt 也會向下遊 bolt 發射訊息,storm 設定了一類 acker 型別的系統 bolt 用於接收所有元件傳送的 ack 訊息,監控資料在 topology 中的處理情況。如果處理成功則傳送 __acker_ack
訊息給 spout,否則傳送 __acker_fail
訊息給 spout,然後 spout 依據相應的訊息型別採取一定的應對措施(例如訊息重發等)。
Ack 演算法利用了數學上的異或操作來實現對整個 tuple tree 的執行狀況的判斷。在一個由一條訊息構成的 tuple tree 中,所有的訊息都有一個 MessageId,本質上就是一個 map:
public class MessageId { /* [anchor, anchor_value] */ private Map<Long, Long> _anchorsToIds; }
欄位 _anchorsToIds
儲存的是 anchor 和 anchor value 的對映,其中 anchor 就是 rootId,它在 spout 中生成並且一路透傳到 tuple tree 對應的所有下游 bolt 中,同一個 tuple tree 中的訊息都具有相同的 rootId,用以唯一標識 spout 發出來的這條訊息(以及從下游 bolt 根據這個 tuple 衍生髮出的訊息)。
一. 演算法介紹
示例引用自官網: ofollow,noindex">http://jstorm.io/ProgrammingGuide_cn/AdvancedUsage/Theory/Acker.html
Ack 機制演算法執行流程如下:
- spout 傳送訊息時生成 root_id。
- 同時對每一個目標 bolt task 生成
<root_id, random_long>
(即為這個 root_id 對應一個隨機 long 數值),然後隨著訊息本身傳送到下游 bolt 中。假設有 2 個 bolt,生成的隨機數對分別為:<root_id, r1>
和<root_id, r2>
。 - spout 向 acker 傳送 ack_init 訊息,它的 message_id 為
<root_id, r1 ^ r2>
(即所有 task 產生的隨機數列表的異或值)。 - bolt 收到 spout 或上游 bolt 傳送過來的 tuple 之後,首先會向 acker 傳送 ack 訊息,message_id 即為收到的值。如果 bolt 下游還有 bolt,則與步驟 2 類似對每一個 bolt 生成隨機數對(root_id 不變,但是值變為與當前值亦或新生成的隨機數)。
- acker 收到訊息後會對 root_id 下所有的值做異或操作,亦或結果為 0 則表示整個 tuple tree 被成功處理,否則就會一直等待直到超時,對應 tuple tree 處理失敗。
- acker 向 spout 傳送最終處理成功或失敗的訊息。
我們以一個稍微複雜一點的 topology 為例描述一下它的整個過程。假設 topology 結構為 spout -> bolt1/bolt2 -> bolt3
,即 spout 同時向 bolt1 和 bolt2 傳送訊息,它們處理完後都向 bolt3 傳送訊息,bolt3 沒有後續處理節點。對於這樣一個 topology 而言,ack 機制的執行流程如下:
- spout 發射一條訊息生成 root_id,由於這個值不變我們就用 root_id 來標識:
<root_id, 1> <root_id, 2> <root_id, 1^2>
- bolt1 收到訊息後生成如下訊息:
<root_id, 3> <root_id, 1^3>
- bolt2 收到訊息後生成如下訊息:
<root_id, 4> <root_id, 2^4>
- bolt3 收到訊息後生成如下訊息:
<root_id, 3> <root_id, 4>
- acker 總共收到以下訊息:
- <root_id, 1^2> - <root_id, 1^3> - <root_id, 2^4> - <root_id, 3> - <root_id, 4>
所有的值進行異或之後即為 1^2^1^3^2^4^3^4 = 0
。
二. 實現分析
下面來看一下原始碼層面對於上面描述的演算法的實現,相應邏輯分別位於 SpoutCollector、BoltCollector 和
Acker 中。先來看一下 spout 的邏輯,spout 在 emit 訊息的時候順帶會向 acker 傳送一條 ack_init
訊息,相應實現位於 SpoutCollector#sendMsg
方法中,實現如下:
public List<Integer> sendMsg( String out_stream_id, List<Object> values, Object message_id, Integer out_task_id, ICollectorCallback callback) { final long startTime = emitTotalTimer.getTime(); try { boolean needAck = (message_id != null) && (ackerNum > 0); // 生成隨機的 rootId (隨機 long 數值),需要確保在當前 spout 唯一,否則無法保證 ack 的準確性 Long root_id = this.getRootId(message_id); List<Integer> outTasks; // 獲取目標 taskId 列表 if (out_task_id != null) { outTasks = sendTargets.get(out_task_id, out_stream_id, values, null, root_id); } else { outTasks = sendTargets.get(out_stream_id, values, null, root_id); } List<Long> ackSeq = new ArrayList<>(); // 存放為所有 bolt task 生成的隨機數值 /* * 遍歷所有的目標 task: * 1. 為每個目標 task 生成 messageId: <root_id, 隨機數值> * 2. 向所有目標 task 發射 tuple 訊息 */ for (Integer taskId : outTasks) { MessageId msgId; if (needAck) { long as = MessageId.generateId(random); // 生成隨機的 long 數值 msgId = MessageId.makeRootId(root_id, as); // <root_id, 隨機數值> ackSeq.add(as); // 新增到 ackSeq 中,用於後面亦或計算 } else { msgId = null; } // 獲取當前 tuple 對應的目標 task 的傳輸佇列,然後將 tuple 投遞給該佇列 TupleImplExt tuple = new TupleImplExt(topology_context, values, task_id, out_stream_id, msgId); tuple.setTargetTaskId(taskId); transfer_fn.transfer(tuple); } // 向 Acker 傳送 ack_init 訊息 this.sendMsgToAck(out_stream_id, values, message_id, root_id, ackSeq, needAck); if (callback != null) { callback.execute(out_stream_id, outTasks, values); } return outTasks; } finally { emitTotalTimer.updateTime(startTime); } }
整個方法的執行步驟可以概括為:
- 為當前 tuple tree 生成在當前 spout 範圍內唯一的 root_id。
- 獲取下游目標 task 集合,併為每一個 task 生成對應的 message_id。
- 向所有下游目標 task 傳送 tuple 訊息
- 向 acker 傳送 ack_init 訊息
上面的步驟中 1 和 2 都是在做準備工作,步驟 3 則是發射 tuple 的主體流程,步驟 4 才是 spout 真正執行 ack 的過程。對於一個 tuple 而言,spout 會向所有下游目標 task 逐一發送該 tuple,同時在發射完成之後向 acker 傳送一條 ack_init
訊息。spout ack 的實現位於 SpoutCollector#sendMsgToAck
方法中:
protected void sendMsgToAck( String outStreamId, List<Object> values, Object messageId, Long rootId, List<Long> ackSeq, boolean needAck) { if (needAck) { TupleInfo info = TupleInfo.buildTupleInfo(outStreamId, messageId, values, System.currentTimeMillis(), isCacheTuple); pending.putHead(rootId, info); // ackerTuple = <root_id, 所有目標 task 的 messageId 隨機數值的異或, task_id> List<Object> ackerTuple = JStormUtils.mk_list((Object) rootId, JStormUtils.bit_xor_vals(ackSeq), task_id); // 向 Acker 發射 ack_init 訊息,依據 __ack_init 這個 stream 直接找到目標 task 進行傳送 this.unanchoredSend(topology_context, sendTargets, transfer_fn, Acker.ACKER_INIT_STREAM_ID, ackerTuple); } else if (messageId != null) { // 不需要 ack 但是仍然實現了 IAckValueSpout 介面,需要為這種 spout 回撥 ack 方法 TupleInfo info = TupleInfo.buildTupleInfo(outStreamId, messageId, values, 0, isCacheTuple); AckSpoutMsg ack = new AckSpoutMsg(rootId, spout, null, info, task_stats); ack.run(); } }
對於開啟了 ack 機制的 topology 來說,方法會對所有下游目標 task 的 message_id 的隨機數值執行亦或運算,將結果與 root_id 和當前 spout 的 task_id 一起封裝成 tuple 傳送給 acker。
下面再來看一下 bolt 的 ack 執行過程。bolt 將 emit 和 ack 分成兩個方法,這主要也是為了方便開發者程式設計實現對於訊息消費執行的控制。先來看一下 emit 過程,位於 BoltCollector#sendMsg
方法中:
public List<Integer> sendMsg(String out_stream_id, List<Object> values, Collection<Tuple> anchors, Integer out_task_id, ICollectorCallback callback) { final long start = emitTimer.getTime(); List<Integer> outTasks = null; try { // 獲取所有目標 task 列表 if (out_task_id != null) { outTasks = sendTargets.get(out_task_id, out_stream_id, values, anchors, null); } else { outTasks = sendTargets.get(out_stream_id, values, anchors, null); } // 提前刪除可能超時的 tuple this.tryRotate(); /* * 遍歷所有的目標 task: * 1. 為每一個 task 生成 messageId:<root_id, 隨機數值> * 2. 向所有下游 bolt 發射 tuple 訊息 */ for (Integer taskId : outTasks) { // 計算目標 task 的 messageId MessageId msgId = this.getMessageId(anchors); TupleImplExt tuple = new TupleImplExt(topologyContext, values, this.taskId, out_stream_id, msgId); tuple.setTargetTaskId(taskId); taskTransfer.transfer(tuple); } } catch (Exception e) { LOG.error("bolt emit error:", e); } finally { // 省略 finally 邏輯 } return outTasks; }
實際上一個 bolt 也可以看做是一個特殊的 spout,因為這個時候它相當於是當前 tuple tree 中一個 sub tuple tree 的訊息起始點,所以在執行邏輯上與 spout emit 訊息基本上相同。上面的方法實現整體與 SpoutCollector#sendMsg
也基本類似,這裡我們主要看一下計算下游 task 的 message_id 的邏輯,位於 BoltCollector#getMessageId
方法中:
protected MessageId getMessageId(Collection<Tuple> anchors) { MessageId ret = null; if (anchors != null && ackerNum > 0) { Map<Long, Long> anchors_to_ids = new HashMap<>(); for (Tuple tuple : anchors) { if (tuple.getMessageId() != null) { Long edge_id = MessageId.generateId(random); // 更新當前 inputTuple 的 edge_id 亦或值到 pending_acks put_xor(pendingAcks, tuple, edge_id); MessageId messageId = tuple.getMessageId(); if (messageId != null) { // 這裡將每一對 <root_id, edge_id> 放入 anchors_to_ids(一般情況下也只有一對), // 由於 anchors_to_ids 是一個空 map,因此 put_xor 裡面相當於將 <root_id, edge_id> 放入 anchors_to_ids for (Long root_id : messageId.getAnchorsToIds().keySet()) { put_xor(anchors_to_ids, root_id, edge_id); } } } } // new MessageId ret = MessageId.makeId(anchors_to_ids); } return ret; }
方法 getMessageId 的入參 anchors 是傳送給當前 bolt 的 tuple,大部分時候只有一個,所以上面的方法實現我們可以簡化一下:
protected MessageId getMessageId(Tuple tuple) { Map<Long, Long> anchors_to_ids = new HashMap<>(); if (tuple.getMessageId() != null) { Long edge_id = MessageId.generateId(random); // 放置 <inputTuple, edge_id> 到 pending_acks put_xor(pendingAcks, tuple, edge_id); // <tuple, edge_id>, pendingAcks 會在 bolt 執行 ack 時用到 MessageId messageId = tuple.getMessageId(); if (messageId != null) { // 這裡將每一對 <root_id, edge_id> 放入 anchors_to_ids(一般情況下也只有一對), // 由於 anchors_to_ids 是一個空 map,因此 put_xor 裡面相當於將 <root_id, edge_id> 放入 anchors_to_ids for (Long root_id : messageId.getAnchorsToIds().keySet()) { put_xor(anchors_to_ids, root_id, edge_id); } } } return MessageId.makeId(anchors_to_ids); // <root_id, edge_id> }
簡化之後應該更加清晰一些,實際上邏輯很簡單,就是為下游 bolt 生成一個隨機 long 數值作為 edge_id,然後將 edge_id 和 root_id 一起生成下游 bolt 的 message_id,也就是 <root_id, edge_id>
。
下面我們再來看一下 bolt 的 ack 過程,位於 BoltCollector#ack
方法中:
public void ack(Tuple input) { if (input.getMessageId() != null) { Long ack_val = 0L; // 取出當前 inputTuple 對應的 edge_id 值 Object pend_val = pendingAcks.remove(input); // <tuple, edge_id>,getMessageId 時寫入的, if (pend_val != null) { ack_val = (Long) (pend_val); } // 向 Acker 傳送 ack 訊息 for (Entry<Long, Long> entry : input.getMessageId().getAnchorsToIds().entrySet()) { this.unanchoredSend(topologyContext, sendTargets, taskTransfer, Acker.ACKER_ACK_STREAM_ID, // __ack_ack // 當前 task 的 egge_id 與目標 task 的 edge_id 進行亦或 JStormUtils.mk_list((Object) entry.getKey(), JStormUtils.bit_xor(entry.getValue(), ack_val))); } } // 省略狀態統計邏輯 }
整個方法的邏輯就是拿到當前 task 的 edge_id 與目標 task 的 edge_id 進行亦或運算,然後將結果與 root_id( <root_id, egde_id1 ^ edge_id2>
)一起傳送給 acker。
最後我們來看一下 acker 的執行流程,acker 本質上也是一個 bolt,只不過是由系統建立,所以相應的處理邏輯位於 Acker#execute
方法中,實現如下:
public void execute(Tuple input) { // root_id, random_long, task_id Object id = input.getValue(0); AckObject curr = pending.get(id); String stream_id = input.getSourceStreamId(); // __acker_init 訊息,由 spout 傳送,直接放入 pending map 中 if (Acker.ACKER_INIT_STREAM_ID.equals(stream_id)) { if (curr == null) { curr = new AckObject(); curr.val = input.getLong(1); curr.spout_task = input.getInteger(2); pending.put(id, curr); } else { // bolt's ack first come curr.update_ack(input.getValue(1)); // 進行亦或運算 curr.spout_task = input.getInteger(2); } } // __ack_ack 訊息,來自於 Bolt 傳送 else if (Acker.ACKER_ACK_STREAM_ID.equals(stream_id)) { if (curr != null) { curr.update_ack(input.getValue(1)); // 進行亦或運算 } else { // two case // one is timeout // the other is bolt's ack first come curr = new AckObject(); curr.val = input.getLong(1); pending.put(id, curr); } } // __ack_fail 訊息,來自於 Bolt 傳送 else if (Acker.ACKER_FAIL_STREAM_ID.equals(stream_id)) { if (curr == null) { // do nothing // already timeout, should go fail return; } curr.failed = true; } else { LOG.info("Unknown source stream, " + stream_id + " from task-" + input.getSourceTask()); return; } // 向 spout 發射 ack/fail 訊息 Integer task = curr.spout_task; if (task != null) { if (curr.val == 0) { // 訊息消費成功 pending.remove(id); List values = JStormUtils.mk_list(id); collector.emitDirect(task, Acker.ACKER_ACK_STREAM_ID, values); } else { if (curr.failed) { // 訊息消費失敗 pending.remove(id); List values = JStormUtils.mk_list(id); collector.emitDirect(task, Acker.ACKER_FAIL_STREAM_ID, values); } // 否則表示還未執行完成,不執行操作 } } // 更新 metrics collector.ack(input); // 檢測是否已經超時了 long now = System.currentTimeMillis(); if (now - lastRotate > rotateTime) { lastRotate = now; Map<Object, AckObject> tmp = pending.rotate(); if (tmp.size() > 0) { LOG.warn("Acker's timeout item size:{}", tmp.size()); } } }
方法的實現雖然很長,但是邏輯還是比較清晰簡單的,acker 會為每一個 spout task 建立一個 AckObject 物件,用於記錄對應 tuple tree 的執行狀態,並在每次接收到來自 spout 和 bolt 的 ack 訊息時對該物件進行相應的更新,如果所有的亦或運算結果為 0 則表示 tuple tree 被執行成功,此時 acker 會向 spout 傳送訊息消費成功的訊息,否則如果有 bolt 明確 ack fail,則向 spout 傳送訊息消費失敗的訊息,否則基於超時機制進行判定。
OK,關於 JStorm 的原始碼分析到這一篇基本上就結束了,我們分析了整個 JStorm 的執行骨架,知道我們編寫的 topology 任務在叢集上的執行過程,同時也瞭解了一個大規模分散式系統在實現上需要考慮的一些關鍵點,後面有時間會繼續完善本系列,比如繼續分析 trident 的實現機制等。
(本篇完)
轉載宣告 : 版權所有,商業轉載請聯絡作者,非商業轉載請註明出處
本部落格所有文章除特別宣告外,均採用 CC BY-NC-SA 4.0 許可協議