1. 程式人生 > >SpringCloud系列十一:SpringCloudStream(SpringCloudStream 簡介、創建消息生產者、創建消息消費者、自定義消息通道、分組與持久化、設置 RoutingKey)

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 簡介、創建消息生產者、創建消息消費者、自定義消息通道、分組與持久化、設置 RoutingKey)

javax sun 就是 eas nts discovery junit4 IE 程序包

1、概念:SpringCloudStream

2、具體內容

2.1、SpringCloudStream 簡介

SpringCloudStream 就是使用了基於消息系統的微服務處理架構。對於消息系統而言一共分為兩類基於應用標準的 JMS、基於協議標準的 AMQP,在整個 SpringCloud 之中支持有 RabbitMQ、Kafka 組件的消息系統。利用 SpringCloudStream 可以實現更加方便的消息系統的整合處理,但是推薦還是基於 RabbitMQ 實現會更好一些。

為什麽 SpringCloud 中要提供有一個類似於消息驅動的 SpringCloudStream 呢?

如果通過 Java 歷史上的分布式的開發架構大家不難發現,對於消息系統,實際上最初的 SUN 公司是非常看中的,所以在 EJB 的時代裏面專門提供有消息驅動 Bean(Message Driven Bean、MDB)利用消息驅動 Bean 可以進行消息的處理操作。利用消息驅動 bean 的模式可以簡化用戶的操作復雜度,直接傳遞一些各類的數據即可實現業務的處理操作

於是在 SpringBoot 的之中為了方便開發者去整合消息組件,也提供有一系列的處理支持,但是如果按照這些方式來在 SpringCloud 之中進行消息處理,有些人會認為比較麻煩,所以在 SpringCloud 裏面將消息整合的處理操作進行了進一步的抽象操作, 實現了更加簡化的消息處理。

總結:SpringCloudStream 就是實現了 MDB 功能,同時可以更加簡化方便的整合消息組件

SpringCloudStream的工作原理:

技術分享圖片

說明:最底層是消息服務,中間層是綁定層,綁定層和底層的消息服務進行綁定,頂層是消息生產者和消息消費者,頂層可以向綁定層生產消息和和獲取消息消費

2.2、創建消息生產者

本次基於 RabbitMQ 實現消息的生產者的微服務操作,在整個的生產者項目之中,首先創建了一個新的 Maven 模塊: microcloud-stream-provider-8401。

1、 【microcloud-stream-provider-8401】修改 pom.xml 配置文件,在這個配置文件之中要追加如下的依賴程序包:

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</
artifactId> </dependency>

2、 【microcloud-stream-provider-8401】修改 application.yml 配置文件,追加如下的綁定處理配置:

server:
  port: 8401
eureka: 
  client: # 客戶端進行Eureka註冊的配置
    service-url:
      defaultZone: http://edmin:[email protected]:7001/eureka,http://edmin:[email protected]:7002/eureka,http://edmin:[email protected]:7003/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒)
    instance-id: send-8401.com  # 在信息列表時顯示主機名稱
    prefer-ip-address: true     # 訪問的路徑變為IP地址
spring:
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務信息;
        defaultRabbit: # 表示定義的名稱,用於於binding整合
          type: rabbit # 消息組件類型
          environment: # 設置rabbitmq的相關的環境配置
            spring:
              rabbitmq:
              addresses: rabbitmq-server
              username: studyjava
              password: hello
              virtual-host: /
      bindings: # 服務的整合處理
        output: # 這個名字是一個通道的名稱,在分析具體源代碼的時候會進行說明
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設置消息類型,本次為對象json,如果是文本則設置“text/plain”
          binder: defaultRabbit # 設置要綁定的消息服務的具體設置
  application:
    name: microcloud-stream-provider

3、 修改 hosts 配置文件,追加主機映射:

127.0.0.1 send-8401.com

4、 【microcloud-stream-provider-8401】定義一個消息的發送接口:

package cn.study.microcloud.service;

import cn.study.vo.Company;

public interface IMessageProvider {

    /**
    * 實現消息的發送,本次發送的消息是一個對象(自動變為json)
    * @param company VO對象,該對象不為null*/
      public void send(Company company) ;
}

5、 【microcloud-stream-provider-8401】定義消息發送的實現子類:

package cn.study.microcloud.service.impl;

import javax.annotation.Resource;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import cn.study.microcloud.service.IMessageProvider;
import cn.study.vo.Company;

