1. 程式人生 > >Storm解讀之路(二、基本 Java-API 篇)

Storm解讀之路(二、基本 Java-API 篇)

寫這些東西其實本質上是記錄因工作接觸 Storm 之後的學習進度,既然是工作,當然要敲程式碼,所以這一篇就分享下基本 Java-API 吧。

首先看下面的圖(畫圖不行見諒),這是 Storm API 使用中最基本的介面和抽象類關係。

OK,這裡我們可以清楚的看到,IComponent 是 API 核心介面,那麼其是怎麼的構成呢?


public interface IComponent extends Serializable {

    /**
     * @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
     */
void declareOutputFields(OutputFieldsDeclarer declarer); Map<String, Object> getComponentConfiguration(); }

這兩個方法很簡單,declareOutputFields 是申明 topology 中流的輸出模式(具體講 Stream 模式的時候再說),而 getComponentConfiguration 是獲取 Storm 配置資訊的。

其實在 Visio 圖中是有兩個基礎介面我沒畫出來的,分別是 ISpout 和 Ibolt,為什麼呢?因為我們可以理解為 IRichSpout 和 IRichBolt 就是兩者與 IComponent 的合體(繼承)。接著一個個來,先說 Spout:


void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

//Spout 終止的時候呼叫(不保證一定被呼叫)
void close();

//Spout 啟用的時候呼叫
void activate();

//Spout 失活(可能重新啟用)時呼叫,呼叫此方法時不會呼叫 nextTuple
void deactivate();

void nextTuple();

void ack(Object msgId);

void fail(Object msgId);

簡單容易理解的方法在這裡就不提了(做了註釋),說一下比較重要的方法。
先說 open,當 Spout 初始化的那時候呼叫,一共接收了三個物件,一個配置物件、一個 Topology 上下文物件,還有一個 輸出控制器物件。重點提一下 SpoutOutputCollector 這個類,這個是控制整個 Spout 關於元組傳輸的類,很重要,主要關注下面幾個方法:

    List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
    void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
    long getPendingCount();

前兩個方法,都是將 tuple 提交到 stream 中去的,區別在於後者是定向提交的。其可以傳遞引數

int taskId, String streamId, List<Object> tuple, Object messageId

前三個引數分別的意義就是字面上意思(論取名的規範性),而 messageId 是作錨定所用(之後談)的。

然後說下 nextTuple 方法,這是個 non-blocking 的方法,也就是說當沒有 tuple 來 emit 的時候,其是立即返回的(非阻塞的)。好像 Storm 是在 0.8.1 版本之後 emit 空的話 nextTuple 就預設 sleep 1秒鐘(可配置,SleepSpoutWaitStrategy 介面),主要為了 cpu 資源的合理分配。總之你的 topology 活著(除了某些特例情況),你的 nextTuple 方法就是不斷被呼叫的,一直請求 tuple,一般我們也在這裡呼叫 SpoutOutputCollector 物件的 emit 方法傳送資料。

最後說下 ack、fail 方法,連帶 messageId 一起提。講之前先說下,Storm Spout 的 nextTuple、ack、fail 好像是一個執行緒的,所以才設計為非阻塞模式,具體底層我也看不了,哎(據說 JStorm 是分了多執行緒的)。所以可以根據實際情況把 nextTuple 的業務執行緒單出來。OK,迴歸正題,ack 方法是 Storm 錨定機制,要說簡單點的話可以這要講:Spout emit 一個 tuple,如果攜帶了 messageId(別告訴我你忘記這東西了),這個 tuple 的傳遞過程就將被追蹤,一直到其傳送成功或者失敗呼叫 fail 方法。關於 fail 方法,預設是 tuple 失敗後重新進入 queue,重發。具體的重發配置我還沒研究,有研究的朋友可以交流下,另外 getPendingCount 方法我也沒搞懂什麼作用,懂的朋友一樣歡迎指教,開源萬歲!

Spout 講完接著咱說 Bolt,老樣子,先看看原始碼


void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

void execute(Tuple input);

//Bolt 終止時呼叫(不保證一定被呼叫)
void cleanup();

同 Spout,cleanup 就不解釋了,這裡說說 prepare 和 execute。先說 prepare 方法:
這是 Bolt 的初始化方法,三個物件和 Spout 不一樣的只有 OutputCollector:

    List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);
    void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);
    void ack(Tuple input);
    void fail(Tuple input);
    void resetTimeout(Tuple input);

其實也 OutputCollector 只是把 ack 和 fail 方法囊括進去了,多了個超時重置配置,用法和 SpoutOutputCollector 基本相同。

然後重點看的是 execute 方法,這是個用作邏輯處理方法,你可以在這裡取得從 Spout 傳遞過來的 tuple,然後在 execute 中對其作你需要的業務實現。當然,如果你還想要向下繼續傳輸你的 tuple,那就得呼叫你在 prepare 方法中初始化好的 OutputCollector 物件,emit 你的 tuple(至於是否錨定還是看業務是否注重資料可靠)。

剛發現漏了說個重要的東西,Tuple,嗨呀好氣啊,補上補上:
Tuple 這個類,包含了你要傳輸的元組元資訊、內容以及操作方法,繼承自 ITuple,以下放一些方法(實在太多)

 public GlobalStreamId getSourceGlobalStreamId();

 public String getSourceComponent();

 public int getSourceTask();

 public MessageId getMessageId();
 /**
 * 判斷 tuple 是否包含該命名的 field
 */
 public boolean contains(String field);

 /**
 * 通過位置引數返回 tuples 的 field(動態型別)
 */
 public Object getValue(int i);

 /**
 * 通過位置引數返回 tuples 的 field(String 型別)
 */
 public String getString(int i);

 /**
 * 通過命名返回 tuples 的 field(String 型別)
 */
 public String getStringByField(String field);

這裡只是 Tuple 的一部分方法,很多實現其實都大同小異,可以返回各種上下文資訊,可以通過 tuples 的位置和命名(具體講 Stream 模式的時候再說)返回動態或已知型別的 field,也就是你傳遞的實際資料,順便說下,所謂 Value 其實就是個封裝 ArrayList 的類

public class Values extends ArrayList<Object>{
    public Values() {

    }

    public Values(Object... vals) {
        super(vals.length);
        for(Object o: vals) {
            add(o);
        }
    }
}

那麼 Spout 和 Bolt 基本的 API 介面分析就到這裡,接著說一個 Bolt 的擴充套件介面 IBasicBolt


public interface IBasicBolt extends IComponent {
    void prepare(Map stormConf, TopologyContext context);
    void execute(Tuple input, BasicOutputCollector collector);
    void cleanup();
}

其實之前看懂了朋友這裡應該是很容易看明白的,BasicOutputCollector,這就是關鍵


public interface IBasicOutputCollector extends IErrorReporter{
    List<Integer> emit(String streamId, List<Object> tuple);
    void emitDirect(int taskId, String streamId, List<Object> tuple);
    void resetTimeout(Tuple tuple);
}

IBasicOutputCollector 自己幫助你實現了 ack 機制的 emit,不需要你自己去寫,對於一些要求可靠性而且不復雜的業務 IBasicBolt 非常實用。

OK,本篇就到這裡,抽象類這裡我就不說了(沒啥說的)。其實 Spout 和 Bolt 的 API 還有一些功能性的封裝,像 ITransactionSpout、KafkaSpout之類的(本次專案時所用),各位可以自己去檢視原始碼,其實同樣是我說道的這些方法加上其各自的功能點,最多實現邏輯複雜些,還是能看明白的。