python併發程式設計基礎之守護程序、佇列、鎖
什麼是守護程序?
表示程序A守護程序B,當被守護程序B結束後,程序A也就結束。
from multiprocessing import Process import time def task(): print('妃子的一生') time.sleep(15) print('妃子死了') if __name__ == '__main__': fz = Process(target=task) fz.daemon = True #將子程序作為主程序的守護程序。必須在開啟之前設定 fz.start()print('皇帝登基了') time.sleep(1) print('做了10年皇帝') time.sleep(5) print('皇帝駕崩了!')
上面的程式碼說明了什麼叫守護程序,需要注意的是,守護程序須在程序開啟前設定,就是改變了,本質其實就是改變了Process類裡面的daemon屬性,預設是False
應用場景:之所以開啟子程序,是為了幫助主程序完成某個任務,然而主程序認為自己的事情一旦完成,子程序就沒必要存在,那就可以將子程序設定未主程序的守護程序。
例如:QQ下載檔案,QQ退出,檔案就終止下載。
2、互斥鎖
1.什麼是鎖(Lock)?
鎖是multiprocessing模組中的一個類(Lock)。
2.為什麼要有鎖?
序列效率低,但是資料不會出現問題。併發效率高,但是資料可能出錯。
當多程序共享一個數據時,可能造成資料錯亂。
1.使用join方法可以讓資料不會錯亂,但是會讓程序變為序列。效率不高。
2.使用lock可以將資料加鎖,其他程式使用時需等待當前程式使用完成。lock只是使部分程式碼序列,其他部分還是併發。
3.互斥鎖的示例
Lock類的兩個方法acquire(鎖)和release(釋放),acquire將狀態變為True, release是將狀態變為False
鎖的本質就是一個bool型別,在執行前會先判斷這個值。
def acquire(self, blocking=True, timeout=-1): pass def release(self): pass #鎖的使用 #注意:使用鎖的必須保證鎖是同一個 from multiprocessing import Process,Lock import random import time def task1(lock): lock.acquire() print('1111') time.sleep(random.randint(1, 3)) print('11111111') lock.release() def task2(lock): lock.acquire() print('222222') time.sleep(random.randint(1,2)) print('2222222222222') lock.release() def task3(lock): lock.acquire() print('3333333333') time.sleep(random.randint(1, 3)) print('33333333333333333333333') lock.release() if __name__ == '__main__': lock = Lock() p1 = Process(target=task1, args=(lock,)) p1.start() p2 = Process(target=task2, args=(lock,)) p2.start() p3 = Process(target=task3, args=(lock,)) p3.start() ###輸出結果除了1的位置固定,後兩個不固定,但是如果出現了2,則必須執行完2 1111 11111111 3333333333 33333333333333333333333 222222 2222222222222
4.鎖的應用
應用場景:購票軟體的購票過程。
邏輯分析:1.使用者都可以檢視餘票
2.但是其中一個使用者發起請求購票,其他使用者不能發起請求購票(鎖狀態)
import json from multiprocessing import Process,Lock import random import time #檢視餘票功能 def check_ticket(name): time.sleep(random.randint(1, 3)) with open('ticket.json', mode='rt', encoding='utf-8')as f: dic = json.load(f) print('%s正在檢視餘票, 餘票:%s 張' % (name, dic['count'])) #購票功能 def buy_ticket(name): with open('ticket.json', mode='rt', encoding='utf-8')as f: dic = json.load(f) if dic['count'] > 0: time.sleep(random.randint(1, 3)) dic['count'] -= 1 print('%s購票成功! 餘票:%s張' % (name, dic['count'])) with open('ticket.json', mode='wt', encoding='utf-8')as f: json.dump(dic, f) #使用者購票流程 def task(name,lock): check_ticket(name) lock.acquire() buy_ticket(name) lock.release() if __name__ == '__main__': lock = Lock() for i in range(1, 11): p = Process(target=task, args=('使用者%s' % i, lock)) p.start()
5.重入鎖(RLock)
RLock類,可重入的鎖。Lock類只能有一個acquire方法,RLock可以有多個,如果在多程序中使用Rlock,並且一個程序A執行了多次acquire,其他程序B想要獲取鎖,則需將所有的acquire釋放可以獲取。
from multiprocessing import RLock lock = RLock() lock.acquire() lock.acquire() lock.acquire() print('haha') lock.release() lock.release() lock.release() #注意重入鎖,鎖幾次就需要開幾次
6.死鎖
1.什麼是死鎖?
死鎖是指鎖無法開啟,導致程式卡死。必須是多把鎖才能形成死鎖。正常開發時,要避免這種情況。通常一把鎖可以解決大多數問題。
##以一個吃飯需要盤子l1 和筷子 l2 二者缺一不可來說明死鎖 from multiprocessing import Process,Lock import time def task1(l1, l2, i): l1.acquire() print('盤子被%s搶走了'% i) time.sleep(2) l2.acquire() print('筷子被%s搶走了'% i) print('吃飯!') l1.release() l2.release() def task2(l1, l2, i): l2.acquire() ###task2先搶了筷子,導致task1無法完成後續動作 print('筷子被%s搶走了' % i) l1.acquire() print('盤子被%s搶走了'% i) print('吃飯!') l1.release() l2.release() if __name__ == '__main__': l1 = Lock() l2 = Lock() Process(target=task1, args=(l1, l2, 1)).start() #開啟兩個程序 Process(target=task2, args=(l1, l2, 2)).start() #在開發中要避免這種問題。如果有使用則應該注意將task2調整鎖的位置。 def task2(l1, l2, i): l1.acquire() print('盤子被%s搶走了'% i) l2.acquire() print('筷子被%s搶走了' % i) print('吃飯!') l1.release() l2.release()
7.ICP程序間通訊
由於程序間記憶體是相互獨立的,所以需要對應積極的方案能夠使得程序之間可以相互傳遞資料。
解決方案:
1.使用共享檔案。多個程序同時讀寫同一個檔案。但是IO速度慢,傳輸的資料大小不受限制。
2.管道。基於記憶體,速度塊,但是是單向的,用起來麻煩。
3.佇列。申請共享記憶體空間,多個程序可以共享這個記憶體區域。
#建立共享檔案 from multiprocessing import Manager,Process,Lock def work(d): # with lock: d['count']-=1 if __name__ == '__main__': with Manager() as m: dic=m.dict({'count':100}) #建立一個共享的字典 p_l=[] for i in range(100): p=Process(target=work,args=(dic,)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)
8.佇列
程序彼此之間互相隔離,要實現程序間通訊(IPC),multiprocessing模組支援兩種形式:佇列和管道,這兩種方式都是使用訊息傳遞的。
佇列也是一種資料容器,其特點:先進先出。
與堆疊相反:先進後出。 (函式的呼叫就是一種類似堆疊方式)
建立佇列的類(底層就是以管道和鎖定的方式實現):
1.Queue([maxsize]):建立共享的程序佇列,Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞
2.maxsize是佇列中允許最大項數,省略則無大小限制。
主要方法
q.put方法用以插入資料到佇列中,put方法還有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該佇列有剩餘的空間。如果超時,會丟擲Queue.Full異常。如果blocked為False,但該Queue已滿,會立即丟擲Queue.Full異常。
q.get方法可以從佇列讀取並且刪除一個元素。同樣,get方法有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,那麼在等待時間內沒有取到任何元素,會丟擲Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果佇列為空,則立即丟擲Queue.Empty異常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
q.empty():呼叫此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果佇列中又加入了專案。
q.full():呼叫此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果佇列中的專案被取走。
q.qsize():返回佇列中目前專案的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
佇列的優點是:保證資料不會錯亂,即使在多程序中。因為put和get都是阻塞
from multiprocessing import Queue q = Queue(3) #建立一個佇列,佇列最多存三個資料 q.put('ming') #存資料 q.put(123) print(q.get()) #put預設會阻塞,當佇列裝滿時程式會卡住 print(q.get()) q.put('xian') # q.put('daidai',False) #put第二個引數設定為False會強行將資料放入佇列 q.put('j', True, timeout=3) #timeout等待時間
9.生產者消費者模型
1.什麼是生產者消費者模型?
生產者:生產資料的一方。
消費者:處理使用資料的一方。
例如:一個爬蟲程式:1.爬去資料 2.解析資料
2.怎麼實現?
1.將兩個不同任務分配給不同的程序。
2.提供一個程序共享的資料容器
#模型示例: from multiprocessing import Process, Queue import time import random def get_data(q): for num in range(1, 6): print('正在爬取第%s個數據' % num) time.sleep(random.randint(1, 2)) print('第%s個數據爬取完成。' % num) q.put('第%s個數據' % num) def parse_data(q): for num in range(1, 6): data = q.get() print('正在解析%s' % data) time.sleep(random.randint(1, 2)) print('%s解析完成。' % data) if __name__ == '__main__': q = Queue(5) # 建立一個可以存放5個數據的佇列 #生產者模型 producer = Process(target=get_data, args=(q,)) producer.start() #消費者模型 consumer = Process(target=parse_data, args=(q,)) consumer.start() ###輸出結果 正在爬取第1個數據 第1個數據爬取完成。 正在爬取第2個數據 正在解析第1個數據 第2個數據爬取完成。 正在爬取第3個數據 第1個數據解析完成。 正在解析第2個數據 第3個數據爬取完成。 正在爬取第4個數據 第2個數據解析完成。 正在解析第3個數據 第3個數據解析完成。 第4個數據爬取完成。 正在爬取第5個數據 正在解析第4個數據 第5個數據爬取完成。 第4個數據解析完成。 正在解析第5個數據 第5個數據解析完成。 ###可以看出兩個模型之間互不干擾,相互獨立。