1. 程式人生 > >SpringBoot整合Kafka和Storm

SpringBoot整合Kafka和Storm

name cat engine override interrupt length dep 發的 介紹

前言

本篇文章主要介紹的是SpringBoot整合kafka和storm以及在這過程遇到的一些問題和解決方案。

kafka和storm的相關知識

如果你對kafkastorm熟悉的話,這一段可以直接跳過!如果不熟,也可以看看我之前寫的博客。一些相關博客如下。

kafka 和 storm的環境安裝

地址:http://www.panchengming.com/2018/01/26/pancm70/

kafka的相關使用

地址:http://www.panchengming.com/2018/01/28/pancm71/
http://www.panchengming.com/2018/02/08/pancm72/

storm的相關使用

地址:http://www.panchengming.com/2018/03/16/pancm75/

SpringBoot整合kafka和storm

為什麽使用SpringBoot整合kafka和storm

一般而言,使用kafka整合storm可以應付大多數需求。但是在擴展性上來說,可能就不太好。目前主流的微服務框架SpringCloud是基於SpringBoot的,所以使用SpringBoot對kafka和storm進行整合,可以進行統一配置,擴展性會更好。

使用SpringBoot整合kafka和storm做什麽

一般來說,kafka和storm的整合,使用kafka進行數據的傳輸,然後使用storm實時的處理kafka中的數據。

在這裏我們加入SpringBoot之後,也是做這些,只不過是由SpringBoot對kafka和storm進行統一的管理。

如果還是不好理解的話,可以通過下面這個簡單的業務場景了解下:

在數據庫中有一批大量的用戶數據,其中這些用戶數據中有很多是不需要的,也就是臟數據,我們需要對這些用戶數據進行清洗,然後重新存入數據庫中,但是要求實時、延時低,並且便於管理。

所以這裏我們就可以使用SpringBoot+kafka+storm來進行相應的開發。

開發準備

在進行代碼開發前,我們要明確開發什麽。
在上述的業務場景中,需要大量的數據,但是我們這裏只是簡單的進行開發,也就是寫個簡單的demo出來,能夠簡單的實現這些功能,所以我們只需滿足如下條件就可以了:

  1. 提供一個將用戶數據寫入kafka的接口;
  2. 使用storm的spout獲取kafka的數據並發送給bolt;
  3. 在bolt移除年齡小於10歲的用戶的數據,並寫入mysql;

那麽根據上述要求我們進行SpringBoot、kafka和storm的整合。
首先需要相應jar包,所以maven的依賴如下:


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <springboot.version>1.5.9.RELEASE</springboot.version>
        <mybatis-spring-boot>1.2.0</mybatis-spring-boot>
        <mysql-connector>5.1.44</mysql-connector>
        <slf4j.version>1.7.25</slf4j.version>
        <logback.version>1.2.3</logback.version>
        <kafka.version>1.0.0</kafka.version>
        <storm.version>1.2.1</storm.version>
        <fastjson.version>1.2.41</fastjson.version>
        <druid>1.1.8</druid>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>${springboot.version}</version>
        </dependency>

        <!-- Spring Boot Mybatis 依賴 -->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>${mybatis-spring-boot}</version>
        </dependency>

        <!-- MySQL 連接驅動依賴 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql-connector}</version>
        </dependency>


        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>


        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>${logback.version}</version>
        </dependency>


        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>${kafka.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>${kafka.version}</version>
        </dependency>


        <!--storm相關jar -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <!--排除相關依賴 -->
            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-slf4j-impl</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-1.2-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-web</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <artifactId>ring-cors</artifactId>
                    <groupId>ring-cors</groupId>
                </exclusion>
            </exclusions>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>${storm.version}</version>
        </dependency>


        <!--fastjson 相關jar -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <!-- Druid 數據連接池依賴 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>${druid}</version>
        </dependency>
    </dependencies>

成功添加了相關依賴之後,這裏我們再來添加相應的配置。
application.properties中添加如下配置:

    
    # log
    logging.config=classpath:logback.xml
    
    ## mysql
    spring.datasource.url=jdbc:mysql://localhost:3306/springBoot2?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true
    spring.datasource.username=root
    spring.datasource.password=123456
    spring.datasource.driverClassName=com.mysql.jdbc.Driver
    
    
    ## kafka 
    kafka.servers = 192.169.0.23\:9092,192.169.0.24\:9092,192.169.0.25\:9092  
    kafka.topicName = USER_TOPIC
    kafka.autoCommit = false
    kafka.maxPollRecords = 100
    kafka.groupId = groupA
    kafka.commitRule = earliest

註:上述的配置只是一部分,完整的配置可以在我的github中找到。

數據庫腳本:

-- springBoot2庫的腳本

CREATE TABLE `t_user` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `name` varchar(10) DEFAULT NULL COMMENT '姓名',
  `age` int(2) DEFAULT NULL COMMENT '年齡',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8

註:因為這裏我們只是簡單的模擬一下業務場景,所以只是建立一張簡單的表。

代碼編寫

說明:這裏我只對幾個關鍵的類進行說明,完整的項目工程鏈接可以在博客底部找到。

在使用SpringBoot整合kafka和storm之前,我們可以先對kfaka和storm的相關代碼編寫,然後在進行整合。

首先是數據源的獲取,也就是使用storm中的spout從kafka中拉取數據。

在之前的storm入門中,講過storm的運行流程,其中spout是storm獲取數據的一個組件,其中我們主要實現nextTuple方法,編寫從kafka中獲取數據的代碼就可以在storm啟動後進行數據的獲取。

spout類的主要代碼如下:

@Override
public void nextTuple() {
    for (;;) {
        try {
            msgList = consumer.poll(100);
            if (null != msgList && !msgList.isEmpty()) {
                String msg = "";
                List<User> list=new ArrayList<User>();
                for (ConsumerRecord<String, String> record : msgList) {
                    // 原始數據
                    msg = record.value();
                    if (null == msg || "".equals(msg.trim())) {
                        continue;
                    }
                    try{
                        list.add(JSON.parseObject(msg, User.class));
                    }catch(Exception e){
                        logger.error("數據格式不符!數據:{}",msg);
                        continue;
                    }
                 } 
                logger.info("Spout發射的數據:"+list);
                //發送到bolt中
                this.collector.emit(new Values(JSON.toJSONString(list)));
                 consumer.commitAsync();
            }else{
                TimeUnit.SECONDS.sleep(3);
                logger.info("未拉取到數據...");
            }
        } catch (Exception e) {
            logger.error("消息隊列處理異常!", e);
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e1) {
                logger.error("暫停失敗!",e1);
            }
        }
    }
}

