1. 程式人生 > >Storm入門之第四章Spouts

Storm入門之第四章Spouts

你將在本章瞭解到spout作為拓撲入口和它的容錯機制相關的最常見的設計策略。

在設計拓撲結構時,始終在頭腦中記著的一件重要事情就是訊息的可靠性。當有無法處理的訊息時,你就要決定該怎麼辦,以及作為一個整體的拓撲結構該做些什麼。舉個例子,在處理銀行存款時,不要丟失任何事務報文就是很重要的事情。但是如果你要統計分析數以百萬的tweeter訊息,即使有一條丟失了,仍然可以認為你的結果是準確的。

對於Storm來說,根據每個拓撲的需要擔保訊息的可靠性是開發者的責任。這就涉及到訊息可靠性和資源消耗之間的權衡。高可靠性的拓撲必須管理丟失的訊息,必然消耗更多資源;可靠性較低的拓撲可能會丟失一些訊息,佔用的資源也相應更少。不論選擇什麼樣的可靠性策略,Storm都提供了不同的工具來實現它。

要在spout中管理可靠性,你可以在分發時包含一個元組的訊息ID(collector.emit(new Values(…),tupleId))。在一個元組被正確的處理時呼叫ack方法,而在失敗時呼叫fail方法。當一個元組被所有的靶bolt和錨bolt處理過,即可判定元組處理成功(你將在第5章學到更多錨bolt知識)。

發生下列情況之一時為元組處理失敗:

  • 提供資料的spout呼叫collector.fail(tuple)
  • 處理時間超過配置的超時時間

讓我們來看一個例子。想象你正在處理銀行事務,需求如下:

  • 如果事務失敗了,重新發送訊息
  • 如果失敗了太多次,終結拓撲執行

建立一個spout

和一個boltspout隨機發送100個事務ID,有80%的元組不會被bolt收到(你可以在例子ch04-spout檢視完整程式碼)。實現spout時利用Map分發事務訊息元組,這樣就比較容易實現重發訊息。

public void nextTuple() {
    if(!toSend.isEmpty()){
        for(Map.Entry<Integer, String> transactionEntry : toSend.entrySet()){
            Integer transactionId = transactionEntry.getKey();
            String transactionMessage = transactionEntry.getValue();
            collector.emit(new Values(transactionMessage),transactionId);
        }
        toSend.clear();
    }
}

如果有未傳送的訊息,得到每條事務訊息和它的關聯ID,把它們作為一個元組傳送出去,最後清空訊息佇列。值得一提的是,呼叫map的clear是安全的,因為nextTuple失敗時,只有ack方法會修改map,而它們都執行在一個執行緒內。

維護兩個map用來跟蹤待發送的事務訊息和每個事務的失敗次數。ack方法只是簡單的把事務從每個列表中刪除。

public void ack(Object msgId) {
    messages.remove(msgId);
    failCounterMessages.remove(msgId);
}

fail方法決定應該重新發送一條訊息,還是已經失敗太多次而放棄它。

NOTE:如果你使用全部資料流組,而拓撲裡的所有bolt都失敗了,spoutfail方法才會被呼叫。

public void fail(Object msgId) {
    Integer transactionId = (Integer) msgId;
    //檢查事務失敗次數
    Integer failures = transactionFailureCount.get(transactionId) + 1;

    if(failes >= MAX_FAILS){
        //失敗數太高了,終止拓撲
        throw new RuntimeException("錯誤, transaction id 【"+

         transactionId+"】 已失敗太多次了 【"+failures+"】");
    }

    //失敗次數沒有達到最大數,儲存這個數字並重發此訊息
    transactionFailureCount.put(transactionId, failures);
    toSend.put(transactionId, messages.get(transactionId));
    LOG.info("重發訊息【"+msgId+"】");
}

首先,檢查事務失敗次數。如果一個事務失敗次數太多,通過丟擲RuntimeException終止傳送此條訊息的工人。否則,儲存失敗次數,並把訊息放入待發送佇列(toSend),它就會再次呼叫nextTuple時得以重新發送。
NOTE:Storm節點不維護狀態,因此如果你在記憶體儲存資訊(就像本例做的那樣),而節點又不幸掛了,你就會丟失所有快取的訊息。
Storm是一個快速失敗的系統。拓撲會在丟擲異常時掛掉,然後再由Storm重啟,恢復到丟擲異常前的狀態。

