1. 程式人生 > >Apache Storm 官方文件 —— 訊息的可靠性保障

Apache Storm 官方文件 —— 訊息的可靠性保障

原文連結    譯者:魏勇

Storm 能夠保證每一個由 Spout 傳送的訊息都能夠得到完整地處理。本文詳細解釋了 Storm 如何實現這種保障機制,以及作為使用者如何使用好 Storm 的可靠性機制。

訊息的“完整性處理”是什麼意思

一個從 spout 中傳送出的 tuple 會產生上千個基於它建立的 tuples。例如,有這樣一個 word-count 拓撲:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com"
, 22133, "sentence_queue", new StringScheme())); builder.setBolt("split", new SplitSentence(), 10) .shuffleGrouping("sentences"); builder.setBolt("
count"
, new WordCount(), 20) .fieldsGrouping("split", new Fields("word"));

這個拓撲從一個 Kestrel 佇列中讀取句子,然後將句子分解成若干個單詞,然後將它每個單詞和該單詞的數量傳送出去。這種情況下,從 spout 中發出的 tuple 就會產生很多基於它建立的新 tuple:包括句子中單詞的 tuple 和 每個單詞的個數的 tuple。這些訊息構成了這樣一棵樹:

Tuple tree

如果這棵 tuple 樹傳送完成,並且樹中的每一條訊息都得到了正確的處理,就表明傳送 tuple 的 spout 已經得到了“完整性處理”。對應的,如果在指定的超時時間內 tuple 樹中有訊息沒有完成處理就意味著這個 tuple 失敗了。這個超時時間可以使用

Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 引數在構造拓撲時進行配置,如果不配置,則預設時間為 30 秒。

在訊息得到完整性處理後或者處理失敗後會發生什麼

為了理解這個問題,讓我們先了解一下 tuple 的生命週期。下面是定義 spout 的介面(可以在 Javadoc 中檢視更多細節資訊):

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

首先,通過呼叫 SpoutnextTuple 方法,Storm 向 Spout 請求一個 tuple。Spout 會使用 open 方法中提供的SpoutOutputCollector 向它的一個輸出資料流中傳送一個 tuple。在傳送 tuple 的時候,Spout 會提供一個 “訊息 id”,這個 id 會在後續過程中用於識別 tuple。例如,上面的 KestrelSpout 就是從一個 kestrel 佇列中讀取一條訊息,然後再發送一條帶有“訊息 id”的訊息,這個 id 是由 Kestrel 提供的。使用 SpoutOutputCollector 傳送訊息一般是這樣的形式:

_collector.emit(new Values("field1", "field2", 3) , msgId);

隨後,tuple 會被髮送到對應的 bolt 中去,在這個過程中,Storm 會很小心地跟蹤建立的訊息樹。如果 Storm 檢測到某個 tuple 被完整處理, Storm 會根據 Spout 提供的“訊息 id”呼叫最初發送 tuple 的 Spout 任務的 ack 方法。對應的,Storm 在檢測到 tuple 超時之後就會呼叫 fail 方法。注意,對於一個特定的 tuple,響應(ack)和失敗處理(fail)都只會由最初建立這個 tuple 的任務執行。也就是說,及時 Spout 在叢集中有很多個任務,某個特定的 tuple 也只會由建立它的那個任務——而不是其他的任務——來處理成功或失敗的結果。

我們再以 KestrlSpout 為例來看看在訊息的可靠性處理中 Spout 做了什麼。在 KestrlSpout 從 Kestrel 佇列中取出一條訊息時,可以看作它“開啟”了這條訊息。也就是說,這條訊息實際上並沒有從佇列中真正地取出來,而是保持著一個“掛起”狀態,等待訊息處理完成的訊號。在掛起狀態的訊息不回被髮送到其他的消費者中。另外,如果消費者(客戶端)斷開了連線,所有處於掛起狀態的訊息都會重新放回到佇列中。在訊息“開啟”的時候 Kestrel 會給客戶端同時提供訊息體資料和一個唯一的 id。KestrelSpout 在使用 SpoutOutputCollector 傳送 tuple 的時候就會把這個唯一的 id 當作“訊息 id”。一段時間之後,在 KestrelSpoutack 或者 fail 方法被呼叫的時候,KestrelSpout 就會通過這個訊息 id 向 Kestrel 請求將訊息從佇列中移除(對應 ack 的情況)或者將訊息重新放回佇列(對應 fail 的情況)。

