1. 程式人生 > >分散式豆瓣爬蟲(三): 控制節點-控制排程器

分散式豆瓣爬蟲(三): 控制節點-控制排程器

一、實現原理

控制排程器主要是產生並啟動 URL 管理程序、資料提取程序和資料儲存程序,同時維護4個佇列保持程序間的通訊,分別為 url_q、result_q、conn_q、store_q。4個佇列說明如下:

  • url_q:佇列是 URL 管理程序將 URL 傳遞給爬蟲節點的通道。
  • result_q:佇列是爬蟲節點將資料返回給資料提取程序的通道。
  • conn_q:佇列是資料提取程序將新的 URL 資料提交給 URL 管理程序的通道。
  • store_q:佇列是資料提取程序將獲取到的資料交給資料儲存程序的通道。

二、程式碼如下

  1 from multiprocessing.managers import
BaseManager 2 from multiprocessing import Queue, Process 3 from DataOutput import DataOutput 4 from UrlManager import UrlManager 5 import time 6 7 8 class NodeManager: 9 def start_manager(self, url_q, result_q): 10 """ 11 建立一個分散式管理器 12 :param url_q: url 佇列
13 :param result_q: 結果佇列 14 :return: BaseManager 15 """ 16 # 把建立的兩個佇列註冊在網路上,利用 register 方法,callable 引數關聯了 Queue 物件 17 # 將 Queue 物件在網路中暴露 18 BaseManager.register('get_task_queue', callable=lambda:url_q) 19 BaseManager.register('get_result_queue
', callable=lambda:result_q) 20 # 繫結埠 8001,設定驗證口令"douban",相當於物件的初始化並返回 21 return BaseManager(address=('', 8001), authkey='douban'.encode('utf-8')) 22 23 def url_manager_proc(self, url_q, conn_q, root_url): 24 """ 25 url 管理程序 26 :param url_q: url 佇列 27 :param conn_q: 解析得到的 url 佇列 28 :param root_url: 起始 url 29 :return: None 30 """ 31 url_manage = UrlManager() 32 url_manage.add_new_url(root_url) 33 while True: 34 while url_manage.has_new_url(): 35 print('old_urls={}'.format(url_manage.old_urls_size())) 36 new_url = url_manage.get_new_url() 37 url_q.put(new_url) 38 urls = conn_q.get() 39 url_manage.add_new_urls(urls) 40 else: 41 url_q.put('end') 42 print('控制節點發起結束通知') 43 url_manage.save_progress('old_urls.txt', url_manage.old_urls) 44 url_manage.save_progress('new_urls.txt', url_manage.new_urls) 45 return 46 47 def result_solve_proc(self, result_q, conn_q, store_q): 48 """ 49 資料提取程序 50 :param result_q: 未處理資料佇列 51 :param conn_q: 解析得到的 url 佇列 52 :param store_q: 解析後的資料佇列 53 :return: 54 """ 55 while True: 56 try: 57 if not result_q.empty(): 58 content = result_q.get() 59 if content['new_urls'] == 'end': 60 print('結果分析程序接收通知然後結束') 61 store_q.put('end') 62 return 63 64 conn_q.put(content['new_urls']) 65 store_q.put(content['data']) 66 else: 67 time.sleep(0.1) 68 except: 69 time.sleep(0.1) 70 71 def store_proc(self, store_q): 72 """ 73 資料儲存程序 74 :param store_q: 解析後的資料佇列 75 :return: 76 """ 77 output = DataOutput() 78 while True: 79 if not store_q.empty(): 80 data = store_q.get() 81 82 if data == 'end': 83 print('儲存程序接收結束通知然後結束') 84 return 85 86 for item in data: 87 output.output_csv(item) 88 else: 89 time.sleep(0.1) 90 91 92 if __name__ == '__main__': 93 # 初始化 4 個佇列 94 url_q = Queue() 95 result_q = Queue() 96 conn_q = Queue() 97 store_q = Queue() 98 # 建立分散式管理器 99 node = NodeManager() 100 manager = node.start_manager(url_q, result_q) 101 # 建立 url 管理程序、資料提取程序和資料儲存程序 102 url = 'https://movie.douban.com/top250?start=0' 103 url_manager_proc = Process(target=node.url_manager_proc, args=(url_q, conn_q, url,)) 104 result_solve_proc = Process(target=node.result_solve_proc, args=(result_q, conn_q, store_q,)) 105 store_proc = Process(target=node.store_proc, args=(store_q,)) 106 # 啟動 3 個程序和分散式管理器 107 url_manager_proc.start() 108 result_solve_proc.start() 109 store_proc.start() 110 manager.get_server().serve_forever()