5 並發編程--隊列&生產者消費者模型
阿新 • • 發佈:2018-09-30
如果 消費者模式 view 解決 多進程 produce 2.4 color __name__
1、隊列的介紹
進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
創建隊列的類(底層就是以管道和鎖定的方式實現):
Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
參數介紹:
maxsize是隊列中允許最大項數,省略則無大小限制。
但需要明確:
1、隊列內存放的是消息而非大數據
2、隊列占用的是內存空間,因而maxsize即便是無大小限制也受限於內存大小
主要方法:
q.put方法用以插入數據到隊列中。 q.get方法可以從隊列讀取並且刪除一個元素。
from multiprocessing import Process,Queue q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(1) q.put(2) q.put(3) print(q.full()) #滿了 # q.put(4) #再放就阻塞住了 print(q.get()) print(q.get()) print(q.get()) print(q.empty()) #空了 # print(q.get()) #再取就阻塞住了 True 1 2 3 True
二、生產者消費者模型
1、生產者消費者模型介紹
為什麽要使用生產者消費者模型
生產者指的是生產數據的任務,消費者指的是處理數據的任務,在並發編程中,
如果生產者處理速度很快,而消費者處理速度很慢,那麽生產者就必須等待消費者處理完,
才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麽消費者就必須等待生產者。
為了解決這個問題於是引入了生產者和消費者模式。
什麽是生產者和消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
這個阻塞隊列就是用來給生產者和消費者解耦的
2、生產者消費者模型實現
2.1 引入模型(生產一個消費一個)
import time def producer(): for i in range(3): res = f"包子 {i}" time.sleep(0.5) print(f"生產者生產了{res}") consumer(res) def consumer(res): time.sleep(1) print(f"消費者吃了{res}") if __name__ == ‘__main__‘: producer() 生產者生產了包子 0 消費者吃了包子 0 生產者生產了包子 1 消費者吃了包子 1 生產者生產了包子 2 消費者吃了包子 2View Code
2.2 實現生產者消費者模型,但有小問題主進程永遠不會結束
消費者不知道生產者已經完畢,一直處於等待狀態,死循環
from multiprocessing import Process,Queue import time def producer(q): for i in range(3): res = f"包子 {i}" time.sleep(0.5) print(f"生產者生產了{res}") # 把生產的給隊列保存 q.put(res) def consumer(q): while True:# 消費者一直接收 res = q.get() time.sleep(1) print(f"消費者吃了{res}") if __name__ == ‘__main__‘: q = Queue() p1 = Process(target=producer,args=(q,)) p2 = Process(target=consumer,args=(q,)) p1.start() p2.start() print(‘主‘) 主 生產者生產了包子 0 生產者生產了包子 1 生產者生產了包子 2 消費者吃了包子 0 消費者吃了包子 1 消費者吃了包子 2View Code
此時的問題是主進程永遠不會結束,原因是:生產者p在生產完後就結束了,但是消費者c在取空了q之後,則一直處於死循環中且卡在q.get()這一步。
解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就可以break出死循環
2.3、解決辦法--其實我們的思路無非是發送結束信號而已
隊列先進先出
from multiprocessing import Process,Queue import time def producer(q): for i in range(3): res = f"包子 {i}" time.sleep(0.5) print(f"生產者生產了{res}") # 把生產的給隊列保存 q.put(res) def consumer(q): while True:# 消費者一直接收 res = q.get() if res == None: break time.sleep(1) print(f"消費者吃了{res}") if __name__ == ‘__main__‘: q = Queue() p1 = Process(target=producer,args=(q,)) p2 = Process(target=consumer,args=(q,)) p1.start() p2.start() p1.join()# 主進程等待p1子進程執行完畢--即生產者生產完畢 q.put(None) print(‘主‘) 生產者生產了包子 0 生產者生產了包子 1 生產者生產了包子 2 消費者吃了包子 0 主 消費者吃了包子 1 消費者吃了包子 2View Code
但上述解決方式,在有多個生產者和多個消費者時,我們則需要用一個很low的方式去解決,有幾個消費者就需要發送幾次結束信號:相當low,例如
if __name__ == ‘__main__‘: q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,‘egon1‘,‘包子‘)) p2=Process(target=producer,args=(q,‘egon2‘,‘骨頭‘)) p3=Process(target=producer,args=(q,‘egon3‘,‘泔水‘)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,‘alex1‘)) c2=Process(target=consumer,args=(q,‘alex2‘)) #開始 p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() q.put(None) q.put(None) q.put(None) print(‘主‘)
2.4 JoinableQueue([maxsize])
5 並發編程--隊列&生產者消費者模型