Storm 的可靠性 API

使用 Storm 的可靠性機制的時候你需要注意兩件事:首先,在 tuple 樹中建立新節點連線時務必通知 Storm;其次,在每個 tuple 處理結束的時候也必須向 Storm 發出通知。通過這兩個操作,Storm 就能夠檢測到 tuple 樹會在何時完成處理,並適時地呼叫 ack 或者 fail 方法。Storm 的 API 提供了一種非常精確的方式來實現著兩個操作。

Storm 中指定 tuple 樹中的一個連線稱為“錨定”(anchoring)。錨定是在傳送新 tuple 的同時發生的。讓我們以下面的 Bolt 為例說明這一點,這個 Bolt 將一個包含句子的 tuple 分割成若干個單詞 tuple:

public class SplitSentence extends BaseRichBolt {
        OutputCollector _collector;

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                _collector.emit(tuple, new Values(word));
            }
            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
}

通過將輸入 tuple 指定為 emit 方法的第一個引數,每個單詞 tuple 都被“錨定”了。這樣,如果單詞 tuple 在後續處理過程中失敗了,作為這棵 tuple 樹的根節點的原始 Spout tuple 就會被重新處理。相對應的,如果這樣傳送 tuple:

_collector.emit(new Values(word));

就稱為“非錨定”。在這種情況下,下游的 tuple 處理失敗不會觸發原始 tuple 的任何處理操作。有時候傳送這種“非錨定” tuple 也是必要的,這取決於你的拓撲的容錯性要求。

一個輸出 tuple 可以被錨定到多個輸入 tuple 上,這在流式連線或者聚合操作時很有用。顯然,一個多錨定的 tuple 失敗會導致 Spout 中多個 tuple 的重新處理。多錨定操作是通過指定一個 tuple 列表而不是單一的 tuple 來實現的,如下面的例子所示:

List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

多錨定操作會把輸出 tuple 新增到多個 tuple 樹中。注意,多錨定也可能會打破樹的結構從而建立一個 tuple 的有向無環圖(DAG),如下圖所示:

Tuple DAG

Storm 的程式實現既支援對樹的處理,同樣也支援對 DAG 的處理(由於早期的 Storm 版本僅僅對樹有效,所以“tuple 樹”的這個糟糕的概念就一直沿襲下來了)。

錨定其實可以看作是將 tuple 樹具象化的過程 —— 在結束對一棵 tuple 樹中一個單獨 tuple 的處理的時候,後續以及最終的 tuple 都會在 Storm 可靠性 API 的作用下得到標定。這是通過 OutputCollectorackfail 方法實現的。如果你再回過頭看一下 SplitSentence 的例子,你就會發現輸入 tuple 是在所有的單詞 tuple 傳送出去之後被 ack 的。

你可以使用 OutputCollectorfail 方法來使得位於 tuple 樹根節點的 Spout tuple 立即失敗。例如,你的應用可以在建立資料庫連線的時候抓取異常,並且在異常出現的時候立即讓輸入 tuple 失敗。通過這種立即失敗的方式,原始 Spout tuple 就會比等待 tuple 超時的方式響應更快。

每個待處理的 tuple 都必須顯式地應答(ack)或者失效(fail)。因為 Storm 是使用記憶體來跟蹤每個 tuple 的,所以,如果你不對每個 tuple 進行應答或者失效,那麼負責跟蹤的任務很快就會發生記憶體溢位。

Bolt 處理 tuple 的一種通用模式是在 execute 方法中讀取輸入 tuple、傳送出基於輸入 tuple 的新 tuple,然後在方法末尾對 tuple 進行應答。大部分 Bolt 都會使用這樣的過程。這些 Bolt 大多屬於過濾器或者簡單的處理函式一類。Storm 有一個可以簡化這種操作的簡便介面,稱為 BasicBolt。例如,如果使用 BasicBoltSplitSentence 的例子可以這樣寫:

