1. 程式人生 > >程式設計師過關斬將--自定義執行緒池來實現文件轉碼

程式設計師過關斬將--自定義執行緒池來實現文件轉碼

背景

我司在很久之前,一位很久之前的同事寫過一個文件轉圖片的服務,具體業務如下:

  1. 使用者在客戶端上傳文件,可以是ppt,word,pdf 等格式,使用者上傳完成可以在客戶端預覽上傳的文件,預覽的時候採用的是圖片形式(不要和我說用別的方式預覽,現在已經來不及了)
  2. 當用戶把文件上傳到雲端之後(阿里雲),把文件相關的資訊記錄在資料庫,然後等待轉碼完成
  3. 伺服器有一個轉碼服務(其實就是一個windows service)不停的在輪訓待轉碼的資料,如果有待轉碼的資料,則從資料庫取出來,然後根據文件的網路地址下載到本地進行轉碼(轉成多張圖片)
  4. 當文件轉碼完畢,把轉碼出來的圖片上傳到雲端,並把雲端圖片的資訊記錄到資料庫
  5. 客戶端有預覽需求的時候,根據資料庫來判斷有沒有轉碼成功,如果成功,則獲取資料來顯示。

文件預覽的整體過程如以上所說,老的轉碼服務現在什麼問題呢?

  1. 由於一個文件同時只能被一個執行緒進行轉碼操作,所以老的服務採用了把待轉碼資料劃分管道的思想,一共有六個管道,對映到資料庫大體就是 Id=》管道ID 這個樣子。
  2. 一個控制檯程式,根據配置檔案資訊,讀取某一個管道待轉碼的文件,然後單執行緒進行轉碼操作
  3. 一共有六個管道,所以伺服器上起了六個cmd的黑視窗……
  4. 有的時候個別文件由於格式問題或者其他問題 轉碼過程中會卡住,具體的表現為:停止了轉碼操作。
  5. 如果程式卡住了,需要運維人員重新啟動轉碼cmd視窗(這種維護比較蛋疼)

後來機緣巧合,這個程式的維護落到的菜菜頭上,維護了一週左右,大約重啟了10多次,終於忍受不了了,重新搞一個吧。仔細分析過後,刨除實際文件轉碼的核心操作之外,整個轉碼流程其實還有很多注意點

  1. 需要保證轉碼服務不被卡住,如果和以前一樣就沒有必要重新設計了
  2. 儘量避免開多個程序的方式,其實在這個業務場景下,多個程序和多個執行緒作用是一致的。
  3. 每個文件只能被轉碼一次,如果一個文件被轉碼多次,不僅浪費了伺服器資源,而且還有可能會有資料不一致的情況發生
  4. 轉碼失敗的文件需要有一定次數的重試,因為一次失敗不代表第二次失敗,所以一定要給失敗的文件再次被操作的機會
  5. 因為程式不停的把文件轉碼成本地圖片,所以需要保證這些檔案在轉碼完成在伺服器上刪除,不然的話,時間長了會生成很多無用的檔案

說了這麼多,其實需要注意的點還是很多的。以整個的轉碼流程來說,本質上是一個任務池的生產和消費問題,任務池中的任務就是待轉碼的文件,生產者不停的把待轉碼文件丟進任務池,消費者不停的把任務池中文件轉碼完成。

執行緒池

