併發程式設計(五)
前言
上篇部落格的內容是守護程序,對於作業系統來說可以在後臺執行一些程式.這篇的內容是互斥鎖,在上上篇部落格上說到程序記憶體空間互相隔離,所以可以通過共享檔案來操作同一個檔案,那麼這樣操作的話會發生什麼呢?
鎖
互斥鎖
多個程序需要共享資料時,先將其鎖定,此時資源狀態為'鎖定',其他程序不能更改;知道該程序釋放資源,將資源的狀態變成非'鎖定',其他的執行緒才能再次鎖定該資源.互斥鎖保證了每次只有一個程序進入寫入操作,從而保證了多程序情況下資料的正確性.
我們使用一個demo 來模擬多個程序操作同一個檔案:
import json import time,random from multiprocessing import Process def show_tickets(name): time.sleep(random.randint(1,3)) with open('ticket.json', 'rt', encoding='utf-8') as f: data = json.load(f) print('%s 檢視 剩餘票數: %s' % (name, data['count'])) def buy_ticket(name): with open('ticket.json', 'rt', encoding='utf-8') as f: dic = json.load(f) if dic['count'] > 0: dic['count'] -= 1 time.sleep(random.randint(1,3)) with open('ticket.json', 'wt', encoding='utf-8') as f: json.dump(dic, f) print('%s: 購票成功' % name) def task(name): show_tickets(name) buy_ticket(name) if __name__ == '__main__': for i in range(1,11): p = Process(target=task, args=(i,)) p.start()
執行結果:
在 ticket.json 裡面只有一張票,結果卻造成多個使用者購買成功,這很顯然是不符合實際情況的.
那麼怎麼解決呢?如果多個程序對同一個檔案進行讀操作可以不進行限制,但是對同一個檔案進行寫操作就必要要進行限制,不可以同時多個人對同一個檔案進行寫操作.python 在多程序模組裡提供一個類, Lock 類,當程序獲取到鎖的時候其他的程序就必須要等待鎖釋放才可以進行爭搶,在這個例子裡面就可以加上一把鎖來保護資料安全.
from multiprocessing import Process, Lock import json,time,random def show_tickets(name): time.sleep(random.randint(1,3)) with open('ticket.json', 'rt', encoding='utf-8') as f: data = json.load(f) print('%s 檢視 剩餘票數: %s' % (name, data['count'])) def buy_ticket(name): time.sleep(random.randint(1,3)) with open('ticket.json', 'rt', encoding='utf-8') as f: dic = json.load(f) if dic['count'] > 0: dic['count'] -= 1 time.sleep(random.randint(1,3)) with open('ticket.json', 'wt', encoding='utf-8') as f: json.dump(dic, f) print('%s: 購票成功' % name) def task(name,lock): show_tickets(name) lock.acquire() buy_ticket(name) lock.release() if __name__ == '__main__': mutex = Lock() for i in range(1,11): p = Process(target=task, args=(i,mutex)) p.start()
執行結果:
這樣加了鎖(互斥鎖)就可以解決同時操作同一個檔案造成的資料混亂問題了.
當使用多程序開發時,如果多個程序同時讀寫同一個資源,可能會造成資料的混亂,為了防止發生問題,使用鎖,或者使用 Process 的方法 join 將並行變為序列.
join 和鎖的區別
- join 人為控制程序的執行順序
- join 把整個程序全部序列,而鎖可以指定部分程式碼序列
一旦序列,效率就會降低,一旦並行,資料就可能會出錯.
程序間通訊
程序間通訊( internal-process communication),我們在開啟子程序是希望子程序幫助完成任務,很多情況下需要將資料返回給父程序,然而程序間記憶體是物理隔離的.
解決辦法:
- 將共享資料放到檔案中
- 管道 多程序模組中的一個類,需要有父子關係
- 共享一快記憶體區域 需要作業系統分配
管道通訊
Pipe類返回一個由管道連線的連線物件,預設情況下為雙工:
from multiprocessing import Process,Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) p.join()
執行結果:
[42, None, 'hello']
例項化 Pipe 類會返回兩個連線物件表示管道的兩端.每個連線物件都有 send() 和 recv() 方法(及其他).請注意,如果兩個程序同時嘗試讀寫管道的同一端,則管道中的資料可能會損壞.當然,同時使用管道的不同端部的過程不存在損壞的風險.
共享記憶體通訊
Queue 通訊
Queue類會生成一個先進先出的容器,通過往佇列中存取資料而進行程序間通訊.
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) p.join()
執行結果:
[42, None, 'hello']
佇列其他特性
# 阻塞操作 必須掌握 q = Queue(3) # # 存入資料 q.put("hello",block=False) q.put(["1","2","3"],block=False) q.put(1,block=False) # 當容量滿的時候 再執行put 預設會阻塞直到執行力了get為止 # 如果修改block=False 直接報錯 因為沒地方放了 # q.put({},block=False) # # # 取出資料 print(q.get(block=False)) print(q.get(block=False)) print(q.get(block=False)) # 對於get當佇列中中沒有資料時預設是阻塞的直達執行了put # 如果修改block=False 直接報錯 因為沒資料可取了 print(q.get(block=False)) # 瞭解 q = Queue(3) q.put("q",timeout=3) q.put("q2",timeout=3) q.put("q3",timeout=3) # 如果滿了 願意等3秒如果3秒後還存不進去 就炸 # q.put("q4",timeout=3) print(q.get(timeout=3)) print(q.get(timeout=3)) print(q.get(timeout=3)) # 如果沒了 願意等3秒如果3秒後還取不到資料 就炸 print(q.get(timeout=3))
Manager 通訊
demo
from multiprocessing importProcess,Manager import time def task(dic): print("子程序xxxxx") # li[0] = 1 # print(li[0]) dic["name"] = "xx" if __name__ == '__main__': m = Manager() # li = m.list([100]) dic = m.dict({}) # 開啟子程序 p = Process(target=task,args=(dic,)) p.start() time.sleep(3)
可以建立一片共享記憶體區域用來存取資料.
生產者消費者模型
什麼是生產者消費者模型
在軟體開發過程中,經常碰到這樣的場景:
某些模組負責生產資料,這些資料由其他模組來負責處理(此處的模組可能是:函式,執行緒,程序等).生產資料的模組稱為生產者,而處理資料的模組稱為消費者.在生產者與消費者之間的緩衝區稱之為倉庫.生產者負責往倉庫運輸商品,而消費者負責從倉庫裡取出商品,這就構成了生產者消費者模型.
結構圖如下:
為了便於理解,我們舉一個寄信的例子。假設你要寄一封信,大致過程如下:
- 你把信寫好——相當於生產者生產資料;
- 你把信放入郵箱——相當於生產者把資料放入緩衝區;
- 郵遞員把信從郵箱取出,做相應處理——相當於消費者把資料取出緩衝區,處理資料.
生產者消費者模型的優點
- 解耦
假設生產者和消費者分別是兩個執行緒.如果讓生產者直接呼叫消費者的某個方法,那麼生產者對於消費者就會產生依賴(耦合).如果未來消費者的程式碼發生改變,可能會影響到生產者的程式碼.而如果兩者都依賴於某個緩衝區,兩者之間不直接依賴,耦合也就相應降低了.
舉個例子,我們去郵局投遞信件,如果不使用郵箱(也就是緩衝區,你必須得把信直接交給郵遞員.有同學會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須 得認識誰是郵遞員,才能把信給他.這就產生了你和郵遞員之間的依賴(相當於生產者和消費者的強耦合).萬一哪天郵遞員換人了,你還要重新認識一下(相當於消費者變化導致修改生產者程式碼).而郵箱相對來說比較固定,你依賴它的成本就比較低(相當於和緩衝區之間的弱耦合).
- 併發
由於生產者與消費者是兩個獨立的併發體,它們之間是使用緩衝區通訊的,生產者只需要往緩衝區裡丟資料,就可以接著生產下一個資料了,而消費者只需要從緩衝區拿資料即可,這樣就不會因為彼此的處理速度而發生阻塞.
繼續上面的例子,如果沒有郵箱,就得在郵局等郵遞員,知道他回來,把信交給他,這期間我們什麼事都幹不了(生產者阻塞).或者郵遞員挨家挨戶問,誰要寄信(消費者阻塞).
- 支援忙閒不均
當生產者製造資料快的時候,消費者來不及處理,為處理的資料可以暫時存在緩衝區中,慢慢處理,而不至於因為消費者的效能過慢造成資料丟失或影響生產者生產資料.
再拿寄信的例子,假設郵遞員一次只能帶走1000封信,萬一碰上情人節或者其他的緊急任務,需要寄出的信超過了1000封,這個時候郵箱作為緩衝區就派上用場了.郵遞員把來不及帶走的信暫存在郵箱中,等下次過來時在拿走.
使用
from multiprocessing import Process, Queue import time, random def producer(name, food, q): for i in range(10): res = '%s %s' % (food, i) time.sleep(random.randint(1,3)) q.put(res) print('%s 生產了 %s' % (name, res)) def consumer(name, q): while True: res = q.get() time.sleep(random.randint(1,3)) print('%s 消費了 %s' % (name, res)) if __name__ == '__main__': q = Queue() p = Process(target=producer, args=('musibii', ':hamburger:', q)) c = Process(target=consumer, args=('thales', q)) p.start() c.start() p.join() c.join() print('主程序')
執行結果:
這樣的話該程序並不會結束,因為 get 方法是阻塞的,資料消費完就會一直等待知道生產者生產新的資料,而生產者只能生產9個數據.所以會一直阻塞.
改進使用
我們需要在消費者消費的時候知道佇列裡面有多少資料,應該什麼時候消費完了,所以可以在生產者裡面生產結束後新增一個標誌,比如 None.
import time, random from multiprocessing import Process, Queue # 製作熱狗 def make_hotdog(queue, name): for i in range(1, 4): time.sleep(random.randint(1, 2)) print("%s 製作了一個%s" % (name, i)) # 生產得到的資料 data = "%s 生產的%s" % (name, i) # 存到佇列中 queue.put(data) # 裝入一個特別的資料 告訴消費方 沒有了 # queue.put(None) # 吃熱狗 def eat_hotdog(queue, name): while True: data = queue.get() if not data: break time.sleep(random.randint(1, 2)) print("%s 吃了 %s" % (name, data)) if __name__ == '__main__': # 建立佇列 q = Queue() p1 = Process(target=make_hotdog, args=(q, "musibii的熱狗店")) p2 = Process(target=make_hotdog, args=(q, "egon的熱狗店")) p3 = Process(target=make_hotdog, args=(q, "eureka的熱狗店")) c1 = Process(target=eat_hotdog, args=(q, "thales")) c2 = Process(target=eat_hotdog, args=(q, "maffia")) p1.start() p2.start() p3.start() c1.start() c2.start() # 讓主程序等三家店全都做完後.... p1.join() p2.join() p3.join() # 新增結束標誌注意這種方法有幾個消費者就加幾個None 不太合適 不清楚將來有多少消費者 q.put(None) q.put(None) # 現在 需要知道什麼時候做完熱狗了 生產者不知道消費者也不知道 # 只有佇列知道 print("主程序over") # 生產方不生產了 然而消費方不知道 所以已知等待get函式阻塞 # 三家店都放了一個空表示沒熱狗了但是消費者只有兩個 他們只要看見None 就認為沒有了 # 於是程序也就結束了造成一些資料沒有被處理 # 等待做有店都做完熱狗在放None
執行結果:
這樣就解決了最初版本消費之因為沒有資料而阻塞的問題了,但是這裡還是有問題,因為不知道到底有多少消費者,因為想讓消費者知道資料已經結束了的話,需要給每個消費者一個標誌位,這樣是不現實的.
完美使用
python 多程序模組提供了一個JoinableQueue類,追根溯源繼承於 Queue,原始碼看的頭疼.
import time, random from multiprocessing import Process, JoinableQueue # 製作熱狗 def make_hotdog(queue, name): for i in range(1,4): time.sleep(random.randint(1, 2)) print("%s 製作的%s" % (name, i)) # 生產得到的資料 data = "%s 生產的%s" % (name, i) # 存到佇列中 queue.put(data) # 裝入一個特別的資料 告訴消費方 沒有了 # queue.put(None) # 吃熱狗 def eat_hotdog(queue, name): while True: data = queue.get() time.sleep(random.randint(1, 2)) print("%s 吃了%s" % (name, data)) # 該函式就是用來記錄一共給消費方多少資料了 就是get次數 queue.task_done() if __name__ == '__main__': # 建立佇列 q = JoinableQueue() p1 = Process(target=make_hotdog, args=(q, "musibii的熱狗店")) p2 = Process(target=make_hotdog, args=(q, "egon的熱狗店")) p3 = Process(target=make_hotdog, args=(q, "eureka的熱狗店")) c1 = Process(target=eat_hotdog, args=(q, "thales")) c2 = Process(target=eat_hotdog, args=(q, "maffia")) p1.start() p2.start() p3.start() # 將消費者作為主程序的守護程序 c1.daemon = True c2.daemon = True c1.start() c2.start() # 讓主程序等三家店全都做完後.... p1.join() p2.join() p3.join() # 如何知道生產方生產完了 並且 消費方也吃完了 # 方法一:等待做有店都做完熱狗在放None # # 新增結束標誌注意這種方法有幾個消費者就加幾個None 不太合適 不清楚將來有多少消費者 # q.put(None) # q.put(None) # 主程序等到佇列結束時再繼續那佇列什麼時候算結束? 生產者已經生產完了 並且消費者把資料全取完了 q.join()# 已經明確生產放一共有多少資料 # 現在 需要知道什麼時候做完熱狗了 生產者不知道消費者也不知道 # 只有佇列知道 print("主程序over") # 生產方不生產了 然而消費方不知道 所以一直等待get函式阻塞 # 三家店都放了一個空表示沒熱狗了但是消費者只有兩個 他們只要看見None 就認為沒有了 # 於是程序也就結束了造成一些資料沒有被處理
執行結果:
檢視 JoinableQueue 類方法 task_done 原始碼:
看不懂.........