public class SplitSentence extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
}

這個實現方式比之前的方式要簡單許多,而且在語義上有著完全一致的效果。傳送到 BasicOutputCollector 的 tuple 會被自動錨定到輸入 tuple 上,而且輸入 tuple 會在 execute 方法結束的時候自動應答。

相對應的,執行聚合或者聯結操作的 Bolt 可能需要延遲應答 tuple,因為它需要等待一批 tuple 來完成某種結果計算。聚合和聯結操作一般也會需要對他們的輸出 tuple 進行多錨定。這個過程已經超出了 IBasicBolt 的應用範圍。

在 tuple 可以被重新處理的前提下,如何讓我的應用可以得到正確的執行?

按照軟體設計的一般思路,這個問題的答案是“取決於實際情況”。Storm 0.7.0 版本引入了“事務拓撲”的特性,它能夠保證大多數計算過程都能夠滿足恰好一次(exactly-once)的訊息語義的容錯性要求。想要了解“事務拓撲”的更多內容可以參考這篇文章

Storm 是以怎樣一種高效的方式實現可靠性的?

Storm 的拓撲有一些特殊的稱為“acker”的任務,這些任務負責跟蹤每個 Spout 發出的 tuple 的 DAG。當一個 acker 發現一個 DAG 結束了,它就會給建立 spout tuple 的 Spout 任務傳送一條訊息,讓這個任務來應答這個訊息。你可以使用Config.TOPOLOGY_ACKERS 來配置拓撲的 acker 數量。Storm 預設會將 acker 的數量設定為一,不過如果你有大量訊息的處理需求,你可能需要增加這個數量。

理解 Storm 的可靠性實現的最好方式還是通過了解 tuple 和 tuple DAG 的生命週期。當一個 tuple 在拓撲中被創建出來的時候 —— 不管是在 Spout 中還是在 Bolt 中建立的 —— 這個 tuple 都會被配置一個隨機的 64 位 id。acker 就是使用這些 id 來跟蹤每個 spout tuple 的 tuple DAG 的。

Spout tuple 的 tuple 樹中的每個 tuple 都知道 spout tuple 的 id。當你在 bolt 中傳送一個新 tuple 的時候,輸入 tuple 中的所有 spout tuple 的 id 都會被複制到新的 tuple 中。在 tuple 被 ack 的時候,它會通過回掉函式向合適的 acker 傳送一條訊息,這條訊息顯示了 tuple 樹中發生的變化。也就是說,它會告訴 acker 這樣一條訊息:“在這個 tuple 樹中,我的處理已經結束了,接下來這個就是被我標記的新 tuple”。

以下圖為例,如果 D tuple 和 E tuple 是由 C tuple 建立的,那麼在 C 應答的時候 tuple 樹就會發生變化:

What happens on an ack

由於在 D 和 E 新增到 tuple 樹中的時候 C 已經從樹中移除了,所以這個樹並不會被過早地結束。

關於 Storm 如何跟蹤 tuple 樹還有更多的細節。正如上面所提到的,你可以隨意設定拓撲中 acker 的數量。這就會引起下面的問題:當 tuple 在拓撲中被 ack 的時候,它是怎麼知道向那個 acker 任務傳送資訊的?

對於這個問題,Storm 實際上是使用雜湊演算法來將 spout tuple 匹配到 acker 任務上的。由於每個 tuple 都會包含原始的 spout tuple id,所以他們會知道需要與哪個 acker 任務通訊。

關於 Storm 的另一個問題是 acker 是如何知道它所跟蹤的 spout tuple 是由哪個 Spout 任務處理的。實際上,在 Spout 任務傳送新 tuple 的時候,它也會給對應的 acker 傳送一條訊息,告訴 acker 這個 spout tuple 是與它的任務 id 相關聯的。隨後,在 acker 觀察到 tuple 樹結束處理的時候,它就會知道向哪個 Spout 任務傳送結束訊息。

