1. 程式人生 > >(架構設計)觀察者模式+redis佇列實現不同專案之間資料的互動

(架構設計)觀察者模式+redis佇列實現不同專案之間資料的互動

一,簡介
最近做一個專案,主要功能是根據一些關鍵詞去百度爬取一些資料,如pv,uv等等有價值的資料,或者對應的URL等百度排名。
我們小組主要負責的是前端的功能,此前端非WEB前端,我們主要將使用者匯入的資料進行封轉,然後轉換為protobuf的傳輸格式物件,
最後儲存到redis的佇列中。
而另一個小組(由技術總監那邊負責的)則是負責爬蟲的業務,他會定時掃描redis佇列的資料進行資料的抓取。

所以,會有兩個java工程來協同工作。其中對接的橋樑就是redis的佇列。
我們這邊的專案會引入技術總監那邊的一個jar,這個jar包的主要功能將資料提交到Redis佇列,
我們需要傳遞過去的引數有佇列名稱,和list集合資料。(list中的集合是protobuf的傳輸物件)

那邊的工程就會去掃描佇列裡面的資料,從佇列lpop一個數據,直到佇列裡面的資料讀完,然後開始呼叫回撥的方法。
我們這邊會去註冊handler來響應那邊的回撥。(這裡使用到了觀察者模式)

二,流程
keyword,url是需要爬取的資料。
1,kw獲取資料,對資料進行清洗的工作,然後儲存到資料庫,資料庫裡面的對應關係是taskid - > keyword,url [一個任務下面會有n個關鍵詞,url連線]
2,定時器定時掃描未開始執行的任務,然後從根據任務的id獲取對應下的所有keyword,url,封裝為protobuf物件,並新增到集合中,最後將list扔到佇列。
3,註冊佇列,只有被註冊過的佇列名,ce那邊才會去掃描該佇列下的資料。
4,註冊handler,處理ce那邊的回撥。
5,ce那邊處理redis的資料,知道資料沒有了,就會呼叫我們註冊的handler,將任務的id發給我們。
6,我們根據這個id再繼續獲取該任務id下面的所有keyword和url,然後根據他們的主鍵來獲取爬蟲抓取回來的資料。

三,程式碼的實現
兩個專案的對接的是redis資料庫,kw負責將前端的資料放到redis,ce取redis的資料,這個時候kw是生產者,ce是消費者。
【ce抓取到資料儲存到資料庫,然後kw去爬蟲資料庫裡面取資料,這個時候ce是生產者,kw是消費者】

如何實現上述的過程,其實是由技術總監提供的jar中的TaskDirector類來實現的。

