1. 程式人生 > >python之路-day31-守護程序、鎖、佇列、生產者消費者模型

python之路-day31-守護程序、鎖、佇列、生產者消費者模型

 

一、守護程序

    之前我們講的子程序是不會隨著主程序而結束的,子程序全部執行完之後,程式才結束,那麼如果有

  一天我們的需求是我的主程序結束了,由主程序建立的子程序必須跟著結束,怎麼辦?守護程序就來了!

    主程序建立守護程序

      其一:守護程序會在主程序程式碼執行結束後就終止

      其二:守護程序內無法再開啟子程序,否則丟擲異常:AssertionError: daemonic processes are not allowed to have children

    注意:程序之間是互相獨立的,主程式程式碼執行結束,守護程序隨即終止

 1 import os
 2 import time
 3 from multiprocessing import Process
 4 
 5 class Myprocess(Process):
 6     def __init__(self,person):
 7         super().__init__()
 8         self.person = person
 9     def run(self):
10         print(os.getpid(),self.name)
11         print('%s正在和女主播聊天' %self.person)
12 time.sleep(3) 13 if __name__ == '__main__': 14 p=Myprocess('太白') 15 p.daemon=True #一定要在p.start()前設定,設定p為守護程序,禁止p建立子程序,並且父程序程式碼執行結束,p即終止執行 16 p.start() 17 # time.sleep(1) # 在sleep時linux下檢視程序id對應的程序ps -ef|grep id 18 print('')
守護程序

 

二、程序同步(鎖)

    我們之前實現了程序的非同步,讓多個任務可以同時在幾個程序中併發處理,他們之間的執行時沒有順序的,一旦

  開啟也不受我們控制。儘管併發程式設計讓我們能更加充分的利用IO資源,但是也給我們帶來了新的問題:程序之間資料

  不共享。雖然可以共享同一臺文件系統,所以訪問同一個檔案,或者同一個列印紅緞,是沒有問題的,而共享帶來的是

  競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理。

 1 import os
 2 import time
 3 import random
 4 from multiprocessing import Process
 5 
 6 def work(n):
 7     print('%s: %s is running' %(n,os.getpid()))
 8     time.sleep(random.random())
 9     print('%s:%s is done' %(n,os.getpid()))
10 
11 if __name__ == '__main__':
12     for i in range(5):
13         p=Process(target=work,args=(i,))
14         p.start()
15 
16 # 看結果:通過結果可以看出兩個問題:問題一:每個程序中work函式的第一個列印就不是按照我們for迴圈的0-4的順序來列印的
17 #問題二:我們發現,每個work程序中有兩個列印,但是我們看到所有程序中第一個列印的順序為0-2-1-4-3,但是第二個列印沒有按照這個順序,變成了2-1-0-3-4,說明我們一個程序中的程式的執行順序都混亂了。
18 #問題的解決方法,第二個問題加鎖來解決,第一個問題是沒有辦法解決的,因為程序開到了核心,有作業系統來決定程序的排程,我們自己控制不了
19 # 0: 9560 is running
20 # 2: 13824 is running
21 # 1: 7476 is running
22 # 4: 11296 is running
23 # 3: 14364 is running
24 
25 # 2:13824 is done
26 # 1:7476 is done
27 # 0:9560 is done
28 # 3:14364 is done
29 # 4:11296 is done
多程序搶佔輸出資源,導致列印混亂
 1 #由併發變成了序列,犧牲了執行效率,但避免了競爭
 2 from multiprocessing import Process,Lock
 3 import os,time
 4 def work(n,lock):
 5     #加鎖,保證每次只有一個程序在執行鎖裡面的程式,這一段程式對於所有寫上這個鎖的程序,大家都變成了序列
 6     lock.acquire()
 7     print('%s: %s is running' %(n,os.getpid()))
 8     time.sleep(1)
 9     print('%s:%s is done' %(n,os.getpid()))
