1. 程式人生 > >Storm之路-WordCount-實例

Storm之路-WordCount-實例

ade debug clas boolean gif import oot 遠程 lex

初學storm,有不足的地方還請糾正。

網上看了很多wordcount實例,發現都不是我想要的。

實現場景:統計shengjing.txt詞頻到集合,一次打印結果。

● 消息源Spout
  繼承BaseRichSpout類 / 實現IRichSpout接口
    open,初始化動作;
    nextTuple,消息接入,執行數據發射;
    ack,tuple成功處理後調用;
    fail,tuple處理失敗後調用;
    declareOutputFields,聲明輸出字段;

● 處理單元Bolt
  繼承BaseBasicBolt類 / BaseWindowedBolt / 實現IRichBolt接口
    prepare,worker啟動時初始化;
    execute,接受一個tuple / tupleWindow並執行邏輯處理,發射出去;
    cleanup,關閉前調用;
    declareOutputFiedls,字段申明;

● 項目結構

技術分享

● pom.xml文件,配置項目jar依賴

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.scps.storm</
groupId> <artifactId>storm-example</artifactId> <version>0.0.1</version> <name>storm.example</name> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</
artifactId> <version>1.1.0</version> </dependency> </dependencies> </project>

● WordTopology.java文件,入口類,實例Topology、Spout、Bolt,配置等

技術分享
 1 package com.scps.storm.helloword;
 2 
 3 import java.util.concurrent.TimeUnit;
 4 
 5 import org.apache.storm.Config;
 6 import org.apache.storm.LocalCluster;
 7 import org.apache.storm.StormSubmitter;
 8 import org.apache.storm.generated.AlreadyAliveException;
 9 import org.apache.storm.generated.AuthorizationException;
10 import org.apache.storm.generated.InvalidTopologyException;
11 import org.apache.storm.topology.TopologyBuilder;
12 import org.apache.storm.topology.base.BaseWindowedBolt.Duration;
13 import org.apache.storm.tuple.Fields;
14 
15 import com.scps.storm.helloword.bolt.SlidingWindowBolt;
16 import com.scps.storm.helloword.bolt.WordCountBolt;
17 import com.scps.storm.helloword.bolt.WordFinalBolt;
18 import com.scps.storm.helloword.bolt.WordSplitBolt;
19 import com.scps.storm.helloword.spout.WordReaderSpout;
20 
21 public class WordTopology {
22 
23     public static void main(String[] args) {
24 
25         TopologyBuilder builder = new TopologyBuilder();
26 
27         // 1個task去讀文件
28         builder.setSpout("word-reader", new WordReaderSpout(), 1);
29         
30         // 2個task分割行
31         builder.setBolt("word-split", new WordSplitBolt(), 2).shuffleGrouping("word-reader");
32         
33         // 2個task分批統計,並發送相同的word到同一個task
34         builder.setBolt("word-count", new WordCountBolt(), 2).fieldsGrouping("word-split", new Fields("word"));
35 
36         // 1個task匯總,每隔3秒統計最近5秒的tuple,SlidingWindow滑動窗口(間隔)
37         // builder.setBolt("sliding-window-bolt", new SlidingWindowBolt().withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS)), 1).shuffleGrouping("word-count");
38         // 1個task匯總,統計5秒內的tuple,不能超過15秒?提示超時錯誤,TumblingWindow滾動窗口
39         builder.setBolt("sliding-window-bolt", new SlidingWindowBolt().withTumblingWindow(new Duration(5, TimeUnit.SECONDS)), 1).shuffleGrouping("word-count");
40         
41         // 1個task輸出
42         builder.setBolt("word-final", new WordFinalBolt(), 1).shuffleGrouping("sliding-window-bolt");
43 
44         Config conf = new Config();
45 
46         conf.setDebug(false);
47 
48         if (args != null && args.length > 0) {
49             
50             // 在集群運行,需要mvn package編譯
51             // bin/storm jar "/root/storm-example-0.0.1.jar" com.scps.storm.helloword.WordTopology "http://nimbus:8080/uploads/shengjing.txt" wordcount
52             
53             try {
54 
55                 String file = args[0];
56                 String name = args[1];
57 
58                 conf.put("file", file);
59                 // conf.setNumWorkers(2);
60 
61                 StormSubmitter.submitTopology(name, conf, builder.createTopology());
62 
63             } catch (AlreadyAliveException e) {
64 
65                 e.printStackTrace();
66 
67             } catch (InvalidTopologyException e) {
68 
69                 e.printStackTrace();
70 
71             } catch (AuthorizationException e) {
72 
73                 e.printStackTrace();
74             }
75 
76         } else {
77 
78             // 直接在eclipse中運行
79             
80             conf.put("file", "C:\\Users\\Administrator\\Downloads\\shengjing1.txt");
81             // conf.put("file", "http://192.168.100.170:8080/uploads/shengjing.txt");
82             // conf.setMaxTaskParallelism(2); // 設置最大task數
83             LocalCluster cluster = new LocalCluster();
84             cluster.submitTopology("wordcount", conf, builder.createTopology());
85         }
86     }
87 }
View Code

