Java SpringBoot整合RabbitMq實戰和總結
目錄
- 交換器、佇列、繫結的宣告
- 註解將訊息和訊息頭注入消費者方法
- 消費訊息、死信佇列和RetryTemplate
- RPC模式的訊息(不常用)
- 關於RabbitMq客戶端的執行緒模型
在公司裡一直在用RabbitMQ,由於api已經封裝的很簡單,關於RabbitMQ本身還有封裝的實現沒有了解,最近在看RabbitMQ實戰這本書,結合網上的一些例子和spring文件,實現了RabbitMQ和spring的整合,對著自己平時的疑惑做了一些總結。
關於RabbitMQ基礎不在詳細講解(本文不適合RabbitMq零基礎), ofollow,noindex" target="_blank">RabbitMQ實戰 的1,2,4三章講的非常不錯。因為書中講的都是Python和Php的例子,所以自己結合SpringBoot文件和 朱小廝的部落格 做了一些總結,寫了一些Springboot的 例子 。
交換器、佇列、繫結的宣告
SpringAMQP專案對RabbitMQ做了很好的封裝,可以很方便的手動宣告佇列,交換器,繫結。如下:
/** * 佇列 * @return */ @Bean @Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE) Queue queue() { return new Queue(RabbitMQConstant.PROGRAMMATICALLY_QUEUE, false, false, true); } /** * 交換器 * @return */ @Bean @Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE) TopicExchange exchange() { return new TopicExchange(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE, false, true); } /** * 宣告繫結關係 * @return */ @Bean Binding binding(@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE) TopicExchange exchange, @Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE) Queue queue) { return BindingBuilder.bind(queue).to(exchange).with(RabbitMQConstant.PROGRAMMATICALLY_KEY); } /** * 宣告簡單的消費者,接收到的都是原始的{@link Message} * * @param connectionFactory * * @return */ @Bean SimpleMessageListenerContainer simpleContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setMessageListener(message -> log.info("simple receiver,message:{}", message)); container.setQueueNames(RabbitMQConstant.PROGRAMMATICALLY_QUEUE); return container; }
消費者和生產者都可以宣告,交換器這種一般經常建立,可以手動建立。需要注意對於沒有路由到佇列的訊息會被丟棄。
如果是Spring的話還需要宣告連線:
@Bean ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.port}") int port, @Value("${spring.rabbitmq.host}") String host, @Value("${spring.rabbitmq.username}") String userName, @Value("${spring.rabbitmq.password}") String password, @Value("${spring.rabbitmq.publisher-confirms}") boolean isConfirm, @Value("${spring.rabbitmq.virtual-host}") String vhost) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setVirtualHost(vhost); connectionFactory.setPort(port); connectionFactory.setUsername(userName); connectionFactory.setPassword(password); connectionFactory.setPublisherConfirms(isConfirm); }
在配置類使用 @EnableRabbit
的情況下,也可以基於註解進行宣告,在Bean的方法上加上 @RabbitListener
,如下:
/** * 可以直接通過註解宣告交換器、繫結、佇列。但是如果宣告的和rabbitMq中已經存在的不一致的話 * 會報錯便於測試,我這裡都是不使用持久化,沒有消費者之後自動刪除 * {@link RabbitListener}是可以重複的。並且宣告佇列繫結的key也可以有多個. * * @param headers * @param msg */ @RabbitListener( bindings = @QueueBinding( exchange = @Exchange(value = RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), value = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), key = DKEY ), //手動指明消費者的監聽容器,預設Spring為自動生成一個SimpleMessageListenerContainer containerFactory = "container", //指定消費者的執行緒數量,一個執行緒會開啟一個Channel,一個佇列上的訊息只會被消費一次(不考慮訊息重新入佇列的情況),下面的表示至少開啟5個執行緒,最多10個。執行緒的數目需要根據你的任務來決定,如果是計算密集型,執行緒的數目就應該少一些 concurrency = "5-10" ) public void process(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) { log.info("basic consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}"); } /** * {@link Queue#ignoreDeclarationExceptions}宣告佇列會忽略錯誤不宣告佇列,這個消費者仍然是可用的 * * @param headers * @param msg */ @RabbitListener(queuesToDeclare = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, ignoreDeclarationExceptions = RabbitMQConstant.true_CONSTANT)) public void process2(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) { log.info("basic2 consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}"); }
關於訊息序列化
這個比較簡單,預設採用了Java序列化,我們一般使用的Json格式,所以配置了Jackson,根據自己的情況來,直接貼程式碼:
@Bean MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
同一個佇列多消費型別
如果是同一個佇列多個消費型別那麼就需要針對每種型別提供一個消費方法,否則找不到匹配的方法會報錯,如下:
@Component @Slf4j @RabbitListener( bindings = @QueueBinding( exchange = @Exchange(value = RabbitMQConstant.MULTIPART_HANDLE_EXCHANGE, type = ExchangeTypes.TOPIC, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), value = @Queue(value = RabbitMQConstant.MULTIPART_HANDLE_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), key = RabbitMQConstant.MULTIPART_HANDLE_KEY ) ) @Profile(SpringConstant.MULTIPART_PROFILE) public class MultipartConsumer { /** * RabbitHandler用於有多個方法時但是引數型別不能一樣,否則會報錯 * * @param msg */ @RabbitHandler public void process(ExampleEvent msg) { log.info("param:{msg = [" + msg + "]} info:"); } @RabbitHandler public void processMessage2(ExampleEvent2 msg) { log.info("param:{msg2 = [" + msg + "]} info:"); } /** * 下面的多個消費者,消費的型別不一樣沒事,不會被呼叫,但是如果缺了相應訊息的處理Handler則會報錯 * * @param msg */ @RabbitHandler public void processMessage3(ExampleEvent3 msg) { log.info("param:{msg3 = [" + msg + "]} info:"); } }
註解將訊息和訊息頭注入消費者方法
在上面也看到了 @Payload
等註解用於注入訊息。這些註解有:
- @Header 注入訊息頭的單個屬性
- @Payload 注入訊息體到一個JavaBean中
- @Headers 注入所有訊息頭到一個Map中
這裡有一點主要注意,如果是 com.rabbitmq.client.Channel
, org.springframework.amqp.core.Message
和 org.springframework.messaging.Message
這些型別,可以不加註解,直接可以注入。
如果不是這些型別,那麼不加註解的引數將會被當做訊息體。不能多於一個訊息體。如下方法ExampleEvent就是預設的訊息體:
public void process2(@Headers Map<String, Object> headers,ExampleEvent msg);
關於消費者確認
RabbitMq消費者可以選擇手動和自動確認兩種模式,如果是自動,訊息已到達佇列,RabbitMq對無腦的將訊息拋給消費者,一旦傳送成功,他會認為消費者已經成功接收,在RabbitMq內部就把訊息給刪除了。另外一種就是手動模式,手動模式需要消費者對每條訊息進行確認(也可以批量確認),RabbitMq傳送完訊息之後,會進入到一個待確認(unacked)的佇列,如下圖紅框部分:
如果消費者傳送了ack,RabbitMq將會把這條訊息從待確認中刪除。如果是nack並且指明不要重新入佇列,那麼該訊息也會刪除。但是如果是nack且指明瞭重新入佇列那麼這條訊息將會入佇列,然後重新發送給消費者,被重新投遞的訊息訊息頭amqp_redelivered屬性會被設定成true,客戶端可以依靠這點來判斷訊息是否被確認,可以好好利用這一點,如果每次都重新回佇列會導致同一訊息不停的被髮送和拒絕。消費者在確認訊息之前和RabbitMq失去了連線那麼訊息也會被重新投遞。所以手動確認模式很大程度上提高可靠性。自動模式的訊息可以提高吞吐量。
spring手動確認訊息需要將 SimpleRabbitListenerContainerFactory
設定為手動模式:
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
手動確認的消費者程式碼如下:
@SneakyThrows @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = RabbitMQConstant.CONFIRM_EXCHANGE, type = ExchangeTypes.TOPIC, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), value = @Queue(value = RabbitMQConstant.CONFIRM_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), key = RabbitMQConstant.CONFIRM_KEY), containerFactory = "containerWithConfirm") public void process(ExampleEvent msg, Channel channel, @Header(name = "amqp_deliveryTag") long deliveryTag, @Header("amqp_redelivered") boolean redelivered, @Headers Map<String, String> head) { try { log.info("ConsumerWithConfirm receive message:{},header:{}", msg, head); channel.basicAck(deliveryTag, false); } catch (Exception e) { log.error("consume confirm error!", e); //這一步千萬不要忘記,不會會導致訊息未確認,訊息到達連線的qos之後便不能再接收新訊息 //一般重試肯定的有次數,這裡簡單的根據是否已經重發過來來決定重發。第二個引數表示是否重新分發 channel.basicReject(deliveryTag, !redelivered); //這個方法我知道的是比上面多一個批量確認的引數 // channel.basicNack(deliveryTag, false,!redelivered); } }
關於spring的AcknowledgeMode需要說明,他一共有三種模式:NONE,MANUAL,AUTO,預設是AUTO模式。這比RabbitMq原生多了一種。這一點很容易混淆,這裡的NONE對應其實就是RabbitMq的自動確認,MANUAL是手動。而AUTO其實也是手動模式,只不過是Spring的一層封裝,他根據你方法執行的結果自動幫你傳送ack和nack。如果方法未丟擲異常,則傳送ack。如果方法丟擲異常,並且不是 AmqpRejectAndDontRequeueException
則傳送nack,並且重新入佇列。如果丟擲異常時 AmqpRejectAndDontRequeueException
則傳送nack不會重新入佇列。我有一個例子專門測試NONE,見 CunsumerWithNoneTest
。
還有一點需要注意的是消費者有一個引數prefetch,它表示的是一個
關於傳送者確認模式
考慮這樣一個場景:你傳送了一個訊息給RabbitMq,RabbitMq接收了但是存入磁碟之前伺服器就掛了,訊息也就丟了。為了保證訊息的投遞有兩種解決方案,最保險的就是事務(和DB的事務沒有太大的可比性), 但是因為事務會極大的降低效能,會導致生產者和RabbitMq之間產生同步(等待確認),這也違背了我們使用RabbitMq的初衷。所以一般很少採用,這就引入第二種方案:傳送者確認模式。
傳送者確認模式是指傳送方傳送的訊息都帶有一個id,RabbitMq會將訊息持久化到磁碟之後通知生產者訊息已經成功投遞,如果因為RabbitMq內部的錯誤會發送ack。注意這裡的傳送者和RabbitMq之間是非同步的,所以相較於事務機制效能大大提高。其實很多操作都是不能保證絕對的百分之一百的成功,哪怕採用了事務也是如此,可靠性和效能很多時候需要做一些取捨,想很多網際網路公司吹噓的5個9,6個9也是一樣的道理。如果不是重要的訊息效能計數器,完全可以不採用傳送者確認模式。
這裡有一點我當時糾結了很久,我一直以為傳送者確認模式的回撥是客戶端的ack觸發的,這裡是大大的誤解!傳送者確認模式和消費者沒有一點關係,消費者確認也和傳送者沒有一點關係,兩者都是在和RabbitMq打交道,傳送者不會管消費者有沒有收到,只要訊息到了RabbitMq並且已經持久化便會通知生產者,這個ack是RabbitMq本身發出的,和消費者無關
傳送者確認模式需要將Channel設定成Confirm模式,這樣才會收到通知。Spring中需要將連線設定成Confirm模式:
connectionFactory.setPublisherConfirms(isConfirm);
然後在RabbitTemplate中設定確認的回撥,correlationData是訊息的id,如下(只是簡單列印下):
// 設定RabbitTemplate每次傳送訊息都會回撥這個方法 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("confirm callback id:{},ack:{},cause:{}", correlationData, ack, cause));
傳送時需要給出唯一的標識( CorrelationData
):
rabbitTemplateWithConfirm.convertAndSend(RabbitMQConstant.DEFAULT_EXCHANGE, RabbitMQConstant.DEFAULT_KEY, new ExampleEvent(i, "confirm message id:" + i), new CorrelationData(Integer.toString(i)));
還有一個引數需要說下:mandatory。這個引數為true表示如果傳送訊息到了RabbitMq,沒有對應該訊息的佇列。那麼會將訊息返回給生產者,此時仍然會發送ack確認訊息。
設定RabbitTemplate的回撥如下:
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("return callback message:{},code:{},text:{}", message, replyCode, replyText));
另外如果是RabbitMq內部的錯誤,不會呼叫該方法。所以如果訊息特別重要,對於未確認的訊息,生產者應該在記憶體用儲存著,在確認時候根據返回的id刪除該訊息。如果是nack可以將該訊息記錄專門的日誌或者轉發到相應處理的邏輯進行後續補償。RabbitTemplate也可以配置RetryTemplate,傳送失敗時直接進行重試,具體還是要結合業務。
最後關於傳送者確認需要提的是spring,因為spring預設的Bean是單例的,所以針對不同的確認方案(其實有不同的確認方案是比較合理的,很多訊息不需要確認,有些需要確認)需要配置不同的bean.
消費訊息、死信佇列和RetryTemplate
上面也提到了如果消費者丟擲異常時預設的處理邏輯。另外我們還可以給消費者配置RetryTemplate,如果是採用SpringBoot的話,可以在application.yml配置中配置如下:
spring: rabbitmq: listener: retry: #重試次數 max-attempts: 3 #開啟重試機制 enabled: true
如上,如果消費者失敗的話會進行重試,預設是3次。 注意這裡的重試機制RabbitMq是為感知的 !到達3次之後會丟擲異常呼叫 MessageRecoverer
。預設的實現為RejectAndDontRequeueRecoverer,也就是列印異常,傳送nack,不會重新入佇列。
我想既然配置了重試機制訊息肯定是很重要的,訊息肯定不能丟,僅僅是日誌可能會因為日誌滾動丟失而且資訊不明顯,所以我們要講訊息儲存下來。可以有如下這些方案:
- 使用RepublishMessageRecoverer這個MessageRecoverer會發送傳送訊息到指定佇列
- 給佇列繫結死信佇列,因為預設的RepublishMessageRecoverer會發送nack並且requeue為false。這樣丟擲一場是這種方式和上面的結果一樣都是轉發到了另外一個佇列。詳見DeadLetterConsumer
- 註冊自己實現的MessageRecoverer
- 給MessageListenerContainer設定RecoveryCallback
- 對於方法手動捕獲異常,進行處理
我比較推薦前兩種。這裡說下死信佇列,死信佇列其實就是普通的佇列,只不過一個佇列宣告的時候指定的屬性,會將死信轉發到該交換器中。宣告死信佇列方法如下:
@RabbitListener( bindings = @QueueBinding( exchange = @Exchange(value = RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), value = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT, arguments = { @Argument(name = RabbitMQConstant.DEAD_LETTER_EXCHANGE, value = RabbitMQConstant.DEAD_EXCHANGE), @Argument(name = RabbitMQConstant.DEAD_LETTER_KEY, value = RabbitMQConstant.DEAD_KEY) }), key = RabbitMQConstant.DEFAULT_KEY ))
其實也就只是在宣告的時候多加了兩個引數x-dead-letter-exchange和x-dead-letter-routing-key。這裡一開始踩了一個坑,因為 @QueueBinding
註解中也有arguments屬性,我一開始將引數宣告到 @QueueBinding
中,導致一直沒繫結成功。如果繫結成功可以在控制檯看到queue的Featrues有DLX(死信佇列交換器)和DLK(死信佇列繫結)。如下:
- 訊息被拒絕(basic.reject/basic.nack)並且requeue=false
- 訊息TTL過期
- 佇列達到最大長度
我們用到的就是第一種。
RPC模式的訊息(不常用)
本來生產者和消費者是沒有耦合的,但是可以通過一些屬性產生耦合。在早期版本中,如果一個生產者想要收到消費者的回覆,實現方案是生產者在訊息頭中加入reply-to屬性也就是佇列(一般是私有,排他,用完即銷燬)的名字,然後在這個佇列上進行監聽,消費者將回復發送到這個佇列中。RabbitMq3.3之後有了改進,就是不用沒有都去建立一個臨時佇列,這樣很耗費效能,可以採用drect-to模式,省去了每次建立佇列的效能損耗,但是還是要建立一次佇列。現在Spring預設的就是這個模式。RabbitTemplate中有一系列的 sendAndReceiveXX
方法。預設等待5秒,超時返回null。用
法和不帶返回的差不多。
消費者的方法通過返回值直接返回訊息(下面的方法是有返回值的):
public String receive(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) { log.info("reply to consumer param:{headers = [" + headers + "], msg = [" + msg + "]} info:"); return REPLY; }
這裡的提一下最後一個註解 @SendTo
,用在消費方法上,指明返回值的目的地,預設不用的話就是返回給傳送者,可以通過這個註解改變這種行為。如下程式碼:
@RabbitListener( bindings = @QueueBinding( exchange = @Exchange(value = RabbitMQConstant.REPLY_EXCHANGE, type = ExchangeTypes.TOPIC, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), value = @Queue(value = RabbitMQConstant.REPLY_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT), key = RabbitMQConstant.REPLY_KEY ) ) @SendTo("queue.reply.s") public ExampleEvent log(ExampleEvent event) { log.info("log receive message:O{}", event); return new ExampleEvent(1, "log result"); }
上面的程式碼就是會將訊息直接傳送到預設交換器,並且以queue.reply.s作為路由鍵。@SendTo的格式為exchange/routingKey用法如下:
- foo/bar: 指定的交換器和key
- foo/: 指定的交換器,key為空
- bar或者/bar: 到空交換器
- /或者空:空的交換器和空的key
這裡還需要提一下,因為預設所有的佇列都會繫結到空交換器,並且以佇列名字作為Routekey, 所以SendTo裡面可以直接填寫佇列名字機會發送到相應的佇列.如日誌佇列。因為RPC模式不常用,專業的東西做專業的事,就像我們一般不用Redis來做訊息佇列一樣(雖然他也可以實現),一般公司都有特定的技術棧,肯定有更合適的RPC通訊框架。當然如果要跨語言的整合這個方案也是一種不錯的方案,可以繼續考慮採用非同步傳送 AsyncRabbitTemplate
來降低延遲等優化方案!
關於消費模型
RabbitMQ底層的消費模型有兩種Push和Pull。我在網上查閱資料的時候發現有很多教程採用了pull這種模式。RabbitMq實戰和
RabbitMQ之Consumer消費模式(Push & Pull) 都指出這種模式效能低,會影響訊息的吞吐量,增加不必要的IO,所以除非有特殊的業務需求,不要採用這種方案。Spring的封裝就是採用了push的方案。
關於RabbitMq客戶端的執行緒模型
這裡講的是消費者的,生產者沒什麼好講的。先看訊息流轉圖:
圖中橢圓表示執行緒,矩形是佇列。訊息到達AMQP的連線執行緒,然後分發到client執行緒池,隨後分發到監聽器。注意除了監聽器的執行緒,其他都是在 com.rabbitmq.client.impl.AMQConnection
中建立的執行緒,我們對執行緒池做一些修改。連線執行緒名字不能修改就是AMQP Connection打頭。心跳執行緒可以設定setConnectionThreadFactory來設定名字。如下:
connectionFactory.setConnectionThreadFactory(new ThreadFactory() { public final AtomicInteger id = new AtomicInteger(); @Override public Thread newThread(Runnable r) { return new Thread(r, MessageFormat.format("amqp-heart-{0}", id.getAndIncrement())); } });
client執行緒池見: com.rabbitmq.client.impl.ConsumerWorkService
構造方法。Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)。
final ExecutorService executorService = Executors.newFixedThreadPool(5, new ThreadFactory() { public final AtomicInteger id = new AtomicInteger(); @Override public Thread newThread(Runnable r) { return new Thread(r, MessageFormat.format("amqp-client-{0}", id.getAndIncrement())); } });
listener的執行緒設定如下:
simpleRabbitListenerContainerFactory.setTaskExecutor(new SimpleAsyncTaskExecutor"amqp-consumer-"));
注意:SimpleAsyncTaskExecutor每次執行一個任務都會新建一個執行緒,對於生命週期很短的任務不要使用這個執行緒池(如client執行緒池的任務), 這裡的消費者執行緒生命週期直到SimpleMessageListenerContainer停止所以沒有適合這個場景
修改過之後的執行緒如下:

