1. 程式人生 > >Storm入門之第五章Bolts

Storm入門之第五章Bolts

第5章 Bolts

正如你已經看到的,bolts是一個Storm叢集中的關鍵元件。你將在這一章學到bolt生命週期,一些bolt設計策略,以及幾個有關這些內容的例子。

Bolt生命週期

Bolt是這樣一種元件,它把元組作為輸入,然後產生新的元組作為輸出。實現一個bolt時,通常需要實現IRichBolt介面。Bolts物件由客戶端機器建立,序列化為拓撲,並提交給叢集中的主機。然後叢集啟動工人程序反序列化bolt,呼叫prepare,最後開始處理元組。

NOTE:要建立一個bolt物件,它通過構造器引數初始化成員屬性,bolt被提交到叢集時,這些屬性值會隨著一起序列化。

Bolt結構

Bolts

擁有如下方法:

declareOutputFields(OutputFieldsDeclarer declarer)bolt宣告輸出模式
prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector)
    僅在bolt開始處理元組之前呼叫
execute(Tuple input)
    處理輸入的單個元組
cleanup()
    在bolt即將關閉時呼叫

下面看一個例子,在這個例子中bolt把一句話分割成單詞列表:

class SplitSentence implements IRichBolt {
    private OutputCollector collector;
    publlic void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word : sentence.split(" ")) {
            collector.emit(new Values(word));
        }
    }

    public void cleanup(){}

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

正如你所看到的,這是一個很簡單的bolt。值得一提的是在這個例子裡,沒有訊息擔保。這就意味著,如果bolt因為某些原因丟棄了一些訊息——不論是因為bolt掛了,還是因為程式故意丟棄的——生成這條訊息的spout不會收到任何通知,任何其它的spoutsbolts也不會收到。

然而在許多情況下,你想確保訊息在整個拓撲範圍內都被處理過了。

可靠的bolts和不可靠的bolts

正如前面所說的,Storm保證通過spout傳送的每條訊息會得到所有bolt的全面處理。基於設計上的考慮,這意味著你要自己決定你的bolts是否保證這一點。

拓撲是一個樹型結構,訊息(元組)穿過其中一條或多條分支。樹上的每個節點都會呼叫ack(tuple)

fail(tuple),Storm因此知道一條訊息是否失敗了,並通知那個/那些製造了這些訊息的spout(s)。既然一個Storm拓撲執行在高度並行化的環境裡,跟蹤始發spout例項的最好方法就是在訊息元組內包含一個始發spout引用。這一技巧稱做錨定(譯者注:原文為Anchoring)。修改一下剛剛講過的SplitSentence,使它能夠確保訊息都被處理了。

class SplitSentence implenents IRichBolt {
    private OutputCollector collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word : sentence.split(" ")) {
            collector.emit(tuple, new Values(word));
        }
        collector.ack(tuple);
    }

    public void cleanup(){}

    public void declareOutputFields(OutputFieldsDeclarer declarer){
        declar.declare(new Fields("word"));
    }
}

錨定發生在呼叫collector.emit()時。正如前面提到的,Storm可以沿著元組追蹤到始發spoutcollector.ack(tuple)collector.fail(tuple)會告知spout每條訊息都發生了什麼。當樹上的每條訊息都已被處理了,Storm就認為來自spout的元組被全面的處理了。如果一個元組沒有在設定的超時時間內完成對訊息樹的處理,就認為這個元組處理失敗。預設超時時間為30秒。

NOTE:你可以通過修改Config.TOPOLOGY_MESSAGE_TIMEOUT修改拓撲的超時時間。

當然了spout需要考慮訊息的失敗情況,並相應的重試或丟棄訊息。

NOTE:你處理的每條訊息要麼是確認的(譯者注:collector.ack())要麼是失敗的(譯者注:collector.fail())。Storm使用記憶體跟蹤每個元組,所以如果你不呼叫這兩個方法,該任務最終將耗盡記憶體。

多資料流

一個bolt可以使用emit(streamId, tuple)把元組分發到多個流,其中引數streamId是一個用來標識流的字串。然後,你可以在TopologyBuilder決定由哪個流訂閱它。

多錨定

為了用bolt連線或聚合資料流,你需要藉助記憶體緩衝元組。為了在這一場景下確保訊息完成,你不得不把流錨定到多個元組上。可以向emit方法傳入一個元組列表來達成目的。

...
List anchors = new ArrayList();
anchors.add(tuple1);
anchors.add(tuple2);
collector.emit(anchors, values);
...

通過這種方式,bolt在任意時刻呼叫ackfail方法,都會通知訊息樹,而且由於流錨定了多個元組,所有相關的spout都會收到通知。

使用IBasicBolt自動確認

你可能已經注意到了,在許多情況下都需要訊息確認。簡單起見,Storm提供了另一個用來實現bolt的介面,IBasicBolt。對於該介面的實現類的物件,會在執行execute方法之後自動呼叫ack方法。

class SplitSentence extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String sentence = tuple.getString(0);
        for(String word : sentence.split(" ")) {
            collector.emit(new Values(word));
        }
}

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

NOTE:分發訊息的BasicOutputCollector自動錨定到作為引數傳入的元組。