1. 程式人生 > >springcloud系列—Bus—第7章-1: Spring Cloud bus 訊息匯流排

springcloud系列—Bus—第7章-1: Spring Cloud bus 訊息匯流排

參考:https://www.jianshu.com/p/730d86030a41

目錄

RabbitMQ實現訊息匯流排

spring boot 整合 RabbitMQ

RabbitMQ實現訊息匯流排

原理分析

指定重新整理範圍

架構優化

kafka實現訊息匯流排

Kafka簡介

快速入門

整合Spring Cloud Bus

Kafka配置


      在微服務架構的系統中,我們通常會使用輕量級的訊息代理來構建一個共用的訊息主題讓系統中所有微服務例項都連線上來,由於該主題中產生的訊息會被所有例項監聽和消費,所以我們稱它為訊息匯流排。在總線上的各個例項都可以方便地廣播- - 些需要讓其他連線在該主題上的例項都知道的訊息,例如配置資訊的變更或者其他一些管理操作等。

      由於訊息匯流排在微服務架構系統中被廣泛使用,所以它同配置中心一樣,幾乎是微服務架構中的必備元件。Spring Cloud作為微服務架構綜合性的解決方案,對此自然也有自己的實現,這就是本章我們將要具體介紹的Spring Cloud Bus。通過使用Spring Cloud Bus,可以非常容易地搭建起訊息匯流排,同時實現了一些訊息匯流排中的常用功能,比如,配合Spring Cloud Config實現微服務應用配置資訊的動態更新等。

RabbitMQ實現訊息匯流排

spring boot 整合 RabbitMQ

  • 新建一個Spring Boot工程,命名為:“rabbitmq-hello”。
  • pom.xml中引入如下依賴內容,其中spring-boot-starter-amqp用於支援RabbitMQ。
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.3.7.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
  • application.properties中配置關於RabbitMQ的連線和使用者資訊,
spring.application.name=rabbitmq-hello

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=spring
spring.rabbitmq.password=123456
  • 建立訊息生產者Sender。通過注入AmqpTemplate介面的例項來實現訊息的傳送,AmqpTemplate介面定義了一套針對AMQP協議的基礎操作。在Spring Boot中會根據配置來注入其具體實現。在該生產者,我們會產生一個字串,併發送到名為hello的佇列中。
@Component
public class Sender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }

}
  • 建立訊息消費者Receiver。通過@RabbitListener註解定義該類對hello佇列的監聽,並用@RabbitHandler註解來指定對訊息的處理方法。所以,該消費者實現了對hello佇列的消費,消費操作為輸出訊息的字串內容。
@Component
@RabbitListener(queues = "hello")
public class Receiver {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver : " + hello);
    }

}
  • 建立RabbitMQ的配置類RabbitConfig,用來配置佇列、交換器、路由等高階資訊。這裡我們以入門為主,先以最小化的配置來定義,以完成一個基本的生產和消費過程。
@Configuration
public class RabbitConfig {

    @Bean
    public Queue helloQueue() {
        return new Queue("hello");
    }

}
  • 建立應用主類:
@SpringBootApplication
public class HelloApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloApplication.class, args);
    }

}
  • 建立單元測試類,用來呼叫訊息生產:
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = HelloApplication.class)
public class HelloApplicationTests {

    @Autowired
    private Sender sender;

    @Test
    public void hello() throws Exception {
        sender.send();
    }

}

完成程式編寫之後,下面開始嘗試執行。首先確保RabbitMQ Server已經開始,然後進行下面的操作:

  • 啟動應用主類,從控制檯中,我們看到如下內容,程式建立了一個訪問127.0.0.1:5672springcloud的連線。