10     #解鎖,解鎖之後其他程序才能去執行自己的程式
11     lock.release()
12 if __name__ == '__main__':
13     lock=Lock()
14     for i in range(5):
15         p=Process(target=work,args=(i,lock))
16         p.start()
17 
18 #列印結果:
19 # 2: 10968 is running
20 # 2:10968 is done
21 # 0: 7932 is running
22 # 0:7932 is done
23 # 4: 4404 is running
24 # 4:4404 is done
25 # 1: 12852 is running
26 # 1:12852 is done
27 # 3: 980 is running
28 # 3:980 is done
29 
30 #結果分析:(自己去多次執行一下,看看結果,我拿出其中一個結果來看)通過結果我們可以看出,多程序剛開始去執行的時候,每次執行,首先打印出來哪個程序的程式是不固定的,但是我們解決了上面列印混亂示例程式碼的第二個問題,那就是同一個程序中的兩次列印都是先完成的,然後才切換到下一個程序去,列印下一個程序中的兩個列印結果,說明我們控制住了同一程序中的程式碼執行順序,如果涉及到多個程序去操作同一個資料或者檔案的時候,就不擔心資料算錯或者檔案中的內容寫入混亂了。
加鎖:由併發改成了序列,犧牲了執行效率,但避免了競爭

    上面這種情況雖然用加鎖的形式是險惡熟悉怒的執行,但是程式又重新變成串行了,這樣確實會浪費了時間

  但是卻保證了資料安全

    接下來,模擬搶票為例,來看看資料安全的重要性。

 1 #注意:首先在當前檔案目錄下建立一個名為db的檔案
 2 #檔案db的內容為:{"count":1},只有這一行資料,並且注意,每次執行完了之後,檔案中的1變成了0,你需要手動將0改為1,然後在去執行程式碼。
 3 #注意一定要用雙引號,不然json無法識別
 4 #併發執行,效率高,但競爭寫同一檔案,資料寫入錯亂
 5 from multiprocessing import Process,Lock
 6 import time,json,random
 7 
 8 #檢視剩餘票數
 9 def search():
10     dic=json.load(open('db')) #開啟檔案,直接load檔案中的內容,拿到檔案中的包含剩餘票數的字典
11     print('\033[43m剩餘票數%s\033[0m' %dic['count'])
12 
13 #搶票
14 def get():
15     dic=json.load(open('db'))
16     time.sleep(0.1)       #模擬讀資料的網路延遲,那麼程序之間的切換,導致所有人拿到的字典都是{"count": 1},也就是每個人都拿到了這一票。
17     if dic['count'] >0:
18         dic['count']-=1
19         time.sleep(0.2)   #模擬寫資料的網路延遲
20         json.dump(dic,open('db','w'))
21         #最終結果導致,每個人顯示都搶到了票,這就出現了問題~
22         print('\033[43m購票成功\033[0m')
23 
24 def task():
25     search()
26     get()
27 
28 if __name__ == '__main__':
29     for i in range(3): #模擬併發100個客戶端搶票
30         p=Process(target=task)
31         p.start()
32 
33 #看結果分析:由於網路延遲等原因使得程序切換,導致每個人都搶到了這最後一張票
34 # 剩餘票數1
35 # 剩餘票數1
36 # 剩餘票數1
37 # 購票成功
38 # 購票成功
39 # 購票成功
併發執行,效率高,但是競爭同一個檔案,導致資料混亂
 1 #注意:首先在當前檔案目錄下建立一個名為db的檔案
 2 #檔案db的內容為:{"count":1},只有這一行資料,並且注意,每次執行完了之後,檔案中的1變成了0,你需要手動將0改為1,然後在去執行程式碼。
 3 #注意一定要用雙引號,不然json無法識別
 4 #加鎖保證資料安全,不出現混亂
 5 from multiprocessing import Process,Lock
 6 import time,json,random
 7 
 8 #檢視剩餘票數
 9 def search():
10     dic=json.load(open('db')) #開啟檔案,直接load檔案中的內容,拿到檔案中的包含剩餘票數的字典
11     print('\033[43m剩餘票數%s\033[0m' %dic['count'])
12 
13 #搶票
14 def get():
15     dic=json.load(open('db'))
16     time.sleep(0.1)       #模擬讀資料的網路延遲,那麼程序之間的切換,導致所有人拿到的字典都是{"count": 1},也就是每個人都拿到了這一票。
17     if dic['count'] >0:
18         dic['count']-=1
19         time.sleep(0.2)   #模擬寫資料的網路延遲
20         json.dump(dic,open('db','w'))
21         #最終結果導致,每個人顯示都搶到了票,這就出現了問題~
22         print('\033[43m購票成功\033[0m')
23     else:
24         print('sorry,沒票了親!')
25 def task(lock):
26     search()
27     #因為搶票的時候是發生資料變化的時候,所有我們將鎖加加到這裡
28     lock.acquire()
29     get()
30     lock.release()
31 if __name__ == '__main__':
32     lock = Lock() #建立一個鎖
33     for i in range(3): #模擬併發100個客戶端搶票
34         p=Process(target=task,args=(lock,)) #將鎖作為引數傳給task函式
35         p.start()
36 
37 #看結果分析:只有一個人搶到了票
38 # 剩餘票數1
39 # 剩餘票數1
40 # 剩餘票數1
41 # 購票成功   #幸運的人兒
42 # sorry,沒票了親!
43 # sorry,沒票了親!
加鎖:購票行為由併發變成了序列,犧牲了效率,但是保證了資料安全

 

  程序鎖的總結:

