1. 程式人生 > >java多執行緒Master-worker模式

java多執行緒Master-worker模式

Master-Worker模式是常用的平行計算模式。它的核心思想是系統由兩類程序協作工作:Master程序和Worker程序。Master負責接收和分配任務,Worker負責處理子任務。當各個Worker子程序處理完成後,會將結果返回給Master,由Master做歸納和總結。其好處是能將一個大任務分解成若干個小任務,並行執行,從而提高系統的吞吐量。

主要模組分為:

Master:中控排程作用,起到分配任務,彙總結果集。包含ConcurrentLinkedQueue容器承裝所有的任務,HashMap<String,Thread>容器承裝所有的worker物件,ConcurrentHashMap<String,Object>併發容器裝載每個worker併發處理任務的結果集。

worker:實現Runnable介面或繼承Thread實現多執行緒。每個worker物件需要有master的ConcurrentLinkedQueue的引用用於獲取任務,每個worker物件需要有ConcurrentHashMap<String,Object>的引用用於承裝返回結果

Main:呼叫Master進行測試

Task:封裝任務

示例程式碼:

Master類:

public class Master {

    //1.承裝任務的容器
    private ConcurrentLinkedQueue<Task> workQueue=new ConcurrentLinkedQueue<>();
    //2.使用HashMap承裝所有的worker物件
    private HashMap<String, Thread> workers=new HashMap<>();
    //3.使用一個容器承裝每一個worker並行執行任務的結果集
    private ConcurrentHashMap<String, Object> resultMap=new ConcurrentHashMap<>();
    //4.構造方法(執行任務的worker物件,和建立worker的執行緒數)
    public Master(Worker worker,int workerCount) {
        //每一個worker物件都需要有Master的引用workerQueue用於任務的領取,resultMap用於任務的提交
        worker.setWorkerQueue(this.workQueue);
        worker.setResultMap(this.resultMap);
        for (int i = 0; i < workerCount; i++) {
            //key代表每個worker的名稱,value表示每個worker
            workers.put("子節點"+i, new Thread(worker));
        }
    }
    //5.提交方法
    public void submit(Task task) {
        this.workQueue.add(task);
    }
    //6.需要一個執行方法(啟動應用程式 讓所有worker工作)
    public void execute() {
        for (Map.Entry<String, Thread> mt : workers.entrySet()) {
            mt.getValue().start();
        }
    }
    //7.判斷執行緒是否執行完畢
    public boolean isComplete() {
        for (Map.Entry<String, Thread> mt : workers.entrySet()) {
            if(mt.getValue().getState()!=Thread.State.TERMINATED) {
                return false;
            }
        }
        return true;
    }
    //8.返回結果集資料
    public int getResult() {
        int ret=0;
        for (Map.Entry<String, Object> mt : resultMap.entrySet()) {
            ret+=(Integer)mt.getValue();
        }
        return ret;
    }
}

 Worker類:

public class Worker implements Runnable{

    private ConcurrentLinkedQueue<Task> workQueue;
    private ConcurrentHashMap<String, Object> resultMap;

    public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
        this.workQueue=workQueue;
    }
    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
        this.resultMap=resultMap;
    }
    @Override
    public void run() {
        while (true) {
            Task input=workQueue.poll();
            if(input==null) break;
            //真正的去處理業務
            Object output=handle(input);
            this.resultMap.put(Integer.toString(input.getId()), output);
        }
    }
    private Object handle(Task input) {
        Object output=null;
        try {
            //表示處理Task任務的耗時,可能是資料的加工,也可能是操作資料庫。。。。
            Thread.sleep(500);
            output=input.getPrice();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }        
        return output;            
    }

}

Task類:

public class Task {
    private int id;
    private String name;
    private int price;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getPrice() {
        return price;
    }
    public void setPrice(int price) {
        this.price = price;
    }
    @Override
    public String toString() {
        return "Task [id=" + id + ", name=" + name + ", price=" + price + "]";
    }    
}

Main類:

public class Main {

    public static void main(String[] args) {
        Worker worker=new Worker();
        Master master=new Master(worker, 20);
        Random r=new Random();
        for (int i = 1; i <= 100; i++) {
            Task task=new Task();
            task.setId(i);
            task.setName("任務"+i);
            task.setPrice(r.nextInt(1000));
            master.submit(task);
        }
        master.execute();
        
        long start=System.currentTimeMillis();
        while(true) {
            if(master.isComplete()) {
                long end=System.currentTimeMillis()-start;
                int ret=master.getResult();
                System.out.println("最終結果:"+ret+"執行耗時:"+end);
                break;
            }
        }
    }
}

執行結果:

最終結果:50622執行耗時:2501