@EnableBinding(Source.class) // 可以理解為是一個消息的發送管道的定義
public class MessageProviderImpl implements IMessageProvider {
    @Resource
    private MessageChannel output; // 消息的發送管道

    @Override
    public void send(Company company) {
        this.output.send(MessageBuilder.withPayload(company).build()); // 創建並發送消息
    }
}

6、 【microcloud-stream-provider-8401】分析一下 Source 類的源代碼:

public interface Source {

    String OUTPUT = "output"; // 之前所設置的消息發送的管道

    @Output(Source.OUTPUT)
    MessageChannel output();

}

7、 【microcloud-stream-provider-8401】定義程序主類:

package cn.study.microcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
@SpringBootApplication
@EnableEurekaClient
@EnableCircuitBreaker
@EnableDiscoveryClient
public class StreamProvider_8401_StartSpringCloudApplication {
    public static void main(String[] args) {
        SpringApplication.run(StreamProvider_8401_StartSpringCloudApplication.class, args);
    }
}

8、 【microcloud-stream-provider-8401】編寫測試類:

· 保證你的 pom.xml 文件之中存在有測試的依賴程序包:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

· 編寫具體的測試程序類:

package cn.study.microcloud.test;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;

import cn.study.microcloud.StreamProvider_8401_StartSpringCloudApplication;
import cn.study.microcloud.service.IMessageProvider;
import cn.study.vo.Company;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = StreamProvider_8401_StartSpringCloudApplication.class)
@WebAppConfiguration
public class TestMessageProvider {
    @Resource
    private IMessageProvider messageProvider;

    @Test
    public void testSend() {
        Company company = new Company();
        company.setTitle("studyjava");
        company.setNote("更多資源請登錄:www.study.cn");
        this.messageProvider.send(company); // 消息發送
    }
}

9、 啟動 RabbitMQ 以及相應的微服務進行消息的發送處理,如果可以檢測到 RabbitMQ 上的活動信息就表示該微服務創建成功。

技術分享圖片

2.3、創建消息消費者

在之前已經成功的實現了消息的發送處理,但是這個消息由於只是一個臨時消息並且只是發送到了 RabbitMQ 之中,那麽現在 如果要想進行該消息的接收就必須通過 RabbitMQ 獲取消息內容。

1、 【microcloud-stream-consumer-8402】通過“microcloud-stream-provider-8401”模塊復制本模塊;

2、 【microcloud-stream-consumer-8402】一定要保證 pom.xml 文件之中擁有如下的依賴包:

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

3、 修改 hosts 主機映射,追加新的主機名稱:

127.0.0.1 receive-8402.com

4、 【microcloud-stream-consumer-8402】修改 application.yml 配置文件:

server:
  port: 8402
eureka: 
  client: # 客戶端進行Eureka註冊的配置
    service-url:
      defaultZone: http://edmin:[email protected]:7001/eureka,http://edmin:[email protected]:7002/eureka,http://edmin:[email protected]:7003/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒)
    instance-id: receive-8402.com  # 在信息列表時顯示主機名稱
    prefer-ip-address: true     # 訪問的路徑變為IP地址
spring:
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務信息;
        defaultRabbit: # 表示定義的名稱,用於於binding整合
          type: rabbit # 消息組件類型
          environment: # 設置rabbitmq的相關的環境配置
            spring:
              rabbitmq:
              addresses: rabbitmq-server
              username: studyjava
              password: hello
              virtual-host: /
      bindings: # 服務的整合處理
        input: # 這個名字是一個通道的名稱,在分析具體源代碼的時候會進行說明
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設置消息類型,本次為對象json,如果是文本則設置“text/plain”
          binder: defaultRabbit # 設置要綁定的消息服務的具體設置
  application:
    name: microcloud-stream-consumer

5、 【microcloud-stream-consumer-8402】定義一個消息的監聽程序類:

package cn.study.microcloud.service;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import cn.study.vo.Company;

@Component
@EnableBinding(Sink.class)
public class MessageListener {
    @StreamListener(Sink.INPUT)
    public void input(Message<Company> message) {
        System.err.println("【*** 消息接收 ***】" + message.getPayload());
    }
}

6、 【microcloud-stream-consumer-8402】觀察 Sink 源代碼:

public interface Sink {

    String INPUT = "input";

    @Input(Sink.INPUT)
    SubscribableChannel input();

}

7、 首先啟動消息的消費端,而後再啟動消息的生產者發送消息。

