1. 程式人生 > >.NET Core中使用IHostedService結合隊列執行定時任務

.NET Core中使用IHostedService結合隊列執行定時任務

組隊 屬性獲取 響應時間 取數據 reg ddt sage 過多 ret

最近遇到了這樣的場景:每隔一段時間,需要在後臺使用隊列對一批數據進行業務處理。

Quartz.NET是一種選擇,在 .NET Core中,可以使用IHostedService執行後臺定時任務。在本篇中,首先嘗試把隊列還原到最簡單、原始的狀態,然後給出以上場景問題的具體解決方案。

假設一個隊列有8個元素。現在abcd依次進入隊列。

0 1 2 3 4 5 6 7
a b c d
head tail

ab依次出隊列。

0 1 2 3 4 5 6 7
c d
head tail

可以想象,隨著不斷地入列出列,head和tail的位置不斷往後,當tail在7號位的時候,雖然隊列裏還有空間,但此時數據就無法入隊列了。

如何才可以繼續入隊列呢

首先想到的是數據搬移。當數據無法進入隊列,首先讓隊列項出列,進入到另外一個新隊列,這個新隊列就可以再次接收數據入隊列了。但是,搬移整個隊列中的數據的時間復雜度為O(n),而原先出隊列的時間復雜度是O(1),這種方式不夠理想。

還有一種思路是使用循環隊列。當tail指向最後一個位置,此時有新的數據進入隊列,tail就來到頭部指向0號位置,這樣這個隊列就可以循環使用了。

0 1 2 3 4 5 6 7
h i j
head tail

現在a入棧。

0 1 2 3 4 5 6 7
h i j a
tail head

隊列有很多種實現

比如在生產消費模型中可以用阻塞隊列。當生產隊列為空的時候,為了不讓消費者取數據,生產隊列的Dequeue行為會被阻塞;而當生產隊列滿的時候,為了不讓更多的數據進來,生產隊列的Enqueue行為被阻塞。

線程安全的隊列叫並發隊列,如C#中的ConcurrentQueue

線程池內部也使用了隊列機制。因為CPU的資源是有限的,過多的線程會導致CPU頻繁地在線程之間切換。線程池內通過維護一定數量的線程來減輕CPU的負擔。當線程池沒有多余的線程可供使用,請求過來,一種方式是拒絕請求,另外一種方式是讓請求隊列阻塞,等到線程池內有線程可供使用,請求隊列出列執行。用鏈表實現的隊列是無界隊列(unbounded queue)

,這種做法可能會導致過多的請求在排隊,等待響應時間過長。用數組實現的隊列是有界隊列(bounded queue),當線程池已滿,請求過來就會被拒絕。對有界隊列來說數組的大小設置很講究。

來模擬一個數組隊列。

public class ArrayQueue
{
    private string[] items;
    private int n = 0; //數組長度
    private int head = 0;
    private int tail = 0;

    public ArrayQueue(int capacity)
    {
        n = capacity;
        items = new string[capacity];
    }

    public bool Enqueue(string item)
    {
        if(tail==n){
            return false;
        }
        items[tail] = item;
        ++tail;
        return true;
    }

    public string Dequeue()
    {
        if(head==null){
            return null;
        }
        string ret = items[head];
        ++head;
        return ret;
    }
}

以上就是一個最簡單的、用數組實現的隊列。

再次回到要解決的場景問題。解決思路大致是:實現IHostedService接口,在其中執行定時任務,每次把隊列項放到隊列中,並定義出隊列的方法,在其中執行業務邏輯。

關於隊列,通過以下的步驟使其在後臺運行。

  • 隊列項(MessageQueueItem):具備唯一標識、委托、添加到隊列中的時間等屬性
  • 隊列(MessageQueue):維護著Dictionary<string, MessageQueueItem>靜態字典集合
  • MessageQueueUtility類:決定著如何運行,比如隊列執行的間隔時間、垃圾回收
  • MessageQueueThreadUtility類:維護隊列線程,提供隊列在後臺運行的方法
  • Startup.cs中的Configure中調用MessageQueueThreadUtility中的方法使隊列在後臺運行

隊列項(MessageQueueItem)

public class MessageQueueItem
{
    public MessageQueueItem(string key, Action action, string description=null)
    {
        Key = key;
        Action = action;
        Description = description;
        AddTime = DateTime.Now;
    }

    public string Key{get;set;}
    public Actioin Action{get;set;}
    public DateTime AddTime{get;set;}
    public string Description{get;set;}
}

隊列(MessageQueue),維護著針對隊列項的一個靜態字典集合。

public class MessageQueue
{
    public static Dictionary<string, MessageQueueItem> MessageQueueDictionary = new Dictionary<string, MessageQueueItem>(StringComparer.OrdinalIgnoreCase);

    public static object MessageQueueSyncLock = new object();
    public static object OperateLock = new object();

