1. 程式人生 > >流程控制:分布式並行任務流程控制

流程控制:分布式並行任務流程控制

time aid ges stat rdquo net 不同的 分布式部署 red

  • 背景:

目前工作中遇到一個比較急,又有點費事的工作任務:

1)目前系統中已經已經包含了一些比較完善的部分模塊,但是模塊之間沒有一個控制流程來管理,就造成程序沒有辦法自動化;

2)已經完成模塊中有幾個是采用分布式部署,但各個服務器之間又是采用的並行執行不同的任務(目的最大化利用服務器,節省處理總耗時):這寫對流程化控制帶來了一些控制繁瑣問題。

3)目前並不需要考慮太多穩定行的問題,但是流程控制程序必須考慮到高可用性(就是需要部署為HA)。

  • 目前已經擁有的功能模塊:

1)采集及更新工參、KPI、路測、掃頻等到數據庫:這些數據在ftp上,因此,每次更新需要從ftp上自動采集數據(當然是任務觸發時最新的數據)之後更新到數據庫中;

2)采集mr數據及解析mr,這裏分為了三個功能模塊:

2.1)采集mr功能:目前已經包含,但是功能還是不太完善,每次采集的觸發時間應該是當任務觸發時開啟采集(這樣避免了未領取到任務時盲目的采集,造成服務器性能下降其他分析慢,IO存儲不足問題),采集距離“采集任務”觸發時間最近且完整的數據;

2.2)mr大壓縮包解壓任務:目前系統中包含的mr數據文件格式比較復雜,可能包含大壓縮包套文件,或者達壓縮吧套小壓縮包的問題,最終需要加壓為最小壓縮包(也就是在解壓一次就是.xml文件);

2.3)mr解析:mr解析是一個分布式部署(部署在多臺服務器,按照公式:enb%服務器數量,得到的值來分配enb到具體哪臺服務器),但是每臺服務器與每臺服務器之間是沒有聯系的:每臺服務器只負責並行的處理分配到自己節點上的任務。

3)mr柵格化:由於上邊mr解析後分別存儲到自己節點服務器上的數據,因此這裏的mr柵格化數據也是分布式的部署在每臺節點服務器上。

  • 解決方案思路:

為此,我寫了一個代碼邏輯框架:

技術分享

文件Program.cs是代碼核心業務控制:

1)它支持部署HA:可以部署多臺服務器,和一臺服務器一樣工作。