o.s.a.r.c.CachingConnectionFactory       : Created new connection: [email protected] [delegate=amqp://[email protected]:5672/]

同時,我們通過RabbitMQ的控制面板,可以看到Connection和Channels中包含當前連線的條目。

  • 執行單元測試類,我們可以看到控制檯中輸出下面的內容,訊息被髮送到了RabbitMQ Server的hello佇列中。
Sender : hello Sun Sep 25 11:06:11 CST 2016
  • 切換到應用主類的控制檯,我們可以看到類似如下輸出,消費者對hello佇列的監聽程式執行了,並輸出了接受到的訊息資訊。
Receiver : hello Sun Sep 25 11:06:11 CST 2016

通過上面的示例,我們在Spring Boot應用中引入spring-boot-starter-amqp模組,進行簡單配置就完成了對RabbitMQ的訊息生產和消費的開發內容。

RabbitMQ實現訊息匯流排

  • 準備工作:這裡我們不做新的應用,但需要用到上一章中,我們已經實現的關於Spring Cloud Config的幾個工程
    • config-repo:定義在Git倉庫中的一個目錄,其中儲存了應用名為didispace的多環境配置檔案,配置檔案中有一個from引數。
    • config-server-eureka:配置了Git倉庫,並註冊到了Eureka的服務端。
    • config-client-eureka:通過Eureka發現Config Server的客戶端,應用名為didispace,用來訪問配置伺服器以獲取配置資訊。該應用中提供了一個/from介面,它會獲取config-repo/didispace-dev.properties中的from屬性返回。
  • 擴充套件config-client-eureka應用
    • 修改pom.xml增加spring-cloud-starter-bus-amqp模組(注意spring-boot-starter-actuator模組也是必須的)。
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
  • 在配置檔案中增加關於RabbitMQ的連線和使用者資訊
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=springcloud
spring.rabbitmq.password=123456
  • 啟動config-server-eureka,再啟動兩個config-client-eureka(分別在不同的埠上,比如7002、7003),我們可以在config-client-eureka中的控制檯中看到如下內容,在啟動時候,客戶端程式多了一個/bus/refresh請求。
o.s.b.a.e.mvc.EndpointHandlerMapping     : Mapped "{[/bus/refresh],methods=[POST]}" onto public void org.springframework.cloud.bus.endpoint.RefreshBusEndpoint.refresh(java.lang.String)
  • 先訪問兩個config-client-eureka的/from請求,會返回當前config-repo/didispace-dev.properties中的from屬性。
  • 接著,我們修改config-repo/didispace-dev.properties中的from屬性值,併發送POST請求到其中的一個/bus/refresh
  • 最後,我們再分別訪問啟動的兩個config-client-eureka的/from請求,此時這兩個請求都會返回最新的config-repo/didispace-dev.properties中的from屬性。

到這裡,我們已經能夠通過Spring Cloud Bus來實時更新總線上的屬性配置了。

原理分析

我們通過使用Spring Cloud Bus與Spring Cloud Config的整合,並以RabbitMQ作為訊息代理,實現了應用配置的動態更新。

整個方案的架構如上圖所示,其中包含了Git倉庫、Config Server、以及微服務“Service A”的三個例項,這三個例項中都引入了Spring Cloud Bus,所以他們都連線到了RabbitMQ的訊息總線上。

當我們將系統啟動起來之後,“Service A”的三個例項會請求Config Server以獲取配置資訊,Config Server根據應用配置的規則從Git倉庫中獲取配置資訊並返回。

此時,若我們需要修改“Service A”的屬性。首先,通過Git管理工具去倉庫中修改對應的屬性值,但是這個修改並不會觸發“Service A”例項的屬性更新。我們向“Service A”的例項3傳送POST請求,訪問/bus/refresh介面。此時,“Service A”的例項3就會將重新整理請求傳送到訊息匯流排中,該訊息事件會被“Service A”的例項1和例項2從匯流排中獲取到,並重新從Config Server中獲取他們的配置資訊,從而實現配置資訊的動態更新。

而從Git倉庫中配置的修改到發起/bus/refresh的POST請求這一步可以通過Git倉庫的Web Hook來自動觸發。由於所有連線到訊息總線上的應用都會接受到更新請求,所以在Web Hook中就不需要維護所有節點內容來進行更新,從而解決了通過Web Hook來逐個進行重新整理的問題。

指定重新整理範圍

上面的例子中,我們通過向服務例項請求Spring Cloud Bus的/bus/refresh介面,從而觸發總線上其他服務例項的/refresh。但是有些特殊場景下(比如:灰度釋出),我們希望可以重新整理微服務中某個具體例項的配置。

Spring Cloud Bus對這種場景也有很好的支援:/bus/refresh介面還提供了destination引數,用來定位具體要重新整理的應用程式。比如,我們可以請求/bus/refresh?destination=customers:9000,此時總線上的各應用例項會根據destination屬性的值來判斷是否為自己的例項名,若符合才進行配置重新整理,若不符合就忽略該訊息。

destination引數除了可以定位具體的例項之外,還可以用來定位具體的服務。定位服務的原理是通過使用Spring的PathMatecher(路徑匹配)來實現,比如:/bus/refresh?destination=customers:**,該請求會觸發customers服務的所有例項進行重新整理。

架構優化

既然Spring Cloud Bus的/bus/refresh介面提供了針對服務和例項進行配置更新的引數,那麼我們的架構也相應的可以做出一些調整。在之前的架構中,服務的配置更新需要通過向具體服務中的某個例項傳送請求,再觸發對整個服務叢集的配置更新。雖然能實現功能,但是這樣的結果是,我們指定的應用例項就會不同於叢集中的其他應用例項,這樣會增加叢集內部的複雜度,不利於將來的運維工作,比如:我們需要對服務例項進行遷移,那麼我們不得不修改Web Hook中的配置等。所以我們要儘可能的讓服務叢集中的各個節點是對等的。

因此,我們將之前的架構做了一些調整,如下圖所示:

我們主要做了這些改動:

  1. 在Config Server中也引入Spring Cloud Bus,將配置服務端也加入到訊息匯流排中來。
  2. /bus/refresh請求不在傳送到具體服務例項上,而是傳送給Config Server,並通過destination引數來指定需要更新配置的服務或例項。

通過上面的改動,我們的服務例項就不需要再承擔觸發配置更新的職責。同時,對於Git的觸發等配置都只需要針對Config Server即可,從而簡化了叢集上的一些維護工作。

kafka實現訊息匯流排

Spring Cloud Bus除了支援RabbitMQ的自動化配置之外,還支援現在被廣泛應用的Kafka。在本文中,我們將搭建一個Kafka的本地環境,並通過它來嘗試使用Spring Cloud Bus對Kafka的支援,實現訊息匯流排的功能。

Kafka簡介

Kafka是一個由LinkedIn開發的分散式訊息系統,它於2011年初開源,現在由著名的Apache基金會維護與開發。Kafka使用Scala實現,被用作LinkedIn的活動流和運營資料處理的管道,現在也被諸多網際網路企業廣泛地用作為資料流管道和訊息系統。

Kafka是基於訊息釋出/訂閱模式實現的訊息系統,其主要設計目標如下:

  • 訊息持久化:以時間複雜度為O(1)的方式提供訊息持久化能力,即使對TB級以上資料也能保證常數時間複雜度的訪問效能。
  • 高吞吐:在廉價的商用機器上也能支援單機每秒100K條以上的吞吐量
  • 分散式:支援訊息分割槽以及分散式消費,並保證分割槽內的訊息順序
  • 跨平臺:支援不同技術平臺的客戶端(如:Java、PHP、Python等)
  • 實時性:支援實時資料處理和離線資料處理
  • 伸縮性:支援水平擴充套件

Kafka中涉及的一些基本概念:

  • Broker:Kafka叢集包含一個或多個伺服器,這些伺服器被稱為Broker。
  • Topic:邏輯上同Rabbit的Queue佇列相似,每條釋出到Kafka叢集的訊息都必須有一個Topic。(物理上不同Topic的訊息分開儲存,邏輯上一個Topic的訊息雖然保存於一個或多個Broker上,但使用者只需指定訊息的Topic即可生產或消費資料而不必關心資料存於何處)
  • Partition:Partition是物理概念上的分割槽,為了提供系統吞吐率,在物理上每個Topic會分成一個或多個Partition,每個Partition對應一個資料夾(儲存對應分割槽的訊息內容和索引檔案)。
  • Producer:訊息生產者,負責生產訊息併發送到Kafka Broker。
  • Consumer:訊息消費者,向Kafka Broker讀取訊息並處理的客戶端。
  • Consumer Group:每個Consumer屬於一個特定的組(可為每個Consumer指定屬於一個組,若不指定則屬於預設組),組可以用來實現一條訊息被組內多個成員消費等功能。

快速入門

在對Kafka有了一些基本瞭解之後,下面我們來嘗試構建一個Kafka服務端,並體驗一下基於Kafka的訊息生產與消費。

環境安裝

首先,我們需要從官網上下載安裝介質。下載地址為:http://kafka.apache.org/downloads.html。本例中採用的版本為:Kafka-0.10.0.1

在解壓Kafka的安裝包之後,可以看到其目錄結構如下:

kafka
  +-bin
    +-windows
  +-config
  +-libs
  +-logs
  +-site-docs

由於Kafka的設計中依賴了ZooKeeper,所以我們可以在binconfig目錄中除了看到Kafka相關的內容之外,還有ZooKeeper相關的內容。其中bin目錄存放了Kafka和ZooKeeper的命令列工具,bin根目錄下是適用於Linux/Unix的shell,而bin/windows下的則是適用於windows下的bat。我們可以根據實際的系統來設定環境變數,以方便後續的使用和操作。而在config目錄中,則是用來存放了關於Kafka與ZooKeeper的配置資訊。

啟動測試

下面我們來嘗試啟動ZooKeeper和Kafka來進行訊息的生產和消費。示例中所有的命令均已配置了Kafka的環境變數為例。

  • 啟動ZooKeeper,執行命令:zookeeper-server-start config/zookeeper.properties,該命令需要指定zookeeper的配置檔案位置才能正確啟動,kafka的壓縮包中包含了其預設配置,開發與測試環境不需要修改。
[2016-09-28 08:05:34,849] INFO Reading configuration from: config\zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-09-28 08:05:34,850] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-09-28 08:05:34,851] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-09-28 08:05:34,851] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-09-28 08:05:34,852] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2016-09-28 08:05:34,868] INFO Reading configuration from: config\zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-09-28 08:05:34,869] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
...
[2016-09-28 08:05:34,940] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