#加鎖可以保證多個程序修改同一塊資料時,同一時間只能有一個任務可以進行修改,即序列的修改,沒錯,速度是慢了,但犧牲了速度卻保證了資料安全。
雖然可以用檔案共享資料實現程序間通訊,但問題是:
1.效率低(共享資料基於檔案,而檔案是硬碟上的資料)
2.需要自己加鎖處理

#因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個程序共享一塊記憶體的資料)2、幫我們處理好鎖問題。這就是mutiprocessing模組為我們提供的基於訊息的IPC通訊機制:佇列和管道。
佇列和管道都是將資料存放於記憶體中
佇列又是基於(管道+鎖)實現的,可以讓我們從複雜的鎖問題中解脫出來,
我們應該儘量避免使用共享資料,儘可能使用訊息傳遞和佇列,避免處理複雜的同步和鎖問題,而且在程序數目增多時,往往可以獲得更好的可獲展性。

IPC通訊機制(瞭解):IPC是intent-Process Communication的縮寫,含義為程序間通訊或者跨程序通訊,是指兩個程序之間進行資料交換的過程。IPC不是某個系統所獨有的,任何一個作業系統都需要有相應的IPC機制,
比如Windows上可以通過剪貼簿、管道和郵槽等來進行程序間通訊,而Linux上可以通過命名共享內容、訊號量等來進行程序間通訊。Android它也有自己的程序間通訊方式,Android建構在Linux基礎上,繼承了一
部分Linux的通訊方式。

  

三、佇列(推薦使用)  

    程序彼此之間互相隔離,要實現程序間通訊(IPC),multiprocessing模組支援兩種形式:對列和管道,這兩種方式都是使用訊息傳遞的。

  佇列就像一個特殊的列表,但是可以設定固定長度,並且從前面插入資料,從後面取出資料,先進先出。

Queue([maxsize])建立共享的程序佇列
引數:maxsize是佇列中允許的最大項數。如果省略此引數,則無大小限制。
底層佇列使用管道和鎖實現。

  queue的方法介紹  

q = Queue([maxsize])
建立共享的程序佇列。maxsize是佇列中允許的最大項數。如果省略此引數,則無大小限制。
底層佇列使用管道和鎖定實現。另外,還需要執行支援執行緒以便佇列中的資料傳輸到底層管道中。
Queue的例項q具有以下方法:

q.get([block[,timeout]])
返回q中的一個專案。如果q為空,此方法將阻塞,直到佇列中有專案可用為止。block用於控制阻塞行為,預設為True。如果設定為False,將引發Queue.Empty異常(定義在Queue模組中)。timeout是可選超時時間,用在阻塞模式中。如果在指定的時間間隔內沒有專案變為可用,將引發Queue.Empty異常

q.put(item [,block[,timeout]])
將item放入佇列。如果佇列已滿,此方法將阻塞至有空間可用為止。block用於控制阻塞行為,預設為True。如果設定為False,將引發Queue.Empty異常(定義在Queue庫模組中)
timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。

q.get_nowait()
同q.get(False)方法。

q.qsize()
返回佇列中目前專案的正確數量。此函式的結果並不可靠,因為在返回結果和在稍後程式中使用結果之間,佇列中可能新增或刪除了專案。在某些系統上,此方法可能引發NotImplementedError異常。

q.empty() 
如果呼叫此方法時 q為空,返回True。如果其他程序或執行緒正在往佇列中新增專案,結果是不可靠的。也就是說,在返回和使用結果之間,佇列中可能已經加入新的專案。

