1. 程式人生 > >Java Thread系列(九)Master-Worker模式

Java Thread系列(九)Master-Worker模式

線程 支持 需要 列表 deque 開啟 exc oid src

Java Thread系列(九)Master-Worker模式

Master-Worker模式是常用的並行設計模式.

一、Master-Worker 模式核心思想

Master-Worker 系統由兩個角色組成,Master 和 Worker,Master 負責接收和分配任務,Worker 負責處理子任務。任務處理過程中,Master 還負責監督任務進展和 Worker 的健康狀態;Master 將接收 Client 提交的任務,並將任務的進展匯總反饋給 Client。各角色關系如下圖:

技術分享圖片

二、Master-Worker 實現

(1) Master

import java.util.HashMap;
import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; public class Master { //1. 應該有一個容器存放任務列表,這個容器需要支持高並發操作 private ConcurrentLinkedDeque<Task> taskQueue = new ConcurrentLinkedDeque<Task>(); //2. 應該有一個容器存放worker
private HashMap<String, Thread> workers = new HashMap<String, Thread>(); //3. 應該有一個容器存放結果集,這個容器需要支持高並發操作 private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); //4. 構造函數 public Master (Worker worker, int threadCount) { //將任務列表和結果集傳遞給worker
worker.setTaskQueue(taskQueue); worker.setResultMap(resultMap); //初始化worder列表 for (int i = 0; i < threadCount; i++) { workers.put("worker-" + i, new Thread(worker)); } } public Master (Worker worker) { this(worker, Runtime.getRuntime().availableProcessors()); } //5. 提交任務 public void submit (Task task) { taskQueue.add(task); } //6. 執行方法 開啟所有的線程 public void execute () { for(Map.Entry<String, Thread> me : workers.entrySet()) { me.getValue().start(); } } //7. 判斷是否執行完畢 public boolean isComplete () { for(Map.Entry<String, Thread> me : workers.entrySet()) { if (me.getValue().getState() != Thread.State.TERMINATED) return false; } return true; } //8. 處理結果集 public int getResult () { int ret = 0; for(Map.Entry<String, Object> me : resultMap.entrySet()) { ret += (int)me.getValue(); } return ret; } }

(2) Worker

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;

public class Worker implements Runnable {

    private ConcurrentLinkedDeque<Task> taskQueue;
    private ConcurrentHashMap<String, Object> resultMap;

    @Override
    public void run() {
        while (true) {
            Task task = taskQueue.poll();
            if (task == null) break;

            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回結果集
            resultMap.put(Integer.toString(task.getId()), handle(task));
        }
    }

    private Object handle(Task task) {
        return task.getCount();
    }

    public void setTaskQueue(ConcurrentLinkedDeque<Task> taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
        this.resultMap = resultMap;
    }
}

(3) Task

public class Task {
    private int id;
    private String name;
    private int count;

    public Task() {}    
    public Task(int id, String name, int count) {
        this.id = id;
        this.name = name;
        this.count = count;
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getCount() {
        return count;
    }
    public void setCount(int count) {
        this.count = count;
    }
    @Override
    public String toString() {
        return "Task{" + "id=" + id + ", name=‘" + name + ‘\‘‘ +
                ", count=" + count + ‘}‘;
    }
}

(4) 測試

Master master = new Master(new Worker(), 1);

for (int i = 1; i <= 100; i++) {
    master.submit(new Task(i, "task-" + i ,i));
}

master.execute();

long t1 = System.currentTimeMillis();
while (true) {
    if (master.isComplete()) {
        long t = System.currentTimeMillis() - t1;
        System.out.printf("執行結果:%s;執行時間:%s", master.getResult(), t);
        break;
    }
}

每天用心記錄一點點。內容也許不重要,但習慣很重要!

Java Thread系列(九)Master-Worker模式