● WordReaderSpout.java文件,讀取txt文件,發送行

技術分享
  1 package com.scps.storm.helloword.spout;
  2 
  3 import java.io.BufferedReader;
  4 import java.io.FileInputStream;
  5 import java.io.FileNotFoundException;
  6 import java.io.IOException;
  7 import java.io.InputStream;
  8 import java.io.InputStreamReader;
  9 import java.io.UnsupportedEncodingException;
 10 import java.net.MalformedURLException;
 11 import java.net.URL;
 12 import java.net.URLConnection;
 13 import java.text.SimpleDateFormat;
 14 import java.util.Date;
 15 import java.util.Map;
 16 
 17 import org.apache.storm.spout.SpoutOutputCollector;
 18 import org.apache.storm.task.TopologyContext;
 19 import org.apache.storm.topology.IRichSpout;
 20 import org.apache.storm.topology.OutputFieldsDeclarer;
 21 import org.apache.storm.tuple.Fields;
 22 import org.apache.storm.tuple.Values;
 23 import org.apache.storm.utils.Utils;
 24 
 25 public class WordReaderSpout implements IRichSpout {
 26 
 27     private static final long serialVersionUID = 1L;
 28     private SpoutOutputCollector outputCollector;
 29     private String filePath;
 30     private boolean completed = false;
 31 
 32     public void ack(Object arg0) {
 33 
 34     }
 35 
 36     public void activate() {
 37 
 38     }
 39 
 40     public void close() {
 41 
 42     }
 43 
 44     public void deactivate() {
 45 
 46     }
 47 
 48     public void fail(Object arg0) {
 49 
 50     }
 51 
 52     @SuppressWarnings("rawtypes")
 53     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
 54 
 55         filePath = conf.get("file").toString();
 56         outputCollector = collector;
 57     }
 58 
 59     public void nextTuple() {
 60 
 61         if (!completed) {
 62 
 63             String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
 64             System.out.println("WordReaderSpout nextTuple, " + time);
 65 
 66             String line = "";
 67             InputStream inputStream = null;
 68             InputStreamReader inputStreamReader = null;
 69             BufferedReader reader = null;
 70 
 71             try {
 72 
 73                 // filePath = "http://192.168.100.170:8080/uploads/shengjing.txt";
 74                 // filePath = "C:\\Users\\Administrator\\Downloads\\shengjing.txt";
 75 
 76                 if (filePath.startsWith("http://")) { // 遠程文件
 77                     URL url = new URL(filePath);
 78                     URLConnection urlConn = url.openConnection();
 79                     inputStream = urlConn.getInputStream();
 80                 } else { // 本地文件
 81                     inputStream = new FileInputStream(filePath);
 82                 }
 83 
 84                 inputStreamReader = new InputStreamReader(inputStream, "utf-8");
 85                 reader = new BufferedReader(inputStreamReader);
 86                 while ((line = reader.readLine()) != null) {
 87                     outputCollector.emit(new Values(line));
 88                 }
 89 
 90             } catch (MalformedURLException e) {
 91                 e.printStackTrace();
 92             } catch (FileNotFoundException e) {
 93                 e.printStackTrace();
 94             } catch (UnsupportedEncodingException e) {
 95                 e.printStackTrace();
 96             } catch (IOException e) {
 97                 e.printStackTrace();
 98             } finally {
 99                 completed = true;
100                 try {
101                     if (reader != null) {
102                         reader.close();
103                     }
104                     if (inputStreamReader != null) {
105                         inputStreamReader.close();
106                     }
107                     if (inputStream != null) {
108                         inputStream.close();
109                     }
110                 } catch (IOException e) {
111                     e.printStackTrace();
112                 }
113             }
114         }
115 
116         Utils.sleep(20000);
117     }
118 
119     public void declareOutputFields(OutputFieldsDeclarer declarer) {
120 
121         declarer.declare(new Fields("line"));
122     }
123 
124     public Map<String, Object> getComponentConfiguration() {
125 
126         return null;
127     }
128 }
View Code

