【c#】佇列(Queue)和MSMQ(訊息佇列)的基礎使用
首先我們知道佇列是先進先出的機制,所以在處理併發是個不錯的選擇。然後就寫兩個佇列的簡單應用。
Queue
名稱空間
名稱空間:System.Collections,不在這裡做過多的理論解釋,這個東西非常的好理解。
可以看下官方文件:https://docs.microsoft.com/zh-cn/dotnet/api/system.collections.queue?view=netframework-4.7.2
示例程式碼
我這裡就是為了方便記憶做了一個基本的例子,首先建立了QueueTest類:
包含了獲取佇列的數量,入隊和出隊的實現
1 public classQueueTest 2 { 3 public static Queue<string> q = new Queue<string>(); 4 5 #region 獲取佇列數量 6 public int GetCount() 7 { 8 9 return q.Count; 10 } 11 #endregion 12 13 #region 佇列新增資料 14 publicvoid IntoData(string qStr) 15 { 16 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(); 17 q.Enqueue(qStr); 18 Console.WriteLine($"佇列新增資料: {qStr};當前執行緒id:{threadId}"); 19 } 20 #endregion 21 22 #region佇列輸出資料 23 24 public string OutData() 25 { 26 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(); 27 string str = q.Dequeue(); 28 Console.WriteLine($"佇列輸出資料: {str};當前執行緒id:{threadId}"); 29 return str; 30 } 31 #endregion 32 33 }
為了模擬併發情況下也不會出現重複讀取和插入混亂的問題所以寫了TaskTest類裡面開闢了兩個非同步執行緒進行插入和讀取:
1 class TaskTest 2 { 3 4 #region 佇列的操作模擬 5 public static void QueueMian() 6 { 7 QueueA(); 8 QueueB(); 9 } 10 private static async void QueueA() 11 { 12 QueueTest queue = new QueueTest(); 13 var task = Task.Run(() => 14 { 15 for (int i = 0; i < 20; i++) 16 { 17 queue.IntoData("QueueA" + i); 18 } 19 }); 20 await task; 21 Console.WriteLine("QueueAA插入完成,進行輸出:"); 22 23 while (queue.GetCount() > 0) 24 { 25 queue.OutData(); 26 } 27 } 28 29 private static async void QueueB() 30 { 31 QueueTest queue = new QueueTest(); 32 var task = Task.Run(() => 33 { 34 for (int i = 0; i < 20; i++) 35 { 36 queue.IntoData("QueueB" + i); 37 } 38 }); 39 await task; 40 Console.WriteLine("QueueB插入完成,進行輸出:"); 41 42 while (queue.GetCount() > 0) 43 { 44 queue.OutData(); 45 } 46 } 47 #endregion 48 49 }
效果展示
然後在main函式直接呼叫即可:
通過上面的截圖可以看出插入執行緒是無先後的。
這張去除也是執行緒無先後,但是資料是根據插入的資料順序取的,也就是說多執行緒取隨便取,但是取的資料是根據插入的順序取值。
MSMQ
msmq是微軟提供的訊息佇列,本來在windows系統中就存在,但是預設沒有開啟。需要開啟。
開啟安裝
開啟控制面板=>程式和功能=> 啟動或關閉windows功能 => Microsoft Message Queue(MSMQ)伺服器=>Microsoft Message Queue(MSMQ)伺服器核心
一般選擇:MSMQ Active Directory域服務繼承和MSMQ HTTP支援即可。
點選確定等待安裝成功。
名稱空間
需要引用System.Messaging.DLL
名稱空間:System.Messaging
官方資料文件:https://docs.microsoft.com/zh-cn/dotnet/api/system.messaging.messagequeue?view=netframework-4.7.2
示例程式碼
與上面queue同樣的示例方式,建立一個MSMQ類,實現建立訊息佇列,查詢資料,入列,出列功能:
1 /// <summary> 2 /// MSMQ訊息佇列 3 /// </summary> 4 class MSMQ 5 { 6 static string path = ".\\Private$\\myQueue"; 7 static MessageQueue queue; 8 public static void Createqueue(string queuePath) 9 { 10 try 11 { 12 if (MessageQueue.Exists(queuePath)) 13 { 14 Console.WriteLine("訊息佇列已經存在"); 15 //獲取這個訊息佇列 16 queue = new MessageQueue(queuePath); 17 } 18 else 19 { 20 //不存在,就建立一個新的,並獲取這個訊息佇列物件 21 queue = MessageQueue.Create(queuePath); 22 path = queuePath; 23 } 24 } 25 catch (Exception e) 26 { 27 Console.WriteLine(e.Message); 28 } 29 30 } 31 32 33 #region 獲取訊息佇列的數量 34 public static int GetMessageCount() 35 { 36 try 37 { 38 if (queue != null) 39 { 40 int count = queue.GetAllMessages().Length; 41 Console.WriteLine($"訊息佇列數量:{count}"); 42 return count; 43 } 44 else 45 { 46 return 0; 47 } 48 } 49 catch (MessageQueueException e) 50 { 51 52 Console.WriteLine(e.Message); 53 return 0; 54 } 55 56 57 } 58 #endregion 59 60 #region 傳送訊息到佇列 61 public static void SendMessage(string qStr) 62 { 63 try 64 { 65 //連線到本地佇列 66 67 MessageQueue myQueue = new MessageQueue(path); 68 69 //MessageQueue myQueue = new MessageQueue("FormatName:Direct=TCP:192.168.12.79//Private$//myQueue1"); 70 71 //MessageQueue rmQ = new MessageQueue("FormatName:Direct=TCP:121.0.0.1//private$//queue");--遠端格式 72 73 Message myMessage = new Message(); 74 75 myMessage.Body = qStr; 76 77 myMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) }); 78 79 //發生訊息到佇列中 80 81 myQueue.Send(myMessage); 82 83 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(); 84 Console.WriteLine($"訊息傳送成功: {qStr};當前執行緒id:{threadId}"); 85 } 86 catch (MessageQueueException e) 87 { 88 Console.WriteLine(e.Message); 89 } 90 } 91 #endregion 92 93 #region 連線訊息佇列讀取訊息 94 public static void ReceiveMessage() 95 { 96 MessageQueue myQueue = new MessageQueue(path); 97 98 99 myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) }); 100 101 try 102 103 { 104 105 //從佇列中接收訊息 106 107 Message myMessage = myQueue.Receive(new TimeSpan(10));// myQueue.Peek();--接收後不訊息從佇列中移除 108 myQueue.Close(); 109 110 string context = myMessage.Body.ToString(); 111 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(); 112 Console.WriteLine($"--------------------------訊息內容: {context};當前執行緒id:{threadId}"); 113 114 } 115 116 catch (System.Messaging.MessageQueueException e) 117 118 { 119 120 Console.WriteLine(e.Message); 121 122 } 123 124 catch (InvalidCastException e) 125 126 { 127 128 Console.WriteLine(e.Message); 129 130 } 131 132 } 133 #endregion 134 }
這裡說明一下path這個欄位,這是訊息佇列的檔案位置和佇列名稱,我這裡寫的“.”(點)就是代表的位置MachineName欄位,,代表本機的意思
然後TaskTest類修改成這個樣子:
1 class TaskTest 2 { 3 4 #region 訊息佇列的操作模擬 5 public static void MSMQMian() 6 { 7 MSMQ.Createqueue(".\\Private$\\myQueue"); 8 MSMQA(); 9 MSMQB(); 10 Console.WriteLine("MSMQ結束"); 11 } 12 private static async void MSMQA() 13 { 14 var task = Task.Run(() => 15 { 16 for (int i = 0; i < 20; i++) 17 { 18 MSMQ.SendMessage("MSMQA" + i); 19 } 20 }); 21 await task; 22 Console.WriteLine("MSMQA傳送完成,進行讀取:"); 23 24 while (MSMQ.GetMessageCount() > 0) 25 { 26 MSMQ.ReceiveMessage(); 27 } 28 } 29 30 private static async void MSMQB() 31 { 32 var task = Task.Run(() => 33 { 34 for (int i = 0; i < 20; i++) 35 { 36 MSMQ.SendMessage("MSMQB" + i); 37 } 38 }); 39 await task; 40 Console.WriteLine("MSMQB傳送完成,進行讀取:"); 41 42 while (MSMQ.GetMessageCount() > 0) 43 { 44 MSMQ.ReceiveMessage(); 45 } 46 } 47 #endregion
效果展示
本機檢視訊息佇列
建立成功的訊息佇列我們可以在電腦上檢視:我的電腦=>管理 =>計算機管理 =>服務與應用程式 =>訊息佇列 =>專用佇列就看到我剛才建立的訊息佇列