從控制檯資訊中,我們可以看到ZooKeeper從指定的config/zookeeper.properties配置檔案中讀取資訊並繫結2181埠啟動服務。有時候啟動失敗,可檢視一下埠是否被佔用,可以殺掉佔用程序或通過修改config/zookeeper.properties配置檔案中的clientPort內容以繫結其他埠號來啟動ZooKeeper。

  • 啟動Kafka,執行命令:kafka-server-start config/server.properties,該命令也需要指定Kafka配置檔案的正確位置,如上命令中指向瞭解壓目錄包含的預設配置。若在測試時,使用外部集中環境的ZooKeeper的話,我們可以在該配置檔案中通過zookeeper.connect引數來設定ZooKeeper的地址和埠,它預設會連線本地2181埠的ZooKeeper;如果需要設定多個ZooKeeper節點,可以為這個引數配置多個ZooKeeper地址,並用逗號分割。比如:zookeeper.connect=127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002

  • 建立Topic,執行命令:kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test,通過該命令,建立一個名為“test”的Topic,該Topic包含一個分割槽一個Replica。在建立完成後,可以使用kafka-topics --list --zookeeper localhost:2181命令來檢視當前的Topic。

另外,如果我們不使用kafka-topics命令來手工建立,直接進行下面的內容進行訊息建立時也會自動建立Topics來使用。

  • 建立訊息生產者,執行命令:kafka-console-producer --broker-list localhost:9092 --topic testkafka-console-producer命令可以啟動Kafka基於命令列的訊息生產客戶端,啟動後可以直接在控制檯中輸入訊息來發送,控制檯中的每一行資料都會被視為一條訊息來發送。我們可以嘗試輸入幾行訊息,由於此時並沒有消費者,所以這些輸入的訊息都會被阻塞在名為test的Topics中,直到有消費者將其消費掉位置。

  • 建立訊息消費者,執行命令:kafka-console-consumer --zookeeper localhost:2181 --topic test --from-beginningkafka-console-consumer命令啟動的是Kafka基於命令列的訊息消費客戶端,在啟動之後,我們馬上可以在控制檯中看到輸出了之前我們在訊息生產客戶端中傳送的訊息。我們可以再次開啟之前的訊息生產客戶端來發送訊息,並觀察消費者這邊對訊息的輸出來體驗Kafka對訊息的基礎處理。

