1. 程式人生 > >深入理解[Master-Worker模式]原理與技術

深入理解[Master-Worker模式]原理與技術

() .exe 思想 thead 常用 ica ces 類的方法 and

Master-Worker模式是常用的並行模式之一。它的核心思想是,系統由兩類進程協作工作:Master進程和Worker進程。Master進程負責接收和分配任務,Worker進程負責處理子任務。當各個Worker進程將子任務處理完成後,將結果返回給Master進程,由Master進程做歸納和匯總,從而得到系統的最終結果,其處理過程如圖1所示。

技術分享圖片

Master-Worker模式的好處,它能夠將一個大任務分解成若幹個小任務,並且執行,從而提高系統的吞吐量。而對於系統請求者Client來說,任務一旦提交,Master進程會分配任務並立即返回,並不會等待系統全部處理完成後再返回,其處理過程是異步的。因此Client不會出現等待現象。

1.Master-Worker的模式結構

Master-Worker模式是一種使用多線程進行數據結構處理的結構。

Master進程為主要進程,它維護了一個Worker進程隊列、子任務隊列和子結果集。Worker進程隊列中的Worker進程,不停地從任務隊列中提取要處理的子任務,並將子任務的處理結果寫入結果集。

2.Master-Worker的代碼實現

基於以上的思路實現一個簡易的master-worker框架。其中Master部分的代碼如下:

public class Master {
    //任務隊列
    protected Queue<Object> workQuery = new ConcurrentLinkedQueue<Object>();
    //worker進程隊列
    protected Map<String, Thread> threadMap = new HashMap<>();
    //子任務處理結果集
    protected Map<String, Object> resultMap = new ConcurrentHashMap<>();

    //是否所有的子任務都結束了
    public boolean isComplete() {
        for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
            if (entry.getValue().getState()!=Thread.State.TERMINATED){
                return false;
            }
        }
        return true;
    }

    //Master 的構造,需要一個Worker 進程邏輯,和需要的Worker進程數量
    public Master(Worker worker,int countWorker){
        worker.setWorkQueue(workQuery);
        worker.setResultMap(resultMap);
        for (int i = 0; i < countWorker; i++) {
            threadMap.put(Integer.toString(i),new Thread(worker));
        }
    }

    //提交一個任務
    public void submit(Object job){
        workQuery.add(job);
    }

    //返回子任務結果集
    public Map<String,Object> getResultMap(){
        return resultMap;
    }

    //開始運行所有的worker進程,進行處理
    public void  execute(){
        for (Map.Entry<String,Thread> entry : threadMap.entrySet()){
            entry.getValue().start();
        }
    }

}

對應的Worker進程的代碼實現:

public class Worker implements Runnable {
    //任務隊列,用於取得子任務
    protected Queue<Object> workQueue;
    //子任務處理結果集
    protected Map<String, Object> resultMap;

    public void setWorkQueue(Queue<Object> workQueue) {
        this.workQueue = workQueue;
    }

    public void setResultMap(Map<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    //子任務處理的邏輯,在子類中實現具體邏輯
    public Object handle(Object input) {
        return input;
    }

    @Override
    public void run() {
        while (true) {
            //獲取子任務
            Object input = workQueue.poll();
            if (input == null) {
                break;
            }
            //處理子任務
            Object re = handle(input);
            //將處理結果寫入結果集
            resultMap.put(Integer.toString(input.hashCode()), re);
        }
    }
}

以上兩段代碼已經展示了Master-Worker框架的全貌。應用程序通過重載 Worker.handle() 方法實現應用層邏輯。

例如,要實現計算1+2+..+100的結果,代碼如下:

public class PlusWorker extends Worker {

    @Override
    public Object handle(Object input) {
        Integer i = (Integer) input;
        return i+1;
    }

