1. 程式人生 > >Master-Worker設計模式介紹

Master-Worker設計模式介紹

stat 方式 tint 代碼實現 exe ide port client shm

Master-Worker模式是常用的並行設計模式。核心思想是,系統由兩個角色組成,Master和Worker,Master負責接收和分配任務,Worker負責處理子任務。任務處理過程中,Master還負責監督任務進展和Worker的健康狀態;Master將接收Client提交的任務,並將任務的進展匯總反饋給Client。各角色關系如下圖

技術分享圖片

Master-Worker模式滿足於可以將大任務劃分為小任務的場景,是一種分而治之的設計理念。通過多線程或者多進程多機器的模式,可以將小任務處理分發給更多的CPU處理,降低單個CPU的計算量,通過並發/並行提高任務的完成速度,提高系統的性能。

技術分享圖片

具體細節如上圖,Master對任務進行切分,並放入任務隊列;然後,觸發Worker處理任務。實際操作中,任務的分配有多種形式,如Master主動拉起Workder進程池或線程池,並將任務分配給Worker;或者由Worker主動領取任務,這樣的Worker一般是常駐進程;還有一種解耦的方式,即Master指做任務的接收、切分和結果統計,指定Worker的數量和性能指標,但不參與Worker的實際管理,而是交由第三方調度監控和調度Worker。

代碼實現Master-Worker模式:

Master代碼:

 1 package com.hjf.master_worker;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 import java.util.concurrent.ConcurrentHashMap;
 6 import java.util.concurrent.ConcurrentLinkedQueue;
 7 
 8 /**
 9  * Master
10  * @author huangjianfei
11  */
12 public class Master 13 { 14 //1:應該有一個承載任務的集合 15 private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); 16 17 //2:使用hashmap去承載所有的worker對象 ThreadName------Worker 18 private HashMap<String,Thread> workers = new HashMap<>();
19 20 //3:使用一個容器承載每一個worker並行執行任務的結果集 21 private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<String, Object>(); 22 23 //4:構造方法 24 public Master(Worker worker,int workerCount){ 25 //在worker中添加兩個引用 workQueue用於任務的領取 resultMap用於任務的提交 26 worker.setWorkerQueue(this.workQueue); 27 worker.setResultMap(this.resultMap); 28 29 for (int i = 0; i < workerCount; i++) 30 { 31 workers.put("子節點 "+i, new Thread(worker)); 32 } 33 } 34 35 //5:提交方法 36 public void submit(Task task){ 37 workQueue.add(task); 38 } 39 40 //6:需要有一個執行的方法(啟動應用程序 讓所有的worker工作) 41 public void execute(){ 42 //遍歷workers 分別去執行每一個worker 43 for (Map.Entry<String,Thread> me: workers.entrySet()) 44 { 45 me.getValue().start(); 46 } 47 } 48 49 /** 50 * 判斷所有的worker是否執行完畢 51 */ 52 public boolean isCompleted() 53 { 54 //遍歷所有的worker 只要有一個沒有停止 那麽就代表沒有結束 55 for (Map.Entry<String,Thread> me: workers.entrySet()) 56 { 57 if(me.getValue().getState() != Thread.State.TERMINATED){ 58 return false; 59 } 60 } 61 return true; 62 } 63 64 /** 65 * 計算最終的結果集 66 * @return 67 */ 68 public int getResult(){ 69 int result = 0; 70 for (Map.Entry<String,Object> me : resultMap.entrySet()) 71 { 72 result += (Integer)me.getValue(); 73 } 74 return result; 75 } 76 }

Worker代碼實現:

 1 package com.hjf.master_worker;
 2 
 3 import java.util.concurrent.ConcurrentHashMap;
 4 import java.util.concurrent.ConcurrentLinkedQueue;
 5 
 6 /**
 7  * Worker
 8  * @author huangjianfei
 9  */
10 public class Worker implements Runnable
11 {
12     private ConcurrentLinkedQueue<Task> workQueue;
13     
14     private ConcurrentHashMap<String, Object> resultMap;
15     
16     public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue)
17     {
18         this.workQueue = workQueue;            
19     }
20 
21     public void setResultMap(ConcurrentHashMap<String, Object> resultMap)
22     {
23         this.resultMap = resultMap;
24     }
25 
26     @Override
27     public void run()
28     {
29         //處理一個個任務
30         while(true){
31             //從隊列中取出一個元素
32             Task input = this.workQueue.poll();
33             if(null == input) break;
34             //真正的去做業務處理
35             Object outPut = handle(input);
36             //存放任務的結果 
37             this.resultMap.put(String.valueOf(input.getId()), outPut);
38         }  
39     }
40 
41     //單獨抽出來 給子類重寫,更加靈活
42     public Object handle(Task input){
43         return null;
44     }
45     
46     
47     /**
48      * 處理業務 應該抽象出來 子類去具體實現業務邏輯
49      * @param input
50      */
51 //    private Object handle(Task input)
52 //    {
53 //        Object outPut = null;
54 //        if(null == input) return null;
55 //        try
56 //        {
57 //            //表示處理task任務的耗時,可能是數據的加工,也可能是操作數據庫
58 //            Thread.sleep(5000);
59 //            //模擬真實的業務場景
60 //            outPut = input.getPrice();
61 //        } catch (InterruptedException e)
62 //        {
63 //            e.printStackTrace();
64 //        }
65 //        return outPut;
66 //    }
67     
68 }

