1. 程式人生 > >手把手一起入門 RabbitMQ 的六大使用模式(Java 客戶端)

手把手一起入門 RabbitMQ 的六大使用模式(Java 客戶端)

# 原文地址:[手把手一起入門 RabbitMQ 的六大使用模式(Java 客戶端)](https://blog.csdn.net/Howinfun/article/details/107181334) # 為什麼使用 MQ? 在這裡我就不多說了,無非就是削峰、解耦和非同步。這裡沒有很多關於 MQ 的理論和概念,只想手把手帶你一起學習 RabbitMQ 的六大使用模式! # 一、普通佇列 我們傳送訊息和接收訊息時,只需要直接指定佇列的名字即可。這是最簡單的一種使用場景。 **生產者**:使用 channel 傳送訊息時,直接指定 queueName。 ```java public class Send { private static final String queueName = "hyf.hello.queue"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = ConnectionFactoryUtils.getFactory(); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()){ // 是否持久化(預設儲存在記憶體,可以持久化到磁碟) boolean durable = false; // 是否獨有(此 Connection 獨有,通過其他 Connection 建立的 channel 無法訪問此佇列) boolean exclusive = false; // 是否自動刪除佇列(佇列沒有消費者時,刪除) boolean autoDelete = false; channel.queueDeclare(queueName, durable, exclusive, autoDelete, null); String message = "Hello world3!"; // 第一個引數是交換器名字,第二個引數是 routingKey(不使用交換器時,為佇列名稱),第三個引數是訊息屬性(AMQP.BasicProperties),第四個引數是訊息 channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("釋出成功"); } } } ``` > 注意:使用 try-with-resources ,在程式結束時,我們不用顯式呼叫 close() 方法來關閉資源。 **消費者**:也是用 channel 指定 queueName,然後繫結一個交付回撥。 ```java public class Receive { private static final String queueName = "hyf.hello.queue"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = ConnectionFactoryUtils.getFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); // 回撥(接收 RabbitMQ 伺服器傳送過來的訊息) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); } } ``` > 注意:這裡我們可以不用 try-with-resource,因為消費者需要一直執行著。 關於普通佇列,大家可以理解為下圖: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200707143700898.png) # 二、工作模式(work queues) 普通佇列中,都是一個消費者去消費佇列,而在 work 模式中,是多個消費者同時去消費同一個佇列。 生產者和消費者我們還是可以用回上面的程式碼。 ## 1、迴圈輪詢 預設情況下,RabbitMQ 將按順序將每個訊息傳送給下一個使用者。平均而言,每個消費者都會收到相同數量的訊息。這種分發訊息的方式稱為迴圈。 這樣會導致一個問題,即使其中一個消費者消費速度很快,已經消費完 RabbitMQ 訊息,並且佇列中還有未消費訊息(已經分派給其他消費者),那麼他也將在白白等待,RabbitMQ 而不會說將分派的訊息回收重新分派給空閒的消費者。 ## 2、自動提交訊息 ack 預設情況下,消費者會不定時自動提交 ack,不管訊息是否消費成功,而當 RabbitMQ 接收到消費者的 ack 訊息後,會將訊息新增刪除標識來標識訊息已被消費成功。但是這個自動 ack 機制會導致訊息丟失和訊息重複消費問題。 - 客戶端還沒消費某條訊息,就自動提交了 ack,如果此時客戶端宕機了,那麼會導致這條訊息消費失敗;而 RabbitMQ 在接收到 ack 時,也將這條訊息標記為已消費,那麼也無法重新消費了。 - 客戶端已經消費某條訊息,但是還沒自動提交 ack 就宕機了,此時就會導致訊息重複消費,因為 RabbitMQ 沒收到 ack 訊息,那麼這條訊息沒有被設定為刪除標識,所以消費者還可以消費此條訊息。 ## 3、手動 ack 解決空閒消費者、訊息丟失、訊息重複消費 ### 消費者: **a. 限制每次讀取訊息數量:** 我們利用 basicQos() 方法來設定 prefetchCount(預期計數) 為1,即 限制客戶端每次都只讀取一個訊息,只有當這個訊息消費完了,才能繼續讀取下一個訊息。 **b. 手動 ack:** 接著我們需要關閉自動提交 ack,並且在消費完訊息後,手動提交 ack。只有當 RabbitMQ 收到 ack 訊息後,才會認定這個訊息已經消費完了,繼續給消費者推送下一條新訊息。 最後看看程式碼: ```java public class Receive1 { private static final String queueName = "hyf.work.queue"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = ConnectionFactoryUtils.getFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); // 每次只讀取一條訊息 channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); ThreadUtil.sleep(2, TimeUnit.SECONDS); System.out.println(message); // 是否批量提交 boolean multiple = false; // 手動 ack channel.basicAck(delivery.getEnvelope().getDeliveryTag(),multiple); }; // 取消自動 ack boolean autoAck = false; channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {}); } } ``` 總結:只有當我們使用了手動ack 和 prefetchCount = 1 ,工作模式才算成功啟動。 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200707143843628.png) ## 4、擴充套件點:如何保證訊息不丟失 當傳送者傳送訊息到 RabbitMQ 後,RabbitMQ 會將訊息快取在記憶體中,而如果此時 RabbitMQ 宕機了,預設情況下,記憶體中的 queue 和 message 都會全部丟失。 而如果我們需要保證訊息不丟失,那麼需要告訴 RabbitMQ 如何做;此時我們需要做的是:將 queue 和 message 都設定為持久化。 **queue 持久化:** ```java private static final String queueName = "hyf.work.queue"; boolean durable = true; channel.queueDeclare(queueName, durable, false, false, null); ``` > 注意:如果一開始 queue 已經定義為不持久化,那麼我們不能重定義為持久化;當 RabbitMQ 檢測到 queue 被重定義了,那麼會返回一個錯誤來提示我們。 **message 持久化:** ```java private static final String queueName = "hyf.work.queue"; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); ``` # 三、釋出訂閱模式(Publish/Subscribe) 上面的 work queue,每一個訊息只能被一個消費者消費。而有些場景,我們需要一個訊息可以被多個消費者消費;例如:使用者下了訂單,簡訊通知模組需要給使用者傳送一個簡訊通知,庫存模組需要根據使用者下單資訊減去商品的庫存等等,此時我們需要使用釋出訂閱模式。 ## 1、交換器 exchange 要做釋出訂閱模式,我們首先需要使用到交換器,生產者不再直接利用 channel 往 queue 傳送訊息,而是將訊息傳送到交換器,讓交換器來決定傳送到哪些 queue 中。 RabbitMQ 提供了幾個型別的交換器:`direct`、`topic`、`headers` 和 `fanout`。 使用釋出訂閱模式,我們只需要使用 `fanout` 型別的交換器,`fanout` 型別的交換器,會將訊息傳送到所有繫結到此交換器的 queue。 ## 2、生產者傳送訊息: 利用 channel 宣告交換器: ```java // 宣告交換器名字和型別 channel.exchangeDeclare(exchangeName,"fanout"); ``` 接著我們就可以直接指定交換器進行訊息釋出: ```java // 第二個引數是 queueName/routingKey channel.basicPublish(exchangeName , "", null, message.getBytes()) ``` 完整程式碼: ```java public class Send { private static final String exchangeName = "hyf.ps.exchange"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = ConnectionFactoryUtils.getFactory(); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()){ // 宣告 fanout 型別的交換器 channel.exchangeDeclare(exchangeName,"fanout"); for (int i = 0; i <= 10; i++){ String message = "訊息"+i; // 直接指定交換器進行訊息釋出 channel.basicPublish(exchangeName,"", null, message.getBytes()); } } } } ``` >
我們可以發現,我們不再需要指定 queueName,而是直接指定 exchangeName,將訊息傳送到交換器,由交換器決定釋出到哪些 queue。 ## 3、消費者:queue 與 exchange 建立繫結關係 建立繫結前,我們還是需要先宣告 `fanout` 型別的交換器,並且命名要和生產者宣告時的名字一致: ```java channel.exchangeDeclare(exchangeName, "fanout"); ``` 接著,將 queue 和 `fanout` 型別的交換器建立繫結訊息,交換器會將訊息傳送到和它有繫結關係的 queue。 ```java channel.queueBind(queueName, exchangeName, ""); ``` 此時,佇列已經和交換器成功建立繫結關係,交換器接收到訊息時,會發送到與交換器繫結的所有佇列中。 最後,我們再呼叫 channel.basicConsume() 進行佇列監聽和 繫結回撥,藉此來接收和消費訊息: ```java DeliverCallback deliverCallback = (consumerTag, delivery) ->
{ String message = new String(delivery.getBody(), "UTF-8"); System.out.println(message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); ``` 完整程式碼: ```java public class Receive1 { private static final String exchangeName = "hyf.ps.exchange"; private static final String queueName = "hyf.ps.queue1"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = ConnectionFactoryUtils.getFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(exchangeName,"fanout"); channel.queueDeclare(queueName,false, false, false, null); channel.queueBind(queueName, exchangeName,""); DeliverCallback callback = (s, delivery) ->
{ String message = new String(delivery.getBody(), "UTF-8"); System.out.println(message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; channel.basicQos(1); boolean autoAck = false; channel.basicConsume(queueName, autoAck, callback, consumerTag -> {}); } } ``` 關於釋出訂閱模式,我們可以理解為下圖: ![\[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-e3jgCeoJ-1594103658020)(E5914A83FBA74E08865214975E25093B)\]](https://img-blog.csdnimg.cn/20200707144046402.png) ## 4、釋出丁訂閱模式中使用工作模式 釋出訂閱模式中,我們還是可以繼續使用上面的工作模式(多個消費者訂閱同一個佇列)。因為在分散式系統中,一個服務往往有多個例項,例如庫存模組可以有多個例項,我們利用手動 ack 和 prefetchCount = 1,還是可以讓 fanout 型別交換器的其中一個 queue 進入工作模式。 # 四、路由模式(routing) 上面的釋出訂閱模式,只要是與 `fanout` 型別交換器繫結的 queue,都會接收到交換器釋出的訊息。而我們現在的場景需要更加靈活訊息分配機制。例如:error 佇列只會接收到 error 型別的資訊,info 佇列只會接收都 info 型別的資訊等等。 那麼我們需要是使用靈活的路由模式,而這種模式還是需要由交換器來完成,但是此時需要使用 `direct` 型別的交換器來替代 `fanout` 型別的交換器。 ## bindingKey 和 routingKey 做到路由模式,不但要使用 `direct` 型別的交換器,還需要利用 `bindingKey` 和 `routingKey` 來完成。bindingKey 是消費者端的概念,而 routingKey 是生產者端的概念。 ### 1、bingdingKey 釋出訂閱模式的消費者程式碼中,我們可以發現:將 queue 與交換器建立繫結關係的 queueBind() 方法中,第三個引數是空的,其實這就是配置 bindingKey 的地方。當然了,即使第三個引數不為空,fanout 型別的交換器還是會直接忽略掉的。 ```java channel.queueBind(queueName, exchangeName, ""); ``` 例如現在我們的消費者要監聽 error 型別的資訊,我們需要宣告 direct 型別的交換器,並且給 queue 繫結值為 error 的 bindingKey 。 ```java public class ErrorReceive { private static final String exchangeName = "hyf.routing.exchange"; private static final String queueName = "hyf.routing.error.queue"; private static final String bindingKey = "error"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = ConnectionFactoryUtils.getFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 宣告 exchange 和 queue channel.exchangeDeclare(exchangeName, "direct"); channel.queueDeclare(queueName, false, false, false, null); // 進行繫結 channel.queueBind(queueName, exchangeName, bindingKey); DeliverCallback callback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(),"utf-8"); System.out.println("ErrorReceive 接收到" + delivery.getEnvelope().getRoutingKey() + "訊息:"+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; channel.basicQos(1); channel.basicConsume(queueName, false, callback, consumerTag -> {}); } } ``` 例如現在我們的消費者2要監聽 info 型別的資訊,這也是非常簡單,同樣是上面的程式碼,只需要修改 queueName 和 bindingKey 即可。 ```java // ... 省略 private static final String queueName = "hyf.info.queue"; private static final String bindingKey = "info"; // ... 省略 ``` ### 2、queue 繫結多個 bindingKey 上面的 `hyf.error.queue` 佇列,只綁定了值為 error 的 bindingKey,如果現在我們不但需要接收 error 型別的資訊,還需要 info 型別的資訊,那麼我們可以為 `hyf.error.queue` 再繫結多一個值為 info 的 bindingKey。 ```java private static final String bindingKey = "error"; private static final String bindingKey2 = "info"; // 進行繫結 channel.queueBind(queueName, exchangeName, bindingKey); channel.queueBind(queueName, exchangeName, bindingKey2); ``` 此時,`hyf.error.queue` 佇列同時綁定了 error 和 info 這兩個 bindingKey,那麼它就能同時接收到 error 型別和 info 型別的資訊。 ### 3、routingKey 在釋出訂閱模式中。我們可以看到釋出訊息的 basicPublish() 方法的第二引數是空的,而第二個引數其實就是 routingKey。 ```java channel.basicPublish( exchangeName, "", null, message.getBytes()); ``` > 我們可以發現,在普通佇列和工作模式中,我們都是指定 queueName 去傳送訊息,而 queueName 在 basicPublish 也是第二個位置。所以,在我們不使用交換器時,routingKey 指定的就是 queueName。而當我們使用交換器時,那麼 routingKey 就有更豐富的含義了,它不再只是簡單直接的 queueName,而是各種各樣的路由含義。 要使得上面綁定了 bindingKey 為 error 和 info 的 `hyf.error.queue` 佇列接收到訊息,那麼需要訊息傳送者指定 routingKey 為 error 或 info ,然後使用 `direct` 型別的交換器釋出訊息。 ```java private static final String exchangeName = "hyf.log.exchange"; private static final String routingKey = "error"; private static final String routingKey2 = "info"; channel.basicPublish(exchangeName, routingKey, null, message.getBytes()); channel.basicPublish(exchangeName, routingKey2, null, message.getBytes()); ``` 當執行上面程式碼,`hyf.error.queue` 佇列能收到兩條訊息,而 `hyf.info.queue` 只能收到 routingKey 為 info 的訊息。 即當 queue 繫結的 bindingKey 和傳送訊息時的 routingKey 完全一致,那麼 queue 就能接收到交換器傳送的訊息,我們可以理解為下圖: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200707144417828.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0hvd2luZnVu,size_16,color_FFFFFF,t_70) # 五、主題模式(topic) 上面的路由模式雖然能讓我們根據業務更加靈活的去接收指定(多種)型別的訊息;但是我們可以發現,如果現在我們想讓消費者接收所有型別的資訊,例如 error、info、debug、fail 等訊息全部都要接收,那麼就要呼叫多次 queueBind() 方法給 queue 繫結多個 bindingKey,這就顯得有點麻煩了。 此時我們可以使用主題模式,即使用 `topic` 型別的交換器,然後利用 `*` 和 `#` 這兩個符號來搞定上面的需求。 ## 1、* 和 # 的使用 "*" 表示匹配一個字元,"#" 表示匹配0個或多個字元 ## 2、場景 我們現在有多個 routingKey 的訊息,例如使用者登陸資訊 `user.login.info`,訂單資訊 `order.detail.info`,使用者的註冊資訊 `user.register.info`,庫存資訊`stock.detail.info` 等等。 ### 3、消費者 假設消費者1想讀取到所有關於使用者的資訊,例如登陸資訊和註冊時心,那麼我們可以使用 `topic` 型別的交換器,並且將 bindingKey 設定為 `user.#`。 ```java public class UserReceive { private static final String exchangeName = "hyf.topic.exchange"; private static final String bindingKey = "user.#"; private static final String queueName = "hyf.topic.user.queue"; @SneakyThrows public static void main(String[] args){ ConnectionFactory factory = ConnectionFactoryUtils.getFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(exchangeName, "topic"); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, bindingKey); DeliverCallback callBack = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), "utf-8"); System.out.println("接收到一條user訊息:"+msg); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; channel.basicQos(1); channel.basicConsume(queueName, false, callBack, consumerTag -> {}); } } ``` 假設消費者2 要接收所有上面關於資訊的訊息,那麼他的 bindingKey 可以設定為 `*.*.info`。 ```java public class InfoReceive { private static final String exchangeName = "hyf.topic.exchange"; private static final String bindingKey = "*.*.info"; private static final String queueName = "hyf.topic.info.queue"; @SneakyThrows public static void main(String[] args){ ConnectionFactory factory = ConnectionFactoryUtils.getFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(exchangeName, "topic"); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, bindingKey); DeliverCallback callback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), "utf-8"); System.out.println("接收到一條info訊息:"+msg); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; channel.basicQos(1); channel.basicConsume(queueName, false, callback, consumerTag -> {}); } } ``` ### 4、生產者 生產者也需要使用 `topic` 型別的交換器傳送訊息。 ```java public class Send { private static final String exchangeName = "hyf.topic.exchange"; private static final String routingkeyByLogin = "user.login.info"; private static final String routingkeyByRegister = "user.register.info"; private static final String routingkeyByOrder = "order.detail.info"; private static final String routingkeyByStock = "stock.detail.info"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = ConnectionFactoryUtils.getFactory(); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()){ channel.exchangeDeclare(exchangeName, "topic"); String msg1 = "使用者張三登陸了"; String msg2 = "新使用者李四註冊了"; String msg3 = "張三買了一臺iphone12"; String msg4 = "iphone12庫存減一"; channel.basicPublish(exchangeName, routingkeyByLogin, null, msg1.getBytes()); channel.basicPublish(exchangeName, routingkeyByRegister, null, msg2.getBytes()); channel.basicPublish(exchangeName, routingkeyByOrder, null, msg3.getBytes()); channel.basicPublish(exchangeName, routingkeyByStock, null, msg4.getBytes()); } } } ``` 經過上面的程式碼釋出訊息,消費者1就能讀取到訊息 msg1、msg2;而消費者2可以讀取到所有的訊息。 關於主題模式,大家可以理解為下圖: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200707144553734.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0hvd2luZnVu,size_16,color_FFFFFF,t_70) # 六、RPC 模式 正常用 MQ 都是用來做非同步化,但是有些場景卻需要同步。即當我們使用 channel 傳送訊息後,我們需要同步等待消費者對訊息消費後的結果。 **RPC 模式主要是利用 replyQueue 和 correlationId 來完成。** ## 1、客戶端 客戶端往 requestQueue 傳送訊息時需要設定 replyQueue,之後我們需要給 replyQueue 繫結一個 DeliverCallback。 為了保證客戶端是同步阻塞等待結果,所以我們在 DeliverCallback 的 handle 方法裡面,將結果放進阻塞佇列(例如 ArrayBlockingQueue);在程式碼的最後呼叫阻塞佇列的 take() 方法在獲取結果。 ```java public class Client { private static final String replyQueueName = "hyf.rpc.reply.queue"; private static final String requestQueueName = "hyf.rpc.request.queue"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = ConnectionFactoryUtils.getFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(replyQueueName, false, false, false, null); // 阻塞佇列 final BlockingQueue responseQueue = new ArrayBlockingQueue<>(1); final String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .replyTo(replyQueueName) .correlationId(corrId) .build(); String msg = "客戶端訊息"; channel.basicPublish("", requestQueueName, properties, msg.getBytes()); String ctag = channel.basicConsume(replyQueueName, true, (consumeTag,delivery) -> { if (delivery.getProperties().getCorrelationId().equals(corrId)) { responseQueue.offer(new String(delivery.getBody(), "UTF-8")); } }, consumeTag -> {}); String result = responseQueue.take(); System.out.println(result); // 取消訂閱 channel.basicCancel(ctag); } } ``` > 通過上面程式碼,我們應該可以留意到 correlationId 的意義是什麼。利用 correlationId ,我們可以判斷當前從 replyQueue 獲取的響應訊息是否是我們發出的訊息消費後的結果,如果不是我們可以直接忽略掉,保證只會獲取 correlationId 一致的結果。 ## 2、服務端 服務端在 DeilverCallback 的 handle() 方法裡讀取 requestQueue 裡面的訊息消費後,在手動 ack(關閉了自動 ack)前,需要先拿到訊息的 replyQueue,然後往 replyQueue 裡面傳送訊息消費後的結果,當然了,還要記得設定回訊息的 correlatinId,最後記得手動 ack。 ```java public class Server { private static final String requestQueueName = "hyf.rpc.request.queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = ConnectionFactoryUtils.getFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(requestQueueName, false, false, false, null); DeliverCallback callback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), "utf-8"); // 處理訊息 String reponse = handleMsg(msg); // 將訊息的 correlationId 傳回去 AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(delivery.getProperties().getCorrelationId()) .build(); channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, reponse.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; channel.basicQos(1); channel.basicConsume(requestQueueName, false, callback, consumeTag -> {}); } private static String handleMsg(String msg){ return msg + "已經被處理了"; } } ``` 關於 RPC 模式,大家可以理解為下圖: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200707144645986.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0hvd2luZnVu,size_16,color_FFFFFF,t_70) # 七、總結 到此,關於 RabbitMQ 的六大使用模式已經介紹完畢。當然了,這些都是入門級別的 demo,如果大家還是有啥不明白的,可以到我的 github 上去看看,完整的程式碼都放在:[MQ Demo](https://github.com/Howinfun/study-in-work-and-life/tree/master/src/main/java/com/hyf/testDemo/mq)。後續,我將會繼續深入學習 RabbitMQ 的 Java Client,學習如何優化客戶端的使用