技術分享圖片

那麽此時實現了了一個基於 RabbitMQ 定義的 SpringCloudStream 基本操作功能。

2.4、自定義消息通道

現在已經實現了一個基礎的 SpringCloudStream 處理操作,但是在本次操作之中一直使用的都是系統中提供好的 Source (output)、Sink(input),如果說現在用戶有需要也可以定義自己的通道名稱。

1、 【micocloud-api】由於現在有兩個模塊都需要使用到自定義消息通道的配置,所以應該將這個配置定義為一個公共的程序處理 類,修改 pom.xml 配置文件,引入相應的開發包:

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

2、 【micocloud-api】使用一個自定義的通道:

package cn.study.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface DefaultProcess {
    public static final String OUTPUT = "study_output"; // 輸出通道名稱
    public static final String INPUT = "study_input"; // 輸入通道名稱

    @Input(DefaultProcess.INPUT)
    public SubscribableChannel input();

    @Output(DefaultProcess.OUTPUT)
    public MessageChannel output();
}

3、 【microcloud-stream-provider-8401】修改 application.yml 配置文件:

spring:
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務信息;
        defaultRabbit: # 表示定義的名稱,用於於binding整合
          type: rabbit # 消息組件類型
          environment: # 設置rabbitmq的相關的環境配置
            spring:
              rabbitmq:
              addresses: rabbitmq-server
              username: studyjava
              password: hello
              virtual-host: /
      bindings: # 服務的整合處理
        study_output: # 這個名字是一個通道的名稱,在分析具體源代碼的時候會進行說明
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設置消息類型,本次為對象json,如果是文本則設置“text/plain”
          binder: defaultRabbit # 設置要綁定的消息服務的具體設置

4、 【microcloud-stream-consumer-8402】修改 application.yml 配置文件:

spring:
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務信息;
        defaultRabbit: # 表示定義的名稱,用於於binding整合
          type: rabbit # 消息組件類型
          environment: # 設置rabbitmq的相關的環境配置
            spring:
              rabbitmq:
              addresses: rabbitmq-server
              username: studyjava
              password: hello
              virtual-host: /
      bindings: # 服務的整合處理
        study_input: # 這個名字是一個通道的名稱,在分析具體源代碼的時候會進行說明
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設置消息類型,本次為對象json,如果是文本則設置“text/plain”
          binder: defaultRabbit # 設置要綁定的消息服務的具體設置

5、 【microcloud-stream-provider-8401】修改消息的發送子類:

package cn.study.microcloud.service.impl;

import javax.annotation.Resource;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

import cn.study.channel.DefaultProcess;
import cn.study.microcloud.service.IMessageProvider;
import cn.study.vo.Company;

@EnableBinding(DefaultProcess.class) // 可以理解為是一個消息的發送管道的定義
public class MessageProviderImpl implements IMessageProvider {
    @Resource
    private MessageChannel output; // 消息的發送管道

    @Override
    public void send(Company company) {
        this.output.send(MessageBuilder.withPayload(company).build()); // 創建並發送消息
    }
}

6、 【microcloud-stream-consumer-8402】修改 MessageListener 程序類:

package cn.study.microcloud.service;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import cn.study.channel.DefaultProcess;
import cn.study.vo.Company;

@Component
@EnableBinding(DefaultProcess.class)
public class MessageListener {
    @StreamListener(Sink.INPUT)
    public void input(Message<Company> message) {
        System.err.println("【*** 消息接收 ***】" + message.getPayload());
    }
}

7、 隨後就可以使用自定義的新的通道名稱進行 Stream 處理操作了。

2.5、分組與持久化

在上面的程序裏面成功的實現了消息的發送以及接收,但是需要註意一個問題,所發送的消息在默認情況下它都屬於一種臨時消息,也就是說如果現在沒有消費者進行消費處理,那麽該消息是不會被保留的。

如果要想實現持久化的消息處理,重點在於消息的消費端配置,同時也需要考慮到一個分組的情況(有分組就表示該消息可以進行持久化)。

1、 【microcloud-stream-consumer-8402】修改 application.yml 配置文件,追加分組配置:

