1. 程式人生 > >python多執行緒+生產者和消費者模型+queue使用

python多執行緒+生產者和消費者模型+queue使用

# 多執行緒簡介 多執行緒:在一個程序內部,要同時幹很多事情,就需要同時執行多個子任務,我們把程序內的這些子任務叫執行緒。 執行緒的記憶體空間是共享的,每個執行緒都共享同一個程序的資源 模組: 1、_thread模組 低階模組(在python3裡基本已棄用) 2、threading模組 高階模組 對_thread模組進行了封裝 ## threading模組使用 1.使用元組傳遞 `threading.Thread(target=方法名,arg=(引數1,引數2...))` 2.用字典傳遞 `threading.Thread(target=方法名,kwargs={“引數名”:引數1,“引數名”:引數2,....})` 3.混合使用元組和字典 `threading.Thread(target=方法名,args=(引數1,引數2,...),kwargs={“引數名”:引數1,“引數名”:引數2,....})` 4.檢視執行緒數: 使用threading.enumerate()函式便可以看到當前執行緒的數量。 5.檢視當前執行緒的名字: 使用threading.current_thread()可以看到當前執行緒的資訊。 6.join([time]):等待至執行緒終止。這阻塞呼叫執行緒直至執行緒的join()方法被呼叫終止、正常退出或者丟擲未處理的異常、或者是可選的超時發生。 7.isAlive():返回執行緒是否活動 8.getName(): 返回執行緒名 9.setNmae():設定執行緒名 10.後臺執行緒(守護執行緒) 後臺執行緒有一個特徵:如果所有的前臺執行緒都死亡了,那麼後臺執行緒也會自動死亡。 呼叫Thread物件的daemon屬性可將指定執行緒設定為後臺執行緒。在下面程式可以看到程式裡的執行緒被指定為後臺執行緒,當所有前臺程式都死亡了後,後臺執行緒隨之死亡。當在整個虛擬機器裡只剩下後臺執行緒時,程式就沒有繼續執行的必要了,所以程式也就退出了。 ``` import threading # 定義後臺執行緒的執行緒執行體與普通執行緒沒有任何區別 def action(max): for i in range(max): print(threading.current_thread().name + " " + str(i)) t = threading.Thread(target=action, args=(100,), name='後臺執行緒') # 將此執行緒設定成後臺執行緒 # 也可在建立Thread物件時通過daemon引數將其設為後臺執行緒 t.daemon = True # 啟動後臺執行緒 t.start() for i in range(10): print(threading.current_thread().name + " " + str(i)) # -----程式執行到此處,前臺執行緒(主執行緒)結束------ # 後臺執行緒也應該隨之結束 ``` 上面程式中的粗體字程式碼先將t執行緒設定成後臺執行緒,然後啟動該執行緒。本來該執行緒應該執行到i等於99時才會結束,但在執行程式時不難發現,該後臺執行緒無法執行到99,因為當主執行緒也就是程式中唯一的前臺執行緒執行結東後,程式會主動退出,所以後臺執行緒也就被結東了。從上面的程式可以看出,主執行緒預設是前臺執行緒,t執行緒預設也是前臺執行緒。但並不是所有的執行緒預設都是前臺執行緒,有些執行緒預設就是後臺執行緒一一前臺執行緒建立的子執行緒預設是前臺執行緒,後臺執行緒建立的子執行緒預設是後臺執行緒 可見,建立後臺執行緒有兩種方式。 1. 主動將執行緒的 daemon屬性設定為True 2. 後臺執行緒啟動的執行緒預設是後臺執行緒。 ### 以下看一個簡單的多執行緒程式: ``` import threading import time def coding(): for x in range(3): print('%s正在寫程式碼' % x) time.sleep(1) def drawing(): for x in range(3): print('%s正在畫圖' % x) time.sleep(1) def single_thread(): coding() drawing() def multi_thread(): t1 = threading.Thread(target=coding) t2 = threading.Thread(target=drawing) t1.start() t2.start() if __name__ == '__main__': multi_thread() ``` ### 繼承自threading.Thread類: 為了讓執行緒程式碼更好的封裝。可以使用threading模組下的Thread類,繼承自這個類,然後實現run方法,執行緒就會自動執行run方法中的程式碼。示例程式碼如下: ``` import threading import time class CodingThread(threading.Thread): def run(self): for x in range(3): print('%s正在寫程式碼' % threading.current_thread()) time.sleep(1) class DrawingThread(threading.Thread): def run(self): for x in range(3): print('%s正在畫圖' % threading.current_thread()) time.sleep(1) def multi_thread(): t1 = CodingThread() t2 = DrawingThread() t1.start() t2.start() if __name__ == '__main__': multi_thread() ``` ### start()和run() **start()** start()方法來啟動執行緒,真正實現了多執行緒執行。這時無需等待run方法體程式碼執行完畢,可以直接繼續執行下面的程式碼;通過呼叫Thread類的start()方法來啟動一個執行緒, 這時此執行緒是處於就緒狀態, 並沒有執行。 然後通過此Thread類呼叫方法run()來完成其執行操作的, 這裡方法run()稱為執行緒體,它包含了要執行的這個執行緒的內容, Run方法執行結束, 此執行緒終止。然後CPU再排程其它執行緒。run() **run()** run()方法當作普通方法的方式呼叫。程式還是要順序執行,要等待run方法體執行完畢後,才可繼續執行下面的程式碼; 程式中只有主執行緒——這一個執行緒, 其程式執行路徑還是隻有一條, 這樣就沒有達到寫執行緒的目的。 記住:多執行緒就是分時利用CPU,巨集觀上讓所有執行緒一起執行 ,也叫併發。start() 和 run()的區別說明 start() : 它的作用是啟動一個新執行緒,新執行緒會執行相應的run()方法。start()不能被重複呼叫。 run() : run()就和普通的成員方法一樣,可以被重複呼叫。單獨呼叫run()的話,會在當前執行緒中執行run(),而並不會啟動新執行緒! # Lock版本生產者和消費者模型 生產者和消費者模式是多執行緒開發中經常見到的一種模式。生產者的執行緒專門用來生產一些資料,然後存放到一箇中間的變數中。消費者再從這個中間的變數中取出資料進行消費。但是因為要使用中間變數,中間變數經常是一些全域性變數,因此需要使用鎖來保證資料完整性。以下是使用threading.Lock鎖實現的“生產者與消費者模式”的一個例子: ``` import threading import random import time gMoney = 1000 glo = threading.Lock() gTotaltime = 10 gTime = 0 class Consumer(threading.Thread): def run(self): global gMoney global gTime while True: money = random.randint(100,1000) glo.acquire() if gMoney>= money: gMoney -= money print("{}消費了{}元,當前剩餘{}元".format(threading.current_thread(),money,gMoney)) else: print("{}準備消費{}元,當前剩餘{}元,不足,不能消費".format(threading.current_thread(),money,gMoney)) if gTime >= gTotaltime and money > gMoney: glo.release() break glo.release() time.sleep(0.7) class Porducer(threading.Thread): def run(self): global gMoney global gTime while True: Money = random.randint(100,700) glo.acquire() if gTime == gTotaltime: glo.release() break gMoney += Money print("{}生產了{}元錢,剩餘{}元錢".format(threading.current_thread(),Money,gMoney)) gTime += 1 glo.release() time.sleep(0.5) def main(): for x in range(3): t1 = Porducer(name="生產者") t1.start() for i in range(5): t = Consumer(name="消費者") t.start() if __name__ == '__main__': main() ``` # queue執行緒安全佇列 線上程中,訪問一些全域性變數,加鎖是一個經常的過程。如果你是想把一些資料儲存到某個佇列中,那麼Python內建了一個執行緒安全的模組叫做queue模組。Python中的queue模組中提供了同步的、執行緒安全的佇列類,包括FIFO(先進先出)佇列Queue,LIFO(後入先出)佇列LifoQueue。這些佇列都實現了鎖原語(可以理解為原子操作,即要麼不做,要麼都做完),能夠在多執行緒中直接使用。可以使用佇列來實現執行緒間的同步。相關的函式如下: 1. 初始化Queue(maxsize):建立一個先進先出的佇列。 2. qsize():返回佇列的大小。 3. empty():判斷佇列是否為空。 4. full():判斷佇列是否滿了。 5. get():從佇列中取最後一個數據。 6. put(item,block=Ture,timeout=None):將一個數據放到佇列中。如果佇列已滿,且block引數為Ture(阻塞),當前執行緒被阻塞,timeout指定阻塞時間,如果將timeout設定為None,則代表一直阻塞,直到有元素被放入佇列中:如果佇列已空,且block引數設定為False(不阻塞),則直接引發queue.Empty異常。 下面就可以用queue來進行執行緒通訊 ``` import queue import time import threading def set_value(q): index = 0 while True: q.put(index) index += 1 time.sleep(3) def get_value(q): index = 0 while True: print(q.get()) time.sleep(0.5) def main(): q = queue.Queue(4) t1 = threading.Thread(target=set_value,args=[q]) t2 = threading.Thread(target=get_value,args=[q]) t1.start() t2.start() if __name__ == '__main__': mai