這很顯然和執行緒池很類似,菜菜之前就寫過一個執行緒池的文章,有興趣的同學可以去翻翻歷史。今天我們就以這個執行緒池來解決這個轉碼問題。執行緒池的本質是初始化一定數目的執行緒,不停的執行任務。

 //執行緒池定義 
    public class LXThreadPool:IDisposable
    {
        bool PoolEnable = true; //執行緒池是否可用 
        List<Thread> ThreadContainer = null; //執行緒的容器
        ConcurrentQueue<ActionData> JobContainer = null; //任務的容器
        int _maxJobNumber; //執行緒池最大job容量

        ConcurrentDictionary<string, DateTime> JobIdList = new ConcurrentDictionary<string, DateTime>(); //job的副本,用於排除某個job 是否在執行中


        public LXThreadPool(int threadNumber,int maxJobNumber=1000)
        {
            if(threadNumber<=0 || maxJobNumber <= 0)
            {
                throw new Exception("執行緒池初始化失敗");
            }
            _maxJobNumber = maxJobNumber;
            ThreadContainer = new List<Thread>(threadNumber);
            JobContainer = new ConcurrentQueue<ActionData>();
            for (int i = 0; i < threadNumber; i++)
            {
                var t = new Thread(RunJob);
                t.Name = $"轉碼執行緒{i}";
                ThreadContainer.Add(t);
                t.Start();
            }
            //清除超時任務的執行緒
            var tTimeOutJob = new Thread(CheckTimeOutJob);
            tTimeOutJob.Name = $"清理超時任務執行緒";
            tTimeOutJob.Start();
        }

        //往執行緒池新增一個執行緒,返回執行緒池的新執行緒數
        public int AddThread(int number=1)
        {
            if(!PoolEnable || ThreadContainer==null || !ThreadContainer.Any() || JobContainer==null|| !JobContainer.Any())
            {
                return 0;
            }
            while (number <= 0)
            {
                var t = new Thread(RunJob);
                ThreadContainer.Add(t);
                t.Start();
                number -= number;
            }
            return ThreadContainer?.Count ?? 0;
        }

        //向執行緒池新增一個任務,返回0:新增任務失敗   1:成功
        public int AddTask(Action<object> job, object obj,string actionId, Action<Exception> errorCallBack = null)
        {
            if (JobContainer != null)
            {
                if(JobContainer.Count>= _maxJobNumber)
                {
                    return 0;
                }
                //首先排除10分鐘還沒轉完的
                var timeoOutJobList = JobIdList.Where(s => s.Value.AddMinutes(10) < DateTime.Now);
                if(timeoOutJobList!=null&& timeoOutJobList.Any())
                {
                    foreach (var timeoutJob in timeoOutJobList)
                    {
                        JobIdList.TryRemove(timeoutJob.Key,out DateTime v);
                    }
                }

                if (!JobIdList.Any(s => s.Key == actionId))
                {
                    if(JobIdList.TryAdd(actionId, DateTime.Now))
                    {
                        JobContainer.Enqueue(new ActionData { Job = job, Data = obj, ActionId = actionId, ErrorCallBack = errorCallBack });
                        return 1;
                    }
                    else
                    {
                        return 101;
                    }
                }
                else
                {
                    return 100;
                }            
            }
            return 0;
        }  

        private void RunJob()
        {
            while (JobContainer != null  && PoolEnable)
            {

                //任務列表取任務
                ActionData job = null;
                JobContainer?.TryDequeue(out job);
                if (job == null)
                {
                    //如果沒有任務則休眠
                    Thread.Sleep(20);
                    continue;
                }
                try
                {
                    //執行任務
                    job.Job.Invoke(job.Data);
                }
                catch (Exception error)
                {
                    //異常回調
                    if (job != null&& job.ErrorCallBack!=null)
                    {
                        job?.ErrorCallBack(error);
                    }

                }
                finally
                {
                    if (!JobIdList.TryRemove(job.ActionId,out DateTime v))
                    {

                    }
                }
            }
        }

        //終止執行緒池
        public void Dispose()
        {
            PoolEnable = false;
            JobContainer = null;
            if (ThreadContainer != null)
            {
                foreach (var t in ThreadContainer)
                {
                    //強制執行緒退出並不好,會有異常
                    t.Join();
                }
                ThreadContainer = null;
            }
        }

        //清理超時的任務
        private void CheckTimeOutJob()
        {
            //首先排除10分鐘還沒轉完的
            var timeoOutJobList = JobIdList.Where(s => s.Value.AddMinutes(10) < DateTime.Now);
            if (timeoOutJobList != null && timeoOutJobList.Any())
            {
                foreach (var timeoutJob in timeoOutJobList)
                {
                    JobIdList.TryRemove(timeoutJob.Key, out DateTime v);
                }
            }
            System.Threading.Thread.Sleep(60000);
        }
    }
    public class ActionData
    {
        //任務的id,用於排重
        public string ActionId { get; set; }
        //執行任務的引數
        public object Data { get; set; }
        //執行的任務
        public Action<object> Job { get; set; }
        //發生異常時候的回撥方法
        public Action<Exception> ErrorCallBack { get; set; }
    }

以上就是一個執行緒池的具體實現,和具體的業務無關,完全可以用於任何適用於執行緒池的場景,其中有一個注意點,我新加了任務的標示,主要用於排除重複的任務被投放多次(只排除正在執行中的任務)。當然程式碼不是最優的,有需要的同學可以自己去優化

使用執行緒池

