1. 程式人生 > >Storm的BaseBasicBolt原始碼解析ack機制

Storm的BaseBasicBolt原始碼解析ack機制

http://www.cnblogs.com/intsmaze/p/5924873.html

我們在學習ack機制的時候,我們知道Storm的Bolt有BaseBasicBolt和BaseRichBolt。
在BaseBasicBolt中,BasicOutputCollector在emit資料的時候,會自動和輸入的tuple相關聯,而在execute方法結束的時候那個輸入tuple會被自動ack。
在使用BaseRichBolt需要在emit資料的時候,顯示指定該資料的源tuple要加上第二個引數anchor tuple,以保持tracker鏈路,即collector.emit(oldTuple, newTuple);並且需要在execute執行成功後呼叫OutputCollector.ack(tuple), 當失敗處理時,執行OutputCollector.fail(tuple);



那麼我們來看看BasicBolt的原始碼是不是這樣的,不能因為看到別人的帖子說是這樣的,我們就這樣任務,以訛傳訛,我們要To see is to believe。

為了方便看原始碼,我先上我們的繼承類:

複製程式碼
public class SplitSentenceBolt extends BaseBasicBolt {  public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
    }
    
  //5:執行我們自己的邏輯處理方法,接收傳入的引數。
  public void execute(Tuple input, BasicOutputCollector collector) { String sentence = (String)input.getValueByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { word = word.trim(); word = word.toLowerCase(); collector.emit(
new Values(word,1));//這個地方就是呼叫OutputCollector的包裝類,來發訊息 } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","num")); } }
複製程式碼



通過打斷點,我們發現,bolt的task會建立這個類下面會標準執行順序

複製程式碼
public class BasicBoltExecutor implements IRichBolt {
    public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);    
    
    private IBasicBolt _bolt;
    private transient BasicOutputCollector _collector;
    //1:建立該物件,然後把我們寫的SplitSentenceBolt物件賦給父類IBasicBolt。
    public BasicBoltExecutor(IBasicBolt bolt) {
        _bolt = bolt;
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        _bolt.declareOutputFields(declarer);//這裡就是呼叫SplitSentenceBolt物件的方法了。
    }
   //2:給BasicOutputCollector _collector欄位賦值,BasicOutputCollector就是對OutputCollector類的包裝。
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        _bolt.prepare(stormConf, context);
        _collector = new BasicOutputCollector(collector);
    }
  //3:然後程式執行該方法,input的值source: spout1:4, stream: default, id: {}, [+ - * % /]
    public void execute(Tuple input) {
        _collector.setContext(input);//把接收到的tuple值設定給BasicOutputCollector中inputTuple欄位。
        try {
            _bolt.execute(input, _collector);//這個地方是呼叫我們實現類SplitSentenceBolt的ececute方法。
            _collector.getOutputter().ack(input);//這個地方就是響應
        } catch(FailedException e) {
            if(e instanceof ReportedFailedException) {
                _collector.reportError(e);
            }
            _collector.getOutputter().fail(input);//這個地方就是響應
        }
    }
    public void cleanup() {
        _bolt.cleanup();
    }
    public Map<String, Object> getComponentConfiguration() {
        return _bolt.getComponentConfiguration();
    }
}
複製程式碼 複製程式碼
public class BasicOutputCollector implements IBasicOutputCollector {
    private OutputCollector out;
    private Tuple inputTuple;
    public BasicOutputCollector(OutputCollector out) {
        this.out = out;
    }
    //4:把收到的tuple資料賦值給inputTuple,這個時候BasicOutputCollector物件的欄位都具有值了。
   public void setContext(Tuple inputTuple) {
        this.inputTuple = inputTuple;
    }
   //6:這裡我們傳送新的(轉換後的)tuple資料,看他內部的呼叫,其實他也會發送一個anchor tuple來保持tracker鏈路,
而這個anchor tuple就是bolt接收到轉換前的源tuple資料。
   public List<Integer> emit(List<Object> tuple) { 
      return emit(Utils.DEFAULT_STREAM_ID, tuple);  
   }
    public List<Integer> emit(String streamId, List<Object> tuple) {
        return out.emit(streamId, inputTuple, tuple);
    }
   public void emitDirect(int taskId, String streamId, List<Object> tuple) {
        out.emitDirect(taskId, streamId, inputTuple, tuple);
    }
    public void emitDirect(int taskId, List<Object> tuple) {
        emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
    }
    protected IOutputCollector getOutputter() {
        return out;
    }
    public void reportError(Throwable t) {
        out.reportError(t);
    }
}
複製程式碼

這裡大家不要糾結bolt的啟動時從哪裡開始的,我後面會講的,這裡我們關注的是,BasicBoltExecutor物件建立後的執行過程,以這我們來看執行的過程。在BasicBoltExecutor的execute方法中,我們看到了ack和fail方法會被自動呼叫的,當我們的程式丟擲異常則會執行fail方法的。