獲取資料

接下來你會了解到一些設計spout的技巧,幫助你從多資料來源獲取資料。

直接連線

在一個直接連線的架構中,spout直接與一個訊息分發器連線(見圖4-1)。

圖4-1直接連線的spout

圖4-1 直接連線的spout

這個架構很容易實現,尤其是在訊息分發器是已知裝置或已知裝置組時。已知裝置滿足:拓撲從啟動時就已知道該裝置,並貫穿拓撲的整個生命週期保持不變。未知裝置就是在拓撲執行期新增進來的。已知裝置組就是從拓撲啟動時組內所有裝置都是已知的。

下面舉個例子說明這一點。建立一個spout使用Twitter流API讀取twitter資料流。spout把API當作訊息分發器直接連線。從資料流中得到符合track引數的公共tweets(參考twitter開發頁面)。完整的例子可以在連結https://github.com/storm-book/examples-ch04-spouts/找到。

spout從配置物件得到連線引數(track,user,password),並連線到API(在這個例子中使用ApacheDefaultHttpClient)。它一次讀一行資料,並把資料從JSON轉化成Java物件,然後釋出它。

public void nextTuple() {
    //建立http客戶端
    client = new DefaultHttpClient();
    client.setCredentialsProvider(credentialProvider);
    HttpGet get = new HttpGet(STREAMING_API_URL+track);
    HttpResponse response;
    try {
        //執行http訪問
        response = client.execute(get);
        StatusLine status = response.getStatusLine();
        if(status.getStatusCode() == 200){
            InputStream inputStream = response.getEntity().getContent();
            BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
            String in;
            //逐行讀取資料
            while((in = reader.readLine())!=null){
                try{
                    //轉化併發布訊息
                    Object json = jsonParser.parse(in);
                    collector.emit(new Values(track,json));
                }catch (ParseException e) {
                    LOG.error("Error parsing message from twitter",e);
                }
            }
        }
    } catch (IOException e) {
        LOG.error("Error in communication with twitter api ["+get.getURI().toString()+"], 
           sleeping 10s");
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e1) {}
    }
}

NOTE:在這裡你鎖定了nextTuple方法,所以你永遠也不會執行ackfail方法。在真實的應用中,我們推薦你在一個單獨的執行緒中執行鎖定,並維持一個內部佇列用來交換資料(你會在下一個例子中學到如何實現這一點:訊息佇列)。

棒極了!
現在你用一個spout讀取Twitter資料。一個明智的做法是,採用拓撲並行化,多個spout從同一個流讀取資料的不同部分。那麼如果你有多個流要讀取,你該怎麼做呢?Storm的第二個有趣的特性(譯者注:第一個有趣的特性已經出現過,這句話原文都是一樣的,不過按照中文的行文習慣還是不重複使用措詞了)是,你可以在任意元件內(spouts/bolts)訪問TopologyContext。利用這一特性,你能夠把流劃分到多個spouts讀取。

public void open(Map conf, TopologyContext context,
          SpoutOutputCollector collector) {
    //從context物件獲取spout大小
    int spoutsSize = 
context.getComponentTasks(context.getThisComponentId()).size();
    //從這個spout得到任務id
    int myIdx = context.getThisTaskIndex();
    String[] tracks = ((String) conf.get("track")).split(",");
    StringBuffer tracksBuffer = new StringBuffer();
    for(int i=0; i< tracks.length;i++){
        //Check if this spout must read the track word
        if( i % spoutsSize == myIdx){
            tracksBuffer.append(",");
            tracksBuffer.append(tracks[i]);
        }
    }
    if(tracksBuffer.length() == 0) {
        throw new RuntimeException("沒有為spout得到track配置" +
 " [spouts大小:"+spoutsSize+", tracks:"+tracks.length+"] tracks的數量必須高於spout的數量");
 this.track =tracksBuffer.substring(1).toString();
    }
 ...
 }

