1. 程式人生 > >spring cloud 中訊息匯流排(bus)使用

spring cloud 中訊息匯流排(bus)使用

訊息系統

說到訊息系統大家耳熟能詳的幾個一般來說都有各自適用的場景,我們這裡簡單說一下幾個常見的訊息系統。

這裡寫圖片描述
ActiveMQ是比較老牌的訊息系統,當然了不一定是大家第一個熟知的訊息系統,因為現在電商、網際網路規模越來越大,不斷進入程式設計師眼簾的大多是Kafka和RocketMQ。ActiveMQ出現的要比他們早,而且涵蓋的功能也特別全,路由、備份、查詢、事務、叢集等等。他的美中不足是不能支撐超大規模、超高併發的網際網路應用,ActiveMQ的併發承受能力在百萬級別,大概500次/s的訊息頻率。


Kafka是新一代的訊息系統,相對於ActiveMQ來說增加了分片功能,類似於資料庫分庫分表,一臺Broker僅負責一部分資料收發,從而使得他的伸縮性特別好,通過增加Broker就可以不斷增加處理能力。一般來說,Kafka被用來處理日誌流,作為流計算的接入點。在電商的訂單、庫存等系統裡邊一般不用,主要顧慮的是Kafka非同步刷盤機制可能導致資料丟失。當然,對於資料丟失這一點不同的工程師也有不同的看法,認為Kafka的Master-Slave的多寫機制,完全能夠避免資料丟失。

這裡寫圖片描述
RocketMQ是阿里開源的一款訊息系統,開發的初衷就是要支撐阿里龐大的電商系統。RocketMQ和Kafka有很多相似之處,由於RocketMQ開發中很大程度上參考了Kafka的實現。RocketMQ同樣提供了優秀的分片機制,RocketMQ的分片比Kafka的分片有所增強,區分了絕對有序和非絕對有序兩種選項。另外RocketMQ採用的是同步刷盤,一般認為不會造成資料丟失。

這裡寫圖片描述
RabbitMQ類似於ActiveMQ也是一個相對小型的訊息系統,他的優勢在於靈活的路由機制,可以進行自由配置。

這裡寫圖片描述
Redis的pub/sub功能,由於Redis是記憶體級的系統,所以速度和單機的併發能力是上述四個訊息系統不能比擬的,但是也是由於記憶體儲存的緣故,在訊息的保障上就更弱一些。據說新浪部落格系統選擇了Redis的pub/sub作為訊息系統,不能不說藝高人膽大。

什麼時候用cloud bus

spring cloud bus在整個後端服務中起到聯通的作用,聯通後端的多臺伺服器。我們為什麼需要他做聯通呢?

後端伺服器一般都做了叢集化,很多臺伺服器,而且在大促活動期經常發生服務的擴容、縮容、上線、下線。這樣,後端伺服器的數量、IP就會變來變去,如果我們想進行一些線上的管理和維護工作,就需要維護伺服器的IP。

比如我們需要更新配置、比如我們需要同時失效所有伺服器上的某個快取,都需要向所有的相關伺服器傳送命令,也就是呼叫一個介面。

你可能會說,我們一般會採用zookeeper的方式,統一儲存伺服器的ip地址,需要的時候,向對應伺服器傳送命令。這是一個方案,但是他的解耦性、靈活性、實時性相比訊息匯流排都差那麼一點。

總的來說,就是在我們需要把一個操作散發到所有後端相關伺服器的時候,就可以選擇使用cloud bus了。

使用cloud bus之後,我們的服務端架構會變成這樣:

這裡寫圖片描述

cloud bus能做什麼

當前spring cloud bus提供了兩個可用的介面:1./bus/env用於設定某一個配置項2./bus/refresh用於重新整理所有繫結到重新整理點的配置項。

這兩個介面是使用spring boot actuator方式釋出出來的(可以參見:深入SpringBoot:自定義Endpoint一文),接收到訊息後會使用spring的stream框架(可以參考:張開濤的解Spring事件驅動模型一文)把訊息傳播到所有註冊的相關伺服器。

/bus/env的引數格式:

name=&value=&destination=

/bus/refresh的引數格式:

destination=

當然了,上述的destination引數都可以不提供。

spring cloud config 配合spring cloud bus實現配置資訊更新

spring cloud config 配置更新有兩種方式:1.配置git倉庫的web hook,當git倉庫有更新時自動呼叫bus提供的重新整理介面,重新整理快取;2.手工呼叫bus提供的重新整理介面。

不論一方案還是二方案區別僅在於是不同的人觸發了重新整理介面。實際上,線上伺服器一般很少採用自動重新整理的機制,都會在修改後,確認無誤後再執行重新整理。

關鍵的修改點是把所有的後端伺服器連線到同一個訊息系統上,然後監聽配置更新訊息。

安裝RabbitMQ

安裝方法很簡單,直接在官網下載對應的安裝檔案就可以了。
這裡寫圖片描述

因為RabbitMQ是Erlang語言寫的,所以如果你的機器上沒有安裝Erlang,那麼需要先安裝Erlang。

增加bus包的引用

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

增加RabbitMQ配置

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5671
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

增加@RefreshScope註解

@SpringBootApplication
@RestController
@RefreshScope
public class ConfigClientApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConfigClientApplication.class, args);
    }

    @Value("${app-name}")
    private String app_name;

    @RequestMapping("hi")
    public String hi(){
        return "hello "+ app_name;
    }
}

spring cloud擴充套件訊息匯流排方法