1. 程式人生 > >Java Worker 設計模式

Java Worker 設計模式

Worker模式

想解決的問題

非同步執行一些任務,有返回或無返回結果

使用動機

有些時候想執行一些非同步任務,如非同步網路通訊、daemon任務,但又不想去管理這任務的生命周。這個時候可以使用Worker模式,它會幫您管理與執行任務,並能非常方便地獲取結果

結構

很多人可能為覺得這與executor很像,但executor是多執行緒的,它的作用更像是一個規劃中心。而Worker則只是個搬運工,它自己本身只有一個執行緒的。每個worker有自己的任務處理邏輯,為了實現這個目的,有兩種方式

1. 建立一個抽象的AbstractWorker,不同邏輯的worker對其進行不同的實現;
2. 對worker新增一個TaskProcessor不同的任務傳入不同的processor即可。


第二種方式worker的角色可以很方便地改變,而且可以隨時更換processor,可以理解成可”刷機”的worker    ^ ^。這裡我們使用第二種方式來介紹此模式的整體結構。

針對上圖,詳細介紹一下幾個角色:


  • ConfigurableWorker:顧名思義這個就是真正幹活的worker了。要實現自我生命週期管理,需要實現Runable,這樣其才能以單獨的執行緒執行,需要注意的是work最好以daemon執行緒的方式執行。worker裡面還包括幾個其它成員:taskQueue,一個阻塞性質的queue,一般BlockingArrayList就可以了,這樣任務是FIFO(先進先出)的,如果要考慮任務的優先順序,則可以考慮使用PriorityBlockingQueue;listeners,根據事件進行劃分的事件監聽者,以便於當一個任務完成的時候進行處理,需要注意的是,為了較高效地進行listener遍歷,這裡我推薦使用CopyOnWriteArrayList,免得每次都複製。其對應的方法有addlistener、addTask等配套方法,這個都不多說了,更詳細的可以看後面的示例程式碼。
  • WorkerTask:實際上這是一個抽象的工內容,其包括基本的id與,task的ID是Worker生成的,相當於遞wtte後的一個執回,當資料執行完了的時候需要使用這個id來取結果。而後面真正實現的實體task則包含任務處理時需要的資料。
  • Processor:為了實現可”刷機”的worker,我們將處理邏輯與worker分開來,processor的本職工作很簡單,只需要加工傳入的task資料即可,加工完成後觸發fireEvent(WorkerEvent.TASK_COMPLETE)事件,之後通過Future的get即可得到最終的資料。

另外再說一點,對於addTask,可以有一個overload的方法,即在輸入task的同時,傳入一個RejectPolice,這樣可以在size過大的時候做出拒絕操作,有效避免被撐死。

適用性/問題

這種設計能自動處理任務,並能根據任務的優先順序自動調節任務的執行順序,一個完全獨立的thread,你完全可以將其理解成一專門負責幹某種活的”機器人”。它可以用於處理一些定時、請求量固定均勻且對實時性要求不是太高的任務,如日誌記錄,資料分析等。當然,如果想提高任務處理的資料,可以生成多個worker,就相當於僱傭更多的人來為你幹活,非常直觀的。當然這樣一來,誰來維護這worker便成了一個問題,另外就目前這種設計下worker之間是沒有通訊與協同的,這些都是改進點。


那麼對於多個worker,有什麼組織方式呢?這裡我介紹三種,算是拋磚引玉:

流水線式worker(assembly-line worker)

就像生產車間上的流水線工人一樣,將任務切分成幾個小塊,每個worker負責自己的一部分,以提高整體的生產、產出效率,如下圖:




假設完成任務 t 需要的時間為:W(t)=n,那麼將任務分解成m份,流水線式的執行,每小份需要的時間便為 W(t/m)=n/m,那麼執行1000條任務的時間,單個為1000n,流水線長度為L,則用這種方式所用的時間為(1000-1)*(m-L+1)*n/m+n 其中L<m,由此可見,流水線的worker越多、任務越細分,工作的效率將越高。這種主方式的問題在於,如果一個worker出現問題,那麼整個流水線就將停止工作。而且任務的優先順序不能動態呼叫,必須事先告知。


多級反饋佇列(Multilevel Feedback Queue)

這是一個有Q1、Q2...Qn個多重流水線方式,從高到低分別程式碼不同的優先順序,高優先順序的worker要多於低優先順序的,一般是2的倍數,即Q4有16個worker、Q3有8個,後面類推。任務根據預先估計好的優先順序進入,如果任務在某步的執行過長,直接踢到下一級,讓出最快的資源。如下圖所示:


顯然這種方式的好處就在於可以動態地調整任務的優級,及時做出反應。當然,為了實現更好的高度,我們可以在低階裡增加一個閥值,使得放偶然放入低階的task可以有復活的機會^ ^。


MapReduce式

流水線雖然有一定的並行性,但總體來說仍然是序列的,因為只要有一個節點出了問題,那都是致命的錯誤。MapReduce是Google率先實現的一個分散式演算法,有非常好的並行執行效率。


