我的RabbitMQ學習之旅3 (發布/訂閱)
在前面的教程中,我們創建了一個工作隊列。工作隊列背後的假設是,每個任務只被傳遞給一個工作人員。在這一部分,我們將做一些完全不同的事情 - 我們會向多個消費者傳遞信息。這種模式被稱為“發布/訂閱”。
本質上,發布的日誌消息將被廣播給所有的接收者
生產者 是發送消息的用戶的應用程序。
隊列 是存儲消息的緩沖器。
消費者 是接收消息的用戶的應用程序。
RabbitMQ中消息傳遞模型的核心思想是生產者永遠不會將任何消息直接發送到隊列中。實際上,生產者通常甚至不知道一個消息是否會被傳送到任何隊列中。
相反,制作人只能發送消息給交易所。交換是一件非常簡單的事情。
有幾種可用的交換類型: direct, topic, headers , fanout
channel.ExchangeDeclare("logs", "fanout");
fanout 交換非常簡單。正如你可能從名字中猜到的那樣,它只是將所有收到的消息廣播到它所知道的所有隊列中。這正是我們記錄器所需要的。
列出交易所
要列出服務器上的交換,命令
rabbitmqctl list_exchanges
在這個列表中將會有一些amq。* 交換和默認(未命名)交換。這些是默認創建的。
默認交換
在本教程的前面部分,我們對交換一無所知,但仍能夠將消息發送到隊列。這是可能的,因為我們使用了一個默認的交換,我們用空字符串(“”)來標識。
channel.BasicPublish(exchange:"",//默認交換 routingKey:"hello", basicProperties:null, body:"發送內容");
第一個參數是交易所的名稱。空字符串表示默認或無名交換:消息被路由到具有由 routingKey 指定的名稱的隊列(如果存在)
我們可以發布到我們的命名交換
channel.BasicPublish(exchange:"logs", routingKey:"", basicProperties:null, body:"發送內容");
臨時隊列
正如你以前可能記得我們使用的是具有指定名稱的隊列(請記住hello和task_queue?)。能夠列出隊列對我們至關重要 - 我們需要指出工人隊列。當你想分享生產者和消費者之間的隊列時,給隊列一個名字是很重要的。
但是我們的記錄器並不是這樣。我們希望了解所有日誌消息,而不僅僅是其中的一部分。我們也只對目前流動的消息感興趣,而不是舊消息。要解決這個問題,我們需要兩件事。
首先,每當我們連接到 Rabbit ,我們需要一個新的,空的隊列。要做到這一點,我們可以創建一個隨機名稱的隊列,或者,甚至更好 - 讓服務器為我們選擇一個隨機隊列名稱。
其次,一旦我們斷開消費者,隊列應該被自動刪除。
在.NET客戶端中,當我們不向queueDeclare() 提供參數時,我們 使用生成的名稱創建一個非持久的獨占的自動刪除隊列:
var queueName = channel.QueueDeclare().QueueName;
此時queueName包含一個隨機隊列名稱。例如,它可能看起來像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
綁定
我們已經創建了一個 fanout 交換和一個隊列。現在我們需要告訴交換機將消息發送到我們的隊列。交換和隊列之間的關系被稱為綁定。
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
從現在起,日誌交換將把消息附加到我們的隊列中。
列出綁定
您可以使用列出現有的綁定命令
rabbitmqctl list_bindings
把它放在一起
發出日誌消息的生產者程序與前面的教程沒有什麽不同。最重要的變化是,我們現在要發布消息到我們的日誌交換,而不是無名的。發送時我們需要提供一個路由密鑰,但是對於扇出交換,它的值將被忽略。
1.建立 發布者(生產者)
public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello World!"); }
如你所見,建立連接後,我們宣布交換。這一步是必要的,因為發布到一個不存在的交易所是被禁止的。
如果沒有隊列綁定到交換機上,消息將會丟失,但對我們來說沒關系; 如果沒有消費者正在聽,我們可以放心地丟棄消息。
1.建立 訂閱者(消費者)
public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) {
// 設置交易所的名稱 以及類型 channel.ExchangeDeclare(exchange: "logs", type: "fanout");
//讓系統生成隊列名稱 var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); Console.WriteLine(" [*] Waiting for logs."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
如果您想將日誌保存到文件中,只需打開一個控制臺並輸入:
cd ReceiveLogs
dotnet run > logs_from_rabbit.log
如果你想看到屏幕上的日誌,產生一個新的終端,並運行:
cd ReceiveLogs
dotnet run
要發射日誌類型:
cd EmitLog
dotnet run
使用rabbitmqctl list_bindings,你可以驗證代碼實際上是否創建了綁定和隊列。有兩個接受程序運行,你應該看到類似於:
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
結果的解釋很簡單:交換日誌中的數據轉到兩個帶有服務器分配名稱的隊列中。這正是我們的意圖。
我的RabbitMQ學習之旅3 (發布/訂閱)