1. 程式人生 > >Storm消息可靠處理機制

Storm消息可靠處理機制

字符串 cti put 計算 size pan pen bolt blog

在很多應用場景中,分布式系統的可靠性保障尤其重要。比如電商平臺中,客戶的購買請求需要可靠處理,不能因為節點故障等原因丟失請求;比如告警系統中,產生的核心告警必須及時完整的知會監控人員,不能因為網絡故障而丟失數據。

Storm消息可靠性保障是Storm核心特性之一,其中消息樹的跟蹤管理機制是Storm核心算法之一,本文將詳細介紹Storm消息可靠處理機制。我們從Storm初探中的例子入手。

技術分享

一、消息處理流程

1、 Spout節點

(1) Spout接收到一個文本消息;

msg1

劉備 關羽 張飛

曹操 郭嘉 荀彧

(2) Spout把文本消息拆分為2個行字符串消息,並把2個消息發送給NamesSplit Bolt節點。

2、 NamesSplit Bolt節點

(1) NamesSplit Bolt接收到兩個行字符串消息;

msg2 劉備 關羽 張飛

msg3 曹操 郭嘉 荀彧

(2) NamesSplit Bolt把2個行字符串消息拆分為6個名字消息,發送給HelloWorld Bolt節點;

(3) NamesSplit Bolt確認,msg2、msg3處理完成。

3、 HelloWorld Bolt節點

(1) HelloWorld Bolt接收到6個名字消息;

msg4 劉備

msg5 關羽

msg6 張飛

msg7 曹操

msg8 郭嘉

msg9 荀彧

(2) HelloWorld Bolt SayHello;

(3) HelloWorld Bolt確認,msg4、msg5、msg6、msg7、msg8、msg9處理完成。

二、關鍵代碼

1、 Spout

下面代碼表示Spout節點發送消息,消息綁定到messageId上,這裏的messageId可以看做上述例子中的msg1,tuple可以看做上述例子中的msg2或msg3。

public void nextTuple() 
{
    this.collector.emit(List<Object> tuple, Object messageId);
}

下面代碼會在消息處理成功或失敗後調用。

public void ack(Object msgId) 
{
}

public void fail(Object msgId) { }

2、 Bolt

這段代碼是Bolt消息處理發送代碼,我們詳細看一下標紅代碼。

public void execute(Tuple input) 
{
    String[] nameArray = names.split(" ");
    for(String name : nameArray)
    {
        List<Object> splitList = new ArrayList<Object>();
        splitList.add(name);
        collector.emit(inputList, splitList);
    }
    collector.ack(input);
}
OutputCollector.emit(Collection<Tuple> anchors, List<Object> tuple) 中tuple表示發送的子消息,anchors表示子消息的父節點。
這段代碼既發送了子消息,有把子消息錨定到了消息樹上。上述例子中,相當於把消息msg4 劉備錨定到消息msg2 劉備 關羽 張飛上。
OutputCollector.ack(Tuple input)表示回答消息處理完成。上述例子中,相當於確認msg4 劉備處理完成。

下面代碼會在消息處理成功或失敗後調用。
public void ack(Object msgId) 
{
}

public void fail(Object msgId) 
{
}

三、消息重發機制

可以看到,一條消息從Spout發送後,會產生一棵消息樹,只有當消息樹中的所有消息都被確認後(ack),Storm才認為消息處理完成。

代碼上可以輕易看出,我們只需要指定根節點消息ID(即Spout接收到的消息ID),其他消息ID系統會自動生成。同時,我們只需要確認非根節點消息處理完成。

實際上,Spout或者Bolt沒發送一條消息,消息便會存儲到kestrel隊列中,Bolt每接收到一條消息,kestrel便會標記這條消息在處理中(pengding),知道該條消息被確認處理完成,kestrel才把它移除出隊列。

Bolt消息處理過程中,發生異常或者超時,kestrel會把該條消息從處理中狀態重新置為待處理狀態,等待Storm下一次調度處理。

四、消息樹管理算法

可以看到,Spout每處理一個消息,就會生成一棵消息樹,如果Storm存儲每個消息樹每個節點的狀態,內存很快便會耗盡,顯然是不可取的。

實際上,Storm僅僅采用20字節管理一棵消息樹,數據結構如下:

treeId|{64bit}

treeId用於區分不同的消息樹(實際上和代碼中指定的根節點ID一一對應),{64bit}則用於消息樹節非根點異或計算。

每生成一個64位msgid,則與{64bit}異或計算一次,直到該消息確認處理完成後,再與{64bit}異或計算一次。(異或計算結果為新的{64bit}值)

{64bit}==0時,表示消息處理完畢。(一目了然,在次不再證明)

Storm消息可靠處理機制