1. 程式人生 > >使用 ActiveMQ 實現JMS 非同步呼叫

使用 ActiveMQ 實現JMS 非同步呼叫

目錄

簡介

服務之間的同步呼叫,可以使用 HTTP 或 RPC 來完成,但並非所有的呼叫都需要同步,有些場景下,當客戶端呼叫服務端時,並不需要等待服務端做出響應,此時就應該使用非同步呼叫。非同步呼叫的常用方式是基於 MQ (Message Queue) 來實現的。下文會以 ActiveMQ 為例進行講解。

ActiveMQ 是 Java 世界中最為流行的開源訊息中介軟體,它不僅功能強大,而且效能穩定。它可全面支援 JMS(Java 訊息服務)技術規範,為 Java 應用程式提供標準的 JMS API。

此外 ActiveMQ 具備與 Spring 框架整合的能力,它一直都是 Spring 應用程式的訊息中介軟體標配。同樣, Spring Boot 也提供了 ActiveMQ 的開箱即用的外掛,只需要幾項配置,就能接入 ActiveMQ,並輕鬆使用 JMS API 編寫非同步訊息通訊程式。

Active MQ 官網地址如下

http://activemq.apache.org

啟動 ActiveMQ 伺服器

先使用 docker 安裝 ActiveMQ ,目前 ActiveMQ 官方並未提供相應的 Docker 映象,我們選擇使用第三方映象 webcenter/activemq

docker pull webcenter/activemq:5.14.3

接下來執行 ActiveMQ

docker run  -d -p 8161:8161 -p 61616:61616 -e ACTIVEMQ_ADMIN_LOGIN=admin -e ACTIVEMQ_ADMIN_PASSWORD=admin --name activemq webcenter/activemq:5.14.3

在啟動 ActiveMQ 容器時,容器對宿主機暴露了兩個埠號:

  • 8161: 表示 ActiveMQ 控制檯埠號,可在瀏覽器中通過控制檯來執行 ActiveMQ 的相關操作
  • 61616: 表示 ActiveMQ 所監聽的 TCP 埠號,應用程式可通過該埠號與 ActiveMQ 建立 TCP 連線,並完成後續的非同步訊息通訊

此外,在啟動 ActiveMQ 容器時,還提供了兩個環境變數

  • ACTIVEMQ_ADMIN_LOGIN: 用於設定控制檯管理員的使用者名稱,預設為 admin
  • ACTIVEMQ_ADMIN_PASSWORD: 用於設定控制檯管理員的密碼,預設為 admin

檢視控制檯

webcenter/activemq 映象擁有一個基於 Web 的控制檯,可通過瀏覽器訪問。容器啟動完畢後,可以開啟瀏覽器,並在位址列中輸入 http://localhost:8161

點選 Manage ActiveMQ borker 連結,瀏覽器將彈出一個對話方塊,此時輸入使用者名稱和密碼,認證通過後會進入管理介面

在管理介面中,包括 8 個功能選單

  1. Home: 基本資訊
  2. Queues: 管理的佇列
  3. Topics: 檢視所管理的主題
  4. Subscribers: 檢視相關主題的訂閱者
  5. Connections: 檢視客戶端的連線資訊
  6. Network: 檢視網路相關資訊
  7. Scheduled: 檢視 ActiveMQ 內部執行的定時任務
  8. Send: 通過表單方式查看向佇列或主題傳送具體訊息

ActiveMQ 的訊息通道

ActiveMQ 管理了兩類訊息通道,一類是佇列(Queue),另一類叫做主題(Topic)。

Queue

Queue 用於解決訊息的 點對點 通訊問題,也就是說,訊息從生產者(Producer) 發出後,首先進入 ActiveMQ 某個指定的 Queue 中,然後再將訊息傳送給其中一個消費者(Consumer)。

Topic

Topic 用於解決訊息的釋出與訂閱(Publish-subscribe) 通訊問題,也就是說,訊息從 Producer 發出後,首先將其釋出到 ActiveMQ 某個指定的 Topic 上,然後將此訊息分發給每個訂閱者(Subscriber) 。

比較

在具體場合下,靈活使用以上兩種通訊模式來實現 Producer 與 Consumer/Subscriber 間的非同步呼叫,從而解決呼叫方的耦合問題。可見,Queue 能解決呼叫緩衝問題,Topic 能解決訊息廣播問題, Queue 與 Topic 都能解決掉呼叫耦合問題,這些技術都為一個好的軟體架構提供了有效的支撐。

開發生產者和消費者