註:如果spout在發送數據的時候發送失敗,是會重發的!

上述spout類中主要是將從kafka獲取的數據傳輸傳輸到bolt中,然後再由bolt類處理該數據,處理成功之後,寫入數據庫,然後給與sqout響應,避免重發。

bolt類主要處理業務邏輯的方法是execute,我們主要實現的方法也是寫在這裏。需要註意的是這裏只用了一個bolt,因此也不用定義Field進行再次的轉發。
代碼的實現類如下:

@Override
    public void execute(Tuple tuple) {
        String msg=tuple.getStringByField(Constants.FIELD);
        try{
            List<User> listUser =JSON.parseArray(msg,User.class);
            //移除age小於10的數據
            if(listUser!=null&&listUser.size()>0){
                Iterator<User> iterator = listUser.iterator();
                 while (iterator.hasNext()) {
                     User user = iterator.next();
                     if (user.getAge()<10) {
                         logger.warn("Bolt移除的數據:{}",user);
                         iterator.remove();
                     }
                 }
                if(listUser!=null&&listUser.size()>0){
                    userService.insertBatch(listUser);
                }
            }
        }catch(Exception e){
            logger.error("Bolt的數據處理失敗!數據:{}",msg,e);
        }
    }

編寫完了spout和bolt之後,我們再來編寫storm的主類。

storm的主類主要是對Topology(拓步)進行提交,提交Topology的時候,需要對spout和bolt進行相應的設置。Topology的運行的模式有兩種:

  1. 一種是本地模式,利用本地storm的jar模擬環境進行運行。

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("TopologyApp", conf,builder.createTopology());
  2. 另一種是遠程模式,也就是在storm集群進行運行。

    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