使用集群測試時,先把txt文件上傳到nimbus的ui裏,隨機指派supervisor遠程讀取文件。

● WordSplitBolt.java文件,接收行,分割行,發送詞

技術分享
 1 package com.scps.storm.helloword.bolt;
 2 
 3 import java.util.Map;
 4 
 5 import org.apache.storm.task.OutputCollector;
 6 import org.apache.storm.task.TopologyContext;
 7 import org.apache.storm.topology.IRichBolt;
 8 import org.apache.storm.topology.OutputFieldsDeclarer;
 9 import org.apache.storm.tuple.Fields;
10 import org.apache.storm.tuple.Tuple;
11 import org.apache.storm.tuple.Values;
12 
13 public class WordSplitBolt implements IRichBolt {
14 
15     private static final long serialVersionUID = 1L;
16     private OutputCollector outputCollector;
17 
18     @SuppressWarnings("rawtypes")
19     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
20 
21         outputCollector = collector;
22     }
23 
24     public void execute(Tuple input) {
25 
26         String line = input.getStringByField("line");
27 
28         line = line.trim();
29         line = line.replace(",", " ");
30         line = line.replace(".", " ");
31         line = line.replace(":", " ");
32         line = line.replace(";", " ");
33         line = line.replace("?", " ");
34         line = line.replace("!", " ");
35         line = line.replace("(", " ");
36         line = line.replace(")", " ");
37         line = line.replace("[", " ");
38         line = line.replace("]", " ");
39         line = line.trim();
40         
41         String[] words = line.split(" ");
42         for (String word : words) {
43             word = word.trim();
44             if (!"".equals(word)) {
45                 outputCollector.emit(new Values(word));
46             }
47         }
48     }
49 
50     public void declareOutputFields(OutputFieldsDeclarer declarer) {
51 
52         declarer.declare(new Fields("word"));
53     }
54 
55     public void cleanup() {
56 
57     }
58 
59     public Map<String, Object> getComponentConfiguration() {
60 
61         return null;
62     }
63 }
View Code

● WordCountBolt.java文件,接收詞,統計詞,發送集合

技術分享
 1 package com.scps.storm.helloword.bolt;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 
 6 import org.apache.storm.task.OutputCollector;
 7 import org.apache.storm.task.TopologyContext;
 8 import org.apache.storm.topology.IRichBolt;
 9 import org.apache.storm.topology.OutputFieldsDeclarer;