    public static void main(String[] args) {
        Master master = new Master(new PlusWorker(), 5);
        for (int i = 0; i < 100; i++) {
            master.submit(i); //提交一百個子任務
        }
        master.execute(); //開始計算
        int re = 0;
        Map<String, Object> resultMap = master.getResultMap();
        while (resultMap.size() > 0 || !master.isComplete()) {
            Set<String> keys = resultMap.keySet();
            String key = null;
            for (String k : keys) {
                key = k;
                break;
            }
            Integer i = null;
            if (key != null) {
                i = (Integer) resultMap.get(key);   //從結果集中獲取結果
            }
            if (i != null) {
                re += i;        //最終結果
            }
            if (key != null) {
                resultMap.remove(key);      //移除已經被計算過的項
            }
        }
        System.out.println("result: " + re);
    }

}

運行結果:

result: 5050

在應用層代碼中,創建了5個Worker工作進程和Worker工作實例PlusWorker。在提交了100個子任務後,便開始子任務的計算。這些子任務中,由生成的5個Worker進程共同完成。Master並不等待所有的Worker執行完畢,就開始訪問子結果集進行最終結果的計算,直到子結果集中所有的數據都被處理,並且5個活躍的Worker進程全部終止,才給出最終計算結果。

Master-Worker模式是一種將串行任務並行化的方法,被分解的子任務在系統中可以被並行處理。同時,如果有需要,Master進程不需要等待所有子任務都完成計算,就可以根據已有的部分結果集計算最終結果。

3.Amino框架提供的Master-Worker模式

在Amino框架中為Master-Worker模式提供了較為完善的實現和便捷的操作接口。Amino實現了兩套Master-Worker實現:一種是靜態的Master-Worker實現,另一種是動態實現。

靜態實現不允許在任務開始時添加新的子任務,而動態的Master-Worker允許在任務執行過程中,由Master或Worker添加新的子任務。

在Amino框架中,MasterWorkerFactory.newStatic(new Pow3(),20)用於創建靜態的Master-Worker模式,

第二個參數為Worker線程數,第一個參數為執行的任務類,該類需實現Doable<Integer,Integer>接口,該接口泛型的第一個類型為任務方法的參數類型,第二個類型為方法返回類型。MasterWorkerFactory.newDynamic(new Pow3Dyn())用於創建動態的Master-Worker模式,其中參數為實現DynamicWorker接口的實例。

submit()方法用於提交應用層任務,execute()方法將執行所有任務。

Amino框架需要自行下載,下載地址:https://sourceforge.net/projects/amino-cbbs/files/cbbs/0.5.3/,找到cbbs-java-bin-0.5.3.tar.gz 下載即可。

下面用Amino框架演示1+2+..+100的完整示例。

public class Pow3 implements Doable<Integer,Integer> {
    @Override
    public Integer run(Integer input) {
        //業務邏輯
        return input;
    }
}
public class Pow3Dyn implements DynamicWorker<Integer,Integer> {
    @Override
    public Integer run(Integer integer, WorkQueue<Integer> workQueue) {
        //業務邏輯
        return integer;
    }
}
public class AminoDemo {

    /
     * Amino 框架提供開箱即用的Master-Worker模式
     * 其它用法參考API文檔
     */
    public static void main(String[] args) {
        new AminoDemo().testDynamic();
        new AminoDemo().testStatic();
    }

    /
     * 靜態模式,不允許在任務開始後添加新的任務
     */
    public void testStatic(){
        MasterWorker<Integer,Integer> mw = MasterWorkerFactory.newStatic(new Pow3(),20);//靜態模式,可指定線程數
        List<MasterWorker.ResultKey> keyList = new Vector<>();
        for (int i = 1; i <= 100; i++) {
            keyList.add(mw.submit(i)); //傳參並調度任務,key用於取得任務結果
        }
        mw.execute();//執行所有任務
        int re = 0;
        while (keyList.size()> 0){ //不等待全部執行完成,就開始求和
            MasterWorker.ResultKey k = keyList.get(0);
            Integer i = mw.result(k); //由Key取得一個任務結果
            if (i!=null){
                re+=i;
                keyList.remove(0); //累加完成後
            }
        }
        System.out.println("result:"+re);
        mw.shutdown();//關閉master-worker,釋放資源
    }

    /
     * 動態模式,可在開始執行任務後繼續添加任務
     */
    public void testDynamic(){
        MasterWorker<Integer,Integer> mw = MasterWorkerFactory.newDynamic(new Pow3Dyn());//動態模式,可指定線程數
        List<MasterWorker.ResultKey> keyList = new Vector<>();
        for (int i = 1; i < 50; i++) {
            keyList.add(mw.submit(i)); //傳參並調度任務,key用於取得任務結果
        }
        mw.execute();
        for (int i = 50; i <= 100; i++) {
            keyList.add(mw.submit(i)); //傳參並調度任務,key用於取得任務結果
        }
        int re = 0;
        while (keyList.size()> 0){
            MasterWorker.ResultKey k = keyList.get(0);
            Integer i = mw.result(k); //由Key取得一個任務結果
            if (i!=null){
                re+=i;
                keyList.remove(0); //累加完成後
            }
        }
        System.out.println("result:"+re);
        mw.shutdown();
    }

}

運行結果:

result:5050
result:5050

MasterWorker類的方法摘要,其它請自行下載API文檔。cbbs-java-apidocs-0.5.3.tar.gz

方法摘要
boolean execute() Begin processing of the work items submitted.
boolean execute(long timeout, java.util.concurrent.TimeUnit unit) Begin processing of the work items submitted.
void finished() Indicate to the master/worker that there is not more work coming.
java.util.Collection<T> getAllResults() Obtain all of the results from the processing work items.
boolean isCompleted() Poll an executing master/worker for completion.
boolean isStatic() Determine if a master/worker is static.
int numWorkers() Get the number of active workers.
T result(MasterWorker.ResultKey k) Obtain the results from the processing of a work item.
void shutdown() Shutdown the master/worker.
MasterWorker.ResultKey submit(S w) Submit a work item for processing.
MasterWorker.ResultKey submit(S w, long timeout, java.util.concurrent.TimeUnit unit) Submit a work item for processing and block until it is either submitted successfully or the specified timeout period has expired.
boolean waitForCompletion() Wait until all workers have completed.
boolean waitForCompletion(long timeout, java.util.concurrent.TimeUnit unit) Wait until all workers have completed or the specified timeout period expires.

深入理解[Master-Worker模式]原理與技術