Java Thread系列(九)Master-Worker模式
阿新 • • 發佈:2018-05-06
線程 支持 需要 列表 deque 開啟 exc oid src
Java Thread系列(九)Master-Worker模式
Master-Worker模式是常用的並行設計模式.
一、Master-Worker 模式核心思想
Master-Worker 系統由兩個角色組成,Master 和 Worker,Master 負責接收和分配任務,Worker 負責處理子任務。任務處理過程中,Master 還負責監督任務進展和 Worker 的健康狀態;Master 將接收 Client 提交的任務,並將任務的進展匯總反饋給 Client。各角色關系如下圖:
二、Master-Worker 實現
(1) Master
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
public class Master {
//1. 應該有一個容器存放任務列表,這個容器需要支持高並發操作
private ConcurrentLinkedDeque<Task> taskQueue = new ConcurrentLinkedDeque<Task>();
//2. 應該有一個容器存放worker
private HashMap<String, Thread> workers = new HashMap<String, Thread>();
//3. 應該有一個容器存放結果集,這個容器需要支持高並發操作
private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
//4. 構造函數
public Master (Worker worker, int threadCount) {
//將任務列表和結果集傳遞給worker
worker.setTaskQueue(taskQueue);
worker.setResultMap(resultMap);
//初始化worder列表
for (int i = 0; i < threadCount; i++) {
workers.put("worker-" + i, new Thread(worker));
}
}
public Master (Worker worker) {
this(worker, Runtime.getRuntime().availableProcessors());
}
//5. 提交任務
public void submit (Task task) {
taskQueue.add(task);
}
//6. 執行方法 開啟所有的線程
public void execute () {
for(Map.Entry<String, Thread> me : workers.entrySet()) {
me.getValue().start();
}
}
//7. 判斷是否執行完畢
public boolean isComplete () {
for(Map.Entry<String, Thread> me : workers.entrySet()) {
if (me.getValue().getState() != Thread.State.TERMINATED)
return false;
}
return true;
}
//8. 處理結果集
public int getResult () {
int ret = 0;
for(Map.Entry<String, Object> me : resultMap.entrySet()) {
ret += (int)me.getValue();
}
return ret;
}
}
(2) Worker
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
public class Worker implements Runnable {
private ConcurrentLinkedDeque<Task> taskQueue;
private ConcurrentHashMap<String, Object> resultMap;
@Override
public void run() {
while (true) {
Task task = taskQueue.poll();
if (task == null) break;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結果集
resultMap.put(Integer.toString(task.getId()), handle(task));
}
}
private Object handle(Task task) {
return task.getCount();
}
public void setTaskQueue(ConcurrentLinkedDeque<Task> taskQueue) {
this.taskQueue = taskQueue;
}
public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
this.resultMap = resultMap;
}
}
(3) Task
public class Task {
private int id;
private String name;
private int count;
public Task() {}
public Task(int id, String name, int count) {
this.id = id;
this.name = name;
this.count = count;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return "Task{" + "id=" + id + ", name=‘" + name + ‘\‘‘ +
", count=" + count + ‘}‘;
}
}
(4) 測試
Master master = new Master(new Worker(), 1);
for (int i = 1; i <= 100; i++) {
master.submit(new Task(i, "task-" + i ,i));
}
master.execute();
long t1 = System.currentTimeMillis();
while (true) {
if (master.isComplete()) {
long t = System.currentTimeMillis() - t1;
System.out.printf("執行結果:%s;執行時間:%s", master.getResult(), t);
break;
}
}
每天用心記錄一點點。內容也許不重要,但習慣很重要!
Java Thread系列(九)Master-Worker模式