1. 程式人生 > >C# 操作rabbitmq(五)

C# 操作rabbitmq(五)

此篇介紹rabbitmq的RPC

一、雖然我們可以使用work queue給worker傳送訊息,但是如果我們希望在遠端伺服器上執行一個方法並且想要得到結果呢?rabbitmq的RPC功能可以實現。

二、 Callback queue,回撥佇列,在rabbitmq上構建RPC是很容易的,客戶端傳送請求訊息,服務端響應回覆訊息,為了接收響應訊息,我們需要在傳送請求的時候附加一個callback 佇列地址

var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;

var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: ""
, routingKey: "rpc_queue", basicProperties: props, body: messageBytes); // ... then code to read a response message from the callback_queue ...

Message properties 訊息屬性:

AMQP 0-9-1協議預定義了一組帶有訊息的14個屬性。大多數屬性很少使用,但以下情況除外:

Persistent: : 將訊息標記為永續性(值為2)或瞬態(任何其他值)
DeliveryMode: 熟悉協議的人可以選擇使用此屬性而不是Persistent。他們控制著同樣的事情。
ContentType:用於描述編碼的mime型別。例如,對於經常使用的JSON編碼,將此屬性設定為:application / json是一種很好的做法。
ReplyTo: 通常用於命名回撥佇列。
CorrelationId: 用於將RPC響應與請求相關聯

三、 Correlation Id 關聯Id

在上面介紹的方法中,我們建議為每個RPC請求建立一個回撥佇列。這是非常低效的,但幸運的是有更好的方法 - 讓我們為每個客戶端建立一個回撥佇列。

這引發了一個新問題,在該佇列中收到響應後,不清楚響應屬於哪個請求。那是在使用CorrelationId屬性的時候 。我們將為每個請求將其設定為唯一值。稍後,當我們在回撥佇列中收到一條訊息時,我們將檢視此屬性,並根據該屬性,我們將能夠將響應與請求進行匹配。如果我們看到未知的 CorrelationId值,我們可以安全地丟棄該訊息 - 它不屬於我們的請求。

您可能會問,為什麼我們應該忽略回撥佇列中的未知訊息,而不是失敗並出現錯誤?這是由於伺服器端可能存在競爭條件。雖然不太可能,但RPC伺服器可能會在向我們傳送答案之後,但在傳送請求的確認訊息之前死亡。如果發生這種情況,重新啟動的RPC伺服器將再次處理請求。這就是為什麼在客戶端上我們必須優雅地處理重複的響應,理想情況下RPC應該是冪等(一次請求和多次請求資源的狀態是一樣的)的。

四、例項

這裡寫圖片描述

//Fibonacci
private static int fib(int n)
{
    if (n == 0 || n == 1) return n;
    return fib(n - 1) + fib(n - 2);
}

//RPCServer.cs
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class RPCServer
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "rpc_queue", durable: false,
              exclusive: false, autoDelete: false, arguments: null);
            channel.BasicQos(0, 1, false);
            var consumer = new EventingBasicConsumer(channel);
            channel.BasicConsume(queue: "rpc_queue",
              autoAck: false, consumer: consumer);
            Console.WriteLine(" [x] Awaiting RPC requests");

            consumer.Received += (model, ea) =>
            {
                string response = null;

                var body = ea.Body;
                var props = ea.BasicProperties;
                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;

                try
                {
                    var message = Encoding.UTF8.GetString(body);
                    int n = int.Parse(message);
                    Console.WriteLine(" [.] fib({0})", message);
                    response = fib(n).ToString();
                }
                catch (Exception e)
                {
                    Console.WriteLine(" [.] " + e.Message);
                    response = "";
                }
                finally
                {
                    var responseBytes = Encoding.UTF8.GetBytes(response);
                    channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                      basicProperties: replyProps, body: responseBytes);
                    channel.BasicAck(deliveryTag: ea.DeliveryTag,
                      multiple: false);
                }
            };

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

    /// 

    /// Assumes only valid positive integer input.
    /// Don't expect this one to work for big numbers, and it's
    /// probably the slowest recursive implementation possible.
    /// 

    private static int fib(int n)
    {
        if (n == 0 || n == 1)
        {
            return n;
        }

        return fib(n - 1) + fib(n - 2);
    }
}

//RPCClient.cs
using System;
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

public class RpcClient
{
    private readonly IConnection connection;
    private readonly IModel channel;
    private readonly string replyQueueName;
    private readonly EventingBasicConsumer consumer;
    private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
    private readonly IBasicProperties props;

public RpcClient()
{
        var factory = new ConnectionFactory() { HostName = "localhost" };

        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        replyQueueName = channel.QueueDeclare().QueueName;
        consumer = new EventingBasicConsumer(channel);

        props = channel.CreateBasicProperties();
        var correlationId = Guid.NewGuid().ToString();
        props.CorrelationId = correlationId;
        props.ReplyTo = replyQueueName;

        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var response = Encoding.UTF8.GetString(body);
            if (ea.BasicProperties.CorrelationId == correlationId)
            {
                respQueue.Add(response);
            }
        };
    }

    public string Call(string message)
    {
        var messageBytes = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(
            exchange: "",
            routingKey: "rpc_queue",
            basicProperties: props,
            body: messageBytes);

        channel.BasicConsume(
            consumer: consumer,
            queue: replyQueueName,
            autoAck: true);

        return respQueue.Take(); ;
    }

    public void Close()
    {
        connection.Close();
    }
}

public class Rpc
{
    public static void Main()
    {
        var rpcClient = new RpcClient();

        Console.WriteLine(" [x] Requesting fib(30)");
        var response = rpcClient.Call("30");

        Console.WriteLine(" [.] Got '{0}'", response);
        rpcClient.Close();
    }
}