1. 程式人生 > >Storm1.0.6中的Trident

Storm1.0.6中的Trident

http://storm.apache.org/releases/1.0.6/Trident-tutorial.html
Trident是Storm做實時計算的一個高層次抽象,實現無縫的高吞吐量、有狀態的、低延時的分散式查詢。Trident操作有join、aggregation、grouping、function、filters.
以對流資料的單詞進行計數為例:
為演示流處理,先生成一個源源不斷地測試資料

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
               new Values
("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), new Values("how many apples can you eat")); spout.setCycle(true);//源源不斷地傳送流

然後生成一個TridentTopology,來處理生產的spout。

TridentTopology topology =
new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) .
parallelismHint(6);

以上每一步都是流處理,newStream讀取輸入源的資料生成一個流,輸入源可以是Kestrel或者Kafka。Trident將哪些資料被處理的狀態元資料儲存在zookeeper中,這裡的spout1指定了元資料的名稱。這些流資料被分成小的tuple批次來處理。Trident提供了豐富的API來處理這些tuple批次。
each()函式中new Split()函式應用到流中的每個tuple。取"sentence" field的資料分解成單詞,每個sentence產生出多個tuple,命名為新的Field “word”.

public class Split extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
       String sentence = tuple.getString(0);
       for(String word: sentence.split(" ")) {
           collector.emit(new Values(word));                
       }
   }
}

接下來對word進行計數,並持久化。首先對"word"進行group,然後每個group用Count()聚合。Trident保證高容錯,並且僅處理一次