    public static void OperateQueue()
    {
        lock(OperateLock)
        {
            var mq = new MessageQueue();
            var key = mq.GetCurrentKey();
            while(!string.IsNullOrEmpty(key))
            {
                var mqItem = mq.GetItem(key);
                mqItem.Action();
                mq.Remove(key);
                key = mq.GetCurrentKey();
            }
        }
    }

    public string GetCurrentKey()
    {
        lock(MessageQueueSyncLock)
        {
            return MessageQueueDictionary.Keys.FirstOrDefault();
        }
    }

    public MessageQueueItem GetItem(string key)
    {
        lock(MessageQueueSyncLock)
        {
            if(MessageQueueDictionary.ContainsKey(key))
            {
                return MessageQueueDictionary[key];
            }
            return null;
        }
    }

    public void Remove(string key)
    {
        lock(MessageQueueSyncLock)
        {
            if(MessageQueueDictionary.ContainsKey(key))
            {
                MessageQueueDictionary.Remove(key);
            }
        }
    }

    public MessageQueueItem Add(string key, Action actioin)
    {
        lock(MessageQueueSyncLock)
        {
            var mqItem = new MessageQueueItem(key, action);
            MessageQueueDictionary[key] = mqItem;
            return mqItem;
        }
    }

    public int GetCount()
    {
        lock(MessageQueueSyncLock)
        {
            return MessageQueueDictionary.Count;
        }
    }
}

MessageQueueUtility類, 決定著隊列運行的節奏。

public class MessageQueueUtility
{
    private readonly int _sleepMilliSeconds;
    public MessageQueueUtility(int sleepMilliSeconds=1000)
    {
        _sleepMilliSeconds = sleepMilliSeoncds;
    }

    ~MessageQueueUtility()
    {
        MessageQueue.OperateQueue();
    }

    public void Run
    {
        do
        {
            MessageQueue.OperateQueue();
            Thread.Sleep(_sleepMilliSeconds);
        } while(true)
    }
}

MessageQueueThreadUtility類,管理隊列的線程,並讓其在後臺運行。

public static class MessageQueueThreadUtility
{
    public static Dictionary<string, Thread> AsyncThreadCollection = new Dictioanry<string, Thread>();
    public static void Register(string threadUniqueName)
    {
        {
            MessageQueueUtility messageQueueUtility = new MessageQueueUtility();
            Thread messageQueueThread = new Thread(messageQueueUtility.Run){
                Name = threadUniqueName
            };
            AsyncThreadCollection.Add(messageQueueThread.Name, messageQueueThread);
        }

        AsyncThreadCollection.Values.ToList().ForEach(z => {
            z.IsBackground = true;
            z.Start();
        });
    }
}

Startup.cs中註冊。

public class Startup
{
    public IServiceProvider ConfigureServices(IServiceCollection services)
    {
        ...
    }

    public void Configure(IApplicationBuilder app, IHostingEnvironment env...)
    {
        RegisterMessageQueueThreads();
    }

    private void RegisterMessageQueueThreads()
    {
        MessageQueueThreadUtility.Register("");
    }
}

最後在IHostedService的實現類中把隊列項丟給隊列。

public class MyBackgroundSerivce : IHostedService, IDisposable
{
    private Timer _timer;
    public IServiceProvider Services{get;}
    
    public MyBackgroundService(IServiceProvider services)
    {
        Serivces = services;
    }

    public void Dispose()
    {
        _timer?.Dispose();
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        _timer = new Timer(DoWork, null, TimeSpan.Zero, TimeSpan.FromSeconds(10));
        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _timer?.Change(Timeout.Infinite,0);
        return Task.CompletedTask;
    }

    private void DoWork(object state)
    {
        using(var scope = Services.CreateScope())
        {
            using(var db = scope.ServiceProvider.GetRequiredService<MyDbContext>())
            {
                ...
                var mq = new MessageQueue();
                mq.Add("somekey", DealQueueItem);
            }
        }
    }

    private void DealQueueItem()
    {
        var mq = new MessageQueue();
        var key = mq.GetCurrentKey();
        var item = mq.GetItem(key);
        if(item!=null)
        {
            using(var scope = Services.CreateScope())
            {
                using(var db = scope.ServiceProvider.GetRequiredService<MyDbContext>())
                {
                    //執行業務邏輯
                }
            }
        }
    }
}

當需要使用上下文的時候,首先通過IServiceProviderCreateScope方法得到ISerivceScope,再通過它的ServiceProvider屬性獲取依賴倒置容器中的上下文服務。

以上,用IHostedService結合隊列解決了開篇提到的場景問題,如果您有很好的想法,我們一起交流吧。文中的隊列部分來自"盛派網絡"的Senparc.Weixin SDK源碼。

.NET Core中使用IHostedService結合隊列執行定時任務