1. 程式人生 > >Azure Storage 系列(六)使用Azure Queue Storage

Azure Storage 系列(六)使用Azure Queue Storage

一,引言

  在之前介紹到 Azure Storage 第一篇文章中就有介紹到 Azure Storage 是 Azure 上提供的一項儲存服務,Azure 儲存包括 物件、檔案、磁碟、佇列和表儲存。這裡的提到的佇列(Queue)就是今天要分享的內容。

慣例,先來一些微軟的官方解釋

  1,什麼是 Azure Queue Storage?

  答:Azure 佇列儲存是一項實現基於雲的佇列的 Azure 服務。 每個佇列都保留一個訊息列表。 應用程式元件使用 REST API 或 Azure 提供的客戶端庫訪問佇列。 通常情況下,將有一個或多個“傳送方”元件以及一個或多個“接收方”元件。 傳送方元件將訊息新增到佇列。 接收方元件檢索佇列前面的訊息以進行處理。 下圖顯示多個將訊息新增到 Azure 佇列的傳送者應用程式以及一個檢索訊息的收件方應用程式。

  佇列中的訊息是最大為 64 KB 的位元組陣列。 任何 Azure 元件都不會解釋訊息內容。如果要建立結構化訊息,可以使用 XML 或 JSON 格式化訊息內容。 程式碼負責生成並解釋自定義格式。

--------------------我是分割線--------------------

Azure Blob Storage 儲存系列:

1,Azure Storage 系列(一)入門簡介

2,Azure Storage 系列(二) .NET Core Web 專案中操作 Blob 儲存

3,Azure Storage 系列(三)Blob 引數設定說明

4,Azure Storage 系列(四)在.Net 上使用Table Storage

5,Azure Storage 系列(五)通過Azure.Cosmos.Table 類庫在.Net 上使用 Table Storage  

6,Azure Storage 系列(六)使用Azure Queue Storage

二,正文

1,Azure Portal 上建立 Queue 

選擇 cnbateblogaccount 左側選單的 “Queue service=》Queues” ,點選 “+ Queue”

Queue name:“blogmessage”

點選 “OK”

2,新增對 Storage Queue 的相應方法

2.1,安裝 “Azure.Storage.Queues” 的Nuget

使用程式包管理控制檯進行安裝

Install-Package Azure.Storage.Queues -Version 12.4.2

2.2,建立IQueueService 介面,和 QueueService 實現類,Queue控制器方法等

 1 public interface IQueueService
 2     {
 3         /// <summary>
 4         /// 插入Message
 5         /// </summary>
 6         /// <param name="msg">msg</param>
 7         /// <returns></returns>
 8         Task AddMessage(string msg);
 9 
10         /// <summary>
11         /// 獲取訊息
12         /// </summary>
13         /// <returns></returns>
14         IAsyncEnumerable<string> GetMessages();
15 
16         /// <summary>
17         /// 更新訊息
18         /// </summary>
19         /// <returns></returns>
20         Task UpdateMessage();
21 
22         /// <summary>
23         /// 處理訊息
24         /// </summary>
25         /// <returns></returns>
26         Task ProcessingMessage();
27 
28 
29     }
IQueueService.cs
 1 public class QueueService : IQueueService
 2     {
 3         private readonly QueueClient _queueClient;
 4 
 5         public QueueService(QueueClient queueClient)
 6         {
 7             _queueClient = queueClient;
 8         }
 9 
10 
11 
12         /// <summary>
13         /// 新增訊息
14         /// </summary>
15         /// <param name="msg">訊息</param>
16         /// <returns></returns>
17         public async Task AddMessage(string msg)
18         {
19             // Create the queue
20             _queueClient.CreateIfNotExists();
21 
22             if (_queueClient.Exists())
23             {
24  
25                 // Send a message to the queue
26                  await _queueClient.SendMessageAsync(msg.EncryptBase64());
27             }
28         }
29 
30         public async IAsyncEnumerable<string> GetMessages()
31         {
32             if (_queueClient.Exists())
33             {
34                 // Peek at the next message
35                 PeekedMessage[] peekedMessage = await _queueClient.PeekMessagesAsync();
36                 for (int i = 0; i < peekedMessage.Length; i++)
37                 {
38                     //Display the message
39                     yield return string.Format($"Peeked message: '{peekedMessage[i].MessageText.DecodeBase64()}'") ;
40                 }
41             }
42         }
43 
44         /// <summary>
45         /// 處理訊息
46         /// </summary>
47         /// <returns></returns>
48         public async Task ProcessingMessage()
49         {
50             // 執行 getmessage(), 隊頭的訊息會變得不可見。
51             QueueMessage[] retrievedMessage = await _queueClient.ReceiveMessagesAsync();
52             try
53             {
54                 //處理訊息
55 
56 
57                 // 如果在30s內你沒有刪除這條訊息,它會重新出現在隊尾。
58                 // 所以正確處理一條訊息的過程是,處理完成後,刪除這條訊息
59                 await _queueClient.DeleteMessageAsync(retrievedMessage[0].MessageId, retrievedMessage[0].PopReceipt);
60             }
61             catch //(訊息處理異常)
62             { }
63         }
64 
65         /// <summary>
66         /// 更新已排隊的訊息
67         /// </summary>
68         /// <returns></returns>
69         public async Task UpdateMessage()
70         {
71             if (_queueClient.Exists())
72             {
73                 // Get the message from the queue
74                 QueueMessage[] message = await _queueClient.ReceiveMessagesAsync();
75 
76                 // Update the message contents
77                 await _queueClient.UpdateMessageAsync(message[0].MessageId,
78                         message[0].PopReceipt,
79                         "Updated contents".EncryptBase64(),
80                         TimeSpan.FromSeconds(60.0)  // Make it invisible for another 60 seconds
81                     );
82             }
83         }
84     }
QueueService.cs
 1 [Route("Queue")]
 2     public class QueueExplorerController : Controller
 3     {
 4 
 5         private readonly IQueueService _queueService;
 6 
 7         public QueueExplorerController(IQueueService queueSerivce)
 8         {
 9             this._queueService = queueSerivce;
10         }
11 
12         [HttpPost("AddQueue")]
13         public async Task<ActionResult> AddQueue()
14         {
15             string msg = $"我是新增進去的第一個訊息";
16             await _queueService.AddMessage(msg);
17             return Ok();
18         }
19 
20         [HttpGet("QueryQueue")]
21         public  ActionResult QueryQueue()
22         {
23             return Ok( _queueService.GetMessages());
24             
25         }
26 
27         [HttpPut("UpdateQueue")]
28         public async Task<ActionResult> UpdateQueue()
29         {
30             await _queueService.UpdateMessage();
31             return Ok();
32         }
33 
34         [HttpGet("ProcessingMessage")]
35         public async Task<ActionResult> ProcessingQueue()
36         {
37             await _queueService.ProcessingMessage();
38             return Ok();
39         }
40     }
QueueExplorerController.cs

