1. 程式人生 > >【c#】佇列(Queue)和MSMQ(訊息佇列)的基礎使用

【c#】佇列(Queue)和MSMQ(訊息佇列)的基礎使用

    首先我們知道佇列是先進先出的機制,所以在處理併發是個不錯的選擇。然後就寫兩個佇列的簡單應用。

Queue

名稱空間

    名稱空間:System.Collections,不在這裡做過多的理論解釋,這個東西非常的好理解。

    可以看下官方文件:https://docs.microsoft.com/zh-cn/dotnet/api/system.collections.queue?view=netframework-4.7.2

示例程式碼

我這裡就是為了方便記憶做了一個基本的例子,首先建立了QueueTest類:

包含了獲取佇列的數量,入隊和出隊的實現

 1  public class
QueueTest 2 { 3 public static Queue<string> q = new Queue<string>(); 4 5 #region 獲取佇列數量 6 public int GetCount() 7 { 8 9 return q.Count; 10 } 11 #endregion 12 13 #region 佇列新增資料 14 public
void IntoData(string qStr) 15 { 16 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(); 17 q.Enqueue(qStr); 18 Console.WriteLine($"佇列新增資料: {qStr};當前執行緒id:{threadId}"); 19 } 20 #endregion 21 22 #region
佇列輸出資料 23 24 public string OutData() 25 { 26 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(); 27 string str = q.Dequeue(); 28 Console.WriteLine($"佇列輸出資料: {str};當前執行緒id:{threadId}"); 29 return str; 30 } 31 #endregion 32 33 }

為了模擬併發情況下也不會出現重複讀取和插入混亂的問題所以寫了TaskTest類裡面開闢了兩個非同步執行緒進行插入和讀取:

 1     class TaskTest
 2     {
 3 
 4         #region 佇列的操作模擬
 5         public static void QueueMian()
 6         {
 7             QueueA();
 8             QueueB();
 9         }
10         private static async void QueueA()
11         {
12             QueueTest queue = new QueueTest();
13             var task = Task.Run(() =>
14             {
15                 for (int i = 0; i < 20; i++)
16                 {
17                     queue.IntoData("QueueA" + i);
18                 }
19             });
20             await task;
21             Console.WriteLine("QueueAA插入完成,進行輸出:");
22 
23             while (queue.GetCount() > 0)
24             {
25                 queue.OutData();
26             }
27         }
28 
29         private static async void QueueB()
30         {
31             QueueTest queue = new QueueTest();
32             var task = Task.Run(() =>
33             {
34                 for (int i = 0; i < 20; i++)
35                 {
36                     queue.IntoData("QueueB" + i);
37                 }
38             });
39             await task;
40             Console.WriteLine("QueueB插入完成,進行輸出:");
41 
42             while (queue.GetCount() > 0)
43             {
44                 queue.OutData();
45             }
46         }
47         #endregion
48 
49     }

效果展示

然後在main函式直接呼叫即可:

通過上面的截圖可以看出插入執行緒是無先後的。

這張去除也是執行緒無先後,但是資料是根據插入的資料順序取的,也就是說多執行緒取隨便取,但是取的資料是根據插入的順序取值。

MSMQ

msmq是微軟提供的訊息佇列,本來在windows系統中就存在,但是預設沒有開啟。需要開啟。

開啟安裝

開啟控制面板=>程式和功能=> 啟動或關閉windows功能 => Microsoft Message Queue(MSMQ)伺服器=>Microsoft Message Queue(MSMQ)伺服器核心

一般選擇:MSMQ Active Directory域服務繼承和MSMQ HTTP支援即可。

點選確定等待安裝成功。

名稱空間

需要引用System.Messaging.DLL

名稱空間:System.Messaging

官方資料文件:https://docs.microsoft.com/zh-cn/dotnet/api/system.messaging.messagequeue?view=netframework-4.7.2

示例程式碼

與上面queue同樣的示例方式,建立一個MSMQ類,實現建立訊息佇列,查詢資料,入列,出列功能:

  1  /// <summary>
  2     /// MSMQ訊息佇列
  3     /// </summary>
  4     class MSMQ
  5     {
  6         static string path = ".\\Private$\\myQueue";
  7         static MessageQueue queue;
  8         public static void Createqueue(string queuePath)
  9         {
 10             try
 11             {
 12                 if (MessageQueue.Exists(queuePath))
 13                 {
 14                     Console.WriteLine("訊息佇列已經存在");
 15                     //獲取這個訊息佇列
 16                     queue = new MessageQueue(queuePath);
 17                 }
 18                 else
 19                 {
 20                     //不存在,就建立一個新的,並獲取這個訊息佇列物件
 21                     queue = MessageQueue.Create(queuePath);
 22                     path = queuePath;
 23                 }
 24             }
 25             catch (Exception e)
 26             {
 27                 Console.WriteLine(e.Message);
 28             }
 29 
 30         }
 31 
 32 
 33         #region 獲取訊息佇列的數量
 34         public static int GetMessageCount()
 35         {
 36             try
 37             {
 38                 if (queue != null)
 39                 {
 40                     int count = queue.GetAllMessages().Length;
 41                     Console.WriteLine($"訊息佇列數量:{count}");
 42                     return count;
 43                 }
 44                 else
 45                 {
 46                     return 0;
 47                 }
 48             }
 49             catch (MessageQueueException e)
 50             {
 51 
 52                 Console.WriteLine(e.Message);
 53                 return 0;
 54             }
 55 
 56 
 57         }
 58         #endregion
 59 
 60         #region 傳送訊息到佇列
 61         public static void SendMessage(string qStr)
 62         {
 63             try
 64             {
 65                 //連線到本地佇列
 66 
 67                 MessageQueue myQueue = new MessageQueue(path);
 68 
 69                 //MessageQueue myQueue = new MessageQueue("FormatName:Direct=TCP:192.168.12.79//Private$//myQueue1");
 70 
 71                 //MessageQueue rmQ = new MessageQueue("FormatName:Direct=TCP:121.0.0.1//private$//queue");--遠端格式
 72 
 73                 Message myMessage = new Message();
 74 
 75                 myMessage.Body = qStr;
 76 
 77                 myMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
 78 
 79                 //發生訊息到佇列中
 80 
 81                 myQueue.Send(myMessage);
 82 
 83                 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
 84                 Console.WriteLine($"訊息傳送成功: {qStr};當前執行緒id:{threadId}");
 85             }
 86             catch (MessageQueueException e)
 87             {
 88                 Console.WriteLine(e.Message);
 89             }
 90         }
 91         #endregion
 92 
 93         #region 連線訊息佇列讀取訊息
 94         public static void ReceiveMessage()
 95         {
 96             MessageQueue myQueue = new MessageQueue(path);
 97 
 98 
 99             myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
100 
101             try
102 
103             {
104 
105                 //從佇列中接收訊息
106 
107                 Message myMessage = myQueue.Receive(new TimeSpan(10));// myQueue.Peek();--接收後不訊息從佇列中移除
108                 myQueue.Close();
109 
110                 string context = myMessage.Body.ToString();
111                 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString();
112                 Console.WriteLine($"--------------------------訊息內容: {context};當前執行緒id:{threadId}");
113 
114             }
115 
116             catch (System.Messaging.MessageQueueException e)
117 
118             {
119 
120                 Console.WriteLine(e.Message);
121 
122             }
123 
124             catch (InvalidCastException e)
125 
126             {
127 
128                 Console.WriteLine(e.Message);
129 
130             }
131 
132         }
133         #endregion
134     }

這裡說明一下path這個欄位,這是訊息佇列的檔案位置和佇列名稱,我這裡寫的“.”(點)就是代表的位置MachineName欄位,,代表本機的意思

然後TaskTest類修改成這個樣子:

 1 class TaskTest
 2     {
 3 
 4         #region 訊息佇列的操作模擬
 5         public static void MSMQMian()
 6         {
 7             MSMQ.Createqueue(".\\Private$\\myQueue");
 8             MSMQA();
 9             MSMQB();
10             Console.WriteLine("MSMQ結束");
11         }
12         private static async void MSMQA()
13         {
14             var task = Task.Run(() =>
15             {
16                 for (int i = 0; i < 20; i++)
17                 {
18                     MSMQ.SendMessage("MSMQA" + i);
19                 }
20             });
21             await task;
22             Console.WriteLine("MSMQA傳送完成,進行讀取:");
23 
24             while (MSMQ.GetMessageCount() > 0)
25             {
26                 MSMQ.ReceiveMessage();
27             }
28         }
29 
30         private static async void MSMQB()
31         {
32             var task = Task.Run(() =>
33             {
34                 for (int i = 0; i < 20; i++)
35                 {
36                     MSMQ.SendMessage("MSMQB" + i);
37                 }
38             });
39             await task;
40             Console.WriteLine("MSMQB傳送完成,進行讀取:");
41 
42             while (MSMQ.GetMessageCount() > 0)
43             {
44                 MSMQ.ReceiveMessage();
45             }
46         }
47         #endregion

 效果展示

本機檢視訊息佇列

建立成功的訊息佇列我們可以在電腦上檢視:我的電腦=>管理 =>計算機管理 =>服務與應用程式 =>訊息佇列 =>專用佇列就看到我剛才建立的訊息佇列