這裏為了方便,兩種方法都編寫了,通過主方法的args參數來進行控制。
Topology相關的配置說明在代碼中的註釋寫的很詳細了,這裏我就不再多說了。
代碼如下:

  public  void runStorm(String[] args) {
    // 定義一個拓撲
    TopologyBuilder builder = new TopologyBuilder();
    // 設置1個Executeor(線程),默認一個
    builder.setSpout(Constants.KAFKA_SPOUT, new KafkaInsertDataSpout(), 1);
    // shuffleGrouping:表示是隨機分組
    // 設置1個Executeor(線程),和兩個task
    builder.setBolt(Constants.INSERT_BOLT, new InsertBolt(), 1).setNumTasks(1).shuffleGrouping(Constants.KAFKA_SPOUT);
    Config conf = new Config();
    //設置一個應答者
    conf.setNumAckers(1);
    //設置一個work
    conf.setNumWorkers(1);
    try {
        // 有參數時,表示向集群提交作業,並把第一個參數當做topology名稱
        // 沒有參數時,本地提交
        if (args != null && args.length > 0) { 
            logger.info("運行遠程模式");
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {
            // 啟動本地模式
            logger.info("運行本地模式");
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("TopologyApp", conf, builder.createTopology());
        }
    } catch (Exception e) {
        logger.error("storm啟動失敗!程序退出!",e);
        System.exit(1);
    }
    logger.info("storm啟動成功...");
    }

好了,編寫完了kafka和storm相關的代碼之後,我們再來進行和SpringBoot的整合!

在進行和SpringBoot整合前,我們先要解決下一下幾個問題。

1 在SpringBoot程序中如何提交storm的Topolgy?

storm是通過提交Topolgy來確定如何啟動的,一般使用過運行main方法來啟動,但是SpringBoot啟動方式一般也是通過main方法啟動的。所以應該怎麽樣解決呢?

  • 解決思路:將storm的Topology寫在SpringBoot啟動的主類中,隨著SpringBoot啟動而啟動。
  • 實驗結果:可以一起啟動(按理來說也是可以的)。但是隨之而來的是下一個問題,bolt和spout類無法使用spring註解。

2 如何讓bolt和spout類使用spring註解?

  • 解決思路:在了解到spout和bolt類是由nimbus端實例化,然後通過序列化傳輸到supervisor,再反向序列化,因此無法使用註解,所以這裏可以換個思路,既然不能使用註解,那麽就動態獲取Spring的bean就好了。
  • 實驗結果:使用動態獲取bean的方法之後,可以成功啟動storm了。

3.有時啟動正常,有時無法啟動,動態的bean也無法獲取?

  • 解決思路:在解決了1、2的問題之後,有時出現問題3,找了很久才找到,是因為之前加入了SpringBoot的熱部署,去掉之後就沒出現了...。

上面的三個問題是我在整合的時候遇到的,其中解決辦法在目前看來是可行的,或許其中的問題可能是因為其他的原因導致的,不過目前就這樣整合之後,就沒出現過其他的問題了。若上述問題和解決辦法有不妥之後,歡迎批評指正!

解決了上面的問題之後,我們回到代碼這塊。
其中,程序的入口,也就是主類的代碼在進行整合後如下:

@SpringBootApplication
public class Application{

    public static void main(String[] args) {
        // 啟動嵌入式的 Tomcat 並初始化 Spring 環境及其各 Spring 組件
        ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
        GetSpringBean springBean=new GetSpringBean();
        springBean.setApplicationContext(context);
        TopologyApp app = context.getBean(TopologyApp.class);
        app.runStorm(args);
    }
    
}

動態獲取bean的代碼如下:

public class GetSpringBean implements ApplicationContextAware{

    private static ApplicationContext context;

    public static Object getBean(String name) {
        return context.getBean(name);
    }

    public static <T> T getBean(Class<T> c) {

        return context.getBean(c);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
            throws BeansException {
        if(applicationContext!=null){
            context = applicationContext;
        }
    }

}

主要的代碼的介紹就到這裏了,至於其它的,基本就和以前的一樣了。

測試結果

成功啟動程序之後,我們先調用接口新增幾條數據到kafka

新增請求:

POST http://localhost:8087/api/user

{"name":"張三","age":20}
{"name":"李四","age":10}
{"name":"王五","age":5}

新增成功之後,我們可以使用xshell工具在kafka集群中查看數據。
輸入:**kafka-console-consumer.sh --zookeeper master:2181 --topic USER_TOPIC --from-beginning**

然後可以看到以下輸出結果。

技術分享圖片

上述也表示了數據成功的寫入了kafka。
因為是實時的從kafka那數據,我們也可以從控制臺查看打印的語句。

控制臺輸出:

 INFO  com.pancm.storm.spout.KafkaInsertDataSpout - Spout發射的數據:[{"age":5,"name":"王五"}, {"age":10,"name":"李四"}, {"age":20,"name":"張三"}]
 WARN  com.pancm.storm.bolt.InsertBolt - Bolt移除的數據:{"age":5,"name":"王五"}
 INFO  com.alibaba.druid.pool.DruidDataSource - {dataSource-1} inited
 DEBUG com.pancm.dao.UserDao.insertBatch - ==>  Preparing: insert into t_user (name,age) values (?,?) , (?,?) 
 DEBUG com.pancm.dao.UserDao.insertBatch - ==> Parameters: 李四(String), 10(Integer), 張三(String), 20(Integer)
 DEBUG com.pancm.dao.UserDao.insertBatch - <==    Updates: 2
 INFO  com.pancm.service.impl.UserServiceImpl - 批量新增2條數據成功!

可以在控制臺成功的看到處理的過程和結果。
然後我們也可以通過接口進行數據庫所有的數據查詢。

查詢請求:

GET http://localhost:8087/api/user

返回結果:

[{"id":1,"name":"李四","age":10},{"id":2,"name":"張三","age":20}]

上述代碼中測試返回的結果顯然是符合我們預期的。

結語

關於SpringBoot整合kafka和storm暫時就告一段落了。本篇文章只是簡單的介紹這些 相關的使用,在實際的應用可能會更復雜。如果有有更好的想法和建議,歡迎留言進行討論!
SpringBoot整合kafka和storm的工程我放在github上了,如果感覺不錯的話請給個star吧。
Gihub地址:https://github.com/xuwujing/springBoot-study
對了,也有kafka整合storm的工程,也在我的github上。
地址:https://github.com/xuwujing/kafka-study

到此,本文結束,謝謝閱讀。

版權聲明:
作者:虛無境
博客園出處:http://www.cnblogs.com/xuwujing
CSDN出處:http://blog.csdn.net/qazwsxpcm    
個人博客出處:http://www.panchengming.com
原創不易,轉載請標明出處,謝謝!

SpringBoot整合Kafka和Storm