使用 RabbitMQ 實現 RPC
作者 | 喬宇
杏仁後端工程師,關注服務端技術。
背景知識
RabbitMQ
RabbitMQ 是基於 AMQP 協議實現的一個訊息佇列(Message Queue),Message Queue 是一個典型的生產者/消費者模式。生產者釋出訊息,消費者消費訊息,生產者和消費者之間是解耦的,互相不知道對方的存在。
RPC
Remote Procedure Call:遠端過程呼叫,一次遠端過程呼叫的流程即客戶端傳送一個請求到服務端,服務端根據請求資訊進行處理後返回響應資訊,客戶端收到響應資訊後結束。
如何使用 RabbitMQ 實現 RPC?
使用 RabbitMQ 實現 RPC,相應的角色是由生產者來作為客戶端,消費者作為服務端。
但 RPC 呼叫一般是同步的,客戶端和伺服器也是緊密耦合的。即客戶端通過 IP/域名和埠連結到伺服器,向伺服器傳送請求後等待伺服器返回響應資訊。
但 MQ 的生產者和消費者是完全解耦的,那麼如何用 MQ 實現 RPC 呢?很明顯就是把 MQ 當作中介軟體實現一次雙向的訊息傳遞:
客戶端和服務端即是生產者也是消費者。客戶端釋出請求,消費響應;服務端消費請求,釋出響應。
具體實現
MQ部分的定義
請求資訊的佇列
我們需要一個佇列來存放請求資訊,客戶端向這個佇列釋出請求資訊,服務端消費該佇列處理請求。該佇列不需要複雜的路由規則,直接使用 RabbitMQ 預設的 direct exchange 來路由訊息即可。
響應資訊的佇列
存放響應資訊的佇列不應只有一個。如果存在多個客戶端,不能保證響應資訊被髮布請求的那個客戶端消費到。所以應為每一個客戶端建立一個響應佇列,這個佇列應該由客戶端來建立且只能由這個客戶端使用並在使用完畢後刪除,這裡可以使用 RabbitMQ 提供的排他佇列(Exclusive Queue):
channel.queueDeclare(queue:"", durable:false, exclusive:true, autoDelete:false, new HashMap<>())
並且要保證佇列名唯一,宣告佇列時名稱設為空 RabbitMQ 會生成一個唯一的佇列名。
exclusive
設為 true
表示宣告一個排他佇列,排他佇列的特點是隻能被當前的連線使用,並且在連線關閉後被刪除。
一個簡單的 demo(使用 pull 機制)
我們使用一個簡單的 demo 來了解客戶端和服務端的處理流程。
釋出請求
-
編寫程式碼前的一個小問題
我們在宣告佇列時為每一個客戶端聲明瞭獨有的響應佇列,那伺服器在釋出響應時如何知道釋出到哪個佇列呢?其實就是客戶端需要告訴服務端將響應釋出到哪個佇列,RabbitMQ 提供了這個支援,訊息體的 Properties
中有一個屬性 reply_to
就是用來標記回撥佇列的名稱,伺服器需要將響應釋出到 reply_to
指定的回撥佇列中。
解決了這個問題之後我們就可以編寫客戶端釋出請求的程式碼了:
// 定義響應回撥佇列 String replyQueueName = channel.queueDeclare("", false, true, false, new HashMap<>()).getQueue(); // 設定回撥佇列到 Properties AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .replyTo(replyQueueName) .build(); String request = "request"; // 釋出請求 channel.basicPublish("", "rpc_queue", properties, request.getBytes());
Direct reply-to:
RabbitMQ 提供了一種更便捷的機制來實現 RPC,不需要客戶端每次都定義回撥佇列,客戶端釋出請求時將 replyTo
設為 amq.rabbitmq.reply-to
,消費響應時也指定消費 amq.rabbitmq.reply-to
,RabbitMQ 會為客戶端建立一個內部佇列
消費請求
接下來是服務端處理請求的部分,接收到請求後經過處理將響應資訊釋出到 reply_to
指定的回撥佇列:
// 服務端 Consumer 的定義 public class RpcServer extends DefaultConsumer { public RpcServer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); String response = (msg + " Received"); // 獲取回撥佇列名 String replyTo = properties.getReplyTo(); // 釋出響應訊息到回撥佇列 this.getChannel().basicPublish("", replyTo, new AMQP.BasicProperties(), response.getBytes()); } } ... // 啟動服務端 Consumer channel.basicConsume("rpc_queue", true, new RpcServer(channel));
接收響應
客戶端如何接收伺服器的響應呢?有兩種方式:1.輪詢的去 pull 回撥佇列中的訊息,2.非同步的消費回撥佇列中的訊息。我們在這裡簡單實現第一種方案。
GetResponse getResponse = null; while (getResponse == null) { getResponse = channel.basicGet(replyQueueName, true); } String response = new String(getResponse.getBody());
一個簡單的基於 RabbitMQ 的 RPC 模型已經實現了,但這個 demo 並不實用,因為客戶端每次傳送完請求都要同步的輪詢等待響應訊息,只能每次處理一個請求。RabbitMQ 的 pull 模式效率也比較低。
實現一個完備可用的 RPC 模式需要做的工作還有很多,要處理的關鍵點也比較複雜,有句話叫不要重複造輪子,spring 已經實現了一個完備可用的 RPC 模式的庫,接下來我們來了解一下。
Spring Rabbit 中的實現
和上面 demo 的 pull 模式一次只能處理一個請求相對應的:如何非同步的接收響應並處理多個請求呢?關鍵點就在於我們需要記錄請求和響應並將它們關聯起來,RabbitMQ 也提供了支援,Properties 中的另一個屬性 correlation_id
用來標識一個訊息的唯一 id。
參考 spring-rabbit
中的 convertSendAndReceive
方法的實現,為每一次請求生成一個唯一的 correlation_id
:
private final AtomicInteger messageTagProvider = new AtomicInteger(); ... String messageTag = String.valueOf(this.messageTagProvider.incrementAndGet()); ... message.getMessageProperties().setCorrelationId(messageTag);
並使用一個 ConcurrentHashMap
來維護 correlation_id
和響應資訊的對映:
private final Map<String, PendingReply> replyHolder = new ConcurrentHashMap<String, PendingReply>(); ... final PendingReply pendingReply = new PendingReply(); this.replyHolder.put(correlationId, pendingReply);
PendingReply
中有一個 BlockingQueue
存放響應資訊,在傳送完請求資訊後呼叫 BlockingQueue
的 pull
方法並設定超時時間來獲取響應:
private final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);public Message get ( long timeout , TimeUnit unit ) throws InterruptedException { Object reply = this . queue . poll ( timeout , unit ); return reply == null ? null : processReply ( reply
);
} 在獲取響應後不論結果如何,都會將 PendingReply
從 replyHolder
中移除,防止 replyHolder
中積壓超時的響應訊息:
try { reply = exchangeMessages(exchange, routingKey, message, correlationData, channel, pendingReply,messageTag); } finally { this.replyHolder.remove(messageTag); ... }
響應資訊是何時如何被放到這個 BlockingQueue
中的呢?看一下 RabbitTemplate
接收訊息的地方:
public void onMessage(Message message) { String messageTag; if (this.correlationKey == null) { // using standard correlationId property messageTag = message.getMessageProperties().getCorrelationId(); } else { messageTag = (String) message.getMessageProperties() .getHeaders().get(this.correlationKey); } // 存在 correlation_id 才認為是RPC的響應資訊,不存在時不處理 if (messageTag == null) { logger.error("No correlation header in reply"); return; } // 從 replyHolder 中取出 correlation_id 對應的 PendingReply PendingReply pendingReply = this.replyHolder.get(messageTag); if (pendingReply == null) { if (logger.isWarnEnabled()) { logger.warn("Reply received after timeout for " + messageTag); } throw new AmqpRejectAndDontRequeueException("Reply received after timeout"); } else { restoreProperties(message, pendingReply); // 將響應資訊 add 到 BlockingQueue 中 pendingReply.reply(message); } }
以上的 spring 程式碼隱去了很多額外部分的處理和細節,只關注關鍵的部分。
至此一個完整可用的由 RabbitMQ 作為中介軟體實現的 RPC 模式就完成了。
總結
服務端
服務端的實現比較簡單,和一般的 Consumer
的區別只在於需要將請求回覆到 replyTo
指定的 queue 中並帶上訊息標識 correlation_id
即可
服務端的一點小優化:
超時的處理是由客戶端來實現的,那服務端有沒有可以優化的地方呢?
答案是有的:如果我們的服務端處理比較耗時,如何判斷客戶端是否還在等待響應呢?
我們可以使用 passive
引數去檢查 replyTo
的 queue 是否存在,因為客戶端宣告的是內部佇列,客戶端如果斷掉連結了這個 queue 就不存在了,這時服務端就無需處理這個訊息了。
客戶端
客戶端承擔了更多的工作量,包括:
-
宣告
replyTo
佇列(使用amq.rabbitmq.reply-to
會簡單很多) -
維護請求和響應訊息(使用唯一的
correlation_id
來關聯) -
消費服務端的返回
-
處理超時等異常情況(使用BlockingQueue來阻塞獲取)
好在 spring 已經實現了一套完備可靠的程式碼,我們在清楚了流程和關鍵點之後,可以直接使用 spring 提供的 RabbitTemplate
,無需自己實現。
使用 MQ 實現 RPC 的意義
通過 MQ 實現 RPC 看起來比客戶端和伺服器直接通訊要複雜一些,那我們為什麼要這樣做呢?或者說這樣做有什麼好處:
-
將客戶端和伺服器解耦:客戶端只是釋出一個請求到 MQ 並消費這個請求的響應。並不關心具體由誰來處理這個請求,MQ 另一端的請求的消費者可以隨意替換成任何可以處理請求的伺服器,並不影響到客戶端。
-
減輕伺服器的壓力:傳統的 RPC 模式中如果客戶端和請求過多,伺服器的壓力會過大。由 MQ 作為中介軟體的話,過多的請求而是被 MQ 消化掉,伺服器可以控制消費請求的頻次,並不會影響到伺服器。
-
伺服器的橫向擴充套件更加容易:如果伺服器的處理能力不能滿足請求的頻次,只需要增加伺服器來消費 MQ 的訊息即可,MQ會幫我們實現訊息消費的負載均衡。
-
可以看出 RabbitMQ 對於 RPC 模式的支援也是比較友好地,
amq.rabbitmq.reply-to
,reply_to
,correlation_id
這些特性都說明了這一點,再加上 spring-rabbit 的實現,可以讓我們很簡單的使用訊息佇列模式的 RPC 呼叫。
全文完
以下文章您可能也會感興趣:
-
OpenResty 不完全指南
-
從 ThreadLocal 的實現看雜湊演算法
我們正在招聘 Java 工程師,歡迎有興趣的同學投遞簡歷到 [email protected] 。
杏仁技術站
長按左側二維碼關注我們,這裡有一群熱血青年期待著與您相會。