訊息投遞過程如下:
- 在AMQConnection中開啟連線執行緒,該執行緒用於處理和RabbitMq的通訊:
public void startMainLoop() { MainLoop loop = new MainLoop(); final String name = "AMQP Connection " + getHostAddress() + ":" + getPort(); mainLoopThread = Environment.newThread(threadFactory, loop, name); mainLoopThread.start(); }
- AMQConnection.heartbeatExecutor是心跳執行緒。
- AMQConnection.consumerWorkServiceExecutor則是用來處理事件的執行緒池,AMQConnection執行緒收到訊息投遞到這裡。
分發邏輯詳見com.rabbitmq.client.impl.ChannelN#processAsync->com.rabbitmq.client.impl.ConsumerDispatcher#handleDelivery->投遞到執行緒池. - 執行緒池中繼續將訊息投遞到org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#queue中
- consumer執行緒進行最終訊息
上面的是預設的消費者監聽器。SpringAMQP 2.0引入了一個新的監聽器實現 DirectMessageListenerContainer
。這個實現最大的變化在於消費者的處理邏輯不是在自己的執行緒池中執行而是直接在client執行緒池中處理,這樣最明顯的是省去了執行緒的上下文切換的開銷,而且設計上也變得更為直觀。所以如果採用這個監聽器需要覆蓋預設的執行緒池加大Connection的執行緒池。採用這個監聽器只需要設定 @RabbitListener
的containerFactory屬性。宣告方法如下:
@Bean DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(ConnectionFactory connectionFactory) { final DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory = new DirectRabbitListenerContainerFactory(); directRabbitListenerContainerFactory.setConsumersPerQueue(Runtime.getRuntime().availableProcessors()); directRabbitListenerContainerFactory.setConnectionFactory(connectionFactory); directRabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter()); directRabbitListenerContainerFactory.setConsumersPerQueue(10); return directRabbitListenerContainerFactory; }
這時的訊息流轉圖如下:
還有一些關於監聽器的例子和Springboot配置我放在了原始碼裡,這裡不再講述。