2)核心業務控制思路:

 1         static void Main(string[] args)
 2         {
 3             while (true)
 4             {
 5                 // TaskLock在zookeeper或sqlserver數據中只存在唯一的一條記錄。
6 TaskLock taskLock = TaskLockBO.Get(); 7 8 if (taskLock.Lock == LockStatus.Locked) 9 { 10 // 獲取正在執行的任務。。。。 11 List<Task> taskItems = TaskBO.GetToDoTaskByTaskGroup(taskLock.DoingTaskGroup); 12 13 if (taskItems == null || taskItems.Count == 0) 14 { 15 // 修改taskLock.Lock=UnLock、taskLock.DoingTaskGroup=Guid.Empty 16 } 17 else 18 { 19 // 按照任務的優先級執行task 20 // 開始調度每臺計算服務器上的任務。。。 21 // 1)如果第一個待執行(todo)的任務是“工參導入或更新”,修改其任務狀態為doing。 22 // “工參導入或更新”服務只可能部署在一臺服務器上或者就是這裏實現,當獲取到歸屬自己的任務狀態是doing時,就開始從ftp上采集工參數據,並解析導入,完成修改任務狀態為done,失敗修改任務狀態為fail 23 // 2)如果第一個待執行(todo)的任務是“采集MR”,修改其任務狀態為doing。 24 // “采集MR”服務也是只可能部署在一臺服務器上(但不可能在這裏執行),當獲取到歸屬自己的任務狀態是doing時,就開始監控ftp,並采集ftp數據到本地,完成修改任務狀態為done,失敗修改任務狀態為fail 25 // 3)如果第一個待執行(todo)的任務是“解壓超大壓縮包”,修改其任務狀態為doing. 26 // “解壓超大壓縮包”服務也是只可能部署在一臺服務器上(但不可能在這裏執行),當獲取到歸屬自己的任務狀態是doing時,就開始循環遍歷采集的mr,若找到超大壓縮包就進行超大壓縮包解壓,完成修改任務狀態為done,失敗修改任務狀態為fail 27 // 4)如果第一個待執行(todo)的任務是“解析MR”,修改任務狀態為predoing,循環遍歷ftp目錄的數據按照分發規則把mr問價分發到不同的計算節點服務器指定的位置,並創建“解析mr子任務”給每臺解析mr服務器,並修改該任務狀態為doing 28 // 註意:這裏是分布式處理的,因此給所有子節點分配任務後統一修改所有“解析MR”任務狀態為doing(每個compute包含一個“mr解析”任務). 29 // “解析MR”服務部署在多個解析處理服務器上,當獲取到歸屬自己的節點的“解析mr”任務狀態是doing時,就開始獲取自己的節點下的“解析mr子任務”逐個處理,處理完成後修改歸屬自己的節點“mr解析”任務狀態為done,失敗修改任務狀態為fail 30 // 註意:這裏是分布式處理的,因此需要考慮到等待所有節點“mr解析”都完成後,才可以進行下一步。 31 // 5)如果第一個待執行(todo)的任務是“mr柵格化”,修改任務狀態為doing. 32 // 註意:這裏是分布式處理的,因此給所有子節點分配任務後統一修改所有“MR柵格化”任務狀態為doing(每個compute包含一個“MR柵格化”任務). 33 // “MR柵格化”服務部署在多個處理服務器上,當獲取到歸屬自己的節點的“mr柵格化”任務狀態是doing時,開始逐個處理自己節點上的“mr柵格化”任務,處理完成後修改歸屬自己的節點“mr柵格化”任務狀態為done,失敗修改任務狀態為fail 34 // 註意:這裏是分布式處理的,因此需要考慮到等待所有節點“mr柵格化”都完成後,才可以進行下一步。 35 } 36 } 37 else if (taskLock.Lock == LockStatus.UnLock) 38 { 39 // 嘗試獲取新的任務。 40 // 修改taskLock.lock=PreLock 41 42 43 // 添加任務成功,則修改taskLock.Lock=Locked、taskLock.DoingTaskGroup賦值;添加任務失敗,則修改taskLock.Lock=UnLock 44 } 45 46 // 5 分鐘輪詢一次。 47 Thread.Sleep(5 * 60 * 1000); 48 } 49 }

3)任務狀態&類型&定義包含:

    enum TaskType
    {
        /// <summary>
        /// 導入或更新工參、KPI等數據
        /// </summary>
        ImportSiteCellKpi = 0,
        /// <summary>
        /// 采集MR數據
        /// </summary>
        GatherMR = 1,
        /// <summary>
        /// 嘗試解壓包含多層壓縮包的MR數據
        /// </summary>
        DoUnZipMR = 2,        
        /// <summary>
        /// 解析入庫MR數據
        /// </summary>
        DoParserMR = 3,
        /// <summary>
        /// MR柵格化
        /// </summary>
        DoMRRaster = 4,
    }
    enum TaskStatus
    {
        Todo = 0,        PreDoing = 1,        Doing = 2,        Done = 3,        Fail = 4
    }
    class Task
    {
        /// <summary>
        /// 如果為同一批次批量處理流程,則TaskGroup為同一個Guid值。
        /// </summary>
        public Guid TaskGroup { get; set; }
        public int TaskId { get; set; }
        public TaskType TaskType { get; set; }
        public TaskStatus TaskStatus { get; set; }
        /// <summary>
        /// 任務優先級
        /// </summary>
        public int Priority { get; set; }
        public string ComputeIP { get; set; }
        public DateTime CreateTime { get; set; }
        public DateTime DoingTime { get; set; }
        public DateTime DoneTime { get; set; }
        public DateTime FailTime { get; set; }
    }

4)任務流程控制鎖(全局鎖):

    enum LockStatus
    {
        PreLock = 0,
        Locked = 1,
        UnLock = 2
    }
    /// <summary>
    /// 所有任務流程是否被Lock掉
    /// 1)當一批次任務未完成之前就不允許有任何新的一批任務開始執行,
    ///     必須等到一批次任務流程執行完成後才可以執行,否則將會導致數據執行的速度過慢,或者導致數據混亂情況。
    /// 2)整個系統中,要確保正在執行的任務流程只有唯一一個,否則系統將會造成性能底下,或者出現數據錯亂情況。
    /// </summary>
    class TaskLock
    {
        public LockStatus Lock { get; set; }
        /// <summary>
        /// 當開始執行新的一批次任務流程是,把該批次任務流程組編號寫入此處,同時修改TaskLock.isLock為true.
        /// </summary>
        public Guid DoingTaskGroup { get; set; }
    }

代碼下載:

鏈接:http://pan.baidu.com/s/1pKIYwl1 密碼:mbl7

流程控制:分布式並行任務流程控制