1. 程式人生 > >RabbitMQ 安裝配置和 Spring 整合

RabbitMQ 安裝配置和 Spring 整合

本文從安裝和配置 RabbitMQ 開始,準備好環境後,直接在 Spring 中整合,並且針對 Spring 中的常見用法提供了示例和講解。

安裝

一般開發環境可能用的都是 Windows,生產環境 Linux 用的比較多,這裡針對 Windows 和 Ubuntu 的安裝說明簡單提煉。其他環境可以直接參考官方文件:https://www.rabbitmq.com/download.html

Windows 安裝

Windows 上安裝很容易,先安裝 Erlang/OTP 環境(注意和 RabbitMQ 版本匹配),再安裝 RabbitMQ 即可。

下載地址:

Ubuntu 安裝

1 為了使用儲存庫方式安裝最新版本,需要將 RabbitMQ 簽名祕鑰新增到 apt-key 中,從下面兩種方式選擇一種方式執行:

sudo apt-key adv --keyserver "hkps.pool.sks-keyservers.net" --recv-keys "0x6B73A36E6026DFCA"

或者

wget -O - "https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc" | sudo apt-key add -

第二種方式無需金鑰伺服器即可下載和匯入金鑰。
我使用的第一種。

2 然後在 packagecloud 有段指令碼(自動根據伺服器版本選擇對應的安裝源),當前(2018-11-30)的內容如下:

curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh |
sudo bash

3 執行該指令碼後,繼續然後執行下面的命令:

sudo apt-get update

更新後,可以通過下面命令檢視當前的 rabbitmq-server 的可用版本:

apt-cache madison rabbitmq-server

我這裡的結果(2018-12-01)顯示如下:

rabbitmq-server |    3.7.9-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server |    3.7.8-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server |    3.7.7-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server |    3.7.6-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server |    3.7.5-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server |   3.6.16-2 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server |   3.6.16-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server |   3.6.15-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server |   3.6.10-1 | http://archive.ubuntu.com/ubuntu bionic/main amd64 Packages

4 執行下面的命令安裝 rabbitmq-server

sudo apt-get install rabbitmq-server

此時安裝的應該是最新的版本。

可以通過 sudo apt-get install rabbitmq-server=3.7.9-1 安裝指定版本。

配置

接下來主要是在 Ubuntu 環境(Windows 環境類似)進行配置。由於沒有桌面環境,因此先通過命令建立可以外網訪問 rabbitmq 的使用者,然後啟用 management 在通過網頁進行管理。

新增使用者 root,密碼 root。(根據自己需要設定

sudo rabbitmqctl add_user root root

給 root 新增管理許可權。

sudo rabbitmqctl set_user_tags root administrator

給 root 新增預設虛擬主機的所有許可權。

sudo rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

Windows 中的操作過程

D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.9\sbin>rabbitmqctl.bat add_user root root
Adding user "root" ...

D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.9\sbin>rabbitmqctl.bat set_user_tags root administrator
Setting tags for user "root" to [administrator] ...

D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.9\sbin>rabbitmqctl.bat set_permissions -p / root ".*" ".*" ".*"
Setting permissions for user "root" in vhost "/" ...

啟用 rabbitmq_management

sudo rabbitmq-plugins enable rabbitmq_management

啟用 rabbitmq_management 後不需要重啟服務

此後可以直接訪問 rabbitmq 的 http://RabbitMQ服務IP:15672 通過 WEB 進行管理。

備忘錄(暫時不用關注這裡,測試叢集時可用)
單機啟動多個帶有 rabbitmq_management 節點時的配置

RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbitl RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port , 156721}]" rabbitmq-server -detached

參考 RabbitMQ實戰指南 7.1.5 單機多節點配置

準備好 RabbitMQ 環境後,下面直接和 Spring 整合。

初學者建議先通過官方示例瞭解 RabbitMQ 的基本概念和用法:https://www.rabbitmq.com/getstarted.html

Spring 整合

下面先是 Spring 整合的配置,然後是專案中具體的用法。

下面示例所有連結都可以直接開啟展示完整內容。
完整示例地址:https://github.com/abel533/spring-rabbitmq-demo

配置

1 新增相關依賴

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.7.11.RELEASE</version>
</dependency>
<!-- spring-rabbit 依賴 spring-amqp,下面這個依賴可以不顯示引入 -->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-amqp</artifactId>
    <version>1.7.11.RELEASE</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.8.11.1</version>
</dependency>
<dependency>
    <groupId>org.codehaus.jackson</groupId>
    <artifactId>jackson-mapper-asl</artifactId>
    <version>1.9.13</version>
</dependency>

2 配置檔案

spring-rabbit 配置單獨放在一個檔案中,需要的時候可以直接在 Spring 中 <import>

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns="http://www.springframework.org/schema/beans"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
       http://www.springframework.org/schema/task
       http://www.springframework.org/schema/task/spring-task.xsd">

    <!--啟用註解監聽訊息-->
    <rabbit:annotation-driven/>

    <!--連線工廠配置-->
    <rabbit:connection-factory id="rabbitConnectionFactory"
                               thread-factory="amqpThreadFactory"
                               virtual-host="${rabbitmq.virtual-host:/}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               channel-cache-size="${rabbitmq.channel-cache-size:30}"
                               addresses="${rabbitmq.addresses}"/>

    <bean id="amqpThreadFactory" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
        <constructor-arg value="rabbitmq-"/>
    </bean>

    <!--訊息模板-->
    <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"
                     message-converter="amqpMessageConverter"/>

    <!--訊息轉換,生產者和消費者都需要 -->
    <bean id="amqpMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>

    <!--amqp管理-->
    <rabbit:admin id="amqpAdmin" connection-factory="rabbitConnectionFactory"/>

    <!--訊息監聽容器,配合註解監聽訊息-->
    <bean id="rabbitListenerContainerFactory"
          class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
        <property name="connectionFactory" ref="rabbitConnectionFactory"/>
        <!--併發消費者數量-->
        <property name="concurrentConsumers" value="${rabbitmq.concurrentConsumers:3}"/>
        <!--最大數量-->
        <property name="maxConcurrentConsumers" value="${rabbitmq.maxConcurrentConsumers:10}"/>
        <!--訊息轉換-->
        <property name="messageConverter" ref="amqpMessageConverter"/>
        <!--任務執行緒池-->
        <property name="taskExecutor">
            <task:executor id="amqpTaskExecutor" pool-size="${rabbitmq.task-executor.pool-size:100}"/>
        </property>
        <!--手動確認-->
        <property name="acknowledgeMode" value="${rabbitmq.acknowledgeMode:MANUAL}"/>
    </bean>

</beans>

3 Spring 配置檔案中需要提供的配置

# rabbitmq 訊息配置
rabbitmq.addresses=localhost:5672
rabbitmq.virtual-host=/
rabbitmq.username=root
rabbitmq.password=root
rabbitmq.channel-cache-size=50
rabbitmq.concurrentConsumers=3
rabbitmq.maxConcurrentConsumers=10
# 確認方式 MANUAL 手動,AUTO 自動,NONE 自動確認
rabbitmq.acknowledgeMode=MANUAL
# 執行緒池數量 = 併發數 * 監聽數
rabbitmq.task-executor.pool-size=100

下面是和 Spring 整合後的用法。

測試中,增加了 spring.xml 配置檔案,內容如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns="http://www.springframework.org/schema/beans"
       xmlns:task="http://www.springframework.org/schema/task"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/task
       http://www.springframework.org/schema/task/spring-task.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd">

    <!--載入屬性配置檔案-->
    <context:property-placeholder location="classpath:META-INF/spring/application.properties"/>

    <!--掃描包-->
    <context:component-scan base-package="rabbitmq"/>

    <!--Producter 中的任務排程使用-->
    <task:scheduler id="taskScheduler"/>
    <task:annotation-driven scheduler="taskScheduler"/>

    <!--引入 spring-rabbitmq 配置-->
    <import resource="classpath*:META-INF/spring/spring-rabbitmq.xml"/>

</beans>

生產者

示例程式碼 如下:

@Component
public class Producter {
    public static final Logger logger = LoggerFactory.getLogger(Producter.class);

    @Autowired
    private AmqpTemplate template;

    @Autowired
    private AmqpAdmin admin;

    @PostConstruct
    protected void init() {
        //定義交換機
        Exchange exchange = ExchangeBuilder.topicExchange("logger").durable(true).build();
        admin.declareExchange(exchange);
        //還可以定義佇列和繫結
    }

    final Random random = new Random();
    final String[] keys = new String[]{"logger.error", "logger.warn", "logger.info"};
    AtomicInteger count = new AtomicInteger();

    @Scheduled(fixedDelay = 1000)
    protected void product() {
        String key = keys[random.nextInt(3)];
        int i = count.getAndIncrement();
        String message = key + " > " + i + " " + new Date();
        User obj = new User(message, i);
        template.convertAndSend("logger", key, obj);
        logger.info("[Send] " + obj);
    }

}
  1. 在程式碼中直接注入 AmqpTemplate,用於傳送或接收訊息。
  2. 根據需要注入 AmqpAdmin,可以用於建立交換機、佇列和繫結。

上面程式碼中,在 init 初始化中定義了一個交換機。通過 product 定時任務,每隔 1000 毫秒執行一次,呼叫 template.convertAndSend("logger", key, obj); 傳送訊息,傳送的物件會根據前面 spring-rabbit 配置檔案中的訊息轉換器轉換為 JSON 資料進行傳送。

生產者的邏輯可以根據業務需要進行定製。

消費者

消費者有多種用法,這裡使用最方便的註解用法。

Consumer 程式碼中,有 3 個例子,這裡拿第一個進行講解:

/**
 * 接收物件的例子
 *
 * 該方法還可以直接注入 org.springframework.amqp.core.Message 物件
 *
 * @param data
 * @param deliveryTag
 * @param channel
 */
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "logger.all", durable = "true"),
        exchange = @Exchange(value = "logger", 
                             durable = "true", 
                             ignoreDeclarationExceptions = "true", 
                             type = ExchangeTypes.TOPIC),
        key = "logger.#"
))
public void all(User data, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag, Channel channel) {
    try {
        //測試用,隨機確認和拒絕(並返回佇列)
        if(Math.random() > 0.5d){
            logger.info("[reject] deliveryTag:" + deliveryTag + ", message: " + data);
            channel.basicReject(deliveryTag, true);
        } else {
            logger.info("[ack   ] deliveryTag:" + deliveryTag + ", message: " + data);
            channel.basicAck(deliveryTag, false);
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

註解

消費者監聽的主要註解就是 @RabbitListener,上面例子是一個比較複雜的用法,下面從簡單開始說起。
最簡單的情況下,註解用法如下:

@RabbitListener(queues = "myQueue")
public void processOrder(String data) {
    ...
}

這種情況下,要求 myQueue 佇列已經存在,這樣就能直接監聽該佇列。除此之外這裡接收的引數要求是字串型別,和消費者傳送的訊息型別需要一致。

再稍微簡單點的情況下,用法如下:

@RabbitListener(bindings = @QueueBinding(
      value = @Queue,
      exchange = @Exchange(value = "auto.exch"),
      key = "invoiceRoutingKey")
)
public void processInvoice(String data) {
  ...
}

實際上這裡已經有些複雜了,這個例子的特點就是,不需要事先存在交換機、佇列和繫結。Spring 在啟動的時候會根據這裡的註解去建立這三者(RabbitMQ 規則是如果佇列、交換機已經存在,在引數相同的情況下會直接複用,不會建立新的,如果引數不同會報錯)。這裡的佇列只用了 @Queue,因此會建立一個匿名獨佔自動刪除的佇列。交換機的名字指定了 auto.exch,佇列和交換機通過 invoiceRoutingKey 進行繫結。

現在再來看本例的用法:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "logger.all", durable = "true"),
        exchange = @Exchange(value = "logger", 
                             durable = "true", 
                             ignoreDeclarationExceptions = "true", 
                             type = ExchangeTypes.TOPIC),
        key = "logger.#"
))

這裡建立了一個指定名稱的佇列,並且配置了持久化。還建立了一個支援持久化的交換機,型別為 TOPIC,並且忽略交換機的宣告異常(如果已經存在並且屬性不同時,忽略此異常)。通過 logger.# 進行匹配,在主題交換機中,有兩個特殊的字元 *#,分別匹配一個逗號隔開的單詞和任意(可0)單詞。因此這裡能匹配 logger.infologger.xxx.debug 等路由。

除了上面這些常見用法外,還有一個特殊的情況,可以根據接收型別自動匹配的用法,如下:

@RabbitListener(id="multi", queues = "someQueue")
public class MultiListenerBean {

    @RabbitHandler
    @SendTo("my.reply.queue")
    public String bar(Bar bar) {
        ...
    }

    @RabbitHandler
    public String baz(Baz baz) {
        ...
    }

    @RabbitHandler
    public String qux(@Header("amqp_receivedRoutingKey") String rk, @Payload Qux qux) {
        ...
    }

}

在類上使用了 @RabbitListener 註解,在方法上使用了 @RabbitHandler 註解。在監聽 someQueue 佇列時,會根據訊息的實際型別,呼叫匹配的方法(Bar, BazQux)。

特別注意:只有上面這種用法下才會根據型別進行匹配,直接在方法上使用 @RabbitListener 註解時不會自動匹配。

下面來看看這個引數需要注意的地方。

引數

在我們配置的 JSON 轉換中,除了轉換的 JSON 串之外,在訊息中還記錄了型別的資訊。如下圖所示:
在這裡插入圖片描述
可以看到在訊息屬性頭中,通過 __TypeId__ 記錄了訊息物件的實際型別,因此在 Spring 中的序列化和反序列化中能夠根據這裡的型別進行轉換,當接收型別和這裡指定的型別不一致時會報錯(只有前面 @RabbitHandler 用法中會去匹配正確的方法,無法匹配時報錯)。

Spring AMQP 中支援以下幾類引數:

  1. 訊息物件(payload),如果引數型別不能明確匹配時,需要通過 @Payload 指定訊息體。
  2. com.rabbitmq.client.Channel,訊息通道,可以呼叫 AMQP 的基本方法,常用於 ack 和 reject。
  3. @Header 註解的引數,從訊息頭提取指定的資訊。
  4. org.springframework.amqp.core.Message 訊息的原始物件。
  5. org.springframework.messaging.Message<T> 訊息介面,通過泛型指定訊息體型別,可以在 1 的基礎上額外獲取訊息頭資訊。

ack 和 reject

在本例中,由於要手動 ACK 或 REJECT,所以在訊息體之外還注入了 @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTagChannel

在業務邏輯執行完成後或者發生異常時,根據具體的情況來選擇執行。

如果業務順利執行完成,我們可以直接通過 channel.basicAck(deliveryTag, false); 確認消費,此後訊息佇列會刪除這條已消費的訊息。

如果業務中出現了異常,需要具體分析,如果只是網路或可以重試的問題,我們可以通過 channel.basicReject(deliveryTag, true); 將訊息返還給訊息佇列。如果出現的是問題是業務邏輯或者就算重複執行仍然有問題的情況,可能就需要通過 channel.basicReject(deliveryTag, false);刪除該訊息(存在死信佇列的情況會接收該訊息,可以進行後續處理)。

總結

學會使用 RabbitMQ 是一件很容易的事情,但是用好用對是很不容易的事。不同常見和業務都需要考慮使用什麼型別的交換機,使用什麼樣的佇列,每個佇列分配多少個併發,這些都很重要。

想要真正用好訊息佇列,還需要學習很多知識,你可以通過下面的參考資料瞭解更多。

參考資料

在我學 RabbitMQ 的過程中,下面這些資料是特別有用的,都是官方提供的專案文件,必要的時候可以多看幾遍。

  1. https://www.rabbitmq.com
  2. https://www.rabbitmq.com/man/rabbitmqctl.8.html
  3. https://docs.spring.io/spring-amqp/docs/1.7.11.RELEASE/reference/html/index.html

除此之外,我還參考了下面兩本書:

  • RabbitMQ實戰:高效部署分散式訊息佇列
  • RabbitMQ實戰指南

第一本書更多的像文件,第二本書有更多作者的心得和技巧。