q.full() 
如果q已滿,返回為True. 由於執行緒的存在,結果也可能是不可靠的(參考q.empty()方法)。

  queue的其他方法(瞭解)

q.close() 
關閉佇列,防止佇列中加入更多資料。呼叫此方法時,後臺執行緒將繼續寫入那些已入佇列但尚未寫入的資料,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動呼叫此方法。關閉佇列不會在佇列使用者中生成任何型別的資料結束訊號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的佇列不會導致get()方法返回錯誤。

q.cancel_join_thread() 
不會再程序退出時自動連線後臺執行緒。這可以防止join_thread()方法阻塞。

q.join_thread() 
連線佇列的後臺執行緒。此方法用於在呼叫q.close()方法後,等待所有佇列項被消耗。預設情況下,此方法由不是q的原始建立者的所有程序呼叫。呼叫q.cancel_join_thread()方法可以禁止這種行為。

  queue的簡單用法

 1 from multiprocessing import Queue
 2 q=Queue(3) #建立一個佇列物件,佇列長度為3
 3 
 4 #put ,get ,put_nowait,get_nowait,full,empty
 5 q.put(3)   #往佇列中新增資料
 6 q.put(2)
 7 q.put(1)
 8 # q.put(4)   # 如果佇列已經滿了,程式就會停在這裡,等待資料被別人取走,再將資料放入佇列。
 9            # 如果佇列中的資料一直不被取走,程式就會永遠停在這裡。
10 try:
11     q.put_nowait(4) # 可以使用put_nowait,如果佇列滿了不會阻塞,但是會因為佇列滿了而報錯。
12 except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去,但是會丟掉這個訊息。
13     print('佇列已經滿了')
14 
15 # 因此,我們再放入資料之前,可以先看一下佇列的狀態,如果已經滿了,就不繼續put了。
16 print(q.full()) #檢視是否滿了,滿了返回True,不滿返回False
17 
18 print(q.get())  #取出資料
19 print(q.get())
20 print(q.get())
21 # print(q.get()) # 同put方法一樣,如果佇列已經空了,那麼繼續取就會出現阻塞。
22 try:
23     q.get_nowait(3) # 可以使用get_nowait,如果佇列滿了不會阻塞,但是會因為沒取到值而報錯。
24 except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去。
25     print('佇列已經空了')
26 
27 print(q.empty()) #空了
queue的簡單用法

  子程序與父程序通過佇列進行通訊

 1 #看下面的佇列的時候,按照編號看註釋
 2 import time
 3 from multiprocessing import Process, Queue
 4 
 5 # 8. q = Queue(2) #建立一個Queue物件,如果寫在這裡,那麼在windows還子程序去執行的時候,我們知道子程序中還會執行這個程式碼,但是子程序中不能夠再次建立了,也就是這個q就是你主程序中建立的那個q,通過我們下面在主程序中先添加了一個字串之後,在去開啟子程序,你會發現,小鬼這個字串還在佇列中,也就是說,我們使用的還是主程序中建立的這個佇列。
 6 def f(q):
 7     # q = Queue() #9. 我們在主程序中開啟了一個q,如果我們在子程序中的函式裡面再開一個q,那麼你下面q.put('姑娘,多少錢~')新增到了新建立的這q裡裡面了
 8     q.put('姑娘,多少錢~')  #4.呼叫主函式中p程序傳遞過來的程序引數 put函式為向佇列中新增一條資料。
 9     # print(q.qsize()) #6.檢視佇列中有多少條資料了
10 
11 def f2(q):
12     print('》》》》》》》》')
13     print(q.get())  #5.取資料
14 if __name__ == '__main__':
15     q = Queue() #1.建立一個Queue物件
16     q.put('小鬼')
17 
18     p = Process(target=f, args=(q,)) #2.建立一個程序
19     p2 = Process(target=f2, args=(q,)) #3.建立一個程序
20     p.start()
21     p2.start()
22     time.sleep(1) #7.如果阻塞一點時間,就會出現主程序執行太快,導致我們在子程序中檢視qsize為1個。
23     # print(q.get()) #結果:小鬼
24     print(q.get()) #結果:姑娘,多少錢~
25     p.join()
子程序與父程序通過佇列進行通訊

 

四、生產者和消費者模型

 

