1. 程式人生 > >.net core實踐系列之簡訊服務-Sikiro.SMS.Job服務的實現

.net core實踐系列之簡訊服務-Sikiro.SMS.Job服務的實現

前言

本篇會繼續講解Sikiro.SMS.Job服務的實現,在我寫第一篇的時候,我就發現我當時設計的架構裡Sikiro.SMS.Job這個可以選擇不需要,而使用MQ代替。但是為了說明排程任務使用實現也堅持寫了下。後面會一篇針對架構、實現優化的講解。

原始碼地址:https://github.com/SkyChenSky/Sikiro.SMS

Quartz的簡介

Quartz.NET是一款功能齊全的開源作業排程框架,小至的應用程式,大到企業系統都可以適用。Quartz是作者James House用JAVA語言編寫的,而Quartz.NET是從Quartz移植過來的C#版本。

Quartz.Net的作用

  • Quartz.Net是多執行緒的,允許多個JOB同時執行。
  • Quartz.Net可以進行持久化,結合管理後臺可以進行視覺化的監控
  • Quartz.Net提供API進行遠端操控,結合管理後臺可以進行運維管理

在一般企業,可以利用Quartz.Net框架做各種的定時任務,例如,資料遷移、跑報表等等。

Cron表示式

欄位名是否必填值範圍特殊字元
Seconds YES 0-59 , - * /
Minutes YES 0-59 , - * /
Hours YES 0-23 , - * /
Day of month YES
1-31 , - * ? / L W
Month YES 1-12 or JAN-DEC , - * /
Day of week YES 1-7 or SUN-SAT , - * ? / L #
Year NO empty, 1970-2099 , - * /

缺點

Quartz.Net的缺點很明顯,沒有自帶的管理後臺,而同款的Hangfir排程任務框架則會有更加良好的易用性。但是在Github上有不少人開源了Quartz.Net的管理後臺,對此作為了彌補。

其他

其他Quartz.Net的資訊可以看我之前記錄的一篇文章《Quartz.NET的使用(附原始碼)

Quartz.Net DEMO:https://github.com/SkyChenSky/QuartzDotNetDemo.git

業務流程

從MongoDB持久化的資料,查詢出狀態為待處理並且定時時間小於當前時間的資料。通過Mongo驅動提供的FindOneAndUpdate對文件進行原子性操作(更新中間狀態並查詢出剛更新的文件)。如果有資料則傳送到MQ,由Sikiro.SMS.Bus進行訂閱傳送,因為本次有資料,我認為可能還會有其他需要傳送的資料,因此立刻呼叫JOB自身方法,進行下一條需要處理的資料進行傳送。如果此次JOB的執行並沒有資料,那麼認為接下來一段時間沒有需要處理的資料,這次排程結束。

TimeSendSms示例

public class TimeSendSms : BaseJob
    {
        private readonly SmsService _smsService;
        private readonly IBus _bus;

        public TimeSendSms(SmsService smsService, IBus bus)
        {
            _smsService = smsService;
            _bus = bus;
        }

        protected override void ExecuteBusiness()
        {
            _smsService.GetToBeSend();

            if (_smsService.Sms != null)
                _bus.Publish(_smsService.Sms.MapTo<SmsModel, SmsQueueModel>());

            _smsService.ContinueDo(ExecuteBusiness);
        }

        protected override void OnException()
        {
            _smsService.RollBack();
        }
    }

模板模式

Job的輪詢處理流程基本相似,查詢出需要執行資料-遍歷業務處理-如果有異常則特殊處理,因此針對類似流程相同,但是實現有差異的程式,我們可以使用模板模式。

 public abstract class BaseJob : IJob
    {
        private void OnException(Action action)
        {
            try
            {
                action();
            }
            catch (Exception e)
            {
                e.WriteToFile();
                OnException();
            }
        }

        public Task Execute(IJobExecutionContext context)
        {
            OnException(ExecuteBusiness);

            return null;
        }

        protected virtual void OnException()
        {

        }

        protected abstract void ExecuteBusiness();
    }