接下來,我們利用以上的執行緒池來完成我們的文件轉碼任務,首先我們啟動的時候初始化一個執行緒池,並啟動一個獨立執行緒來不停的往執行緒池來輸送任務,順便起了一個監控執行緒去監視傳送任務的執行緒

       string lastResId = null;
        string lastErrorResId = null;

        Dictionary<string, int> ResErrNumber = new Dictionary<string, int>(); //轉碼失敗的資源重試次數
        int MaxErrNumber = 5;//最多轉碼錯誤的資源10次
        Thread tPutJoj = null;
        LXThreadPool pool = new LXThreadPool(4,100);
        public void OnStart()
        {
            //初始化一個執行緒傳送轉碼任務
            tPutJoj = new Thread(PutJob);
            tPutJoj.IsBackground = true;
            tPutJoj.Start();

            //初始化 監控執行緒
            var tMonitor = new Thread(MonitorPutJob);
            tMonitor.IsBackground = true;
            tMonitor.Start();
        }
       //監視發放job的執行緒
        private void MonitorPutJob()
        {
            while (true)
            {
                if(tPutJoj == null|| !tPutJoj.IsAlive)
                {
                    Log.Error($"傳送轉碼任務執行緒停止==========");
                    tPutJoj = new Thread(PutJob);
                    tPutJoj.Start();
                    Log.Error($"傳送轉碼任務執行緒重新初始化並啟動==========");
                }
                System.Threading.Thread.Sleep(5000);
            }

        }

        private void PutJob()
        {           
            while (true)
            {
                try
                {
                    //先搜尋等待轉碼的
                    var fileList = DocResourceRegisterProxy.GetFileList(new int[] { (int)FileToImgStateEnum.Wait }, 30, lastResId);
                    Log.Error($"拉取待轉碼記錄===總數:lastResId:{lastResId},結果:{fileList?.Count() ?? 0}");
                    if (fileList == null || !fileList.Any())
                    {
                        lastResId = null;
                        Log.Error($"待轉碼數量為0,開始拉取轉碼失敗記錄,重新轉碼==========");
                        //如果無待轉,則把出錯的 嘗試
                        fileList = DocResourceRegisterProxy.GetFileList(new int[] { (int)FileToImgStateEnum.Error, (int)FileToImgStateEnum.TimeOut, (int)FileToImgStateEnum.Fail }, 1, lastErrorResId);
                        if (fileList == null || !fileList.Any())
                        {
                            lastErrorResId = null;
                        }
                        else
                        {
                            // Log.Error($"開始轉碼失敗記錄:{JsonConvert.SerializeObject(fileList)}");
                            List<DocResourceRegister> errFilter = new List<DocResourceRegister>();
                            foreach (var errRes in fileList)
                            {
                                if (ResErrNumber.TryGetValue(errRes.res_id, out int number))
                                {
                                    if (number > MaxErrNumber)
                                    {
                                        Log.Error($"資源:{errRes.res_id} 轉了{MaxErrNumber}次不成功,放棄===========");
                                        continue;
                                    }
                                    else
                                    {
                                        errFilter.Add(errRes);
                                        ResErrNumber[errRes.res_id] = number + 1;
                                    }
                                }
                                else
                                {
                                    ResErrNumber.Add(errRes.res_id, 1);
                                    errFilter.Add(errRes);
                                }
                            }
                            fileList = errFilter;
                            if (fileList.Any())
                            {
                                lastErrorResId = fileList.Select(s => s.res_id).Max();
                            }
                        }
                    }
                    else
                    {
                        lastResId = fileList.Select(s => s.res_id).Max();
                    }

                    if (fileList != null && fileList.Any())
                    {
                        foreach (var file in fileList)
                        {
                            //如果 任務投放執行緒池失敗,則等待一面繼續投放
                            int poolRet = 0;
                            while (poolRet <= 0)
                            {
                                poolRet = pool.AddTask(s => {
                                    AliFileService.ConvertToImg(file.res_id + $".{file.res_ext}", FileToImgFac.Instance(file.res_ext));
                                }, file, file.res_id);
                                if (poolRet <= 0 || poolRet > 1)
                                {
                                    Log.Error($"發放轉碼任務失敗==========執行緒池返回結果:{poolRet}");
                                    System.Threading.Thread.Sleep(1000);
                                }
                            }
                        }
                    }
                    //每一秒去資料庫取一次資料
                    System.Threading.Thread.Sleep(3000);
                }
                catch
                {
                    continue;
                }

            }
        }

以上就是發放任務,執行緒池執行任務的所有程式碼,由於具體的轉碼程式碼涉及到隱私,這裡不在提供,如果有需要可以私下找菜菜索要,雖然我深知還有更優的方式,但是我覺得執行緒池這樣的思想可能會對部分人有幫助,其中任務超時的核心程式碼如下(採用了polly外掛):

 var policy= Policy.Timeout(TimeSpan.FromSeconds(this.TimeOut), onTimeout: (context, timespan, task) =>
                {
                    ret.State=Enum.FileToImgStateEnum.TimeOut;                   
                });
                policy.Execute(s=>{
                    .....
                });

把你的更優方案寫在留言區吧,2020年大家越來越好