如上圖所示,只要我們將Map與Reduce都改成Worker就行了,如MapWorker與ReduceWorker。這樣,可以看見,Map的過程是完全並行的,當然這樣就需要在Map與Reduce上的分配與資料組合上稍稍下一點功夫了。

樣例實現

這裡我們實現一個PageURLMiningWorker,對給定的URL,開啟頁面後,採取所有的URL,並反回結果進行彙總輸出。由於時間有限,這裡我只實現了單worker與MapReduce worker集兩種方式,有興趣的同學可以實現其它型別,如多級反饋佇列。注意!我這裡只是向大家展示這種設計模式,URL 抓取的效率不在本次考慮之列。

所有的程式碼可以在這裡獲取:https://github.com/sefler1987/javaworker

單Worker實現樣例

package com.alibaba.taobao.main;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;

import com.alibaba.taobao.worker.ConfigurableWorker;
import com.alibaba.taobao.worker.SimpleURLComparator;
import com.alibaba.taobao.worker.WorkerEvent;
import com.alibaba.taobao.worker.WorkerListener;
import com.alibaba.taobao.worker.WorkerTask;
import com.alibaba.taobao.worker.linear.PageURLMiningProcessor;
import com.alibaba.taobao.worker.linear.PageURLMiningTask;

/**
 * Linear version of page URL mining. It's slow but simple.
 * Average time cost for 1000 URLs is: 3800ms
 *
 * @author xuanyin.zy E-mail:[email protected]
 * @since Sep 16, 2012 5:35:40 PM
 */
public class LinearURLMiningMain implements WorkerListener {
    private static final String EMPTY_STRING = "";

    private static final int URL_SIZE_TO_MINE = 10000;

    private static ConcurrentHashMap<String, WorkerTask<?>> taskID2TaskMap = new ConcurrentHashMap<String, WorkerTask<?>>();

    private static ConcurrentSkipListSet<String> foundURLs = new ConcurrentSkipListSet<String>(new SimpleURLComparator());

    public static void main(String[] args) throws InterruptedException {
        long startTime = System.currentTimeMillis();

        ConfigurableWorker worker = new ConfigurableWorker("W001");
        worker.setTaskProcessor(new PageURLMiningProcessor());

        addTask2Worker(worker, new PageURLMiningTask("http://www.taobao.com"));
        addTask2Worker(worker, new PageURLMiningTask("http://www.xinhuanet.com"));
        addTask2Worker(worker, new PageURLMiningTask("http://www.zol.com.cn"));
        addTask2Worker(worker, new PageURLMiningTask("http://www.163.com"));

        LinearURLMiningMain mainListener = new LinearURLMiningMain();
        worker.addListener(mainListener);

        worker.start();

        String targetURL = EMPTY_STRING;
        while (foundURLs.size() < URL_SIZE_TO_MINE) {
            targetURL = foundURLs.pollFirst();

            if (targetURL == null) {
                TimeUnit.MILLISECONDS.sleep(50);
                continue;
            }

            PageURLMiningTask task = new PageURLMiningTask(targetURL);
            taskID2TaskMap.putIfAbsent(worker.addTask(task), task);

            TimeUnit.MILLISECONDS.sleep(100);
        }

        worker.stop();

        for (String string : foundURLs) {
            System.out.println(string);
        }

        System.out.println("Time Cost: " + (System.currentTimeMillis() - startTime) + "ms");
    }

    private static void addTask2Worker(ConfigurableWorker mapWorker_1, PageURLMiningTask task) {
        String taskID = mapWorker_1.addTask(task);
        taskID2TaskMap.put(taskID, task);
    }

    @Override
    public List<WorkerEvent> intrests() {
        return Arrays.asList(WorkerEvent.TASK_COMPLETE, WorkerEvent.TASK_FAILED);
    }

    @Override
    public void onEvent(WorkerEvent event, Object... args) {
        if (WorkerEvent.TASK_FAILED == event) {
            System.err.println("Error while extracting URLs");
            return;
        }

        if (WorkerEvent.TASK_COMPLETE != event)
            return;

        PageURLMiningTask task = (PageURLMiningTask) args[0];
        if (!taskID2TaskMap.containsKey(task.getTaskID()))
            return;

        foundURLs.addAll(task.getMinedURLs());

        System.out.println("Found URL size: " + foundURLs.size());

        taskID2TaskMap.remove(task.getTaskID());
    }
}



MapReduce實現樣例

package com.alibaba.taobao.main;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;

import com.alibaba.taobao.worker.ConfigurableWorker;
import com.alibaba.taobao.worker.SimpleURLComparator;
import com.alibaba.taobao.worker.WorkerEvent;
import com.alibaba.taobao.worker.WorkerListener;
import com.alibaba.taobao.worker.WorkerTask;
import com.alibaba.taobao.worker.mapreduce.Map2ReduceConnector;
import com.alibaba.taobao.worker.mapreduce.MapReducePageURLMiningTask;
import com.alibaba.taobao.worker.mapreduce.PageContentFetchProcessor;
import com.alibaba.taobao.worker.mapreduce.URLMatchingProcessor;

/**
 * MapReduce version of page URL mining. It's very powerful.
 *
 * @author xuanyin.zy E-mail:[email protected]
 * @since Sep 16, 2012 5:35:40 PM
 */
