1. 程式人生 > >python爬蟲多執行緒之queue

python爬蟲多執行緒之queue

        首先先來介紹下queue這個包吧,這個包叫佇列,沒錯,就是那個和棧反過來的那個佇列,大家一聽佇列就隨口說出先進先出,而棧則是後進先出,為什麼要用用佇列來實現,其實我也不知道,反正用過之後很順手,具體哪裡也說不上來

        先來看下佇列的內建方法的,我們只需要記住兩個,一個是put 放 ,另一個是get 獲得,因為我們q = queue.Queue()建立了一個佇列後,這個佇列是空的,要先放東西進去才能從裡面拿東西出來

q = queue.Queue()    
q.qsize()           返回佇列的大小  
q.empty()         如果佇列為空,返回True,反之False  
q.full()          如果佇列滿了,返回True,反之False 
q.full              與 maxsize 大小對應  
q.get([block[, timeout]]) 獲取佇列,timeout等待時間  
q.get_nowait()         相當q.get(False) 
q.put(item)           寫入佇列,timeout等待時間  
q.put_nowait(item)    相當q.put(item, False) 
q.task_done()         在完成一項工作之後,q.task_done() 函式向任務已經完成的佇列傳送一個訊號 
q.join()             實際上意味著等到佇列為空,再執行別的操作

下面是我自己寫的一個簡單的程式碼,我會用文字來註釋,以便大家能看懂,真心簡單,熟悉讀完這篇文章後你如果還不會的話,那我也沒什麼說的了

#開啟多個執行緒,同時執行任務,有幾個執行緒就執行幾個任務

import threading
import time
import queue

class MyThread(threading.Thread):
    def __init__(self, func):
        threading.Thread.__init__(self)
        self.func = func
    def run(self):
        self.func()

def worker():
    while not q.empty():
        item = q.get()  # 或得任務
        print('Processing : ',item)
        time.sleep(1)

def main():
    threads = []
    for task in range(100):
        q.put(task)
    for i in range(threadNum):   #開啟三個執行緒
        thread = MyThread(worker)
        thread.start()
        threads.append(thread)
    for thread in threads:
        thread.join()

if __name__ == '__main__':
    q = queue.Queue()
    threadNum = 3
    main()

看程式碼先看主入口,根據裡面的函式呼叫一步步來

1.    首先我例項化了一個佇列  q = queue.Queue()    ,然後我設定執行緒數為3,接著呼叫main() 方法

2.    進入main函式中,我建立了一個空列表,用來放執行緒,第一個for 迴圈中我做的是將(0,100)之間資料放入隊列當中,當然,100是取不到的,放完佇列後又來了個for迴圈,這個for迴圈是用來建立執行緒的,前面說了threadNum = 3,那麼說明我這裡要迴圈三次,threadNum的值分別為0,1,2,當然,這三個數用不到,因為我沒在MyThread寫執行緒名,沒有把thread作為引數傳入類中,所以每個執行緒都是無名氏,我們看到thread = MyThread(worker) 這段程式碼,先來看MyThread這個類,worker這個引數先不看,MyThread中這個類繼承了threading.Thread,在init方法中初始化了父類,也就是threading.Thread,並且定義了一個屬性,屬性名叫func,因為threading這個類有自己的run方法,我們可以重寫父類的run方法,self.func()  表明但我們start後,執行緒會自動呼叫run方法,就會執行self.func() 這句

3.     看完MyThread這個類後我們就來看worker這個引數,有的人就會問,看了半天沒看到有這個引數啊,誰說引數一定是變數啊,也有可能是函式啊,現在就有人恍然大悟了,看到worker這個方法。這個方法裡面一個while迴圈,我使用的一個佇列的方法,這個方法是q.empty(),這個方法說明當佇列q為空時,他的值為True,前面寫了個not ,說明整句的意思是,當佇列q不為空時才會執行這個while迴圈。一開始定義一個變數item 來接收得到的資料,列印一遍並且休眠一秒,不休眠的話程式執行太快,看不到多執行緒的效果,以上  thread = MyThread(worker)  這句程式碼就解釋完了

4.     thread.start() 這句是啟動執行緒,threads.append(thread)是將執行緒加入到前面定義的threads空列表中,最後這個for迴圈是將每個執行緒都join()一下,join的意思是等執行緒結束後才會執行後面的語句,我們這裡的意思是前面三個執行緒跑完了才能輪到後面三個執行緒

5.    基本的程式碼都講解完畢了,現在來執行程式碼

就會發現,一次輸出三句,停頓一秒後又輸出三句,因為開了三個執行緒,有興趣的可以把我程式碼複製下來並且將執行緒數改下來看下效果,看到這估計有人就明白了,這個queue佇列相當於或類似一個全域性的列表,只負責存和取,沒錯,他的作用就是這樣,最起碼我用的只有這麼多

下面還有一個列子,這個列子是我想執行一個永不停止的執行緒,每次佇列被取完後,我都會將資料原樣放回去,具體的步驟我就不說了,上面有寫

import threading
import queue
import time

class Mythread(threading.Thread):
    def __init__(self,fun):
        threading.Thread.__init__(self)
        self.fun = fun
    def run(self):
        self.fun()

def worker():
    global data
    while True:
        if not q.empty():
            a = q.get()
            parse(a[0], a[1])
            time.sleep(1)
            data.append(a)
        else:
            for i in data:
                q.put(i)
            data = []


def parse(qd, zd):
    mystr = qd + zd
    print('=============',mystr)

def main():
    threads = []
    for i in LstAdd:
        q.put(i)
    for i in range(thendNum):
        thread = Mythread(worker)
        thread.start()
        threads.append(thread)
    for thread in threads:
        thread.join()

if __name__ == '__main__':
    LstAdd = [
        ('abcde1','adfa1'),
        ('abcde2','adfa2'),
        ('abcde3','adfa3'),
        ('abcde4','adfa4'),
        ('abcde5','adfa5'),
        ('abcde6','adfa6'),
        ('abcde7','adfa7'),
        ('abcde8','adfa8'),
        ('abcde9','adfa9'),
              ]
    data = []
    q = queue.Queue()
    thendNum = 3
    main()

我想將列表中每個元素即元組的兩個元素相加並輸出,執行程式結果如下:

他會一直輸出1-9,後面的沒有截圖下來,太多了,我將每次get()的資料都存起來,這樣防止資料丟失,每次我都會判斷佇列是否為空,為空的話就將存起來的資料原樣存回去,並且情況存這些資料的列表,保證每次都是同樣的資料同樣的順序