Storm基本概念及WordCount示例
本文仍然是Storm的入門系列,介紹下Storm裡面的基本概念,然後漫遊一個簡單的例項WordCount。
Storm基本概念
流(Stream)
一系列訊息元組(Tuple)。

Spout
流的源頭,比如從Kafka讀取。

Bolt
處理輸出流,同時產生新的流。

Topology
由Spout,Bolt構成的網路。

Task
Spout,Bolt的功能都是由Task來實現,Task分佈在Storm叢集中。

Stream Grouping
決定了一個Tuple路由到哪個Task去執行。常用的Group策略有:
- Shuffle:隨機選擇一個
- Fields:根據欄位hash
- All:傳送到所有的Task
- Global:所有的tuple都會到達同一個task(最小ID)
WordCount示例

本示例中,SentenceSpout發射出一個個的句子,SplitBolt進行分詞,然後發射出一個個的單次,WordCountBolt進行單次計數。
public class WordCountTopology { private static final Logger LOG = LoggerFactory.getLogger(WordCountTopology.class); public static class RandomSentenceSpout extends BaseRichSpout { SpoutOutputCollector _collector; Random _rand; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } @Override public void nextTuple() { String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; String sentence = sentences[_rand.nextInt(sentences.length)]; _collector.emit(new Values(sentence)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } public static class SplitSentence extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { String sentence = input.getString(0); System.out.println(sentence); String[] words = sentence.split(" "); for (String w : words) { collector.emit(new Values(w)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } public static class WordCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); // 設定時間長一點,否則可能看不到執行的輸出 Thread.sleep(20000); cluster.shutdown(); } } }
下面分別兩種模式下提交這個Topology,看看效果。
本地模式 Local Mode
執行後,命令列日誌輸出可以看出執行情況。
09:39:26.338 [Thread-44-count-executor[2 2]] INFO org.apache.storm.daemon.task - Emitting: count default [keeps, 16018] 09:39:26.323 [Thread-52-split-executor[6 6]] INFO org.apache.storm.daemon.executor - TRANSFERING tuple [dest: 2 tuple: source: split:6, stream: default, id: {}, [seven]] 09:39:26.338 [Thread-52-split-executor[6 6]] INFO org.apache.storm.daemon.task - Emitting: split default [dwarfs] 09:39:26.338 [Thread-52-split-executor[6 6]] INFO org.apache.storm.daemon.executor - TRANSFERING tuple [dest: 4 tuple: source: split:6, stream: default, id: {}, [dwarfs]] 09:39:26.338 [Thread-52-split-executor[6 6]] INFO org.apache.storm.daemon.executor - BOLT ack TASK: 6 TIME: -1 TUPLE: source: spout:10, stream: default, id: {}, [snow white and the seven dwarfs]
Cluster模式
storm jar提交我們的拓撲,指定類名和拓撲名。
➜storm-examples ~/dev/apache-storm-1.0.6/bin/storm jar target/storm-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.vonzhou.examples.wordcount.WordCountTopology word-count SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/storm/dev/apache-storm-1.0.6/lib/log4j-slf4j-impl-2.8.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/storm/GitHub/storm-examples/target/storm-examples-1.0-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Running: /usr/lib/jvm/java-8-openjdk-amd64/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/home/storm/dev/apache-storm-1.0.6 -Dstorm.log.dir=/home/storm/dev/apache-storm-1.0.6/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/storm/dev/apache-storm-1.0.6/lib/log4j-over-slf4j-1.6.6.jar:/home/storm/dev/apache-storm-1.0.6/lib/log4j-api-2.8.jar:/home/storm/dev/apache-storm-1.0.6/lib/objenesis-2.1.jar:/home/storm/dev/apache-storm-1.0.6/lib/clojure-1.7.0.jar:/home/storm/dev/apache-storm-1.0.6/lib/servlet-api-2.5.jar:/home/storm/dev/apache-storm-1.0.6/lib/minlog-1.3.0.jar:/home/storm/dev/apache-storm-1.0.6/lib/kryo-3.0.3.jar:/home/storm/dev/apache-storm-1.0.6/lib/storm-rename-hack-1.0.6.jar:/home/storm/dev/apache-storm-1.0.6/lib/log4j-core-2.8.jar:/home/storm/dev/apache-storm-1.0.6/lib/asm-5.0.3.jar:/home/storm/dev/apache-storm-1.0.6/lib/reflectasm-1.10.1.jar:/home/storm/dev/apache-storm-1.0.6/lib/log4j-slf4j-impl-2.8.jar:/home/storm/dev/apache-storm-1.0.6/lib/storm-core-1.0.6.jar:/home/storm/dev/apache-storm-1.0.6/lib/disruptor-3.3.2.jar:/home/storm/dev/apache-storm-1.0.6/lib/slf4j-api-1.7.21.jar:target/storm-examples-1.0-SNAPSHOT-jar-with-dependencies.jar:/home/storm/dev/apache-storm-1.0.6/conf:/home/storm/dev/apache-storm-1.0.6/bin -Dstorm.jar=target/storm-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.vonzhou.examples.wordcount.WordCountTopology word-count SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/storm/dev/apache-storm-1.0.6/lib/log4j-slf4j-impl-2.8.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/storm/GitHub/storm-examples/target/storm-examples-1.0-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] 527[main] WARNo.a.s.u.Utils - STORM-VERSION new 1.0.6 old null 550[main] INFOo.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -5846175772011635424:-4935736075314039348 601[main] WARNo.a.s.u.Utils - STORM-VERSION new 1.0.6 old 1.0.6 604[main] INFOo.a.s.s.a.AuthUtils - Got AutoCreds [] 694[main] INFOo.a.s.u.NimbusClient - Found leader nimbus : ubuntu:6627 709[main] INFOo.a.s.u.NimbusClient - Found leader nimbus : ubuntu:6627 721[main] INFOo.a.s.StormSubmitter - Uploading topology jar target/storm-examples-1.0-SNAPSHOT-jar-with-dependencies.jar to assigned location: /home/storm/dev/apache-storm-1.0.6/storm-local/nimbus/inbox/stormjar-950ce762-4fdc-4dcd-bea5-b362be0c32eb.jar Start uploading file 'target/storm-examples-1.0-SNAPSHOT-jar-with-dependencies.jar' to '/home/storm/dev/apache-storm-1.0.6/storm-local/nimbus/inbox/stormjar-950ce762-4fdc-4dcd-bea5-b362be0c32eb.jar' (34468236 bytes) [==================================================] 34468236 / 34468236 File 'target/storm-examples-1.0-SNAPSHOT-jar-with-dependencies.jar' uploaded to '/home/storm/dev/apache-storm-1.0.6/storm-local/nimbus/inbox/stormjar-950ce762-4fdc-4dcd-bea5-b362be0c32eb.jar' (34468236 bytes) 1303 [main] INFOo.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/storm/dev/apache-storm-1.0.6/storm-local/nimbus/inbox/stormjar-950ce762-4fdc-4dcd-bea5-b362be0c32eb.jar 1318 [main] INFOo.a.s.u.NimbusClient - Found leader nimbus : ubuntu:6627 1318 [main] INFOo.a.s.StormSubmitter - Submitting topology word-count in distributed mode with conf {"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-5846175772011635424:-4935736075314039348","topology.workers":3,"topology.debug":true} 2075 [main] INFOo.a.s.StormSubmitter - Finished submitting topology: word-count
看看開啟了哪些程序?
➜storm-examples jps -lm 15712 org.apache.storm.LogWriter /usr/lib/jvm/java-8-openjdk-amd64/bin/java -server -Dlogging.sensitivity=S3 -Dlogfile.name=worker.log -Dstorm.home=/home/storm/dev/apache-storm-1.0.6 -Dworkers.artifacts=/home/storm/dev/apache-storm-1.0.6/logs/workers-artifacts -Dstorm.id=word-count-1-1539312747 -Dworker.id=2065eadf-38dc-4018-8a02-d56a98f069a4 -Dworker.port=6702 -Dstorm.log.dir=/home/storm/dev/apache-storm-1.0.6/logs -Dlog4j.configurationFile=/home/storm/dev/apache-storm-1.0.6/log4j2/worker.xml -DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector -Dstorm.local.dir=storm-local -Xmx768m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump -Djava.library.path=/home/storm/dev/apache-storm-1.0.6/storm-local/supervisor/stormdist/word-count-1-1539312747/resources/Linux-amd64:/home/storm/dev/apache-storm-1.0.6/storm-lo 15713 org.apache.storm.LogWriter /usr/lib/jvm/java-8-openjdk-amd64/bin/java -server -Dlogging.sensitivity=S3 -Dlogfile.name=worker.log -Dstorm.home=/home/storm/dev/apache-storm-1.0.6 -Dworkers.artifacts=/home/storm/dev/apache-storm-1.0.6/logs/workers-artifacts -Dstorm.id=word-count-1-1539312747 -Dworker.id=ad93b93e-e2d1-4497-8b97-f18b36336725 -Dworker.port=6700 -Dstorm.log.dir=/home/storm/dev/apache-storm-1.0.6/logs -Dlog4j.configurationFile=/home/storm/dev/apache-storm-1.0.6/log4j2/worker.xml -DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector -Dstorm.local.dir=storm-local -Xmx768m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump -Djava.library.path=/home/storm/dev/apache-storm-1.0.6/storm-local/supervisor/stormdist/word-count-1-1539312747/resources/Linux-amd64:/home/storm/dev/apache-storm-1.0.6/storm-lo 1250 org.apache.storm.ui.core 15779 org.apache.storm.daemon.worker word-count-1-1539312747 0d1b7bc1-df35-42e7-be40-7219087e19b4 6702 2065eadf-38dc-4018-8a02-d56a98f069a4 15715 org.apache.storm.LogWriter /usr/lib/jvm/java-8-openjdk-amd64/bin/java -server -Dlogging.sensitivity=S3 -Dlogfile.name=worker.log -Dstorm.home=/home/storm/dev/apache-storm-1.0.6 -Dworkers.artifacts=/home/storm/dev/apache-storm-1.0.6/logs/workers-artifacts -Dstorm.id=word-count-1-1539312747 -Dworker.id=85b1908a-449c-45bc-93ed-1a0efaa8ab54 -Dworker.port=6701 -Dstorm.log.dir=/home/storm/dev/apache-storm-1.0.6/logs -Dlog4j.configurationFile=/home/storm/dev/apache-storm-1.0.6/log4j2/worker.xml -DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector -Dstorm.local.dir=storm-local -Xmx768m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump -Djava.library.path=/home/storm/dev/apache-storm-1.0.6/storm-local/supervisor/stormdist/word-count-1-1539312747/resources/Linux-amd64:/home/storm/dev/apache-storm-1.0.6/storm-lo 15780 org.apache.storm.daemon.worker word-count-1-1539312747 0d1b7bc1-df35-42e7-be40-7219087e19b4 6700 ad93b93e-e2d1-4497-8b97-f18b36336725 17286 com.intellij.idea.Main 19432 org.apache.storm.command.dev_zookeeper 15818 org.apache.storm.command.config_value storm.log4j2.conf.dir 24811 org.apache.storm.daemon.supervisor.Supervisor 21138 org.jetbrains.jps.cmdline.Launcher /home/storm/dev/idea-IU-182.3684.101/lib/netty-buffer-4.1.25.Final.jar:/home/storm/dev/idea-IU-182.3684.101/lib/nanoxml-2.2.3.jar:/home/storm/dev/idea-IU-182.3684.101/lib/aether-transport-http-1.1.0.jar:/home/storm/dev/idea-IU-182.3684.101/lib/httpclient-4.5.5.jar:/home/storm/dev/idea-IU-182.3684.101/lib/commons-codec-1.10.jar:/home/storm/dev/idea-IU-182.3684.101/lib/javac2.jar:/home/storm/dev/idea-IU-182.3684.101/lib/maven-artifact-3.3.9.jar:/home/storm/dev/idea-IU-182.3684.101/lib/aether-util-1.1.0.jar:/home/storm/dev/idea-IU-182.3684.101/lib/asm-all.jar:/home/storm/dev/idea-IU-182.3684.101/lib/aether-api-1.1.0.jar:/home/storm/dev/idea-IU-182.3684.101/lib/oro-2.0.8.jar:/home/storm/dev/idea-IU-182.3684.101/lib/platform-api.jar:/home/storm/dev/idea-IU-182.3684.101/lib/aether-spi-1.1.0.jar:/home/storm/dev/idea-IU-182.3684.101/lib/jps-builders.jar:/home/storm/dev/idea-IU-182.3684.101/lib/maven-repository-metadata-3.3.9.jar:/home/storm/dev/idea-IU-182.3684.101/lib/jna-platform. 15763 org.apache.storm.daemon.worker word-count-1-1539312747 0d1b7bc1-df35-42e7-be40-7219087e19b4 6701 85b1908a-449c-45bc-93ed-1a0efaa8ab54 22295 org.apache.zookeeper.ZooKeeperMain 15831 sun.tools.jps.Jps -lm 26584 org.apache.storm.ui.core 15548 org.apache.storm.daemon.nimbus 21244 org.jetbrains.idea.maven.server.RemoteMavenServer 23165 org.apache.storm.daemon.nimbus
從worker.log可以看到執行情況。
2018-10-12 11:00:39.736 o.a.s.d.executor Thread-7-split-executor[22 22] [INFO] Processing received message FOR 22 TUPLE: source: spout:25, stream: default, id: {}, [i am at two with nature] 2018-10-12 11:00:39.736 STDIO Thread-7-split-executor[22 22] [INFO] i am at two with nature 2018-10-12 11:00:39.736 o.a.s.d.task Thread-7-split-executor[22 22] [INFO] Emitting: split default [i] 2018-10-12 11:00:39.736 o.a.s.d.executor Thread-7-split-executor[22 22] [INFO] TRANSFERING tuple [dest: 8 tuple: source: split:22, stream: default, id: {}, [i]] 2018-10-12 11:00:39.738 o.a.s.d.task Thread-7-split-executor[22 22] [INFO] Emitting: split default [am] 2018-10-12 11:00:39.689 o.a.s.d.task Thread-23-count-executor[4 4] [INFO] Emitting: count default [jumped, 65428]
從Storm UI也可以看到一些統計資訊。


協調者 Zookeeper

ZK在Storm中進行資訊的協調,比如Nimbus和Supervisor之間的協作。
過程中可以看到ZK裡面的資訊。
[zk: localhost:2181(CONNECTED) 2] ls /storm [assignments, backpressure, nimbuses, logconfigs, leader-lock, storms, errors, supervisors, workerbeats, blobstore] [zk: localhost:2181(CONNECTED) 3] ls /storm/nimbuses [ubuntu:6627] [zk: localhost:2181(CONNECTED) 4] ls /storm/supervisors [] [zk: localhost:2181(CONNECTED) 9] ls /storm/supervisors [0d1b7bc1-df35-42e7-be40-7219087e19b4] [zk: localhost:2181(CONNECTED) 10] ls /storm/assignments [word-count-1-1539312747]
當Kill掉Topology後:
[zk: localhost:2181(CONNECTED) 15] ls /storm/assignments [word-count-1-1539312747] [zk: localhost:2181(CONNECTED) 16] ls /storm/assignments []
相關閱讀
Slide:storm-distributed-and-faulttolerant-realtime-computation
ofollow,noindex" target="_blank"> https:// zhuanlan.zhihu.com/p/46 420407