1. 程式人生 > >Storm設計一個Topology用來統計單詞的TopN的實例

Storm設計一個Topology用來統計單詞的TopN的實例

osi was 對象 turn col rms nds owin collect

Storm的單詞統計設計

一:Storm的wordCount和Hadoop的wordCount實例對比

技術分享圖片

二:Storm的wordCount的方案實例設計

技術分享圖片

三:建立maven項目,添加maven相關依賴包
(1)輸入:search.maven.org網址,在其中找到storm的核心依賴
(2)將核心依賴添加到pom.xml文件中

<dependency>
<groupId>com.github.aloomaio</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.2-incubating</version>
</dependency>

四:代碼實現

一:WordCountTopology源碼【啟動topology的入口】

技術分享圖片
 1 package com.yeepay.sxf.helloword;
 2 
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.StormSubmitter;
 6 import backtype.storm.topology.TopologyBuilder;
 7 import backtype.storm.tuple.Fields;
 8 
 9 import com.yeepay.sxf.helloword.bolt.PrintBolt;
10 import com.yeepay.sxf.helloword.bolt.WordCountBolt;
11 import com.yeepay.sxf.helloword.bolt.WordNormalizerBolt;
12 import com.yeepay.sxf.helloword.spout.RandomSentenceSpout;
13 
14 public class WordCountTopology {
15 
16     private static TopologyBuilder builder=new TopologyBuilder();
17 
18     public static void main(String[] args) throws InterruptedException {
19         
20         
21         Config config=new Config();
22         
23         builder.setSpout("RandomSentence", new RandomSentenceSpout(),2);
24         builder.setBolt("WordNormalizer", new WordNormalizerBolt(),2).shuffleGrouping("RandomSentence");
25         builder.setBolt("WordCount", new WordCountBolt(),2).fieldsGrouping("WordNormalizer", new Fields("wordd"));
26         builder.setBolt("Print", new PrintBolt(),1).shuffleGrouping("WordCount");
27         
28         config.setDebug(false);//調試模式,會把所有的log都打印出來
29         
30         //通過是否有參數來判斷是否啟動集群,或者本地模式執行
31         if(args!=null&&args.length>0){
32             System.out.print("集群模式------------------------------------------------");
33             try {
34                 config.setNumWorkers(1);
35                 StormSubmitter.submitTopology(args[0], config, builder.createTopology());
36             } catch (Exception e) {
37                 // TODO: handle exception
38             }
39         }else{
40             System.out.print("本地模式------------------------------------------------");
41             //本地模式
42             config.setMaxTaskParallelism(1);
43             LocalCluster cluster=new LocalCluster();
44             cluster.submitTopology("wordcount",config,builder.createTopology() );
45             
46             Thread.sleep(10000L);
47             //關閉本地集群
48             cluster.shutdown();
49         }
50     }
51 }
View Code

二:RandomSentenceSpout源碼【發送英語句子的消息源頭】

