SpringAMQP 使用者管理元件 RabbitAdmin 以及宣告式配置
RabbitAdmin 類可以很好的操作 rabbitMQ,在 Spring 中直接進行注入即可。
!!! 注意,autoStartup 必須設定為 true,否則 Spring 容器不會載入 RabbitAdmin 類。 複製程式碼
RabbitAdmin 類 的底層實現就是從 Spring 容器中獲取 exchange、Bingding、routingkey 以及queue 的 @bean 宣告,然後使用 rabbitTemplate 的 execute 方法進行執行對應的宣告、修改、刪除等一系列 rabbitMQ 基礎功能操作。例如新增交換機、刪除一個繫結、清空一個佇列裡的訊息等等
程式碼示例:
程式碼地址: github.com/hmilyos/rab… rabbitmq-api 專案下
RabbitAdmin 類的使用如下
rabbitMQ 的基本配置 複製程式碼
public interface RabbitMQCommon { final static String RABBITMQ_HOST = "www.mycentosserver.com"; final static int RABBITMQ_PORT = 5672; final static String RABBITMQ_DEFAULT_VIRTUAL_HOST = "/"; public final static String RABBITMQ_USERNAME = "guest"; public final static String RABBITMQ_PASSWORD = "guest"; } 複製程式碼
@Configuration @ComponentScan({"com.hmily.rabbitmqapi.spring.*"}) public class RabbitMQConfig { private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class); @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(RabbitMQCommon.RABBITMQ_HOST + ":" + RabbitMQCommon.RABBITMQ_PORT); connectionFactory.setUsername(RabbitMQCommon.RABBITMQ_USERNAME); connectionFactory.setPassword(RabbitMQCommon.RABBITMQ_PASSWORD); connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); returnrabbitAdmin; } 複製程式碼
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApiApplicationTests { private static final Logger log = LoggerFactory.getLogger(RabbitmqApiApplicationTests.class); @Test public void contextLoads() { } @Autowired private RabbitAdmin rabbitAdmin; @Test public void testAdmin() throws Exception { rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false)); rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false)); rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false)); rabbitAdmin.declareQueue(new Queue("test.direct.queue", false)); rabbitAdmin.declareQueue(new Queue("test.topic.queue", false)); rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false)); rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "direct", new HashMap<>())); rabbitAdmin.declareBinding( BindingBuilder .bind(new Queue("test.topic.queue", false))//直接建立佇列 .to(new TopicExchange("test.topic", false, false))//直接建立交換機 建立關聯關係 .with("user.#"));//指定路由Key rabbitAdmin.declareBinding( BindingBuilder .bind(new Queue("test.fanout.queue", false)) .to(new FanoutExchange("test.fanout", false, false))); //清空佇列資料 rabbitAdmin.purgeQueue("test.topic.queue", false); } } 複製程式碼
執行單元測試,然後去 rabbitMQ 的管控臺頁面驗證這些交換機、佇列以及繫結關係是否正確。
原始碼分析:
RabbitAdmin 類 的底層實現就是從 Spring 容器中獲取 exchange、Bingding、routingkey 以及queue 的 @bean 宣告,然後使用 rabbitTemplate 的 execute 方法進行執行對應的宣告、修改、刪除等一系列 rabbitMQ 基礎功能操作。例如新增交換機、刪除一個繫結、清空一個佇列裡的訊息等等


afterPropertiesSet 方法就是在 我們的 bean 載入後進行一些設定




SpringAMQP 的宣告式
回顧一下消費者配置
1. 設定交換機型別 2. 將佇列繫結到交換機 交換機型別: FanoutExchange 型別: 將訊息分發到所有的繫結佇列,無 routingkey 的概念 HeadersExchange 型別:通過新增屬性 key-value 匹配 DirectExchange :按照 routingkey 分發到指定佇列 TopicExchange : 多關鍵字匹配 複製程式碼
在 Rabbit 基礎 API 裡面宣告一個 Exchange、queue、binding 的程式碼如下所示

SpringAMQP 宣告即在 rabbit 基礎 API 裡面宣告一個 exchange、Bingding、queue。使用SpringAMQP 去宣告,就需要使用 @bean 的宣告方式。
-
針對消費者配置
-
- 設定交換機型別
-
- 將佇列繫結到交換機
-
-
FanoutExchange: 將訊息分發到所有的繫結佇列,無routingkey的概念
-
HeadersExchange :通過新增屬性key-value匹配
-
DirectExchange:按照routingkey分發到指定佇列
-
TopicExchange:多關鍵字匹配
具體的程式碼實現如下
在 RabbitMQConfig 裡面新增下面的程式碼 複製程式碼
/** *宣告 TopicExchange 型別的交換機 topic001 * @return */ @Bean public TopicExchange exchange001() { //是否持久化,是否自動刪除 return new TopicExchange("topic001", true, false); } //宣告 queue001 佇列 @Bean public Queue queue001() { return new Queue("queue001", true); //佇列持久 } //將上面的交換機和佇列繫結 @Bean public Binding binding001() { return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*"); } @Bean public TopicExchange exchange002() { return new TopicExchange("topic002", true, false); } @Bean public Queue queue002() { return new Queue("queue002", true); } @Bean public Binding binding002() { return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*"); } @Bean public Queue queue003() { return new Queue("queue003", true); //佇列持久 } @Bean public Binding binding003() { return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*"); } 複製程式碼
然後然後在單元測試裡面寫個方法測試一下,也可以利用剛才 RabbitAdmin 的單元測試,只要能跑起來就行,例如剛才的 testAdmin() ,執行單元測試,然後上管控太驗證剛才宣告的 exchange、Bingding、queue 是否都成功




自此,RabbitAdmin以及宣告式配置的簡單使用demo演示完畢