在併發程式設計中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。

    為什麼要使用生產者和消費者模式

      線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

    什麼是生產者消費者模式

      生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力,並且我可以根據生產速度和消費速度來均衡一下多少個生產者可以為多少個消費者提供足夠的服務,就可以開多程序等等,而這些程序都是到阻塞佇列或者說是緩衝區中去獲取或者新增資料。

    通俗的解釋:看圖說話。。背景有點亂,等我更新~~

 

 

 1 from multiprocessing import Process,Queue
 2 import time,random,os
 3 def consumer(q):
 4     while True:
 5         res=q.get()
 6         time.sleep(random.randint(1,3))
 7         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
 8 
 9 def producer(q):
10     for i in range(10):
11         time.sleep(random.randint(1,3))
12         res='包子%s' %i
13         q.put(res)
14         print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
15 
16 if __name__ == '__main__':
17     q=Queue()
18     #生產者們:即廚師們
19     p1=Process(target=producer,args=(q,))
20 
21     #消費者們:即吃貨們
22     c1=Process(target=consumer,args=(q,))
23 
24     #開始
25     p1.start()
26     c1.start()
27     print('')
基於佇列的生產者消費模型
 1 #生產者消費者模型總結
 2 
 3     #程式中有兩類角色
 4         一類負責生產資料(生產者)
 5         一類負責處理資料(消費者)
 6         
 7     #引入生產者消費者模型為了解決的問題是:
 8         平衡生產者與消費者之間的工作能力,從而提高程式整體處理資料的速度
 9         
10     #如何實現:
11         生產者<-->佇列<——>消費者
12     #生產者消費者模型實現類程式的解耦和
生產者消費者模型總結

    通過上面基於佇列的生產者消費者程式碼示例,我們發現一個問題:主程序永遠不會結束,原因是:生產者p在生產完後就結束了,但是消費者c在取空了q之後,則一直處於死迴圈中且卡在q.get()這一步。

    解決方式無非是讓生產者在生產完畢後,往佇列中再發一個結束訊號,這樣消費者在接收到結束訊號後就可以break出死迴圈

 1 from multiprocessing import Process,Queue
 2 import time,random,os
 3 def consumer(q):
 4     while True:
 5         res=q.get()
 6         if res is None:break #收到結束訊號則結束
 7         time.sleep(random.randint(1,3))
 8         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
 9 
10 def producer(q):
11     for i in range(5):
12         time.sleep(random.randint(1,3))
13         res='包子%s' %i
14         q.put(res)
15         print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
16     q.put(None) #在自己的子程序的最後加入一個結束訊號
17 if __name__ == '__main__':
18     q=Queue()
19     #生產者們:即廚師們
20     p1=Process(target=producer,args=(q,))
21 
22     #消費者們:即吃貨們
23     c1=Process(target=consumer,args=(q,))
24 
25     #開始
26     p1.start()
27     c1.start()
28 
29     print('')
子程序生產者在生產完畢後傳送結束訊號None

注意:結束訊號None,不一定要由生產者發,主程序裡同樣可以發,但主程序需要等生產者結束後才應該傳送該訊號

 1 from multiprocessing import Process,Queue
 2 import time,random,os
 3 def consumer(q):
 4     while True:
 5         res=q.get()
 6         if res is None:break #收到結束訊號則結束
 7         time.sleep(random.randint(1,3))
 8         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
 9 
10 def producer(q):
11     for i in range(2):
12         time.sleep(random.randint(1,3))
13         res='包子%s' %i
14         q.put(res)
15         print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
16 
17 if __name__ == '__main__':
18     q=Queue()
19     #生產者們:即廚師們
20     p1=Process(target=producer,args=(q,))
21 
22     #消費者們:即吃貨們
23     c1=Process(target=consumer,args=(q,))
24 
25     #開始
26     p1.start()
27     c1.start()
28 
29     p1.join() #等待生產者程序結束
30     q.put(None) #傳送結束訊號
31     print('')
主程序在生產者生產完後傳送結束訊號

  

   但上述解決方式,在有多個生產者和多個消費者時,由於佇列我們說了是程序安全的,我一個程序拿走了結束訊號,另外一個程序就拿不到了,還需要多傳送一個結束訊號,有幾個取資料的程序就要傳送幾個結束訊號,我們則需要用一個很low的方式去解決

 1 from multiprocessing import Process,Queue
 2 import time,random,os
 3 def consumer(q):
 4     while True:
 5         res=q.get()
 6         if res is None:break #收到結束訊號則結束
 7         time.sleep(random.randint(1,3))
 8         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
 9 
