1. 程式人生 > >Storm叢集模式下cleanup解決方法

Storm叢集模式下cleanup解決方法

  1. 背景
    由於cleanup方法並不可靠,它只在local mode下生效,Storm叢集模式下cleanup不會被呼叫執行。很多資源得不到釋放

  2. 解決方案
    在kill topology之前,先deactivate相應的topology。在spout中實現deactivate()方法,deactivate()方法中給bolt emit特殊的資料(如:emit “shutDown”字串給bolt),bolt中判斷接收的資料為”shutDown”就呼叫cleanup()方法。在cleanup()方法中釋放需要釋放的資源。

  3. 流程圖
    這裡寫圖片描述

  4. 相關程式碼

    spout重寫deactivate()方法程式碼如下:

    @Override
    public void deactivate(){
        LOGGER.info("deaactivate to spout and bolt");  
        try {
           //storm deactivate時發一條訊息給bolt
           collector.emit(new Values("shutDown"));
        } catch (Exception e) {  
           e.printStackTrace();  
        }
    }
   bolt中execute()方法程式碼如下:
    @Override
    public void execute(Tuple input) {
        String message = input.getStringByField("loan_message");
        //判斷是不是spout的deactivate傳過來的訊息
        if("shutDown".equals(message )) {
            cleanup();
        }
    }

注意事項: deactivate topology時,建議等待時間儘量長,時間過短訊息來不及處理,會導致資料丟失