1. 程式人生 > >python爬蟲自學第一天,全新的開始!

python爬蟲自學第一天,全新的開始!

一直在學習,跟著雪峰老師的講解,跟著《Python爬蟲開發與專案實戰》,跟著笨辦法學Python(不得不說確實是新手上路的一本好書),跟著CSDN裡的大神,跟著慕課網的老師,跟著一切能跟著資源......只是今天才開始記錄自己的學習歷程,之前的會慢慢補齊,今天就從“分散式程序”開始記錄吧

話不多少,先上今天的學習內容

# task_master.py

import random, time, queue  #註釋1
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support  #註釋2
# 程序個數 task_number = 10 # 自定義傳送任務的佇列: task_queue = queue.Queue(task_number) # 自定義接收結果的佇列: result_queue = queue.Queue(task_number) def get_task(): #註釋3,包括下面3 return task_queue def get_result(): return result_queue # 從BaseManager繼承的QueueManager: class QueueManager(BaseManager): pass def win_run(): # 把兩個Queue都註冊到網路上, callable引數關聯了Queue物件: QueueManager.register('get_task_queue', callable=get_task) #註釋4
QueueManager.register('get_result_queue', callable=get_result) #註釋4 # 繫結埠5000, 設定驗證碼'abc',windows下需要填寫IP,Linux下空預設為本地 manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc') #註釋5 # 啟動Queue: manager.start() try: # 獲得通過網路訪問的Queue物件: task = manager.get_task_queue() result = manager.get_result_queue() # 放幾個任務進去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) #註釋6
# 從result佇列讀取結果: print('Try get results...') for i in range(10): r = result.get(timeout=10) print('Result: %s' % r) except: print ('Manager Error') finally: # 關閉: manager.shutdown() print('master exit.') if __name__ == "__main__": freeze_support() #註釋2 win_run()

熟悉的都應該知道這是雪峰老師python3中所講到的知識,只是裡面有幾處我私自改動的地方,也是掉進雪峰老師那個坑後自己慢慢爬出來的一個個腳印

註釋1:Py2和Py3的差異,3中import的queue是小寫而不是大寫、

註釋2:這裡是為了防止過多的程序和視窗導致程式出錯而設定的一個預防手段,只不過在涉及到程序池的時候可能會用到,向上面這樣的小程式是基本上不可能用到的,寫在這裡只是為了加深印象

註釋3:這裡的四行程式碼是為了函式win_run中

QueueManager.register('get_task_queue', callable=get_task) 

QueueManager.register('get_result_queue', callable=get_result)

這兩句話而提前設定的,雪峰老師的程式碼中,這部分所用到的是Linux環境下的引數,和Windows不同的是,Linux環境下有個叫“lambda”的引數,可以自動繫結所呼叫的介面,所以不需要自行定義,但是Windows由於沒有這個引數,所以只能先將需要呼叫的介面定義好

註釋4:將上面自定義的兩個函式作為物件關聯到佇列中,也就是這裡,Windows不能使用lambda引數

註釋5:這裡的authkey引數後面跟的字串一定要進行編碼!!!一定要進行編碼!!!否則就會報編碼錯誤

manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')

註釋6:這裡是自己犯的一個小錯誤,當時並沒有將put放到for迴圈當中,導致最後worker.py接收到的佇列中只有master.py中的最後一條資料,寫在這裡算是給自己提個醒

下面這一部分是worker.py,用於接受上面master在網路中所傳送的佇列並進行處理,最後將結果返回到master中

這一部分中的註釋已經寫的十分詳細了,不需要在做過多的說明:

import time, queue
from multiprocessing.managers import BaseManager

#建立類似的QueueManager
class QueueManager(BaseManager):
    pass

#由於QueueManager只是從網路上獲取Queue,所以註冊時只需要提供名字
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

#連結到伺服器,也就是執行task_queue的機器
server_addr = '127.0.0.1'
print ('Connect to server %s ...' % server_addr)
#埠和驗證碼注意和master.py中設定的要完全一致
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
#從網路連線
m.connect()
#獲取queue的物件
task = m.get_task_queue()
result = m.get_result_queue()
#從task佇列中去除任務,並將結果存入到result佇列中
for i in range(10):
    try:
        n = task.get(timeout=1)
        print ('run task %d * %d' % (n,n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except queue.Empty:
        print ('task queue is empty')
print ('worker exit.')

這裡補充一下python中queue的相關知識點:

Python queue模組有三種佇列及建構函式:
1、Python queue模組的FIFO佇列先進先出。     class queue.queue(maxsize)
2、LIFO類似於堆,即先進後出。                         class queue.Lifoqueue(maxsize)
3、還有一種是優先順序佇列級別越低越先出來。    class queue.Priorityqueue(maxsize)

此包中的常用方法(q =queue.queue()):
q.qsize() 返回佇列的大小
q.empty() 如果佇列為空,返回True,反之False
q.full() 如果佇列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取佇列,timeout等待時間
q.get_nowait() 相當q.get(False)
非阻塞 q.put(item) 寫入佇列,timeout等待時間
q.put_nowait(item) 相當q.put(item, False)
q.task_done() 在完成一項工作之後,q.task_done() 函式向任務已經完成的佇列傳送一個訊號
q.join() 實際上意味著等到佇列為空,再執行別的操作