AY C# RabbitMQ 2019 微筆記3
傳送訊息,生產者 接收訊息 消費者 RabbitMQ是Erlang語言開發
ofollow,noindex">上篇部落格
實際場景Exchange用的多
1對多釋出訂閱(下篇講,這篇讓你更瞭解佇列)
==============開始DEMO
2個控制檯
釋出者2
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace MQ.Product2 { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: true, arguments: null); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); //channel.QueueDeclare(queue: "hello", //durable: true, //exclusive: false, //autoDelete: true, //arguments: null); //string message = "Hello World!"; //var body = Encoding.UTF8.GetBytes(message); //channel.BasicPublish(exchange: "", //routingKey: "hello", //basicProperties: null, //body: body); //Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } } }
這裡基於上一個DEMO改的,這裡我們設定了一個properties了。
執行專案。
然後消費者修改程式碼(基於DEMO1的消費者 程式碼)
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace AyTestMQ2 { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { //channel.QueueDeclare(queue: "task_queue", //durable: true, //exclusive: false, //autoDelete: true, //arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer); //var consumer = new EventingBasicConsumer(channel); //consumer.Received += (model, ea) => //{ //var body = ea.Body; //var message = Encoding.UTF8.GetString(body); //Console.WriteLine(" [x] Received {0}", message); //}; //channel.BasicConsume(queue: "hello", //autoAck: true, //consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } Console.ReadKey(); } } }
主要接收訊息,處理,模擬耗時工作。
發的訊息一個 點號 停頓1秒
生產端訊息改下
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello.World.AY.2019");
}
消費端改改
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
var _3 = message.Split('.');
//int dots = message.Split('.').Length - 1;
foreach (var item in _3)
{
Console.WriteLine(item);
Thread.Sleep(1000);
}
執行生產端,然後消費端效果如下
測試2,
開啟生產者,然後開啟消費者,如上所示,不要關閉,關掉生產者在開啟,消費者那段又收到訊息了。
同樣的,如果有2個消費者, rabbitmq會發給下一個消費者,這種分發訊息叫做 round-robin(迴圈排程)
一個訊息只給一個消費者處理。
場景:其實我們可以做 使用者的請求,每個請求放入訊息佇列,然後讓訊息佇列給空閒的 消費者去消費處理。1個消費者不夠處理,可以執行多個來吃完任務。
任務會耗時間的。
您可能想知道如果其中一個消費者開始執行長任務並且僅在部分完成時死亡會發生什麼。
上面的程式碼,一旦RabbitMQ向客戶傳送訊息,它立即將其標記為刪除。
在這種情況下,如果當前的消費者掛了,我們將丟失它剛剛處理的訊息。
我還將丟失分發給這個消費者的 還未處理的所有訊息。
但是我不想丟失任何的訊息(1個訊息一個任務),如果消費者處理掛了,我當然更想把訊息給其他的消費者處理。
為了確保訊息永不丟失,RabbitMQ 提供了一個 ack機制, 手動應答,處理完了,告訴兔子,我處理完了,等兔子空閒時候就刪除該訊息了。
定義 消費者死了,就是 channel關閉,connection關閉,tcp斷開了,沒網路了。
當消費者還沒傳送 ack,兔子那邊就會認為 訊息沒有被處理,又會恢復回去了。如果同一時間,還有其他消費者線上,兔子會把這燙手山芋給其他的消費者。
恩,所以啊,你的程式沒死,他的訊息一直存在兔子那的,除非你手動應答。如果你掛了沒應答,會看有沒有其他的消費者處理。
接下來模擬這個場景
生產者生產個訊息,然後修改消費者程式碼
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { //channel.QueueDeclare(queue: "task_queue", //durable: true, //exclusive: false, //autoDelete: true, //arguments: null); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); throw new NotImplementedException(); Console.WriteLine(" [x] Received {0}", message); var _3 = message.Split('.'); foreach (var item in _3) { Console.WriteLine(item); Thread.Sleep(2000); } Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); };
設定了autoack:false了
然後received裡面設定了 ea.DeliveryTag
測試1
這裡處理訊息,停留了2秒1個欄位,我們再BasicAck應答之前關閉程式,看訊息會不會被刪除了。
由於丟擲異常Unacked 為1了。
把程式關了
訊息還是刪除了。。
我懷疑服務端設定了 自動刪除導致的。我改為false測試,這樣生產了1個不會自動刪除的訊息。
測試2
執行修改後的生產者
消費者程式碼不改,讓丟擲異常
然後關閉程式,過一會,訊息恢復正常了。這次就對了。也就是生產者自動刪除我覺得大部分都是關閉的。
測試3
正確處理,看訊息會不會刪除,移除拋棄異常的程式碼
ready終於是0了
然後關閉客戶端,斷開連線(執行完using,釋放連線),佇列被處理了,沒刪除哦
那如果想要刪除呢,暫時先這樣吧,因為用的最多的還是Exchange的Topic
注意:
忘記寫 BasicAck這行程式碼, 這是一個簡單的錯誤,但後果是嚴重的。 當您的客戶端退出時,訊息將被重新傳遞(這可能看起來像隨機重新傳遞),但RabbitMQ將會佔用越來越多的記憶體,因為它無法釋放任何未經處理的訊息。
假如忘了unack
測試4
註釋掉程式碼,然後生產個訊息,然後執行消費者
再執行消費者,當然 連線不要釋放,不然任務客戶端死了,又恢復回去了
這裡我們開啟命令列
rabbitmqctl list_queues name messages_ready messages_unacknowledged
貌似超時了 這裡就列出名字了。算了,遇到再看。
=============================================================
永續性,如果兔子掛了,訊息還是會丟丟失了。
hannel.QueueDeclare(queue: "hello",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
設定持久化,就會不丟失了。 但是兔子不允許你重新定義一個已經存在的佇列,然後更改屬性
你可以換個名字重新定義一個。
對了,如果伺服器重啟,我們在上篇部落格說到 訊息恢復了,但是不可再被消費了,但是如果生產訊息時候,加上下面程式碼就好了,終於解決了 durable=true也無效的問題了。
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
將訊息標記為永續性並不能完全保證訊息不會丟失。 雖然它告訴RabbitMQ將訊息儲存到磁碟,但是當RabbitMQ接s收訊息並且尚未儲存訊息時,仍然有一個短時間視窗。 此外,RabbitMQ不會為每條訊息執行fsync(2) - 它可能只是儲存到快取而不是真正寫入磁碟。 永續性保證不強,但對於我們簡單的任務佇列來說已經足夠了。 如果您需要更強的保證,那麼您可以使用 釋出者確認 (publisher confirms)。
公平排程 Fair Dispatch
2個消費者,一個很忙,一個幾乎不做事,兔子不知道誰忙誰不忙的,還是均勻的發訊息的。
發生這種情況是因為RabbitMQ只是在訊息進入佇列時排程訊息。
它不會檢視消費者未確認訊息的數量。
它只是盲目地向第n個消費者傳送每個第n個訊息
為了改變這種行為,我們可以使用BasicQos方法,shezhi PrefetchCount=1
這會告訴兔子,不要同一時間給超過一個訊息以上給一個消費者,因為它很忙,可能還沒處理完,你又來了。
換句話說, 在處理並確認前一個訊息之前,不要向該工作程式傳送新訊息。 相反,它會將它傳送給下一個不忙的 消費者。
channel.BasicQos(0, 1, false);
這裡注意佇列的 size
如果所有的 消費者都很忙,並且你的queue填滿了。你就要考慮是否新增更多的消費者,或者換個思路去解決問題。
消費者修改後的程式碼如下:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace AyTestMQ2 { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); var _3 = message.Split('.'); foreach (var item in _3) { Console.WriteLine(item); } Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } Console.ReadKey(); } } }
====================www.ayjs.net 楊洋 wpfui.com ayui ay aaronyang=======請不要轉載謝謝了。=========
關於IModel內的方法和IBasicProperties你想了解的,可以檢視 RabbitMQ .NET client API reference online
特別推薦以下指南
particularly recommend the following guides: Publisher Confirms and Consumer Acknowledgements , Production Checklist and Monitoring .
====================www.ayjs.net 楊洋 wpfui.com ayui ay aaronyang=======請不要轉載謝謝了。=========
推薦您閱讀更多有關於“”的文章