10 def producer(name,q):
11     for i in range(2):
12         time.sleep(random.randint(1,3))
13         res='%s%s' %(name,i)
14         q.put(res)
15         print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
16 
17 
18 
19 if __name__ == '__main__':
20     q=Queue()
21     #生產者們:即廚師們
22     p1=Process(target=producer,args=('包子',q))
23     p2=Process(target=producer,args=('骨頭',q))
24     p3=Process(target=producer,args=('泔水',q))
25 
26     #消費者們:即吃貨們
27     c1=Process(target=consumer,args=(q,))
28     c2=Process(target=consumer,args=(q,))
29 
30     #開始
31     p1.start()
32     p2.start()
33     p3.start()
34     c1.start()
35 
36     p1.join() #必須保證生產者全部生產完畢,才應該傳送結束訊號
37     p2.join()
38     p3.join()
39     q.put(None) #有幾個消費者就應該傳送幾次結束訊號None
40     q.put(None) #傳送結束訊號
41     print('')
有多個消費者和生產者的時候需要傳送多次結束訊號

 

 

其實我們的思路無非是傳送結束訊號而已,有另外一種佇列提供了這種機制

 JoinableQueue([maxsize]) 

 

#JoinableQueue([maxsize]):這就像是一個Queue物件,但佇列允許專案的使用者通知生成者專案已經被成功處理。通知程序是使用共享的訊號和條件變數來實現的。

   #引數介紹:
    maxsize是佇列中允許最大項數,省略則無大小限制。    
  #方法介紹:
    JoinableQueue的例項p除了與Queue物件相同的方法之外還具有:
    q.task_done():使用者使用此方法發出訊號,表示q.get()的返回專案已經被處理。如果呼叫此方法的次數大於從佇列中刪除專案的數量,將引發ValueError異常
    q.join():生產者呼叫此方法進行阻塞,直到佇列中所有的專案均被處理。阻塞將持續到佇列中的每個專案均呼叫q.task_done()方法為止,也就是佇列中的資料全部被get拿走了。

 

 1 from multiprocessing import Process,JoinableQueue
 2 import time,random,os
 3 def consumer(q):
 4     while True:
 5         res=q.get()
 6         # time.sleep(random.randint(1,3))
 7         time.sleep(random.random())
 8         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
 9         q.task_done() #向q.join()傳送一次訊號,證明一個數據已經被取走並執行完了
10 
11 def producer(name,q):
12     for i in range(10):
13         # time.sleep(random.randint(1,3))
14         time.sleep(random.random())
15         res='%s%s' %(name,i)
16         q.put(res)
17         print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
18     print('%s生產結束'%name)
19     q.join() #生產完畢,使用此方法進行阻塞,直到佇列中所有專案均被處理。
20     print('%s生產結束~~~~~~'%name)
21 
22 if __name__ == '__main__':
23     q=JoinableQueue()
24     #生產者們:即廚師們
25     p1=Process(target=producer,args=('包子',q))
26     p2=Process(target=producer,args=('骨頭',q))
27     p3=Process(target=producer,args=('泔水',q))
28 
29     #消費者們:即吃貨們
30     c1=Process(target=consumer,args=(q,))
31     c2=Process(target=consumer,args=(q,))
32     c1.daemon=True #如果不加守護,那麼主程序結束不了,但是加了守護之後,必須確保生產者的內容生產完並且被處理完了,所有必須還要在主程序給生產者設定join,才能確保生產者生產的任務被執行完了,並且能夠確保守護程序在所有任務執行完成之後才隨著主程序的結束而結束。
33     c2.daemon=True
34 
35     #開始
36     p_l=[p1,p2,p3,c1,c2]
37     for p in p_l:
38         p.start()
39 
40     p1.join() #我要確保你的生產者程序結束了,生產者程序的結束標誌著你生產的所有的人任務都已經被處理完了
41     p2.join()
42     p3.join()
43     print('')
44     
45     # 主程序等--->p1,p2,p3等---->c1,c2
46     # p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到佇列的資料
47     # 因而c1,c2也沒有存在的價值了,不需要繼續阻塞在程序中影響主程序了。應該隨著主程序的結束而結束,所以設定成守護程序就可以了。
JoinableQueue佇列實現生產者消費者模型

 

 

 

 

 

 

 

 

 

 

 

 

 

同一個