技術分享圖片
  1 package com.yeepay.sxf.helloword.spout;
  2 import java.util.Map;
  3 import java.util.Random;
  4 
  5 import backtype.storm.spout.SpoutOutputCollector;
  6 import backtype.storm.task.TopologyContext;
  7 import backtype.storm.topology.OutputFieldsDeclarer;
  8 import backtype.storm.topology.base.BaseRichSpout;
  9 import backtype.storm.tuple.Fields;
 10 import backtype.storm.tuple.Values;
 11 import backtype.storm.utils.Utils;
 12 /**
 13  * 內存中隨機選取待定的英文語句,作為數據源發射出去
 14  * @author sxf
 15  *隨機發送一條內置消息,該spout繼承BaseRichSpout/IRichSpout
 16  *
 17  *Storm的兩個主要抽象是Spout和Bolt,Storm的第三個更強大的抽象是StateSpout
 18  */
 19 @SuppressWarnings("serial")
 20 public class RandomSentenceSpout extends BaseRichSpout {
 21     //發射消息
 22     SpoutOutputCollector spoutOutputCollector;
 23     Random random;
 24     
 25     /**
 26      * 【1】IComponent接口中的open()方法
 27      * 進行spout的一些初始化工作,包括參數傳遞。open()方法在該組件的一個任務在集群的工作進程內被初始化時被調用。提供了Spout執行的所需的環境。
 28      * Map:是這個Spout的Storm配置,提供給拓撲與這臺主機上的集群配置一起進行合並。
 29      * TopologyContext:可以用來獲取關於這個任務在拓撲中的位置信息,包括該任務的id,該任務的組件id,輸入和輸出信息等。
 30      * SpoutOutputCollector:是收集器,用於從這個Spout發射元組,元組可以隨時被發射,包括open()和colse()方法。收集器是線程安全的,應該作為這個Spout對象的實例變量進行保存
 31      * 
 32      * 
 33      * 【2】IComponent接口中的colse()方法
 34      * 當一個Ispout即將關閉時被調用。不能保證colse()方法一定會被調用。因為Supervisor可以對集群的工作進程使用Kill -9命令強制殺死進程命令
 35      * 本地模式,當拓撲被殺死事,一定調用colse()方法
 36      * 
 37      * 【3】IComponent接口中的activate()方法
 38      * Activate()方法當Spout已經從失效模式中激活時被調用。該Spout的nextTuple()方法很快就會被調用。當使用Storm客戶端操作拓撲時,Spout可以在失效狀態之後變成激活模式。
 39      * 
 40      * 【4】
 41      */
 42     @Override
 43     public void open(Map arg0, TopologyContext arg01, SpoutOutputCollector arg2) {
 44         this.spoutOutputCollector=arg2;
 45         this.random=new Random();
 46     }
 47     
 48     /**
 49      * 進行Tuple處理的主要方法
 50      * 【4】IComponent的nextTuple()方法
 51      * 當調用nextTuple()方法時,Storm要求Spout發射元組到輸出收集器(OutputCollector)
 52      * nextTuple()方法應該是非阻塞的,所以,如果Spout沒有元組可以發射,該方法應該返回。
 53      * nextTuple(),ack()和fail()方法都在Spout任務的單一線程內緊密循環被調用。當沒有元組可以發射時,
 54      * 可以讓nextTuple()去sleep很短時間,例如1毫秒,這樣不會浪費太多cpu資源。
 55      */
 56     @Override
 57     public void nextTuple() {
 58         //每兩秒種發送一條消息
 59         Utils.sleep(2000);
 60         //自定義內置數組
 61         String[] sentences=new String[]{
 62                 "or 420 million US dollars",
 63                 "What happened is that a group",
 64                 "fight lasted hours overnight between",
 65                 "the air according to the residents",
 66                 "told me that one Malian soldier",
 67                 "military spokesman says security forces",
 68                 "that thousands of people who prayed",
 69                 "continuing to receive treatment for",
 70                 "freezing temperatures currently gripping",
 71                 "Central African Republic Michel Djotodia",
 72                 "freezing temperatures currently gripping",
 73                 "former opposition will make up most",
 74                 "The Syrian government has accused",
 75                 "Doctors in South Africa reporting",
 76                 "military spokesman says security forces",
 77                 "Late on Monday, Ms Yingluck invoked special powers allowing officials to impose curfews",
 78                 "Those who took up exercise were three times more likely to remain healthy over the next eight",
 79                 "The space dream, a source of national pride and inspiration",
 80                 "There was no time to launch the lifeboats because the ferry capsized with such alarming speed"
 81                 };
 82         
 83         //從sentences數組中,隨機獲取一條語句,作為這次spout發送的消息
 84         String sentence=sentences[random.nextInt(sentences.length)];
 85         //使用emit方法進行Tuple發布會,參數用Values申明
 86         spoutOutputCollector.emit(new Values(sentence.trim().toLowerCase()));
 87     }
 88 
 89     
 90     
 91     /**
 92      * 【5】IComponent的ack()方法
 93      * Storm已經斷定該Spout發射的標識符為msgId的元組已經被完全處理時,會調用ack方法。
 94      * 通常情況下,ack()方法會將該消息移除隊列以防止它被重發
 95      */
 96     @Override
 97     public void ack(Object msgId) {
 98         
 99     }
100 
101     
102     /**
103      * 【6】IComponent接口的fail()方法
104      * 該Spout發射的標識為msgId的元組未能被完全處理時,會調用fail()方法。
105      * 通常情況下,fail方法會將消息放回隊列中,並在稍後重發消息
106      */
107     @Override
108     public void fail(Object msgId) {
109         
110     }
111 
112     //字段聲明
113     @Override
114     public void declareOutputFields(OutputFieldsDeclarer arg0) {
115         arg0.declare(new Fields("wordd"));
116     }
117 
118     
119 }
View Code