10 import org.apache.storm.tuple.Fields;
11 import org.apache.storm.tuple.Tuple;
12 import org.apache.storm.tuple.Values;
13 
14 public class WordCountBolt implements IRichBolt {
15 
16     private static final long serialVersionUID = 1L;
17     Map<String, Integer> counter;
18     private OutputCollector outputCollector;
19 
20     @SuppressWarnings("rawtypes")
21     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
22 
23         counter = new HashMap<String, Integer>();
24         outputCollector = collector;
25     }
26 
27     public void execute(Tuple input) {
28 
29         String word = input.getStringByField("word");
30         int count;
31 
32         if (!counter.containsKey(word)) {
33             count = 1;
34         } else {
35             count = counter.get(word) + 1;
36         }
37 
38         counter.put(word, count);
39         outputCollector.emit(new Values(word, count));
40     }
41 
42     public void declareOutputFields(OutputFieldsDeclarer declarer) {
43 
44         declarer.declare(new Fields("word", "count"));
45     }
46 
47     public void cleanup() {
48 
49     }
50 
51     public Map<String, Object> getComponentConfiguration() {
52 
53         return null;
54     }
55 }
View Code

● SlidingWindowBolt.java文件,接收集合,合並集合,發送集合

技術分享
 1 package com.scps.storm.helloword.bolt;
 2 
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5 import java.util.HashMap;
 6 import java.util.Map;
 7 
 8 import org.apache.storm.task.OutputCollector;
 9 import org.apache.storm.task.TopologyContext;
10 import org.apache.storm.topology.OutputFieldsDeclarer;
11 import org.apache.storm.topology.base.BaseWindowedBolt;
12 import org.apache.storm.tuple.Fields;
13 import org.apache.storm.tuple.Tuple;
14 import org.apache.storm.tuple.Values;
15 import org.apache.storm.windowing.TupleWindow;
16 
17 public class SlidingWindowBolt extends BaseWindowedBolt {
18 
19     private static final long serialVersionUID = 1L;
20     Map<String, Integer> counter;
21     private OutputCollector outputCollector;
22 
23     @SuppressWarnings("rawtypes")
24     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
25 
26         counter = new HashMap<String, Integer>();
27         outputCollector = collector;
28     }
29 
30     public void execute(TupleWindow inputWindow) {
31         
32         String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
33         System.out.println("SlidingWindowBolt execute, " + time);
34 
35         for (Tuple input : inputWindow.get()) {
36 
37             String word = input.getStringByField("word");
38             int count = input.getIntegerByField("count");
39             
40             counter.put(word, count);
41         }
42 
43         outputCollector.emit(new Values(counter));
44     }
45 
46     public void declareOutputFields(OutputFieldsDeclarer declarer) {
47 
48         declarer.declare(new Fields("counter"));
49     }
50 }
View Code

● WordFinalBolt.java文件,接收集合,打印集合

技術分享
 1 package com.scps.storm.helloword.bolt;
 2 
 3 import java.text.SimpleDateFormat;
 4 import java.util.ArrayList;
 5 import java.util.Collections;
 6 import java.util.Date;
 7 import java.util.List;
 8 import java.util.Map;
 9 
10 import org.apache.storm.task.OutputCollector;
11 import org.apache.storm.task.TopologyContext;
12 import org.apache.storm.topology.IRichBolt;
13 import org.apache.storm.topology.OutputFieldsDeclarer;
14 import org.apache.storm.tuple.Tuple;
15 
16 public class WordFinalBolt implements IRichBolt {
17 
18     private static final long serialVersionUID = 1L;
19 
20     @SuppressWarnings("rawtypes")
21     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
22 
23     }
24 
25     @SuppressWarnings("unchecked")
26     public void execute(Tuple input) {
27 
28         Map<String, Integer> counter = (Map<String, Integer>) input.getValueByField("counter");
29         List<String> keys = new ArrayList<String>();
30         keys.addAll(counter.keySet());
31         Collections.sort(keys);
32         String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
33         System.out.println("-----------------begin------------------, " + time);
34         for (String key : keys) {
35             System.out.println(key + " : " + counter.get(key));
36         }
37         System.out.println("-----------------end--------------------, " + time);
38     }
39 
40     public void cleanup() {
41 
42     }
43 
44     public void declareOutputFields(OutputFieldsDeclarer declarer) {
45 
46     }
47 
48     public Map<String, Object> getComponentConfiguration() {
49 
50         return null;
51     }
52 }
View Code

● 項目源碼文件地址:https://pan.baidu.com/s/1mhZtvq4 密碼:ypbc

Storm之路-WordCount-實例