1. 程式人生 > >多執行緒設計模式:Master-Worker模式

多執行緒設計模式:Master-Worker模式

Master-Worker是常用的平行計算模式。它的核心思想是系統由兩類程序協作工作:Master程序和Worker程序。Master負責接收和分配任務,Worker負責處理子任務。當各個Worker子程序處理完成後,會將結果返回給Master,由Master作歸納總結。其好處就是能將一個大任務分解成若干個小任務,並行執行,從而提高系統的吞吐量。處理過程如下圖所示:

Master程序為主要程序,它維護一個Worker程序佇列、子任務佇列和子結果集。Worker程序佇列中的Worker程序不停從任務佇列中提取要處理的子任務,並將結果寫入結果集。    

根據上面的思想,我們來模擬一下這種經典設計模式的實現。

分析過程:

  1. 既然Worker是具體的執行任務,那麼Worker一定要實現Runnable介面
  2. Matser作為接受和分配任務,得先有個容器來裝載使用者發出的請求,在不考慮阻塞的情況下我們選擇ConcurrentLinkedQueue作為裝載容器
  3. Worker物件需要能從Master接收任務,它也得有Master ConcurrentLinkedQueue容器的引用
  4. Master還得有個容器需要能夠裝載所有的Worker,可以使用HashMap<String,Thread>
  5. Worker處理完後需要將資料返回給Master,那麼Master需要有個容器能夠裝載所有worker併發處理任務的結果集。此容器需要能夠支援高併發,所以最好採用ConcurrentHashMap<String,Object>
  6. 同理由於Worker處理完成後將資料填充進Master的ConcurrentHashMap,那麼它也得有一份ConcurrentHashMap的引用 

程式碼實現:

Task任務物件

public class Task {
    private int id;
    private String name;
    private int price;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getPrice() {
        return price;
    }

    public void setPrice(int price) {
        this.price = price;
    }
}

Master物件:

public class Master {
    //任務集合
    private ConcurrentLinkedQueue<Task> taskQueue = new ConcurrentLinkedQueue<>();

    //所有的處理結果
    private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<>();

    //所有的Worker集合
    private HashMap<String,Thread> workerMap = Maps.newHashMap();

    //構造方法,初始化Worker
    public Master(Worker worker,int workerCount){
        //每一個worker物件都需要有Master的引用,taskQueue用於任務的提取,resultMap用於任務的提交
        worker.setTaskQueue(this.taskQueue);
        worker.setResultMap(this.resultMap);
        for(int i = 0 ;i < workerCount; i++){
            //key表示worker的名字,value表示執行緒執行物件
            workerMap.put("worker"+i,new Thread(worker));
        }
    }

    //用於提交任務
    public void submit(Task task){
        this.taskQueue.add(task);
    }

    //執行方法,啟動應用程式讓所有的Worker工作
    public void execute(){
        for(Map.Entry<String,Thread> me : workerMap.entrySet()){
            me.getValue().start();
        }
    }

    //判斷所有的執行緒是否都完成任務
    public boolean isComplete() {
        for(Map.Entry<String,Thread> me : workerMap.entrySet()){
           if(me.getValue().getState() != Thread.State.TERMINATED){
               return false;
           }
        }
        return true;
    }

    //總結歸納 
    public int getResult(){
        int ret = 0;
        for (Map.Entry<String, Object> entry : resultMap.entrySet()) {
            ret+=(Integer) entry.getValue();
        }
        return ret;
    }
}

Worker物件:

public class Worker implements Runnable{
    private ConcurrentLinkedQueue<Task> taskQueue;
    private ConcurrentHashMap<String, Object> resultMap;

    public void setTaskQueue(ConcurrentLinkedQueue<Task> taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    @Override
    public void run() {
        while(true){
            Task executeTask = this.taskQueue.poll();
            if(executeTask == null) break;
            //真正的任務處理
            Object result = handle(executeTask);
            this.resultMap.put(executeTask.getName(),result);
        }
    }

    //核心處理邏輯,可以抽離出來由具體子類實現
    private Object handle(Task executeTask) {
        Object result = null;
        try {
            //表示處理任務的耗時....
            Thread.sleep(500);
            result = executeTask.getPrice();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }
}

客戶端呼叫:

public class Main {

    public static void main(String[] args) {
        //實際開發中多少個執行緒最好寫成Runtime.getRuntime().availableProcessors()
        Master master = new Master(new Worker(), 10);
        Random random = new Random();
        for(int i = 0 ;i <= 100 ;i++){
            Task task = new Task();
            task.setId(i);
            task.setName("任務"+i);
            task.setPrice(random.nextInt(1000));
            master.submit(task);
        }
        master.execute();
        long start = System.currentTimeMillis();
        while(true){
            if(master.isComplete()){
                long end = System.currentTimeMillis() - start;
                int ret = master.getResult();
                System.out.println("計算結果:"+ret+",執行耗時:"+end);
                break;
            }
        }
    }
}

在Worker物件中的核心處理業務邏輯handle()方法最好抽象成公共方法,具體實現由子類覆寫。