1. 程式人生 > >使用storm trident消費kafka訊息

使用storm trident消費kafka訊息

一、前言
   storm通過保證資料至少被處理一次來保證資料的完整性,由於元祖可以重發,對於一些需要資料精確的場景,可以考慮用storm trident實現。
    傳統的事物型拓撲中存在幾種bolt:
 1.1 BasicBolt
  這是最基本的Bolt,BasicBolt每次只能處理一個tuple,而且必須等前一個tuple成功處理後下一個tuple才能繼續處理,顯然效率不高。
 1.2 BatchBolt
  storm的一個優勢就是能夠批量處理tuple,BatchBolt支援批量處理tuple,每一個batch中的tuple都會呼叫execute(),處理完成後呼叫finishBatch方法。


 1.3 Committer BatchBolt
   標記為Committer的BatchBolt和基本的BasicBolt的區別在於二者呼叫finishBatch()的時機不同,標記為Committer的BatchBolt在提交階段就會呼叫finishBatch()。

二、storm trident的使用
   storm目前的版本下載已經將事物拓撲的實現封裝trident,trident目前支援3種不同的事物介面,一種是非事物型的(不介紹,因為基本不用),一種是事務性的TransactionalTridentKafkaSpout,而我們比較常用的是透明型事物OpaqueTridentKafkaSpout(事務型應用最重要的一點是要判斷一批訊息是新的還是已來過的)。


 2.1 TransactionalTridentKafkaSpout  

   原理是每次在資料庫中存了txid,IPartitionedTransactionalSpout的每一個tuple都會繫結在固定的批次(batch)中。
   一個批次無論重發多少次,它也只有一個唯一且相同的事務ID,它所包含的內容都是完全一致的,而一個tuple無論被重發多少次只會在同一個批次裡。
使用方式下載如下:

Java程式碼 收藏程式碼
  1. TridentTopology topology = new TridentTopology();  
  2.     TridentKafkaConfig tridentKafkaConfig = new
     TridentKafkaConfig(zkHosts, topic, spoutId);  
  3.     tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new ConvertStringScheme());  
  4.     /** 
  5.      * 支援事物,支援失敗重發 
  6.      * 
  7.      */
  8.     TransactionalTridentKafkaSpout transactionalTridentKafkaSpout = new TransactionalTridentKafkaSpout(  
  9.             tridentKafkaConfig);  
  10.     topology.newStream("name",transactionalTridentKafkaSpout)  
  11.             .shuffle()  
  12.             .each(new Fields("msg"), new SpilterFunction(), new Fields("sentence"))  
  13.             .groupBy(new Fields("sentence"))  
  14.             .aggregate(new Fields("sentence"), new SumWord(),new Fields("sum"))  
  15.             .parallelismHint(5)  
  16.             .each(new Fields("sum"), new PrintFilter_partition());  
  17.     Config config = new Config();  
  18.     StormSubmitter.submitTopology("XXX", config,  
  19.             topology.build());  

   但貌似目前TransactionalTridentKafkaSpout有個bug,啟動會報:classCastException(非程式碼問題)
   然而我們可以想到的是,IPartitionedTransactionalSpout會有一個問題,假設一批訊息在被bolt消費過程中失敗了,需要spout重發,此時如果正巧遇到訊息傳送中介軟體故障,
例如某一個分割槽不可讀,spout為了保證重發時每一批次包含的tuple一致,它只能等待訊息中介軟體恢復,也就是卡在那裡無法再繼續傳送給bolt訊息了,直至訊息中介軟體恢復(因為它必須傳送一樣的Batch)。
 2.2 OpaqueTridentKafkaSpout
     IOpaquePartitionedTransactionalSpout不保證每次重發一個批次的訊息所包含的tuple完全一致。也就是說某個tuple可能第一次在txid=1的批次中出現,後面有可能在txid=3的批次中出現。這種情況只出現在當某一批次訊息消費失敗需要重發且恰巧訊息中介軟體故障時。這時,IOpaquePartitionedTransactionalSpout不是等待訊息中介軟體故障恢復,而是先讀取可讀的partition。例如txid=1的批次在消費過程中失敗了,需要重發,恰巧訊息中介軟體的16個分割槽有1個分割槽(partition=3)因為故障不可讀了。這時候IOpaquePartitionedTransactionalSpout會先讀另外的15個分割槽,完成txid=1這個批次的傳送,這時候同樣的批次其實包含的tuple已經少了。假設在txid=3時訊息中介軟體的故障恢復了,那之前在txid=1且在分割槽partition=3的還沒有被髮送的tuple會被重新發送,包含在txid=3的批次中,所以其不保證每批次的batch包含的tuple是一樣的。
 2.2.1 實戰
  首先搭建下載好zk,kafka,storm的分散式環境,先起zk,然後kafka然後storm.分別啟動後效果jps看一下

master機器:

 slave1機器:

 slave2機器:

 hosts裡面配置

 2.2.1.1 建立topic

 2.2.1.2 寫storm消費端

  main方法下載

Java程式碼 收藏程式碼
  1. publicstaticvoid main(String[] args) throws AlreadyAliveException,  
  2.           InvalidTopologyException, AuthorizationException {  
  3.       TridentTopology topology = new TridentTopology();  
  4.       TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHosts, topic);  
  5.       kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());  
  6.       OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(  
  7.               kafkaConfig);  
  8.       topology.newStream("test_kafka2storm_opaqueTrident",  
  9.               opaqueTridentKafkaSpout)  
  10.               .parallelismHint(3)  
  11.               .shuffle()  
  12.               .each(new Fields("str"), new SpilterFunction(), new Fields("sentence"))  
  13.               .groupBy(new Fields("sentence"))  
  14.               .aggregate(new Fields("sentence"), new SumWord(),  
  15.                       new Fields("sum")).parallelismHint(5)  
  16.               .each(new Fields("sum"), new PrintFilter_partition());  
  17.       Config config = new Config();  
  18.       config.setDebug(false);  
  19.       config.setNumWorkers(2);  
  20.       StormSubmitter.submitTopology("test_kafka2storm_opaqueTrident_topology", config,  
  21.               topology.build());  
  22.   }  

  SpilterFunction下載

Java程式碼 收藏程式碼
  1. import org.apache.storm.trident.operation.BaseFunction;  
  2. import org.apache.storm.trident.operation.TridentCollector;  
  3. import org.apache.storm.trident.tuple.TridentTuple;  
  4. import org.apache.storm.tuple.Values;  
  5. publicclass SpilterFunction extends BaseFunction {  
  6.     /** 
  7.      *  
  8.      */
  9.     privatestaticfinallong serialVersionUID = 1L;  
  10.     @Override
  11.     publicvoid execute(TridentTuple tuple, TridentCollector collector) {  
  12.         String sentens = tuple.getString(0);  
  13.         String[] array = sentens.split("\\s+");  
  14.         for(int i=0;i<array.length;i++){  
  15.             System.out.println("spilter emit:" + array[i]);  
  16.             collector.emit(new Values(array[i]));  
  17.         }  
  18.     }  
  19. }  
  SumWord: