1. 程式人生 > >RabbitMQ實戰:運行和管理RabbitMQ

RabbitMQ實戰:運行和管理RabbitMQ

RabbitMQ

本系列是「RabbitMQ實戰:高效部署分布式消息隊列」書籍的總結筆記。

上一篇介紹了AMQP消息通信,包括隊列、交換器和綁定,通過虛擬主機還可以隔離數據和權限,消息持久化和發送方確認模式確保了消息不丟失。

本篇主要介紹如何運行和管理RabbitMQ,在介紹之前,會有個DEMO演示消息發送和接收,一方面對AMQP的元素有更直觀的認識,一方面為後面介紹監控做數據來源。

通過介紹,你會了解到:

  • 消息發送和接收簡單實現
  • 服務器管理-啟動和停止節點
  • 權限配置
  • 使用統計

消息發送和接收簡單實現

該Demo主要用於收集日誌,消息發送者是各個應用子系統,消息接收者是日誌收集服務,使用RabbitMQ可以很容易實現。

基於Spring Boot框架實現,主要類的作用如下:

  • LogRabbitConfig:創建隊列、交換器、綁定等初始化操作;
  • Sender:消息發送者;
  • AllReceiver:所有級別日誌接收者,接收所有級別的日誌;
  • ErrorReceiver:錯誤級別日誌接受者,只接收錯誤級別的日誌;
  • LogSenderTest:測試用例類;

消息模型如下:
技術分享圖片

配置

首先,配置spring boot和rabbitmq依賴:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.1.RELEASE</version>
    <relativePath/>
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <!--rabbitmq依賴-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

然後在application.properties文件中配置rabbitmq地址:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
LogRabbitConfig實現

使用Spring的@Configuration定義配置類,可替換xml配置文件,被註解的類內部包含有一個或多個被@Bean註解的方法,用於構建bean定義,初始化Spring容器。

@Configuration
public class LogRabbitConfig {
    final static String QUEUE_LOG_ERROR = "log.error";
    final static String QUEUE_LOG_ALL = "log.all";

    //創建log.error隊列
    @Bean
    public Queue logError() {
        return new Queue(QUEUE_LOG_ERROR);
    }
    //創建log.all隊列
    @Bean
    public Queue logAll() {
        return new Queue(QUEUE_LOG_ALL);
    }
    //創建exchange,命名為log
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("log");
    }
    //綁定log.error隊列到exchange,routingkey為log.error
    @Bean
    Binding bindingExchangeError(Queue logError, TopicExchange exchange) {
        return BindingBuilder.bind(logError).to(exchange).with("log.error");
    }
    //綁定log.all隊列到exchange,routingkey為log.#
    @Bean
    Binding bindingExchangeAll(Queue logAll, TopicExchange exchange) {
        return BindingBuilder.bind(logAll).to(exchange).with("log.#");
    }
}
Sender實現

各個子系統向rabbitmq服務器發送消息:

@Component
public class Sender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        //向mq服務端發送消息,exchange為log,routingkey為log.error
        String context = "error log";
        this.rabbitTemplate.convertAndSend("log", "log.error", context);

        //向mq服務端發送消息,exchange為log,routingkey為log.info
        context = "info log";
        System.out.println("send msg : " + context);
        this.rabbitTemplate.convertAndSend("log", "log.info", context);

        //向mq服務端發送消息,exchange為log,routingkey為log.warn
        context = "warn log";
        System.out.println("send msg : " + context);
        this.rabbitTemplate.convertAndSend("log", "log.warn", context);
    }

}
AllReceiver和ErrorReceiver實現

從rabbitmq服務器接收消息。

AllReceiver從服務器的log.all隊列獲取消息,因為它綁定的routingkey為"log.#",所以,會收到所有級別的日誌:

@Component
@RabbitListener(queues = "log.all")
public class AllReceiver {
    @RabbitHandler
    public void process(String context) {
        System.out.println("receive log : " + context);
    }
}

ErrorReceiver從服務器的log.error隊列獲取消息,因為它綁定的routingkey為"log.error",所以,只會收到error級別的日誌:

@Component
@RabbitListener(queues = "log.error")
public class ErrorReceiver {
    @RabbitHandler
    public void process(String context) {
        System.out.println("receive error : " + context);
    }
}
LogSenderTest測試用例

測試用例很簡單,就是調用Sender發送消息,觀察消息的接收情況。

@RunWith(SpringRunner.class)
@SpringBootTest
@SpringBootApplication
public class LogSenderTest {
    @Autowired
    Sender sender;

    @Test
    public void sendLog() {
        sender.send();
    }
}

運行日誌如下:
技術分享圖片

可以看到,error收到了2次,說明exchange同時分發給了log.all和log.error隊列,其他級別的日誌分發給了log.all隊列。

服務器管理-啟動和停止節點

RabbitMQ是用Erlang編寫的,Erlang天生就能讓應用程序無需知道對方是否在同一臺機器上即可相互通信,這讓集群和可靠的消息路由變得簡單。

理解節點和Erlang應用程序

和Java有JVM虛擬機類似,Erlang也有虛擬機,虛擬機的每個實例稱之為「節點」,不同的是,多個Erlang應用程序可以運行在同一個節點之上,如果應用程序崩潰了,Erlang節點會自動嘗試自動重啟應用程序。

節點的操作:

  • 後臺啟動節點:./rabbitmq-server -detached
  • 停止節點:./rabbitmqctl stop
  • 僅停止rabbit應用程序:./rabbitmqctl stop_app
配置文件

配置文件的格式本質上是原始的Erlang數據結構,是一個包含了嵌套哈希表的數組,如下:

[
    [mnesia , [{dump_log_write_threshold , 1000}]],
    [rabbit , [{vm_memory_high_wateremark , 0.4}]]
]

上面配置了2個應用,每個應用會有自己的哈希表來配置選項:

  • mnesia:是rabbitmq用來存儲交換器和隊列元數據的;
  • rabbit:是rabbitmq特定的配置選項;

每個應用如果有多個選項,用逗號隔開。

權限配置

RabbitMQ權限系統中,單個用戶可以跨越多個vhost進行授權,而且可以對讀、寫、配置分別授權。

首先創建一個用戶dongqingqing,密碼為123456:

./rabbitmqctl add_user dongqingqing 123456

授予dongqingqing用戶權限,可以讀取所有隊列和交換器,只可寫log.*格式的隊列和交換器,無法創建或刪除隊列和交換器

./rabbitmqctl set_permissions  dongqingqing ".*" "log.*" "" 

set_permissions 後面的參數分別為用戶名、讀權限、寫權限、配置權限。

其他詳細用法可查看文檔。

使用統計

查看數據統計

可通過rabbitmqctl命令查看數據統計信息,比如隊列和消息數目、交換器和綁定等。

查看所有隊列,包含上面demo定義的log.all和log.error:
技術分享圖片

查看所有交換器,包含上面demo定義的log
技術分享圖片

另外,rabbitmq提供了管理界面插件,更方便的查看各種統計,可以通過下面的命令開啟:

sudo ./rabbitmq-plugins enable rabbitmq_management

技術分享圖片

查看日誌

可以在文件系統中查看日誌,啟動rabbitmq後,會顯示日誌的路徑:
技術分享圖片

另外,可以通過AMQP獲取實時日誌信息,有一個amq.rabbitmq.log的topic交換器,監聽對應的隊列即可。

下一篇將介紹消息通信模式和最佳實踐,感謝大家持續關註。

歡迎掃描下方二維碼,關註我的個人微信公眾號 ~

技術分享圖片

RabbitMQ實戰:運行和管理RabbitMQ