重點:將新訊息新增到佇列的後面。可見性超時指定訊息應該對Dequeue和Peek操作不可見的時間。訊息內容必須是UTF-8編碼的字串,最大長度為64KB。

訊息的格式必須可以包含在具有UTF-8編碼。要在訊息中包含標記,訊息的內容必須為XML換碼或Base64編碼。在將訊息新增到佇列之前,將刪除訊息中所有未轉義或編碼的XML標記。

我們這裡使用Base64編碼

public static class StringExtensions
    {
        public static string EncryptBase64(this string s)
        {
            byte[] b = Encoding.UTF8.GetBytes(s);
            return Convert.ToBase64String(b);
        }

        public static string DecodeBase64(this string s)
        {
            byte[] b = Convert.FromBase64String(s);
            return Encoding.UTF8.GetString(b);
        }
    }

2.3,新增對 QueueService,以及QueueClient 的以來注入

services.AddSingleton(x => new QueueClient("DefaultEndpointsProtocol=https;AccountName=cnbateblogaccount;AccountKey=e2T2gYREFdxkYIJocvC4Wut7khxMWJCbQBp8tPM2EJt37QaUUlflTPAlkoJzIlY29aGYt8WW0xx1bckO4hLKJA==;EndpointSuffix=core.windows.net", "blogmessage"));
services.AddSingleton<IQueueService, QueueService>();

3,Postman 對相應介面進行測試

3.1,新增佇列訊息

我們新增一條 “我是新增進去的第一個訊息” 的Queue

 postman 中輸入 “localhost:9001/Queue/AddQueue”,點選 “Send”

接下來,我們可以在 VS 的 “Cloud Explorer” 檢視到對應的 “cnbateblogaccount” 的 Strorage Account,以及 “blogmessage” 的 Storage Queue

右鍵彈出選擇頁面,點選 “開啟”

 我們可以看懂新增進去的 Queue 的資訊 ,Queue 的過去時間因為我們進行設定,這裡預設是7天

3.2 查詢Queue

postman 中輸入 “localhost:9001/Queue/QueryQueue”,點選 “Send”,可以看到剛剛新增進去的Queue被查詢出來了

3.3,更新Queue

postman 中輸入 “localhost:9001/Queue/UpdateQueue”,點選 “Send”

注意:由於我們在更新 Queue 的時候,設定了 Queue 的不可見時間為60秒,所以在更新操作完成後去檢視 Queue 會找不到更新的那條Queue 的資訊,稍等一下再去檢視  就可以展示出更新的 Queue 的資訊

 更新的Queue的文字內容已經發生改變了

3.4 處理Queue

postman中輸入 “localhost:9001/Queue/ProcessingMessage”,點選 “Send”

注意:因為這裡只是做演示,所以就假象進行訊息處理,處理完成後,刪除這條訊息。

可以看到已經沒有了 Queue 的資訊了

Ok,今天的分享就先到此結束

三,結尾

github:https://github.com/yunqian44/Azure.Storage.git

作者:Allen 

版權:轉載請在文章明顯位置註明作者及出處。如發現錯誤,歡迎批評指正。

作者:Allen 版權:轉載請在文章明顯位置註明作者及出處。如發現錯誤,歡迎批評指正。