1. 程式人生 > >【轉】Storm入門示例

【轉】Storm入門示例

Storm入門示例

  開發Storm的第一步就是設計Topology,為了方便開發者入門,首先我們設計一個簡答的例子,該例子的主要的功能就是把每個單詞的後面加上Hello,World字尾,然後再列印輸出,整個例子的Topology圖如下:

  整個Topology分為三部分:

  TestWordSpout:資料來源,負責傳送words

  ExclamationBolt:負責把每個單詞後面加上字尾

  PrintBolt:負責把單詞列印輸出

程式碼實現

1.使用IDEA建立maven過程,新增Maven依賴

複製程式碼

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ysl</groupId>
    <artifactId>storm</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/storm/storm -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.4</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.ysl.WordsTopology</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

複製程式碼

TestWordSpout:

複製程式碼

package com.ysl.spouts;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Random;

public class TestWordSpout extends BaseRichSpout{

    private static Logger logger = LoggerFactory.getLogger(TestWordSpout.class);
    private SpoutOutputCollector collector = null;

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
    }

    public void nextTuple() {
        Utils.sleep(1000);
        final String[] words = new String[]{"fdfs","fdfs","ffsdfs"};
        final Random random = new Random();
        final String word = words[random.nextInt(words.length)];
        collector.emit(new Values(word));
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}

複製程式碼

ExclamationBolt:

複製程式碼

package com.ysl.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class ExclamationBolt extends BaseRichBolt{

    private static Logger logger = LoggerFactory.getLogger(ExclamationBolt.class);

    private OutputCollector collector = null;

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    public void execute(Tuple tuple) {
        this.collector.emit(tuple,new Values(tuple.getString(0)+"!!!"));
        this.collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}

複製程式碼

PrintBolt:

複製程式碼

package com.ysl.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class PrintBolt extends BaseRichBolt{

    private static Logger logger = LoggerFactory.getLogger(PrintBolt.class);

    private OutputCollector collector = null;

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    public void execute(Tuple tuple) {
        logger.info(tuple.getString(0) + ".......");
        this.collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

複製程式碼

WordsTopology:

複製程式碼

package com.ysl;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import com.ysl.bolts.ExclamationBolt;
import com.ysl.bolts.PrintBolt;
import com.ysl.spouts.TestWordSpout;

public class WordsTopology {
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("word",new TestWordSpout(),1);
        topologyBuilder.setBolt("exclaim",new ExclamationBolt(),1).shuffleGrouping("word");
        topologyBuilder.setBolt("print",new PrintBolt(),1).shuffleGrouping("exclaim");
        Config config = new Config();
        config.setDebug(true);
        if(args != null && args.length > 0){
            config.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0],config,topologyBuilder.createTopology());
        }else{
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("test",config,topologyBuilder.createTopology());
            Utils.sleep(30000);
            localCluster.killTopology("test");
            localCluster.shutdown();
        }
    }
}

複製程式碼

2.打包執行

  使用maven打包應用程式,命令如下:

mvn clean install

  storm的執行方式有兩種:一是本地執行,適合除錯和開發,自己直接在IDEA中執行main函式執行即可,本地模式的程式碼中有設定睡眠時間,到時間後主動kill topoloyg

  二是遠端叢集模式執行:叢集模式需要先建立一個包含程式程式碼以及程式碼所依賴的依賴包的jar包(有關storm的jar包不用包括, 這些jar包會在工作節點上自動被新增到classpath裡面去)。如果使用maven, 那麼外掛:Maven Assembly Plugin可以幫你打包,詳細見上述maven的設定。

  遠端執行要使用storm的命令提交topology到storm叢集:

storm jar /home/workspace/storm/target/storm-1.0-SNAPSHOT.jar com.ysl.WordsTopology testfrfr

執行上面的命令後,出現下面的日誌,表示執行成功:

複製程式碼

346  [main] INFO  backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar...
351  [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar /home/workspace/storm/target/storm-1.0-SNAPSHOT.jar to assigned location: storm-local/nimbus/inbox/stormjar-89514681-2477-44c2-8924-7907d16e3ba1.jar
Start uploading file '/home/workspace/storm/target/storm-1.0-SNAPSHOT.jar' to 'storm-local/nimbus/inbox/stormjar-89514681-2477-44c2-8924-7907d16e3ba1.jar' (6196 bytes)
[==================================================] 6196 / 6196
File '/home/workspace/storm/target/storm-1.0-SNAPSHOT.jar' uploaded to 'storm-local/nimbus/inbox/stormjar-89514681-2477-44c2-8924-7907d16e3ba1.jar' (6196 bytes)
363  [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: storm-local/nimbus/inbox/stormjar-89514681-2477-44c2-8924-7907d16e3ba1.jar
363  [main] INFO  backtype.storm.StormSubmitter - Submitting topology testfrfr in distributed mode with conf {"topology.workers":3,"topology.debug":true}
448  [main] INFO  backtype.storm.StormSubmitter - Finished submitting topology:

複製程式碼

終止一個topology

要終止一個topology, 執行:

storm kill {stormname}

其中{stormname}是提交topology給storm叢集的時候指定的名字。

storm不會馬上終止topology。相反,它會先終止所有的spout,讓它們不再發射任何新的tuple, storm會等Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS秒之後才殺掉所有的工作程序。這會給topology足夠的時 間來完成所有我們執行storm kill命令的時候還沒完成的tuple。