Acker 實際上並不會直接跟蹤 tuple 樹。對於一棵包含數萬個 tuple 節點的樹,如果直接跟蹤其中的每個 tuple,顯然會很快把這個 acker 的記憶體撐爆。所以,這裡 acker 使用一個特殊的策略來實現跟蹤的功能,使用這個方法對於每個 spout tuple 只需要佔用固定的記憶體空間(大約 20 位元組)。這個跟蹤演算法是 Storm 執行的關鍵,也是 Storm 的一個突破性技術。

在 acker 任務中儲存了一個表,用於將 spout tuple 的 id 和一對值相對映。其中第一個值是建立這個 tuple 的任務 id,這個 id 主要用於在後續操作中傳送結束訊息。第二個值是一個 64 位元的數字,稱為“應答值”(ack val)。這個應答值是整個 tuple 樹的一個完整的狀態表述,而且它與樹的大小無關。因為這個值僅僅是這棵樹中所有被建立的或者被應答的 tuple 的 tuple id 進行異或運算的結果值。

當一個 acker 任務觀察到“應答值”變為 0 的時候,它就知道這個 tuple 樹已經完成處理了。因為 tuple id 實際上是隨機生成的 64 位元數值,所以“應答值”碰巧為 0 是一種極小概率的事件。理論計算得以得出,在每秒應答一萬次的情況下,需要 5000 萬年才會發生一次錯誤。而且即使是這樣,也僅僅會在 tuple 碰巧在拓撲中失敗的時候才會發生資料丟失的情況。

假設你現在已經理解了這個可靠性演算法,讓我們再分析一下所有失敗的情形,看看這些情形下 Storm 是如何避免資料缺失的:

  • 由於任務(執行緒)掛掉導致 tuple 沒有被應答(ack)的情況:這時位於 tuple 樹根節點的 spout tuple 會在任務超時後得到重新處理。
  • Acker 任務掛掉的情形:這種情況下 acker 所跟蹤的所有 spout tuple 都會由於超時被重新處理。
  • Spout 任務掛掉的情形:這種情況下 Spout 任務的來源就會負責重新處理訊息。例如,對於像 Kestrel 和 RabbitMQ 這樣的訊息佇列就會在客戶端斷開連線時將所有的掛起狀態的訊息放回佇列(關於掛起狀態的概念可以參考Storm 的容錯性——譯者注)。

綜上所述,Storm 的可靠性機制完全具備分佈的、可伸縮的、容錯的特徵。

調整可靠性

由於 acker 任務是輕量級的,在拓撲中你並不需要很多 acker 任務。你可以通過 Storm UI 監控他們的效能(acker 任務的 id 為“__acker”)。如果發現觀察結果存在問題,你可能就需要增加更多的 acker 任務。

如果你不關注訊息的可靠性 —— 也就是說你不關心在失敗情形下發生的 tuple 丟失 —— 那麼你就可以通過不跟蹤 tuple 樹的處理來提升拓撲的效能。由於 tuple 樹中的每個 tuple 都會帶有一個應答訊息,不追蹤 tuple 樹會使得傳輸的訊息的數量減半。同時,下游資料流中的 id 也會變少,這樣可以降低網路頻寬的消耗。

有三種方法可以移除 Storm 的可靠性機制。第一種方法是將 Config.TOPOLOGY_ACKERS 設定為0,在這種情況下,Storm 會在 Spout 傳送 tuple 之後立即呼叫 ack 方法,tuple 樹葉就不會被跟蹤了。

第二種方法是基於訊息本身移除可靠性。你可以通過在 SpoutOutputCollector.emit 方法中省略訊息 id 來關閉 spout tuple 的跟蹤功能。

最後,如果你不關心拓撲中的下游 tuple 是否會失敗,你可以在傳送 tuple 的時候選擇傳送“非錨定”的(unanchored)tuple。由於這些 tuple 不會被標記到任何一個 spout tuple 中,顯然在他們處理失敗的時候不會引起任何 spout tuple 的重新處理(注意,在使用這種方法時,如果上游有 spout 或 bolt 仍然保持可靠性機制,那麼需要在 execute 方法之初呼叫OutputCollector.ack 來立即響應上游的訊息,否則上游元件會誤認為訊息沒有傳送成功導致所有的訊息會被反覆傳送——譯者注)。