1. 程式人生 > >多執行緒並行設計模式之Master-Worker模式

多執行緒並行設計模式之Master-Worker模式

Master-Worker模式的核心思想是在於Master程序和Worker程序各自分擔各自的任務,協同完成資訊處理的模式 
使用一個圖例來簡單描述一下: 
Master-Worker工作原理 
如圖所示Master-Worker的工作機制就是Master接收到了一個任務,對任務進行拆分,並且分配給各個Worker,讓各個Worker之間各自執行各自的子任務,最後Worker們的返回結果交給Master進行彙總並且最後返回給任務的發起方.

Master-Worker是一種並行模式,Master是主要程序,Master中有維護著一個Worker程序佇列.Master把一個大的而且複雜的業務拆分成若干小的業務,只要是互不影響的都可以分而治之相互獨立.可以通過多執行緒或多程序甚至多機聯合計算,把拆分後的小業務交給更多的CPU或機器處理,通過併發/並行的方式提高整體業務的運算速度,壓榨系統性能來提高效率.

這樣做的好處就是,在某些業務場景下,尤其是業務比較複雜而且資料量較大的情況下,例如財務賬單結算和生成.一個賬單需要有很多關聯計算而且條件和引數眾多的時候,如果把所有的業務都放在一個任務中,效率是比較低的,資料量大的情況下往往耗時巨大,少則幾十分鐘,誇張的有整整一宿.這個在圓通速遞的羅漢財務系統中體現尤其嚴重..不信可以問問許雙芳.

具體的實現細節是Master任務切分 –> 交給任務佇列 –> Worker處理任務 
Master可以建立Worker執行緒池,分發任務給Worker.也可以只負責任務的接收和拆分,單不負責Worker的管理,通過其他第三方工具來負責Worker的監控和排程,這樣對於解耦方面比較有利.

OK 有了思想之後 剩下的就是通過程式碼來簡單實現一下

先建立Master和Worker 
Master:

package com.unsc.Master_Worker;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * Created by DELL on 2017/12/25.
 * Master-Worker中的 Master 類
 * @author