public class TaskDirector {
    private static final String QUEUE_TASK_SPLIT = "_";
    private static final String QUEUE_TASK_LIST_END = "_task"
; private static final String QUEUE_TASK_LIST_SUBMITED_END = "_submited"; private static final String QUEUE_TASK_LIST_COMPLETED_END = "_completed"; private static QueueClient client; private static List<String> watchList = null; private static ScheduledExecutorService executor; private static ITaskCompleteHandler handler; private static InitialLock lock = new InitialLock(); public TaskDirector() { } public static void intialize(final String hosts) { lock.doInit(new Runnable() { public void run() { TaskDirector.client = QueueManager.getClient(hosts, ""); TaskDirector.executor = Executors.newScheduledThreadPool(20); TaskDirector.watchList = new ArrayList(30); } }); } public static void registerWatchQueue(final String queueName) { try { if(watchList.contains(queueName)) { return; } watchList.add(queueName); executor.scheduleAtFixedRate(new Runnable() { public void run() { try { TaskDirector.doWatchQueue(queueName); } catch (Exception var2) { ; } } }, 10L, 60L, TimeUnit.SECONDS); } catch (Exception var2) { ; } } public static void registerHandler(ITaskCompleteHandler h) { handler = h; } public static void submitTaskDirect(String queueName, CrawlingTask task) { if(task != null && StringUtils.isNotEmpty(task.getTaskId()) && task.getProtos() != null && task.getProtos().size() > 0) { try { Iterator var3 = task.getProtos().iterator(); while(var3.hasNext()) { ProtoEntity p = (ProtoEntity)var3.next(); byte[] data = p.toByteArray(); client.enqueue(queueName, data); } } catch (Exception var5) { ; } } } public static void submitTask(String queueName, CrawlingTask task) { boolean hasWatched = false; Iterator var4 = watchList.iterator(); while(var4.hasNext()) { String p = (String)var4.next(); if(p.equals(queueName)) { hasWatched = true; } } if(hasWatched) { if(task != null && StringUtils.isNotEmpty(task.getTaskId()) && task.getProtos() != null && task.getProtos().size() > 0) { try { var4 = task.getProtos().iterator(); while(var4.hasNext()) { ProtoEntity p1 = (ProtoEntity)var4.next(); byte[] data = p1.toByteArray(); client.enqueueX(queueName, getTaskQueueName(queueName, task.getTaskId()), data); } client.enqueueX(queueName, getTaskListName(queueName), task.toByteArray()); client.enqueueX(queueName, getTaskListSubmitedName(queueName), formatBytes(task.getTaskId())); } catch (Exception var6) { ; } } } } public static void cancelTask(String queueName, CrawlingTask task) { List tasksOrdered = taskList(queueName, false); for(int i = 0; i < tasksOrdered.size(); ++i) { if(((CrawlingTask)tasksOrdered.get(i)).equals(task.getTaskId())) { if(i == tasksOrdered.size() - 1) { client.clearQueue(queueName); } client.clearQueue(getTaskQueueName(queueName, task.getTaskId())); client.queueTrimX(queueName, getTaskListName(queueName), (long)(i + 1), (long)(i - (tasksOrdered.size() - 1))); if(handler != null) { handler.onTaskCancelled(queueName, task); } break; } } } public static List<CrawlingTask> upgradeTask(String queueName, CrawlingTask task) { List tasksOrdered = taskList(queueName, false); for(int i = 0; i < tasksOrdered.size() - 1; ++i) { if(((CrawlingTask)tasksOrdered.get(i)).getTaskId().equals(task.getTaskId())) { client.queueTrimX(queueName, getTaskListName(queueName), (long)(i + 1), (long)(i - (tasksOrdered.size() - 1))); client.queueInsertX(queueName, getTaskListName(queueName), ((CrawlingTask)tasksOrdered.get(i + 1)).toByteArray(), ((CrawlingTask)tasksOrdered.get(i)).toByteArray()); break; } } return taskList(queueName, false); } public static List<CrawlingTask> taskList(String queueName, boolean needTaskLen) { List taskList = client.queueAllX(queueName, getTaskListName(queueName)); ArrayList tasksOrdered = new ArrayList(); if(taskList != null) { Iterator var5 = taskList.iterator(); while(var5.hasNext()) { byte[] bytes = (byte[])var5.next(); CrawlingTask task = new CrawlingTask(); try { ProtoEntity.parseFrom(task, bytes); if(needTaskLen) { task.setTaskLen(taskLen(queueName, task.getTaskId())); } tasksOrdered.add(task); } catch (Exception var8) { var8.printStackTrace(); } } } return tasksOrdered; } public static Long queueLen(String queueName) { Long len = client.queueLen(queueName); List queue = client.queueAllX(queueName, getTaskListName(queueName)); CrawlingTask task; if(queue != null) { for(Iterator var4 = queue.iterator(); var4.hasNext(); len = Long.valueOf(len.longValue() + client.queueLenX(queueName, getTaskQueueName(queueName, task.getTaskId())).longValue())) { byte[] bytes = (byte[])var4.next(); task = new CrawlingTask(); try { ProtoEntity.parseFrom(task, bytes); } catch (Exception var7) { var7.printStackTrace(); } } } return len; } public static void killQueue(String queueName) { List queue = client.queueAllX(queueName, getTaskListName(queueName)); CrawlingTask task; if(queue != null) { for(Iterator var3 = queue.iterator(); var3.hasNext(); client.clearQueueX(queueName, getTaskQueueName(queueName, task.getTaskId()))) { byte[] bytes = (byte[])var3.next(); task = new CrawlingTask(); try { ProtoEntity.parseFrom(task, bytes); } catch (Exception var6) { var6.printStackTrace(); } } } client.clearQueueX(queueName, getTaskListName(queueName)); client.clearQueue(queueName); } private static void doWatchQueue(String queueName) { Long len = client.queueLen(queueName); if(len.longValue() == 0L) { byte[] b = client.queueHeadX(queueName, getTaskListName(queueName)); if(b != null) { CrawlingTask task = new CrawlingTask(); try { ProtoEntity.parseFrom(task, b); } catch (Exception var5) { var5.printStackTrace(); } Long lenX = client.queueLenX(queueName, getTaskQueueName(queueName, task.getTaskId())); if(lenX.longValue() == 0L) { client.dequeueNoBlockX(queueName, getTaskListName(queueName)); if(handler != null) { handler.onTaskCompleted(queueName, task); } client.enqueueX(queueName, getTaskListCompletedName(queueName), formatBytes(task.getTaskId())); } else { client.renameX(getTaskQueueName(queueName, task.getTaskId()), queueName); } } } } private static Long taskLen(String queueName, String taskId) { Long len = client.queueLenX(queueName, getTaskQueueName(queueName, taskId)); if(len.longValue() == 0L) { len = client.queueLen(queueName); } return len; } private static byte[] formatBytes(String taskId) { return String.format("%s(%s)", new Object[]{taskId, DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss")}).getBytes(); } private static String getTaskListName(String queueName) { return queueName + "_task"; } private static String getTaskListSubmitedName(String queueName) { return queueName + "_submited"; } private static String getTaskListCompletedName(String queueName) { return queueName + "_completed"; } private static String getTaskQueueName(String queueName, String taskId) { return queueName + "_" + taskId; } }

通過分析這個類,我們就可以知道兩個專案之間是如何進行協作的。
(1)初始化資料

    public static void intialize(final String hosts) {
        lock.doInit(new Runnable() {
            public void run() {
                TaskDirector.client = QueueManager.getClient(hosts, "");
                TaskDirector.executor = Executors.newScheduledThreadPool(20);
                TaskDirector.watchList = new ArrayList(30);
            }
        });
    }

該方法主要是對redis的客戶端進行初始化,執行緒池的初始化,佇列數量集合的初始化。
其中使用InitialLock類來做初始化,
1,利用開啟一個執行緒來處理一些比較耗時操作,如何redis的初始化,執行緒池的初始化。【然而,我測試過,其實那裡的程式碼不會用太久的時間】
2,使用雙重校驗鎖機制來確定該程式碼只會被執行一次。

    public void doInit(Runnable initer) {
        if(!this.inited) {
            Object var2 = this.syncRoot;
            synchronized(this.syncRoot) {
                if(!this.inited) {
                    initer.run();
                    this.inited = true;
                }
            }
        }
    }

(2)註冊redis佇列

public static void registerWatchQueue(final String queueName) {
    try {
        if(watchList.contains(queueName)) {
            return;
        }

        watchList.add(queueName);
        executor.scheduleAtFixedRate(new Runnable() {
            public void run() {
                try {
                    TaskDirector.doWatchQueue(queueName);
                } catch (Exception var2) {
                    ;
                }

            }
        }, 10L, 60L, TimeUnit.SECONDS);
    } catch (Exception var2) {
        ;
    }

}

1,新增佇列名稱到list集合,如果新增的集合已經存在,則不儲存。
2,開啟執行緒池每隔一分鐘呼叫一次doWatchQueue(queueName)方法。

(3)監控佇列情況

 private static void doWatchQueue(String queueName) {
        Long len = client.queueLen(queueName);
        if(len.longValue() == 0L) {
            byte[] b = client.queueHeadX(queueName, getTaskListName(queueName));
            if(b != null) {
                CrawlingTask task = new CrawlingTask();

                try {
                    ProtoEntity.parseFrom(task, b);
                } catch (Exception var5) {
                    var5.printStackTrace();
                }

                Long lenX = client.queueLenX(queueName, getTaskQueueName(queueName, task.getTaskId()));
                if(lenX.longValue() == 0L) {
                    client.dequeueNoBlockX(queueName, getTaskListName(queueName));
                    if(handler != null) {
                        handler.onTaskCompleted(queueName, task);
                    }

                    client.enqueueX(queueName, getTaskListCompletedName(queueName), formatBytes(task.getTaskId()));
                } else {
                    client.renameX(getTaskQueueName(queueName, task.getTaskId()), queueName);
                }
            }
        }

    }

1,這個方法主要是判斷佇列的長度是否為0,如果是,則說明任務執行完畢,將佇列銷燬,在submit的佇列中記錄提交任務的記錄,
並在呼叫對調的函式(這個handler是kw那邊需要註冊處理的)
2,由於不同的任務提交就會生成一個佇列,佇列的名稱類似於queue_name_id,id是不同的。
而CE那邊是隻知道定義好的佇列名稱,後面的id它是不知道的,如queue_name,ce只知道這個名稱。所以我們就要將每一個佇列進行優先順序的排序,第一個取出來的佇列,
如果長度不為0,則將這個佇列的名稱改為queue_name,知道這個queue_name資料處理完畢,我們再處理剩下的佇列。

client.renameX(getTaskQueueName(queueName, task.getTaskId()), queueName);

(4)提交任務

public static void submitTask(String queueName, CrawlingTask task) {
    boolean hasWatched = false;
    Iterator var4 = watchList.iterator();

    while(var4.hasNext()) {
        String p = (String)var4.next();
        if(p.equals(queueName)) {
            hasWatched = true;
        }
    }

    if(hasWatched) {
        if(task != null && StringUtils.isNotEmpty(task.getTaskId()) && task.getProtos() != null && task.getProtos().size() > 0) {
            try {
                var4 = task.getProtos().iterator();

                while(var4.hasNext()) {
                    ProtoEntity p1 = (ProtoEntity)var4.next();
                    byte[] data = p1.toByteArray();
                    client.enqueueX(queueName, getTaskQueueName(queueName, task.getTaskId()), data);
                }

                client.enqueueX(queueName, getTaskListName(queueName), task.toByteArray());
                client.enqueueX(queueName, getTaskListSubmitedName(queueName), formatBytes(task.getTaskId()));
            } catch (Exception var6) {
                ;
            }
        }

    }
}

1,提交的任務佇列是否存在,且提交的資料的長度是否合法,如果存在且合法則進行下面的操作。
2,將提交的集合資料放到佇列,並記錄提交資訊

(5)註冊控制代碼

public static void registerHandler(ITaskCompleteHandler h) {
    handler = h;
}

1,kw註冊這個控制代碼,只要方法執行完,就回調這個控制代碼的方法,返回任務id。

上述五個步驟簡單的描述資料是如何通過CrawlDirector放到redis佇列。
接下來需要有一個類來專門處理資料放到redis的操作還有資料回撥的操作。

  1. TaskQuartz 定時器類,這個類會定時掃描任務 通過呼叫CrawDirector的submitTask方法將資料提交。
  2. TaskCallBack 回撥類,這個類會去註冊爬蟲佇列,並註冊Handler,實現資料的回撥。
  3. QueueClientHelper 佇列的幫助類,定義一些佇列名稱。