1. 程式人生 > >併發程式設計之Master-Worker模式

併發程式設計之Master-Worker模式

我們知道,單個執行緒計算是序列的,只有等上一個任務結束之後,才能執行下一個任務,所以執行效率是比較低的。

那麼,如果用多執行緒執行任務,就可以在單位時間內執行更多的任務,而Master-Worker就是多執行緒平行計算的一種實現方式。

它的思想是,啟動兩個程序協同工作:Master和Worker程序。

Master負責任務的接收和分配,Worker負責具體的子任務執行。每個Worker執行完任務之後把結果返回給Master,最後由Master彙總結果。(其實也是一種分而治之的思想,和forkjoin計算框架有相似之處,參看:並行任務計算框架forkjoin)

Master-Worker工作示意圖如下:

下面用Master-Worker實現計算1-100的平方和,思路如下:

  1. 定義一個Task類用於儲存每個任務的資料。
  2. Master生產固定個數的Worker,把所有worker存放在workers變數(map)中,Master需要儲存所有任務的佇列workqueue(ConcurrentLinkedQueue)和所有子任務返回的結果集resultMap(ConcurrentHashMap)。
  3. 每個Worker執行自己的子任務,然後把結果存放在resultMap中。
  4. Master彙總resultMap中的資料,然後返回給Client客戶端。
  5. 為了擴充套件Worker的功能,用一個MyWorker繼承Worker重寫任務處理的具體方法。

Task類:

package com.thread.masterworker;
public class Task {
    private int id;
    private String name;
    private int num;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }
}

Master實現:

package com.thread.masterworker;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Master {
    //所有任務的佇列
    private ConcurrentLinkedQueue<Task> workerQueue = new ConcurrentLinkedQueue<Task>();

    //所有worker
    private HashMap<String,Thread> workers = new HashMap<String,Thread>();

    //共享變數,worker返回的結果
    private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<String,Object>();

    //構造方法,初始化所有worker
    public Master(Worker worker,int workerCount){
        worker.setWorkerQueue(this.workerQueue);
        worker.setResultMap(this.resultMap);

        for (int i = 0; i < workerCount; i++) {
            Thread t = new Thread(worker);
            this.workers.put("worker-"+i,t);
        }
    }

    //任務的提交
    public void submit(Task task){
        this.workerQueue.add(task);
    }

    //執行任務
    public int execute(){
        for (Map.Entry<String, Thread> entry : workers.entrySet()) {
            entry.getValue().start();
        }

        //一直迴圈,直到結果返回
        while (true){
            if(isComplete()){
                return getResult();
            }
        }

    }

    //判斷是否所有執行緒都已經執行完畢
    public boolean isComplete(){
        for (Map.Entry<String, Thread> entry : workers.entrySet()) {
            //只要有任意一個執行緒沒有結束,就返回false
            if(entry.getValue().getState() != Thread.State.TERMINATED){
                return false;
            }
        }
        return true;
    }

    //處理結果集返回最終結果
    public int getResult(){
        int res = 0;
        for (Map.Entry<String,Object> entry : resultMap.entrySet()) {
            res += (Integer) entry.getValue();
        }
        return res;
    }

}

父類Worker:

package com.thread.masterworker;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Worker implements Runnable {

    private ConcurrentLinkedQueue<Task> workerQueue;

    private ConcurrentHashMap<String,Object> resultMap;

    public void setWorkerQueue(ConcurrentLinkedQueue<Task> workerQueue) {
        this.workerQueue = workerQueue;
    }

    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    @Override
    public void run() {
        while(true){
            //從任務佇列中取出一個任務
            Task task = workerQueue.poll();
            if(task == null) break;
            //處理具體的任務
            Object res = doTask(task);
            //把每次處理的結果放到結果集裡面,此處直接把num值作為結果
            resultMap.put(String.valueOf(task.getId()),res);
        }

    }

    public Object doTask(Task task) {
        return null;
    }
}

子類MyWorker繼承父類Worker,重寫doTask方法實現具體的邏輯:

package com.thread.masterworker;

public class MyWorker extends Worker {
    @Override
    public Object doTask(Task task) {
        //暫停0.5秒,模擬任務處理
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //計算數字的平方
        int num = task.getNum();
        return num * num;
    }
}

客戶端Client:

package com.thread.masterworker;

import java.util.Random;

public class Client {
    public static void main(String[] args) {

        Master master = new Master(new MyWorker(), 10);

        //提交n個任務到任務佇列裡
        for (int i = 0; i < 100; i++) {
            Task task = new Task();
            task.setId(i);
            task.setName("任務"+i);
            task.setNum(i+1);
            master.submit(task);
        }

        //執行任務
        long start = System.currentTimeMillis();
        int res = master.execute();
        long time = System.currentTimeMillis() - start;
        System.out.println("結果:"+res+",耗時:"+time);
    }
}

以上,我們用10個執行緒去執行子任務,最終由Master做計算求和(1-100的平方和)。每個執行緒暫停500ms,計算數字的平方值。

總共100個任務,分10個執行緒平行計算,相當於每個執行緒均分10個任務,一個任務的時間大概為500ms,故10個任務為5000ms,再加上計算平方值的時間,故稍大於5000ms。結果如下,

結果:338350,耗時:5084