犯罪嫌疑人盧某 */
public class cMaster { /** * 建立一個ConcurrentLinkedQueue 用來盛放任務 * ConcurrentLinkedQueue是一個執行緒安全的無界執行緒安全佇列 * 對元素的排序遵循先進先出的原則 獲取元素時返回頭部元素 新增元素則為尾部 */ private Queue<Object> workersQueue = new ConcurrentLinkedQueue<>(); /** * 建立HashMap<K,V> 來存放Worker物件 */ private Map<String, Thread> workers = new HashMap<>(); /** * 建立ConcurrentHashMap來存放Worker計算後的結果集 */ private Map<String, Object> resultMap = new ConcurrentHashMap<>(); /** * Master類的有參構造 * @param worker Worker物件 * @param workersCount Worker的數量 用來建立對應數量的執行緒 */ public cMaster(cWorker worker , int workersCount ) { worker.setIntoWorkersQueue(this.workersQueue); worker.setResultMap(this.resultMap); /* * 建立對應數量的執行緒來模擬Worker * 至於顯式new Thread() 阿里的程式碼規約什麼的對我這個無業遊民毫無約束 隨它去 */ for (int i = 0 ; i < workersCount ; i ++) { workers.put(Integer.toString(i) , new Thread(worker , Integer.toString(i))); } } /** * 判斷是否所有的子任務都已完成 * @return 是否完成 */ public boolean isComplete() { for(Map.Entry<String, Thread> entry : workers.entrySet()) { if (entry.getValue().getState() != Thread.State.TERMINATED) { return false; } } return true; } /** * 向任務佇列中提交子任務 */ public void missionSubmit(Object mission) { workersQueue.add(mission); } /** * 返回子任務的結果集 * @return 子任務執行完畢的結果集 */ public Map<String, Object> getResultMap() { return resultMap; } /** * 啟動所有的Workers執行緒 開始平行計算 */ public void startAllWorkes() { for (Map.Entry<String,Thread> threadEntry : workers.entrySet()) { threadEntry.getValue().start(); } } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79

再是Worker:

package com.unsc.Master_Worker;

import java.util.Map;
import java.util.Queue;


/**
 * Created by DELL on 2017/12/25.
 * Master-Worker 中的 Worker類
 * @author 犯罪嫌疑人盧某
 */
public class cWorker implements Runnable{
    /**
     * Worker中的任務佇列
     */
    private Queue<Object> workQueue;
    /**
     * Worker中任務的結果集Map
     */
    private Map<String, Object> resultMap;
    /**
     * 實現Runnable介面重寫的Run方法
     */
    @Override
    public void run() {
        /*
         * 設定輪詢 獲取子任務並且處理
         */
        while (true) {
            Object mission = workQueue.poll();
            if (mission == null) {
                break;
            }
            /*
             * 模擬子任務的處理 並且把處理結果加入結果集
             */
            Object result = executeMission(mission);
            resultMap.put(Integer.toString(mission.hashCode()) , result);
        }
    }
    /**
     * 具體執行任務的業務邏輯 這裡是基本的Worker 具體的邏輯交給子類實現
     * @param mission 任務
     * @return 執行結果
     */
    public Object executeMission(Object mission) {
        return mission;
    }
    /**
     * 設定Worker的任務佇列
     * @param workersQueue 任務佇列
     */
    void setIntoWorkersQueue(Queue<Object> workersQueue) {
        this.workQueue = workersQueue;
    }
    /**
     * 設定Worker的結果集
     * @param resultMap 結果集
     */
    void setResultMap(Map<String, Object> resultMap) {
        this.resultMap = resultMap;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

再建立一個Worker的具體的實現類 這裡通過一個TrueWorker類重寫Worker中的executeMission方法執行具體的操作. 
TrueWorker:

package com.unsc.Master_Worker;

import java.util.Map;
import java.util.Set;

/**
 * Created by DELL on 2017/12/25.
 * Master-Worker模式的測試
 * @author 犯罪嫌疑人盧某
 */
public class MasterWorkerTest {
    public static void main(String[] args) {
        /*
         * 設定6個worker和100個子任務 for迴圈裡面的魔法值無視吧
         */
        cMaster master = new cMaster(new TrueWorker(), 4);
        for (int i = 1 ; i < 101 ; i++) {
            master.missionSubmit(i + 0.1);
        }
        /*
         * Master讓所有的Worker開始工作 並且在運作完畢後獲得結果集
         */
        //打一個時間戳 對比測試消耗的時間
        long startTime = System.currentTimeMillis();
        master.startAllWorkes();
        Map<String, Object> resultMap = master.getResultMap();
        long endTime = System.currentTimeMillis();
        double finalScore = 0;
        while (true) {
            Set<String> keySet = resultMap.keySet();
            String key = null;
            for (String s : keySet) {
                key = s;
                break;
            }
            //計算結果變數
            Double score = null;
            if (key != null) {
                //計算結果並且從結果集中刪除
                score = (Double) resultMap.get(key);
                resultMap.remove(key);
            }
            if (score != null) {
                finalScore += score;
            }
            if (master.isComplete() && resultMap.size() == 0) {
                System.out.println("任務執行完畢..");
                break;
            }
        }
        System.out.println("結果的值是 --->>> " + finalScore);
        System.out.println("總計耗時 : " + (endTime - startTime) + " ms");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

因為int型別計算太快了 我才用雙精度浮點來運算 排除掉建立執行緒和其他的操作單純就關注任務提交和獲得結果集的耗時 得到的執行結果如圖: 
MW方式

同樣的再用傳統的方法 再來檢測一下效率: 
程式碼如下 :

package com.unsc.Master_Worker;

/**
 * Created by DELL on 2017/12/25.
 * 直接單執行緒在main中計算 1 到 100的平方和
 * @author 犯罪嫌疑人盧某
 */
public class OldMethod {
    public static void main(String[] args) {
        /*
         * 開始的時間戳
         */
        long startTime = System.currentTimeMillis();
        double finalScore = 0;
        for(int i = 0 ; i < 101 ; i++) {
            double num = i + 0.1;
            finalScore += num * num;
        }
        System.out.println("結果是 : ---->>>> " + finalScore);
        long endTime = System.currentTimeMillis();
        System.out.println("耗時總計 : " + (endTime - startTime) + " ms");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

結果如圖: 
傳統方式

這裡存在一些效能差別 雖然很小 只是1ms的差距 但是在高併發大資料量的情況下 會有比較明顯的效能差距 只是單機上進行測試不是很明顯罷了..