下面就以 Queue 為例,將 ActiveMQ 與 Spring Boot 進行整合,將 Producer 作為客戶端, Consumer 作為服務端,通過 Queue 實現客戶端與服務端的非同步呼叫

開發服務端(消費者)

首先建立一個名為 acitvemq-hello-server 的 spring boot 專案,如果在 eclipse 中安裝了 Spring Tools ,可以在新建時選擇 New Spring Starter Project 選項。或者新建 Maven 工程。對應的 maven 依賴如下

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.19.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
    </dependencies>

在 Spring Boot 框架中已經內建了對 ActiveMQ 的支援,我們只需要依賴 spring-boot-starter-mq 就能啟動 ActiveMQ,此時還需要在 application.properties 檔案中新增 ActiveMQ 配置項

spring.activemq.broker-url=tcp://10.104.10.1:61616
spring.activemq.user=admin
spring.activemq.password=admin

接下來建立 HelloServer 的類,封裝服務端相關程式碼

@Component
public class HelloServer {
    @JmsListener(destination="hello-queue")
    public void receive(String message) {
        System.out.println(message);
    }
}

使用 @Component 註解,說明它可被 Spring IoC 容器所管理。此時只需要使用 @JmsListener 註解,並將其繫結到 receive() 方法上,就能從 ActiveMQ 中接收響應的訊息。

@JmsListener 註解中需要新增一個 destination 屬性來指定 Queue/Topic 的名稱,該名稱具有唯一性。訊息將以一個 String 型別引數的形式傳入方法體中,也可以接收其他型別的訊息,這取決於客戶端傳送的訊息是哪種型別。Spring JMS 將訊息放入 ActiveMQ 時會進行序列化,當訊息從 ActiveMQ 取出時將進行反序列化,應用程式無需關注這些底層細節,只需要將精力放在業務邏輯上。

最後,編寫一個 Spring Boot 應用程式啟動類來啟動服務端(使用 spring tools 工具會自動生成)

@SpringBootApplication
public class ActivemqHelloServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ActivemqHelloServerApplication.class, args);
    }
}

當服務端啟動完畢後,將一直監聽 ActiveMQ 的 hello-queue 佇列中即將到來的訊息,訊息由客戶端來發送。

開發客戶端(生產者)

建立一個名為 active-mq-client 的 Maven 專案, pom.xml 檔案內容與服務端相似。application.properties 檔案與服務端相同。

接下來建立一個名為 HelloClient 的類,將其作為客戶端。

@Component
public class HelloClient {
    
    @Autowired
    private JmsTemplate jmsTemplate;
    
    public void send(String message) {
        jmsTemplate.convertAndSend("hello-queue", message);
    }
}

這裡使用了 @Autowired 註解, JmsTemplate 物件注入進來,還編寫了一個 send() 方法,在該方法中呼叫 JmsTemplate 物件的 convertAndSend 來轉換併發送訊息。

最後使用 Spring Boot 應用程式啟動類來啟動客戶端

@SpringBootApplication
public class ActivemqHelloClientApplication {

    @Autowired
    private HelloClient helloClient;
    
    @PostConstruct
    public void init() {
        helloClient.send("hello world");
    }
    
    public static void main(String[] args) {
        SpringApplication.run(ActivemqHelloClientApplication.class, args);
    }

}

需要注意的是, init() 方法帶有 @PostConstruct 註解,表示 Spring IoC 容器例項化 ActivemqHelloClientApplication 類後將呼叫該方法。

執行 main() 方法可以啟動客戶端應用程式,並可以在服務端應用程式控制臺中看到 client 傳送的訊息,也可以在 ActiveMQ 控制檯中檢視佇列的當前狀態

Queue 表格中列明的含義如下

  • Name 表示佇列名稱,可在應用程式中自動建立,也可在 ActiveMQ 控制檯中手動建立
  • Number Of Pending Messages 表示阻塞在佇列中未經消費的訊息條數
  • Number Of Consumers 表示正在與 ActiveMQ 建立連線的消費者數量
  • Messages Enqueued 表示進入佇列的訊息數量
  • Messages Dequeued 表示離開佇列的訊息數量

此外,還有下面幾種操作

  • Browser 用於檢視當前佇列中訊息的相關細節
  • Active Consumers 用於檢視當前活動消費者的相關資訊
  • Active Producers 用於檢視當前活動生產者的相關資訊
  • Send To 用於向當前佇列中傳送具體訊息
  • Purge 用於清空佇列中的訊息
  • Delete 用於刪除當前佇列

參考

  • 《架構探險—輕量級微服務架構》