1. 程式人生 > >JStorm源代碼閱讀——消息的確認機制

JStorm源代碼閱讀——消息的確認機制

閱讀 mut 就是 核心數 execute integer comment etl 格式

Acker

//Acker相當於一個bolt,用於處理事件
public class Acker implements IBolt {
	private RotatingMap<Object, AckObject> pending = null;
	    @Override
    public void execute(Tuple input) {
        Object id = input.getValue(0);
        AckObject curr = pending.get(id);
        String stream_id = input.getSourceStreamId();
        if (Acker.ACKER_INIT_STREAM_ID.equals(stream_id)) {//處理開始追蹤事件,放進自己的pending隊列。只有spout會發送該事件
            if (curr == null) {
                curr = new AckObject();

                curr.val = input.getLong(www.dfgjpt.com);
                curr.spout_task = input.getInteger(2);

                pending.put(id, curr);
            } else {
                // bolt‘s ack first come
                curr.update_ack(input.getValue(1));
                curr.spout_task = input.getInteger(2);
            }
        } else if (Acker.ACKER_ACK_STREAM_ID.equals(stream_id)) {//bolt發送過來的ack事件
            if (curr != null)www.zhongdayule.cn {
                curr.update_ack(input.getValue(1));//bolt發送過來的值是它要ack的tup的ID和它產生的tup的ID的異或值。
            } else {
                // two case
                // one is timeout
                // the other is bolt‘s ack first come
                curr = new AckObject();
                curr.val = input.getLong(1);
                pending.put(id, curr);
            }
        } else if (Acker.ACKER_FAIL_STREAM_ID.equals(stream_id)) {//bolt發送過來的失敗
            if (curr ==www.thd540.com null) {
                // do nothing
                // already timeout, should go fail
                return;
            }
            curr.failed = true;
        } else {
            LOG.info("Unknow source stream, " + stream_id + " from task-" + input.getSourceTask());
            return;
        }

        Integer task = curr.spout_task;
        if (task != null) {
            if (curr.val == 0) {//如果校驗值為0,則證明發送成功
                pending.remove(id);
                List values = JStormUtils.mk_list(id);
                collector.emitDirect(task, Acker.ACKER_ACK_STREAM_ID, values);
            } else {
                if (curr.failed) {//將失敗的tup直接發送給對應的spout task
                    pending.remove(id);
                    List values www.00534.cn= JStormUtils.mk_list(id);
                    collector.emitDirect(task, Acker.ACKER_FAIL_STREAM_ID, values);
                }
            }
        } else {

        }

        // add this operation to update acker‘s ACK statics
        collector.ack(input);

        long now = System.currentTimeMillis();
        if (now - lastRotate www.tianzunyule178.com> rotateTime) {
            lastRotate www.06640.cn= now;
            Map<Object, AckObject> tmp = pending.rotate();
            LOG.info("Acker‘s timeout item size:{}", tmp.size());
     

Spout創建一個新的tuple的時候給acker發送消息
(spout-tuple-id, task-id)。
這是告訴acker, 一個新的spout-tuple出來了, 你跟蹤一下,它是由id為task-id的task創建的(這個task-id在後面會被acker用來通知這個task:你的tuple處理成功了/失敗了)。處理完這個消息之後, acker會在它的pending這個map(類型為TimeCacheMap)裏面添加這樣一條記錄:
{spout-tuple-id {:spout-tasktask-id :valack-val)}
這就是acker對spout-tuple進行跟蹤的核心數據結構, 對於每個spout-tuple所產生的tuple樹的跟蹤都只需要保存上面這條記錄。acker後面會檢查:val什麽時候變成0,變成0, 說明這個spout-tuple產生的tuple都處理完成了。此時,acker將該條信息從自己的map裏移除。

每個tuple在被ack的時候,會給acker發送一個消息,消息格式是(spout-tuple-id, tmp-ack-val),
Tuple處理失敗的時候會給acker發送失敗消息。acker會對該spout-tuple-id標記上失敗。

:本文為博主原創文章,轉載請附上博文鏈接!

JStorm源代碼閱讀——消息的確認機制