spring:
  cloud:
    stream:
      binders: # 在此處配置要綁定的rabbitmq的服務信息;
        defaultRabbit: # 表示定義的名稱,用於於binding整合
          type: rabbit # 消息組件類型
          environment: # 設置rabbitmq的相關的環境配置
            spring:
              rabbitmq:
              addresses: rabbitmq-server
              username: studyjava
              password: hello
              virtual-host: /
      bindings: # 服務的整合處理
        study_input: # 這個名字是一個通道的名稱,在分析具體源代碼的時候會進行說明
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設置消息類型,本次為對象json,如果是文本則設置“text/plain”
          binder: defaultRabbit # 設置要綁定的消息服務的具體設置
          group: study-group # 進行操作的分組,實際上就表示持久化

在 SpringCloudStream 之中如果要設置持久化隊列,則名稱為“destination.group”。此時關閉掉消費端的微服務之後該隊列信息依然會被保留在 RabbitMQ 之中。而後在關閉消費端的情況下去運行消息生產者,發送完消息後再運行消息的消費端仍然可以接收到之前的消息。

2.6、設置 RoutingKey

默認情況下之前的程序都是屬於廣播消息,也就是說所有的消費者都可以接收發送消息內容,在 RabbitMQ 裏面支持有直連消息,而直連消息主要是通過 RoutingKey 來實現,利用直連消息可以實現準確的消息消費端的接收處理。

1、 【microcloud-stream-consumer-8402】修改 application.yml 配置文件:

server:
  port: 8402
eureka: 
  client: # 客戶端進行Eureka註冊的配置
    service-url:
      defaultZone: http://edmin:[email protected]:7001/eureka,http://edmin:[email protected]:7002/eureka,http://edmin:[email protected]:7003/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒)
    instance-id: receive-8402.com  # 在信息列表時顯示主機名稱
    prefer-ip-address: true     # 訪問的路徑變為IP地址
spring:
  cloud:
    stream:
      rabbit: # 進行rabbit的相關綁定配置
        bindings:
          study_input:
            consumer: # 進行消費端配置
              bindingRoutingKey: study-key # 設置一個RoutingKey信息
      binders: # 在此處配置要綁定的rabbitmq的服務信息;
        defaultRabbit: # 表示定義的名稱,用於於binding整合
          type: rabbit # 消息組件類型
          environment: # 設置rabbitmq的相關的環境配置
            spring:
              rabbitmq:
              addresses: rabbitmq-server
              username: studyjava
              password: hello
              virtual-host: /
      bindings: # 服務的整合處理
        study_input: # 這個名字是一個通道的名稱,在分析具體源代碼的時候會進行說明
          destination: RKExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設置消息類型,本次為對象json,如果是文本則設置“text/plain”
          binder: defaultRabbit # 設置要綁定的消息服務的具體設置
          group: study-group # 進行操作的分組,實際上就表示持久化
          
  application:
    name: microcloud-stream-consumer

2、 【microcloud-stream-provider-8401】定義 RoutingKey 的表達式配置:

server:
  port: 8401
eureka: 
  client: # 客戶端進行Eureka註冊的配置
    service-url:
      defaultZone: http://edmin:[email protected]:7001/eureka,http://edmin:[email protected]:7002/eureka,http://edmin:[email protected]:7003/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒)
    instance-id: send-8401.com  # 在信息列表時顯示主機名稱
    prefer-ip-address: true     # 訪問的路徑變為IP地址
spring:
  cloud:
    stream:
      rabbit: # 進行rabbit的相關綁定配置
        bindings:
          study_output:
            producer: # 進行消費端配置
              routing-key-expression: ‘‘‘study-key‘‘‘ #定義 RoutingKey 的表達式配置
      binders: # 在此處配置要綁定的rabbitmq的服務信息;
        defaultRabbit: # 表示定義的名稱,用於於binding整合
          type: rabbit # 消息組件類型
          environment: # 設置rabbitmq的相關的環境配置
            spring:
              rabbitmq:
              addresses: rabbitmq-server
              username: studyjava
              password: hello
              virtual-host: /
      bindings: # 服務的整合處理
        study_output: # 這個名字是一個通道的名稱,在分析具體源代碼的時候會進行說明
          destination: RKExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設置消息類型,本次為對象json,如果是文本則設置“text/plain”
          binder: defaultRabbit # 設置要綁定的消息服務的具體設置
  application:
    name: microcloud-stream-provider

3、 首先運行消費端程序,隨後在運行生產端,只有 RoutingKey 匹配了之後才可以正常進行消息的接收處理。

技術分享圖片

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 簡介、創建消息生產者、創建消息消費者、自定義消息通道、分組與持久化、設置 RoutingKey)