1. 程式人生 > >多執行緒下的設計模式之Master-Worker模式

多執行緒下的設計模式之Master-Worker模式

該模式可以簡單理解為:首先client將任務交給Master,Master中使用一個併發集合類來承載所有任務,使用一個集合去承載所有的Worker物件,並且有一個併發集合類來承載每一個Worker併發處理任務的結果集;每一個Worker是一個工作執行緒,所以首先要實現Runnable介面,每一個Worker物件還有Master中承載任務集合的引用來獲取任務,每一個Worker物件有一個Master中承載結果集的引用,使得工作執行緒執行完畢後將結果放到結果集中,三者之間的的關係如下所示:

 程式碼實現如下:

Task任務類

Master類

package masterworker;

import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue;

public class Master {     //1.承裝任務的集合     private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<>();     //2. 使用HaskMap承裝worker物件     private HashMap<String,Thread> workers = new HashMap<>();     //3.結果集     private ConcurrentHashMap<String, Object> resultMap =  new ConcurrentHashMap<>();     //4.構造     public Master(Worker worker, int workerCount){         for (int i = 0; i < workerCount; i++) {             //每一個worker物件都需要有master引用             worker.setWorkerQueue(this.workQueue);             worker.setResultMap(this.resultMap);             //key表示worker名,value表示執行緒執行物件             workers.put("子節點"+Integer.toString(i), new Thread(worker));                      }     }          //5.提交方法     public void submit(Task task){         this.workQueue.add(task);     }          //6.執行方法   啟動應用程式,讓所有worker工作     public void execute(){         for(Map.Entry<String, Thread> me : workers.entrySet()){             me.getValue().start();         }     }               //7. 判斷是否執行完畢     public boolean isComplete(){         for(Map.Entry<String,Thread> me : workers.entrySet()){             if(me.getValue().getState() != Thread.State.TERMINATED){                 return false;             }         }         return true;     }          //8.返回結果集資料     public int getResult(){         int ret = 0;         for(Map.Entry<String, Object> me: resultMap.entrySet()){             ret += (Integer) me.getValue();         }         return ret;     }      }

Worker類

package masterworker;

import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue;

public class Worker implements Runnable{     private ConcurrentLinkedQueue<Task> workQueue;     private ConcurrentHashMap<String, Object> resultMap;               public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {         this.workQueue = workQueue;              }

    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {         this.resultMap = resultMap;              }     @Override     public void run() {              while(true){             Task input = this.workQueue.poll();             if(input == null){                 break;             }             //真正做業務處理             Object output = handle(input);             this.resultMap.put(Integer.toString(input.getId()), output);         }              }          private Object handle(Task input){         Object output = null;         try {             //表示處理task任務耗時,資料加工,操作資料庫等             Thread.sleep(500);             output = input.getPrice();         } catch (InterruptedException e) {             e.printStackTrace();         }         return output;     } }

測試類Main

public class Main {     public static void main(String[] args) {         Master master = new Master(new Worker(), 10);         Random r = new Random();                  for (int i = 0; i < 100; i++) {             Task t = new Task();             t.setId(i);             t.setName("任務" + i);             t.setPrice(r.nextInt(1000));

            master.submit(t);         }                  master.execute();         long start = System.currentTimeMillis();         while(true){             if(master.isComplete()){                 long end = System.currentTimeMillis() - start;                 int ret = master.getResult();                 System.out.println("最終結果: "+ret +",執行耗時:" +end);                 break;             }         }     }

}