三:WordNormalizerBolt源碼【將英語句子的切割成單詞的處理邏輯單元】

技術分享圖片
  1 package com.yeepay.sxf.helloword.bolt;
  2 
  3 import java.util.Map;
  4 
  5 import backtype.storm.task.OutputCollector;
  6 import backtype.storm.task.TopologyContext;
  7 import backtype.storm.topology.IRichBolt;
  8 import backtype.storm.topology.OutputFieldsDeclarer;
  9 import backtype.storm.tuple.Fields;
 10 import backtype.storm.tuple.Tuple;
 11 import backtype.storm.tuple.Values;
 12 /**
 13  * 消息預處理的bolt,將消息按單詞切分
 14  * @author sxf
 15  *Bolt的生命周期如下:
 16  *在客戶端主機上創建IBolt對象,IBolt被序列化到拓撲並提交到集群的主控節點(Nimbus)然後Nimbus啟動工作進程(Worker)反序列化對象,
 17  *調用對象上的prepare()方法,然後開始處理元組。
 18  *如果你希望參數化一個IBolt,應該通過其構造函數設置參數並作為實例變量保存參數化狀態。然後,實例變量會序列化,
 19  *並發送給跨集群的每個任務來執行這個Bolt
 20  *如果使用java來定義Bolt,應該使用IRichBolt接口,IRichBolt接口添加了使用java TopologyBuilder API的必要方法
 21  *
 22  * 
 23  * IBasicBolt與IRichBolt具有一樣的同名方法,唯一不同,IBasicBolt的execute()方法會自動處理Acking機制,
 24  * 如果在execute中想讓元組失敗,可以顯示拋出一個FailedException異常
 25  */
 26 @SuppressWarnings("serial")
 27 public class WordNormalizerBolt implements IRichBolt {
 28 
 29     //發射消息
 30     private OutputCollector outputCollector;
 31     
 32     /**
 33      * bolt的初始化方法
 34      *(配置的參數,上下文,發送器)
 35      *【1】IBolt接口的prepare()方法
 36      *在該組建的一個任務在集群的工作進程內被初始化時被調用,提供了Bolt執行的所需環境
 37      *Map參數:是這個Bolt的Storm配置,提供給拓撲與這臺主機上的集群配置一起進行合並。
 38      *TopologyContext參數:可以用來獲取關於這個任務在拓撲中的位置信息,比如任務的id,該任務的組件id,輸入輸出信息等。
 39      *OutputCollector參數:是收集器皿,用於從這個Bolt發射元組。元組隨時被發射,包括prepare()和cleanup()方法。
 40      *收集器是線程安全,應該作為這個Bolt對象的實例變量進行保存。
 41      */
 42     @Override
 43     public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
 44         this.outputCollector=arg2;
 45     }
 46     
 47     
 48     /**
 49      * 執行訂閱的Tuple邏輯過程的方法
 50      * 【2】IBolt接口的execute()方法
 51      * 用於處理一個輸入元組,元組對象包含元組來自哪個組件/流/任務的元數據。
 52      * 元組的值可以使用tuple.getValue()進行訪問。Ibolt沒有立即處理元組,而是完全地捕獲一個元組在以後進行處理。
 53      * 元組應該使用prepare方法提供的OutputCollector進行發射。使用OutputCollector在某種程度上要求所有輸入元組是ack或者fail
 54      * 否則Storm將無法確定來自Spout的元組什麽時候處理完成。
 55      * 常見做法是:在execute()方法結束時對輸入的元組調用ack方法,而IBasicBolt會自動處理該部分。
 56      * Tuple參數:為被處理的輸入元組
 57      * 
 58      */
 59     @Override
 60     public void execute(Tuple tuple) {
 61         //獲取訂閱的tuple的內容
 62         String sentence=tuple.getString(0);
 63         //獲取元組來自那個bolt或spout。返回它們的名字
 64         String ad=tuple.getSourceComponent();
 65         System.out.print(ad);//RandomSentence
 66         //進行單詞分割
 67         String[] words=sentence.split(" ");
 68         //將單詞發送出去
 69         for(String word:words){
 70             outputCollector.emit(new Values(word));
 71         }
 72     }
 73     
 74     /**
 75      * 此方法,在當前Bolt被關閉時,調用此方法來清理任何已經打開的資源,但不能保證這個方法會被集群調用
 76      * 【2】IBolt接口的cleanup()方法
 77      * 當一個Bolt即將關閉時被調用。不能保證cleanup()方法一定會被調用,
 78      * 因為Supervisor可以對集群的工作進程使用Kill -9命令強制殺死進程命令
 79      * 如果在本地模式下運行storm,當拓撲被殺死時一定會調用該方法
 80      *
 81      */
 82     @Override
 83     public void cleanup() {
 84         
 85     }
 86 
 87     
 88     /**
 89      * 此方法,用於聲明當前Bolt類發射一個字段名為"wordd"的一個元組
 90      * 為該拓撲的所有流聲明輸出模式
 91      */
 92     @Override
 93     public void declareOutputFields(OutputFieldsDeclarer declarer) {
 94         declarer.declare(new Fields("wordd"));
 95     }
 96 
 97     /**
 98      * 此方法孕育你配置關於當前這個組件如何運行的很多參數,會被運行中調用
 99      */
