(架構設計)觀察者模式+redis佇列實現不同專案之間資料的互動
一,簡介
最近做一個專案,主要功能是根據一些關鍵詞去百度爬取一些資料,如pv,uv等等有價值的資料,或者對應的URL等百度排名。
我們小組主要負責的是前端的功能,此前端非WEB前端,我們主要將使用者匯入的資料進行封轉,然後轉換為protobuf的傳輸格式物件,
最後儲存到redis的佇列中。
而另一個小組(由技術總監那邊負責的)則是負責爬蟲的業務,他會定時掃描redis佇列的資料進行資料的抓取。
所以,會有兩個java工程來協同工作。其中對接的橋樑就是redis的佇列。
我們這邊的專案會引入技術總監那邊的一個jar,這個jar包的主要功能將資料提交到Redis佇列,
我們需要傳遞過去的引數有佇列名稱,和list集合資料。(list中的集合是protobuf的傳輸物件)
那邊的工程就會去掃描佇列裡面的資料,從佇列lpop一個數據,直到佇列裡面的資料讀完,然後開始呼叫回撥的方法。
我們這邊會去註冊handler來響應那邊的回撥。(這裡使用到了觀察者模式)
二,流程
keyword,url是需要爬取的資料。
1,kw獲取資料,對資料進行清洗的工作,然後儲存到資料庫,資料庫裡面的對應關係是taskid - > keyword,url [一個任務下面會有n個關鍵詞,url連線]
2,定時器定時掃描未開始執行的任務,然後從根據任務的id獲取對應下的所有keyword,url,封裝為protobuf物件,並新增到集合中,最後將list扔到佇列。
3,註冊佇列,只有被註冊過的佇列名,ce那邊才會去掃描該佇列下的資料。
4,註冊handler,處理ce那邊的回撥。
5,ce那邊處理redis的資料,知道資料沒有了,就會呼叫我們註冊的handler,將任務的id發給我們。
6,我們根據這個id再繼續獲取該任務id下面的所有keyword和url,然後根據他們的主鍵來獲取爬蟲抓取回來的資料。
三,程式碼的實現
兩個專案的對接的是redis資料庫,kw負責將前端的資料放到redis,ce取redis的資料,這個時候kw是生產者,ce是消費者。
【ce抓取到資料儲存到資料庫,然後kw去爬蟲資料庫裡面取資料,這個時候ce是生產者,kw是消費者】
如何實現上述的過程,其實是由技術總監提供的jar中的TaskDirector類來實現的。
public class TaskDirector {
private static final String QUEUE_TASK_SPLIT = "_";
private static final String QUEUE_TASK_LIST_END = "_task" ;
private static final String QUEUE_TASK_LIST_SUBMITED_END = "_submited";
private static final String QUEUE_TASK_LIST_COMPLETED_END = "_completed";
private static QueueClient client;
private static List<String> watchList = null;
private static ScheduledExecutorService executor;
private static ITaskCompleteHandler handler;
private static InitialLock lock = new InitialLock();
public TaskDirector() {
}
public static void intialize(final String hosts) {
lock.doInit(new Runnable() {
public void run() {
TaskDirector.client = QueueManager.getClient(hosts, "");
TaskDirector.executor = Executors.newScheduledThreadPool(20);
TaskDirector.watchList = new ArrayList(30);
}
});
}
public static void registerWatchQueue(final String queueName) {
try {
if(watchList.contains(queueName)) {
return;
}
watchList.add(queueName);
executor.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
TaskDirector.doWatchQueue(queueName);
} catch (Exception var2) {
;
}
}
}, 10L, 60L, TimeUnit.SECONDS);
} catch (Exception var2) {
;
}
}
public static void registerHandler(ITaskCompleteHandler h) {
handler = h;
}
public static void submitTaskDirect(String queueName, CrawlingTask task) {
if(task != null && StringUtils.isNotEmpty(task.getTaskId()) && task.getProtos() != null && task.getProtos().size() > 0) {
try {
Iterator var3 = task.getProtos().iterator();
while(var3.hasNext()) {
ProtoEntity p = (ProtoEntity)var3.next();
byte[] data = p.toByteArray();
client.enqueue(queueName, data);
}
} catch (Exception var5) {
;
}
}
}
public static void submitTask(String queueName, CrawlingTask task) {
boolean hasWatched = false;
Iterator var4 = watchList.iterator();
while(var4.hasNext()) {
String p = (String)var4.next();
if(p.equals(queueName)) {
hasWatched = true;
}
}
if(hasWatched) {
if(task != null && StringUtils.isNotEmpty(task.getTaskId()) && task.getProtos() != null && task.getProtos().size() > 0) {
try {
var4 = task.getProtos().iterator();
while(var4.hasNext()) {
ProtoEntity p1 = (ProtoEntity)var4.next();
byte[] data = p1.toByteArray();
client.enqueueX(queueName, getTaskQueueName(queueName, task.getTaskId()), data);
}
client.enqueueX(queueName, getTaskListName(queueName), task.toByteArray());
client.enqueueX(queueName, getTaskListSubmitedName(queueName), formatBytes(task.getTaskId()));
} catch (Exception var6) {
;
}
}
}
}
public static void cancelTask(String queueName, CrawlingTask task) {
List tasksOrdered = taskList(queueName, false);
for(int i = 0; i < tasksOrdered.size(); ++i) {
if(((CrawlingTask)tasksOrdered.get(i)).equals(task.getTaskId())) {
if(i == tasksOrdered.size() - 1) {
client.clearQueue(queueName);
}
client.clearQueue(getTaskQueueName(queueName, task.getTaskId()));
client.queueTrimX(queueName, getTaskListName(queueName), (long)(i + 1), (long)(i - (tasksOrdered.size() - 1)));
if(handler != null) {
handler.onTaskCancelled(queueName, task);
}
break;
}
}
}
public static List<CrawlingTask> upgradeTask(String queueName, CrawlingTask task) {
List tasksOrdered = taskList(queueName, false);
for(int i = 0; i < tasksOrdered.size() - 1; ++i) {
if(((CrawlingTask)tasksOrdered.get(i)).getTaskId().equals(task.getTaskId())) {
client.queueTrimX(queueName, getTaskListName(queueName), (long)(i + 1), (long)(i - (tasksOrdered.size() - 1)));
client.queueInsertX(queueName, getTaskListName(queueName), ((CrawlingTask)tasksOrdered.get(i + 1)).toByteArray(), ((CrawlingTask)tasksOrdered.get(i)).toByteArray());
break;
}
}
return taskList(queueName, false);
}
public static List<CrawlingTask> taskList(String queueName, boolean needTaskLen) {
List taskList = client.queueAllX(queueName, getTaskListName(queueName));
ArrayList tasksOrdered = new ArrayList();
if(taskList != null) {
Iterator var5 = taskList.iterator();
while(var5.hasNext()) {
byte[] bytes = (byte[])var5.next();
CrawlingTask task = new CrawlingTask();
try {
ProtoEntity.parseFrom(task, bytes);
if(needTaskLen) {
task.setTaskLen(taskLen(queueName, task.getTaskId()));
}
tasksOrdered.add(task);
} catch (Exception var8) {
var8.printStackTrace();
}
}
}
return tasksOrdered;
}
public static Long queueLen(String queueName) {
Long len = client.queueLen(queueName);
List queue = client.queueAllX(queueName, getTaskListName(queueName));
CrawlingTask task;
if(queue != null) {
for(Iterator var4 = queue.iterator(); var4.hasNext(); len = Long.valueOf(len.longValue() + client.queueLenX(queueName, getTaskQueueName(queueName, task.getTaskId())).longValue())) {
byte[] bytes = (byte[])var4.next();
task = new CrawlingTask();
try {
ProtoEntity.parseFrom(task, bytes);
} catch (Exception var7) {
var7.printStackTrace();
}
}
}
return len;
}
public static void killQueue(String queueName) {
List queue = client.queueAllX(queueName, getTaskListName(queueName));
CrawlingTask task;
if(queue != null) {
for(Iterator var3 = queue.iterator(); var3.hasNext(); client.clearQueueX(queueName, getTaskQueueName(queueName, task.getTaskId()))) {
byte[] bytes = (byte[])var3.next();
task = new CrawlingTask();
try {
ProtoEntity.parseFrom(task, bytes);
} catch (Exception var6) {
var6.printStackTrace();
}
}
}
client.clearQueueX(queueName, getTaskListName(queueName));
client.clearQueue(queueName);
}
private static void doWatchQueue(String queueName) {
Long len = client.queueLen(queueName);
if(len.longValue() == 0L) {
byte[] b = client.queueHeadX(queueName, getTaskListName(queueName));
if(b != null) {
CrawlingTask task = new CrawlingTask();
try {
ProtoEntity.parseFrom(task, b);
} catch (Exception var5) {
var5.printStackTrace();
}
Long lenX = client.queueLenX(queueName, getTaskQueueName(queueName, task.getTaskId()));
if(lenX.longValue() == 0L) {
client.dequeueNoBlockX(queueName, getTaskListName(queueName));
if(handler != null) {
handler.onTaskCompleted(queueName, task);
}
client.enqueueX(queueName, getTaskListCompletedName(queueName), formatBytes(task.getTaskId()));
} else {
client.renameX(getTaskQueueName(queueName, task.getTaskId()), queueName);
}
}
}
}
private static Long taskLen(String queueName, String taskId) {
Long len = client.queueLenX(queueName, getTaskQueueName(queueName, taskId));
if(len.longValue() == 0L) {
len = client.queueLen(queueName);
}
return len;
}
private static byte[] formatBytes(String taskId) {
return String.format("%s(%s)", new Object[]{taskId, DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss")}).getBytes();
}
private static String getTaskListName(String queueName) {
return queueName + "_task";
}
private static String getTaskListSubmitedName(String queueName) {
return queueName + "_submited";
}
private static String getTaskListCompletedName(String queueName) {
return queueName + "_completed";
}
private static String getTaskQueueName(String queueName, String taskId) {
return queueName + "_" + taskId;
}
}
通過分析這個類,我們就可以知道兩個專案之間是如何進行協作的。
(1)初始化資料
public static void intialize(final String hosts) {
lock.doInit(new Runnable() {
public void run() {
TaskDirector.client = QueueManager.getClient(hosts, "");
TaskDirector.executor = Executors.newScheduledThreadPool(20);
TaskDirector.watchList = new ArrayList(30);
}
});
}
該方法主要是對redis的客戶端進行初始化,執行緒池的初始化,佇列數量集合的初始化。
其中使用InitialLock類來做初始化,
1,利用開啟一個執行緒來處理一些比較耗時操作,如何redis的初始化,執行緒池的初始化。【然而,我測試過,其實那裡的程式碼不會用太久的時間】
2,使用雙重校驗鎖機制來確定該程式碼只會被執行一次。
public void doInit(Runnable initer) {
if(!this.inited) {
Object var2 = this.syncRoot;
synchronized(this.syncRoot) {
if(!this.inited) {
initer.run();
this.inited = true;
}
}
}
}
(2)註冊redis佇列
public static void registerWatchQueue(final String queueName) {
try {
if(watchList.contains(queueName)) {
return;
}
watchList.add(queueName);
executor.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
TaskDirector.doWatchQueue(queueName);
} catch (Exception var2) {
;
}
}
}, 10L, 60L, TimeUnit.SECONDS);
} catch (Exception var2) {
;
}
}
1,新增佇列名稱到list集合,如果新增的集合已經存在,則不儲存。
2,開啟執行緒池每隔一分鐘呼叫一次doWatchQueue(queueName)方法。
(3)監控佇列情況
private static void doWatchQueue(String queueName) {
Long len = client.queueLen(queueName);
if(len.longValue() == 0L) {
byte[] b = client.queueHeadX(queueName, getTaskListName(queueName));
if(b != null) {
CrawlingTask task = new CrawlingTask();
try {
ProtoEntity.parseFrom(task, b);
} catch (Exception var5) {
var5.printStackTrace();
}
Long lenX = client.queueLenX(queueName, getTaskQueueName(queueName, task.getTaskId()));
if(lenX.longValue() == 0L) {
client.dequeueNoBlockX(queueName, getTaskListName(queueName));
if(handler != null) {
handler.onTaskCompleted(queueName, task);
}
client.enqueueX(queueName, getTaskListCompletedName(queueName), formatBytes(task.getTaskId()));
} else {
client.renameX(getTaskQueueName(queueName, task.getTaskId()), queueName);
}
}
}
}
1,這個方法主要是判斷佇列的長度是否為0,如果是,則說明任務執行完畢,將佇列銷燬,在submit的佇列中記錄提交任務的記錄,
並在呼叫對調的函式(這個handler是kw那邊需要註冊處理的)
2,由於不同的任務提交就會生成一個佇列,佇列的名稱類似於queue_name_id,id是不同的。
而CE那邊是隻知道定義好的佇列名稱,後面的id它是不知道的,如queue_name,ce只知道這個名稱。所以我們就要將每一個佇列進行優先順序的排序,第一個取出來的佇列,
如果長度不為0,則將這個佇列的名稱改為queue_name,知道這個queue_name資料處理完畢,我們再處理剩下的佇列。
client.renameX(getTaskQueueName(queueName, task.getTaskId()), queueName);
(4)提交任務
public static void submitTask(String queueName, CrawlingTask task) {
boolean hasWatched = false;
Iterator var4 = watchList.iterator();
while(var4.hasNext()) {
String p = (String)var4.next();
if(p.equals(queueName)) {
hasWatched = true;
}
}
if(hasWatched) {
if(task != null && StringUtils.isNotEmpty(task.getTaskId()) && task.getProtos() != null && task.getProtos().size() > 0) {
try {
var4 = task.getProtos().iterator();
while(var4.hasNext()) {
ProtoEntity p1 = (ProtoEntity)var4.next();
byte[] data = p1.toByteArray();
client.enqueueX(queueName, getTaskQueueName(queueName, task.getTaskId()), data);
}
client.enqueueX(queueName, getTaskListName(queueName), task.toByteArray());
client.enqueueX(queueName, getTaskListSubmitedName(queueName), formatBytes(task.getTaskId()));
} catch (Exception var6) {
;
}
}
}
}
1,提交的任務佇列是否存在,且提交的資料的長度是否合法,如果存在且合法則進行下面的操作。
2,將提交的集合資料放到佇列,並記錄提交資訊
(5)註冊控制代碼
public static void registerHandler(ITaskCompleteHandler h) {
handler = h;
}
1,kw註冊這個控制代碼,只要方法執行完,就回調這個控制代碼的方法,返回任務id。
上述五個步驟簡單的描述資料是如何通過CrawlDirector放到redis佇列。
接下來需要有一個類來專門處理資料放到redis的操作還有資料回撥的操作。
- TaskQuartz 定時器類,這個類會定時掃描任務 通過呼叫CrawDirector的submitTask方法將資料提交。
- TaskCallBack 回撥類,這個類會去註冊爬蟲佇列,並註冊Handler,實現資料的回撥。
- QueueClientHelper 佇列的幫助類,定義一些佇列名稱。