利用這一技巧,你可以把collector物件均勻的分配給多個數據源,當然也可以應用到其它的情形。比如說,從web伺服器收集日誌檔案見圖4-2

圖4-2直連hash

圖4-2 直連hash

通過上一個例子,你學會了從一個spout連線到已知裝置。你也可以使用相同的方法連線未知裝置,不過這時你需要藉助於一個協同系統維護的裝置列表。協同系統負責探察列表的變化,並根據變化建立或銷燬連線。比如,從web伺服器收集日誌檔案時,web伺服器列表可能隨著時間變化。當新增一臺web伺服器時,協同系統探查到變化併為它建立一個新的spout。見圖4-3

圖4-3直連協同

圖4-3 直連協同

訊息佇列

第二種方法是,通過一個佇列系統接收來自訊息分發器的訊息,並把訊息轉發給spout。更進一步的做法是,把佇列系統作為spout和資料來源之間的中介軟體,在許多情況下,你可以利用多佇列系統的重播能力增強佇列可靠性。這意味著你不需要知道有關訊息分發器的任何事情,而且新增或移除分發器的操作比直接連線簡單的多。這個架構的問題在於佇列是一個故障點,另外你還要為處理流程引入新的環節。

圖4-4展示了這一架構模型

圖4-4使用佇列系統

圖4-4 使用佇列系統

NOTE:你可以通過輪詢佇列或雜湊佇列(把佇列訊息通過雜湊傳送給spouts或建立多個佇列使佇列spouts一一對應)在多個spouts之間實現並行性。

接下來我們利用Redis和它的java庫Jedis建立一個佇列系統。在這個例子中,我們建立一個日誌處理器從一個未知的來源收集日誌,利用lpush命令把訊息插入佇列,利用blpop命令等待訊息。如果你有很多處理過程,blpop命令採用了輪詢方式獲取訊息。

我們在spoutopen方法建立一個執行緒,用來獲取訊息(使用執行緒是為了避免鎖定nextTuple在主迴圈的呼叫):

new Thread(new Runnable() {
    @Override
    public void run() {
        try{
           Jedis client= new Jedis(redisHost, redisPort);
           List res = client.blpop(Integer.MAX_VALUE, queues);
           messages.offer(res.get(1));
        }catch(Exception e){
            LOG.error("從redis讀取隊列出錯",e);
            try {
                Thread.sleep(100);
            }catch(InterruptedException e1){}
        }
    }
}).start();

這個執行緒的惟一目的就是,建立redis連線,然後執行blpop命令。每當收到了一個訊息,它就被新增到一個內部訊息佇列,然後會被nextTuple消費。對於spout來說資料來源就是redis佇列,它不知道訊息分發者在哪裡也不知道訊息的數量。

NOTE:我們不推薦你在spout建立太多執行緒,因為每個spout都執行在不同的執行緒。一個更好的替代方案是增加拓撲並行性,也就是通過Storm叢集在分散式環境建立更多執行緒。

nextTuple方法中,要做的惟一的事情就是從內部訊息佇列獲取訊息並再次分發它們。

public void nextTuple(){
    while(!messages.isEmpty()){
        collector.emit(new Values(messages.poll()));
    }
}

NOTE:你還可以藉助redis在spout實現訊息重發,從而實現可靠的拓撲。(譯者注:這裡是相對於開頭的可靠的訊息VS不可靠的訊息講的)

DRPC

DRPCSpout從DRPC伺服器接收一個函式呼叫,並執行它(見第三章的例子)。對於最常見的情況,使用backtype.storm.drpc.DRPCSpout就足夠了,不過仍然有可能利用Storm包內的DRPC類建立自己的實現。

小結

現在你已經學習了常見的spout實現模式,它們的優勢,以及如何確保訊息可靠性。不存在適用於所有拓撲的架構模式。如果你知道資料來源,並且能夠控制它們,你就可以使用直接連線;然而如果你需要新增未知資料來源或從多種資料來源接收資料,就最好使用訊息佇列。如果你要執行線上過程,你可以使用DRPCSpout或類似的實現。

你已經學習了三種常見連線方式,不過依賴於你的需求仍然有無限的可能。