Task代碼實現:

 1 package com.hjf.master_worker;
 2 /**
 3  * 任務
 4  * @author huangjianfei
 5  */
 6 public class Task
 7 {
 8     private int id;
 9     private String name;
10     private int price;
11     public int getId()
12     {
13         return id;
14     }
15     public void setId(int id)
16     {
17         this.id = id;
18     }
19     public String getName()
20     {
21         return name;
22     }
23     public void setName(String name)
24     {
25         this.name = name;
26     }
27     public int getPrice()
28     {
29         return price;
30     }
31     public void setPrice(int price)
32     {
33         this.price = price;
34     }
35     
36 }

Worker子類,在以後的開發中可以按照自己的需求去設計相關的Worker的子類:

 1 package com.hjf.master_worker;
 2 
 3 public class MyWorker1 extends Worker
 4 {
 5     @Override 
 6     public Object handle(Task input)
 7     {
 8         Object outPut = null;
 9         if(null == input) return null;
10         try
11         {
12             //表示處理task任務的耗時,可能是數據的加工,也可能是操作數據庫
13             Thread.sleep(5000);
14             //模擬真實的業務場景
15             outPut = input.getPrice();
16         } catch (InterruptedException e)
17         {
18             e.printStackTrace();
19         }
20         return outPut;
21     }
22 }

Main測試類代碼:

 1 package com.hjf.master_worker;
 2 
 3 import java.util.Random;
 4 /**
 5  * 主線程測試類
 6  * @author huangjianfei
 7  */
 8 public class Main
 9 {
10     public static void main(String[] args)
11     {
12         System.out.println("我的機器可用的Processor數量:"+Runtime.getRuntime().availableProcessors());
13         // 使用worker子類實現具體的業務,更加靈活
14         Master master = new Master(new MyWorker1(), Runtime.getRuntime().availableProcessors()); 
15         Random r = new Random();
16         //提交100個任務
17         for (int i = 0; i <= 100; i++)
18         {
19             Task t = new Task();
20             t.setId(i);
21             t.setName("任務 "+i);
22             t.setPrice(r.nextInt(1000)); 
23             master.submit(t);
24         }
25         
26         //執行所有的worker
27         master.execute();
28         
29         long start = System.currentTimeMillis();//記錄時間
30         
31         
32         while(true){
33             //全部的worker執行結束的時候去計算最後的結果
34             if(master.isCompleted()){
35                 long end = System.currentTimeMillis() - start;//計算耗時
36                 //計算結果集
37                 int result = master.getResult();
38                 System.out.println("執行最終結果: "+result + " 執行耗時 "+end);
39                 break;
40             }
41         }
42         
43     }
44     
45 }

Master-Worker設計模式介紹