Mongo的原子性

原子性

原子是物理概念,指的是指化學反應不可再分的基本微粒。而計算機領域的原子性強調的物件是操作(指令、事務)。我們所說的指令組是原子操作,意思要麼一起成功,要麼一起失敗。不允許2個指令裡,一個成功一個失敗的情況存在。

MongoDB 原子操作

MongoDB的原子操作就是要麼這個文件完整的儲存到Mongodb,要麼沒有儲存到Mongodb,不會出現查詢到的文件沒有儲存完整的情況。

MongoDB的文件的儲存,修改,刪除等操作都是原子性,除此之外還提供了FindOneAndDelete、FindOneAndUpdate、FindOneAndReplace等原子操作。

以FindOneAndUpdate為例,對某文件FindOneAndUpdate,可以文件B進行Update操作完成後返回出文檔B的結果,根據引數返回結果是更新前還是更新後(一般我們需要更新後)。

而這FindOneAndUpdate的操作對於我們更新到中間狀態的非常實用:

  • 避免進行Update後無法良好的查詢到剛Update的文件
  • 避免應用叢集部署時批量更新後,無法良好分配任務
  • 批量更新多個文件需要isolated標識隔離,全域性鎖在大併發情況下效能並不樂觀

雖然以上可以通過更新時標識版本號進行解決,這無疑增加實現難度。

MongoDB鎖機制

Mongodb併發操作又讀寫鎖來進行控制。

簡單來說

當進行讀操作的時候會加讀鎖,這個時候其他讀操作可以也獲得讀鎖,但是不能加寫鎖,也就是說不能進行寫操作。

當進行寫操作的時候會加寫鎖,這個時候其他操作無法加任何鎖,也就是說不能進行其他的讀操作和寫操作。

多個JOB的併發性

綜上所述,落實到我們應用場景,在部署多個排程任務服務,或者JOB多個執行緒去跑時,我們可以使用FindOneAndUpdate,每個排程任務每次只處理一個文件,Update操作的時候會進行寫鎖阻塞其他程序(程序)的寫操作。那麼就可以保證每個排程任務都可以只處理唯一一個有效的文件,避免重複處理。

下面是我的Sikiro.Nosql.Mongo的FindOneAndUpdate封裝示例,因為Update欄位的不友好,所以我封裝了一下Lambda表示式,ReturnDocument = ReturnDocument.After標識響應資料是更新前還是更新後的文件。

public T GetAndUpdate<T>(string database, string collection, Expression<Func<T, bool>> predicate, Expression<Func<T, T>> updateExpression)
        {
            var db = _mongoClient.GetDatabase(database);
            var col = db.GetCollection<T>(collection);

            var updateDefinitionList = MongoExpression<T>.GetUpdateDefinition(updateExpression);

            var updateDefinitionBuilder = new UpdateDefinitionBuilder<T>().Combine(updateDefinitionList);

            return col.FindOneAndUpdate(predicate, updateDefinitionBuilder, new FindOneAndUpdateOptions<T, T>
            {
                ReturnDocument = ReturnDocument.After
            });

SQL Server的UpdateSelect

SQL Server的操作也具有上述FindOneAndUpdate的功能,我們公司成他為UpdateSelect,下面是示例程式碼:

UPDATE TOP ( 100 )        SYS_USER WITH ( UPDLOCK, READPAST )SET     USER_STATUS = 1OUTPUT  INSERTED.[USER_NAME] ,        INSERTED.SYS_USERID ,        INSERTED.EMAILFROM    SYS_USERWHERE   CREATE_DATETIME < '2018-09-13'        AND USER_STATUS = 2;

結尾

本篇介紹了排程任務結合MongoDB原子操作的使用,使得排程任務服務可以具有良好的伸縮性。如果有任何建議與問題可以在下方評論反饋給我。