100     @Override
101     public Map<String, Object> getComponentConfiguration() {
102         return null;
103     }
104 
105 }
View Code

四:WordCountBolt源碼【進行單詞統計的處理邏輯單元】

技術分享圖片
  1 package com.yeepay.sxf.helloword.bolt;
  2 
  3 import java.util.HashMap;
  4 import java.util.Map;
  5 
  6 import backtype.storm.task.OutputCollector;
  7 import backtype.storm.task.TopologyContext;
  8 import backtype.storm.topology.IRichBolt;
  9 import backtype.storm.topology.OutputFieldsDeclarer;
 10 import backtype.storm.tuple.Fields;
 11 import backtype.storm.tuple.Tuple;
 12 import backtype.storm.tuple.Values;
 13 
 14 import com.yeepay.sxf.helloword.util.MapSort;
 15 /**
 16  * 單詞統計,並且實時獲取詞頻前N的發射出去
 17  * @author sxf
 18  *
 19  */
 20 @SuppressWarnings("serial")
 21 public class WordCountBolt implements IRichBolt{
 22 
 23     //單詞統計結果
 24     private Map<String, Integer> counters;
 25     //消息發射器皿
 26     private OutputCollector outputCollector;
 27     
 28     //bolt的初始化的方法
 29     @Override
 30     public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
 31         this.outputCollector=arg2;
 32         this.counters=new HashMap<String, Integer>();
 33     }
 34     
 35     //執行方法,進行該bolt的邏輯處理
 36     @Override
 37     public void execute(Tuple tuple) {
 38         //獲得tuple
 39         String str=tuple.getString(0);
 40         //獲取當前的tuple來自那個Bolt或spout的,返回他們的名字
 41         String ad=tuple.getSourceComponent();
 42         System.out.print(ad);//WordNormalizer
 43         //邏輯處理
 44         if(!counters.containsKey(str)){
 45             //如果不包含,則進行初始化統計
 46             counters.put(str, 1);
 47         }else{
 48             //如果包含,則數值加1
 49             Integer c=counters.get(str)+1;
 50             counters.put(str, c);
 51         }
 52         //我們取前n個的單詞詞頻
 53         int num=8;
 54         int length=0;
 55         //使用工具類MapSort對map進行排序
 56         counters=MapSort.sortByValue(counters);
 57         
 58         if(num<counters.keySet().size()){
 59             length=num;
 60         }else{
 61             length=counters.keySet().size();
 62         }
 63         
 64         String word=null;
 65         StringBuffer st=new StringBuffer();
 66         //增量統計
 67         int count=0;
 68         for(String key:counters.keySet()){
 69             
 70             //獲取前n個
 71             if(count>=length){
 72                 break;
 73             }
 74             
 75             if(count==0){
 76                 st.append("The first ").append(length).append("==>");
 77                 st.append("[").append(key).append(":").append(counters.get(key)).append("]");
 78             }else{
 79                 st.append(",[").append(key).append(":").append(counters.get(key)).append("]");
 80             }
 81             count++;
 82         }
 83         
 84         //將消息發射出去
 85         outputCollector.emit(new Values(st.toString()));
 86                 
 87     }
 88 
 89     
 90     /**
 91      * 此方法,用於聲明當前Bolt類發射一個字段名為"oneword"的一個元組
 92      * 為該拓撲的所有流聲明輸出模式
 93      */
 94     @Override
 95     public void declareOutputFields(OutputFieldsDeclarer arg0) {
 96         
 97     arg0.declare(new Fields("oneword"));
 98     }
 99 