整合Spring Cloud Bus

在上一篇使用Rabbit實現訊息匯流排的案例中,我們已經通過引入spring-cloud-starter-bus-amqp模組,完成了使用RabbitMQ來實現的訊息匯流排。若我們要使用Kafka來實現訊息匯流排時,只需要把spring-cloud-starter-bus-amqp替換成spring-cloud-starter-bus-kafka模組,在pom.xml的dependenies節點中進行修改,具體如下:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>

如果我們在啟動Kafka時均採用了預設配置,那麼我們不需要再做任何其他配置就能在本地實現從RabbitMQ到Kafka的切換。我們可以嘗試把剛剛搭建的ZooKeeper、Kafka啟動起來,並將修改為spring-cloud-starter-bus-kafka模組的config-server和config-client啟動起來。

在config-server啟動時,我們可以在控制檯中看到如下輸出:

2016-09-28 22:11:29.627  INFO 15144 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder    : Using kafka topic for outbound: springCloudBus
2016-09-28 22:11:29.642  INFO 15144 --- [-localhost:2181] org.I0Itec.zkclient.ZkEventThread        : Starting ZkClient event thread.
...
016-09-28 22:11:30.290  INFO 15144 --- [           main] o.s.i.kafka.support.ProducerFactoryBean  : Using producer properties => {bootstrap.servers=localhost:9092, linger.ms=0, acks=1, compression.type=none, batch.size=16384}
2016-09-28 22:11:30.298  INFO 15144 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
...
2016-09-28 22:11:30.322  INFO 15144 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : Adding {message-handler:outbound.springCloudBus} as a subscriber to the 'springCloudBusOutput' channel
2016-09-28 22:11:30.322  INFO 15144 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'config-server:7001.springCloudBusOutput' has 1 subscriber(s).
2016-09-28 22:11:30.322  INFO 15144 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : started outbound.springCloudBus
...
2016-09-28 22:11:31.465  INFO 15144 --- [           main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframe[email protected]4178cb34
2016-09-28 22:11:31.467  INFO 15144 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$7  : Adding {message-handler:inbound.springCloudBus.anonymous.8b9e6c7b-6a50-48c5-b981-8282a0d5a30b} as a subscriber to the 'bridge.springCloudBus' channel
2016-09-28 22:11:31.467  INFO 15144 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$7  : started inbound.springCloudBus.anonymous.8b9e6c7b-6a50-48c5-b981-8282a0d5a30b

從控制檯的輸出內容,我們可以看到config-server連線到了Kafka中,並使用了名為springCloudBus的Topic。

此時,我們可以使用kafka-topics --list --zookeeper localhost:2181命令來檢視當前Kafka中的Topic,若已成功啟動了config-server並配置正確,我們就可以在Kafka中看到已經多了一個名為springCloudBus的Topic。

我們再啟動配置了spring-cloud-starter-bus-kafka模組的config-client,可以看到控制檯中輸出如下內容:

2016-09-28 22:43:55.067  INFO 6136 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder    : Using kafka topic for outbound: springCloudBus
2016-09-28 22:43:55.078  INFO 6136 --- [-localhost:2181] org.I0Itec.zkclient.ZkEventThread        : Starting ZkClient event thread.
...
2016-09-28 22:50:38.584  INFO 828 --- [           main] o.s.i.kafka.support.ProducerFactoryBean  : Using producer properties => {bootstrap.servers=localhost:9092, linger.ms=0, acks=1, compression.type=none, batch.size=16384}
2016-09-28 22:50:38.592  INFO 828 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
...
2016-09-28 22:50:38.615  INFO 828 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : Adding {message-handler:outbound.springCloudBus} as a subscriber to the 'springCloudBusOutput' channel
2016-09-28 22:50:38.616  INFO 828 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'didispace:7002.springCloudBusOutput' has 1 subscriber(s).
2016-09-28 22:50:38.616  INFO 828 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : started outbound.springCloudBus
...
2016-09-28 22:50:39.162  INFO 828 --- [           main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframe[email protected]60cf855e
2016-09-28 22:50:39.162  INFO 828 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$7  : Adding {message-handler:inbound.springCloudBus.anonymous.f8fc9c0c-ccd3-46dd-9537-07198f4ee216} as a subscriber to the 'bridge.springCloudBus' channel
2016-09-28 22:50:39.163  INFO 828 --- [           main] o.s.c.s.b.k.KafkaMessageChannelBinder$7  : started inbound.springCloudBus.anonymous.f8fc9c0c-ccd3-46dd-9537-07198f4ee216

可以看到,config-client啟動時輸出了類似的內容,他們都訂閱了名為springCloudBus的Topic。

在啟動了config-server和config-client之後,為了更明顯地觀察訊息匯流排重新整理配置的效果,我們可以在本地啟動多個不同埠的config-client。此時,我們的config-server以及多個config-client都已經連線到了由Kafka實現的訊息總線上。我們可以先訪問各個config-client上的/from請求,檢視他獲取到的配置內容。然後,修改Git中對應的引數內容,再訪問各個config-client上的/from請求,可以看到配置內容並沒有改變。最後,我們向config-server傳送POST請求:/bus/refresh,此時我們再去訪問各個config-client上的/from請求,就能獲得到最新的配置資訊,各客戶端上的配置都已經載入為最新的Git配置內容。

從config-client的控制檯中,我們可以看到如下內容:

2016-09-29 08:20:34.361  INFO 21256 --- [ kafka-binder-1] o.s.cloud.bus.event.RefreshListener      : Received remote refresh request. Keys refreshed [from]

RefreshListener監聽類記錄了收到遠端重新整理請求,並重新整理了from屬性的日誌。

Kafka配置

在上面的例子中,由於Kafka、ZooKeeper均運行於本地,所以我們沒有在測試程式中通過配置資訊來指定Kafka和ZooKeeper的配置資訊,就完成了本地訊息匯流排的試驗。但是我們實際應用中,Kafka和ZooKeeper一般都會獨立部署,所以在應用中都需要來為Kafka和ZooKeeper配置一些連線資訊等。Kafka的整合與RabbitMQ不同,在Spring Boot 1.3.7中並沒有直接提供的Starter模組,而是採用了Spring Cloud Stream的Kafka模組,所以對於Kafka的配置均採用了spring.cloud.stream.kafka的字首,比如:

屬性名 說明 預設值
spring.cloud.stream.kafka.binder.brokers Kafka的服務端列表 localhost
spring.cloud.stream.kafka.binder.defaultBrokerPort Kafka服務端的預設埠,當brokers屬性中沒有配置埠資訊時,就會使用這個預設埠 9092
spring.cloud.stream.kafka.binder.zkNodes Kafka服務端連線的ZooKeeper節點列表 localhost
spring.cloud.stream.kafka.binder.defaultZkPort ZooKeeper節點的預設埠,當zkNodes屬性中沒有配置埠資訊時,就會使用這個預設埠 2181

更多配置引數請參考官方文件