public class MapReduceURLMiningMain implements WorkerListener {
    private static final String EMPTY_STRING = "";

    private static final int URL_SIZE_TO_MINE = 10000;

    private static ConcurrentHashMap<String, WorkerTask<?>> taskID2TaskMap = new ConcurrentHashMap<String, WorkerTask<?>>();

    private static ConcurrentSkipListSet<String> foundURLs = new ConcurrentSkipListSet<String>(new SimpleURLComparator());

    public static void main(String[] args) throws InterruptedException {
        long startTime = System.currentTimeMillis();

        // four mapers
        List<ConfigurableWorker> mappers = new ArrayList<ConfigurableWorker>(4);

        ConfigurableWorker mapWorker_1 = new ConfigurableWorker("W_M1");
        ConfigurableWorker mapWorker_2 = new ConfigurableWorker("W_M2");
        ConfigurableWorker mapWorker_3 = new ConfigurableWorker("W_M3");
        ConfigurableWorker mapWorker_4 = new ConfigurableWorker("W_M4");
        mapWorker_1.setTaskProcessor(new PageContentFetchProcessor());
        mapWorker_2.setTaskProcessor(new PageContentFetchProcessor());
        mapWorker_3.setTaskProcessor(new PageContentFetchProcessor());
        mapWorker_4.setTaskProcessor(new PageContentFetchProcessor());

        mappers.add(mapWorker_1);
        mappers.add(mapWorker_2);
        mappers.add(mapWorker_3);
        mappers.add(mapWorker_4);

        // one reducer
        ConfigurableWorker reduceWorker_1 = new ConfigurableWorker("W_R1");
        reduceWorker_1.setTaskProcessor(new URLMatchingProcessor());

        // bind reducer to final result class
        MapReduceURLMiningMain main = new MapReduceURLMiningMain();
        reduceWorker_1.addListener(main);

        // initiate tasks
        addTask2Worker(mapWorker_1, new MapReducePageURLMiningTask("http://www.taobao.com"));
        addTask2Worker(mapWorker_2, new MapReducePageURLMiningTask("http://www.xinhuanet.com"));
        addTask2Worker(mapWorker_3, new MapReducePageURLMiningTask("http://www.zol.com.cn"));
        addTask2Worker(mapWorker_4, new MapReducePageURLMiningTask("http://www.sina.com.cn/"));

        // bind mapper to reduer
        Map2ReduceConnector connector = new Map2ReduceConnector(Arrays.asList(reduceWorker_1));
        mapWorker_1.addListener(connector);
        mapWorker_2.addListener(connector);
        mapWorker_3.addListener(connector);
        mapWorker_4.addListener(connector);

        // start all
        mapWorker_1.start();
        mapWorker_2.start();
        mapWorker_3.start();
        mapWorker_4.start();
        reduceWorker_1.start();

        String targetURL = EMPTY_STRING;
        int lastIndex = 0;
        while (foundURLs.size() < URL_SIZE_TO_MINE) {
            targetURL = foundURLs.pollFirst();

            if (targetURL == null) {
                TimeUnit.MILLISECONDS.sleep(50);
                continue;
            }

            lastIndex = ++lastIndex % mappers.size();
            MapReducePageURLMiningTask task = new MapReducePageURLMiningTask(targetURL);
            taskID2TaskMap.putIfAbsent(mappers.get(lastIndex).addTask(task), task);

            TimeUnit.MILLISECONDS.sleep(100);
        }

        // stop all
        mapWorker_1.stop();
        mapWorker_2.stop();
        mapWorker_3.stop();
        mapWorker_4.stop();
        reduceWorker_1.stop();

        for (String string : foundURLs) {
            System.out.println(string);
        }

        System.out.println("Time Cost: " + (System.currentTimeMillis() - startTime) + "ms");
    }

    private static void addTask2Worker(ConfigurableWorker mapWorker_1, MapReducePageURLMiningTask task) {
        String taskID = mapWorker_1.addTask(task);
        taskID2TaskMap.put(taskID, task);
    }

    @Override
    public List<WorkerEvent> intrests() {
        return Arrays.asList(WorkerEvent.TASK_COMPLETE, WorkerEvent.TASK_FAILED);
    }

    @Override
    public void onEvent(WorkerEvent event, Object... args) {
        if (WorkerEvent.TASK_FAILED == event) {
            System.err.println("Error while extracting URLs");
            return;
        }

        if (WorkerEvent.TASK_COMPLETE != event)
            return;

        MapReducePageURLMiningTask task = (MapReducePageURLMiningTask) args[0];
        if (!taskID2TaskMap.containsKey(task.getTaskID()))
            return;

        foundURLs.addAll(task.getMinedURLs());

        System.out.println("Found URL size: " + foundURLs.size());

        taskID2TaskMap.remove(task.getTaskID());
    }
}



結果對比

Y軸為抓取X軸URL個數所用的時間

總結

我們可以看到,worker模式組合是非常靈活的,它真的就像一個活生生的工人,任你調配。使用worker,我們可以更方便地實現更復雜的結構。