100 
101     /**
102      * 此方法孕育你配置關於當前這個組件如何運行的很多參數,會被運行中調用
103      */
104     @Override
105     public Map<String, Object> getComponentConfiguration() {
106         // TODO Auto-generated method stub
107         return null;
108     }
109 
110     
111     /**
112      * 此方法,在當前Bolt被關閉時,調用此方法來清理任何已經打開的資源,但不能保證這個方法會被集群調用
113      */
114     @Override
115     public void cleanup() {
116         // TODO Auto-generated method stub
117         
118     }
119 
120     
121 
122 
123     
124 }
View Code

五:PrintBolt源碼【進行單詞topN的打印單元】

技術分享圖片
 1 package com.yeepay.sxf.helloword.bolt;
 2 
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5 
 6 import backtype.storm.topology.BasicOutputCollector;
 7 import backtype.storm.topology.OutputFieldsDeclarer;
 8 import backtype.storm.topology.base.BaseBasicBolt;
 9 import backtype.storm.tuple.Tuple;
10 /**
11  * 打印接受的數據的Bolt
12  * @author sxf
13  *
14  */
15 @SuppressWarnings("serial")
16 public class PrintBolt extends BaseBasicBolt {
17 
18     
19     @Override
20     public void execute(Tuple tuple, BasicOutputCollector arg1) {
21         //接收數據date
22         try {
23             String mesg=tuple.getString(0);
24             if(mesg!=null){
25                 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS").format(new Date())+"===>"+mesg);
26             }
27         } catch (Exception e) {
28             e.printStackTrace();
29         }
30         
31     }
32     /**
33      * 此方法,用於聲明當前Bolt類發射什麽字段的元組,當前bolt只是打印,不再發射元組,所以不用定義
34      * 為該拓撲的所有流聲明輸出模式
35      */
36     @Override
37     public void declareOutputFields(OutputFieldsDeclarer declarer) {
38     }
39     
40     
41 
42     
43 }
View Code

六:MapSort源碼【進行單詞排序】

技術分享圖片
 1 package com.yeepay.sxf.helloword.util;
 2 
 3 import java.util.Collections;
 4 import java.util.Comparator;
 5 import java.util.HashMap;
 6 import java.util.Iterator;
 7 import java.util.LinkedHashMap;
 8 import java.util.LinkedList;
 9 import java.util.List;
10 import java.util.Map;
11 import java.util.Map.Entry;
12 /**
13  * 對map排序的工具類
14  * @author sxf
15  *
16  */
17 public class MapSort {
18 
19     
20     public static Map<String, Integer> sortByValue(Map<String, Integer> map) {
21 
22         if (map == null) {
23             return null;
24         }
25 
26         List list = new LinkedList(map.entrySet());
27 
28         Collections.sort(list, new Comparator() {
29             
30             public int compare(Object o1, Object o2) {
31                 Comparable sort1 = (Comparable) ((Map.Entry) o1).getValue();
32                 Comparable sort2 = (Comparable) ((Map.Entry) o2).getValue();
33                 return sort2.compareTo(sort1);
34             }
35             
36         });
37 
38         Map result = new LinkedHashMap();
39 
40         for (Iterator it = list.iterator(); it.hasNext();) {
41             
42             Map.Entry entry = (Map.Entry) it.next();
43             result.put(entry.getKey(), entry.getValue());
44             
45         }
46 
47         return result;
48     }
49 
50     public static void main(String[] args) {
51         
52         Map<String, Integer> map = new HashMap<String, Integer> ();
53         map.put("test", 3);
54         map.put("hcy", 1);
55         map.put("put", 2);
56         
57         map = sortByValue(map);
58         
59         for (String key : map.keySet()) {
60             System.out.println( key + " ==> " + map.get(key));
61         }
62     }
63 
64 }
View Code

Storm設計一個Topology用來統計單詞的TopN的實例