1. 程式人生 > >Python學習【第20篇】:互斥鎖以及程序之間的三種通訊方式(IPC)以及生產者個消費者模型 python併發程式設計之多程序1-----------互斥鎖與程序間的通訊

Python學習【第20篇】:互斥鎖以及程序之間的三種通訊方式(IPC)以及生產者個消費者模型 python併發程式設計之多程序1-----------互斥鎖與程序間的通訊

python併發程式設計之多程序1-----------互斥鎖與程序間的通訊

一、互斥鎖

程序之間資料隔離,但是共享一套檔案系統,因而可以通過檔案來實現程序直接的通訊,但問題是必須自己加鎖處理。

注意:加鎖的目的是為了保證多個程序修改同一塊資料時,同一時間只能有一個修改,即序列的修改,沒錯,速度是慢了,犧牲了速度而保證了資料安全。

1.上廁所的小例子:你上廁所的時候肯定得鎖門吧,有人來了看見門鎖著,就會在外面等著,等你吧門開開出來的時候,下一個人才去上廁所。

複製程式碼
 1 from multiprocessing import Process,Lock
 2 import os
 3 import time
 4 def work(mutex):
 5     mutex.acquire()
 6     print('task[%s] 上廁所'%os.getpid()) 7 time.sleep(3) 8 print('task[%s] 上完廁所'%os.getpid()) 9  mutex.release() 10 if __name__ == '__main__': 11 mutex = Lock() 12 p1 = Process(target=work,args=(mutex,)) 13 p2 = Process(target=work,args=(mutex,)) 14 p3 = Process(target=work,args=(mutex,)) 15  p1.start() 16  p2.start() 17  p3.start() 18  p1.join() 19  p2.join() 20  p3.join() 21 print('主')
複製程式碼

二、模擬搶票(也是利用了互斥鎖的原理  :LOCK互斥鎖)

複製程式碼
 1 import json
 2 import time
 3 import random
 4 import os
 5 from multiprocessing import Process,Lock
 6 def chakan(): 7 dic = json.load(open('piao',)) # 先檢視票數,也就是開啟那個檔案 8 print('剩餘票數:%s' % dic['count']) # 檢視剩餘的票數 9 def buy(): 10 dic = json.load(open('piao',)) 11 if dic['count']>0: #如果還有票 12 dic['count']-=1 #就修改裡面的值-1 13 time.sleep(random.randint(1,3)) #執行裡面買票的一系列操作就先不執行了,讓睡一會代替(並且隨機的睡) 14 json.dump(dic,open('piao','w')) 15 print('%s 購票成功' % os.getpid()) # 當前的那個id購票成功 16 def task(mutex): #搶票 17 chakan() #因為檢視的時候大家都可以看到,不需要加鎖 18 mutex.acquire() #加鎖 19 buy() #買的時候必須一個一個的買,先等一個人買完了,後面的人在買 20 mutex.release() #取消鎖 21 if __name__ == '__main__': 22 mutex = Lock() 23 for i in range(50):#讓50個人去訪問那個票數 24 p = Process(target=task,args=(mutex,)) 25 p.start()
複製程式碼

三、Process物件的其他屬性

p.daemon :守護程序(必須在開啟之前設定守護程序):如果父程序死,子程序p也死了

p.join:父程序等p執行完了才執行主程序,是父程序阻塞在原地,而p仍然在後臺執行。

terminate:強制關閉。(確保p裡面沒有其他子程序的時候關閉,如果裡面有子程序,你去用這個方法強制關閉了就會產生殭屍程序(打個比方:如果你老子掛了,你還沒掛,那麼就沒人給你收屍了,啊哈哈))

is_alive:關閉程序的時候,不會立即關閉,所以is_alive立刻檢視的結果可能還是存活

p.join():父程序在等p的結束,是父程序阻塞在原地,而p仍然在後臺執行

p.name:檢視名字

p.pid :檢視id

我們可以簡單介紹一下殭屍程序:

子程序執行完成,但是父程序遲遲沒有進行回收,此時子程序實際上並沒有退出,其仍然佔用著系統資源,這樣的⼦程序稱為殭屍程序

因為殭屍程序的資源一直未被回收,造成了系統資源的浪費,過多的殭屍程序將造成系統性能下降,所以應避免出現僵⼫程序。

複製程式碼
 1 from multiprocessing import Process
 2 import os
 3 import time
 4 def work():
 5     print('%s is working'%os.getpid())
 6     time.sleep(3) 7 if __name__ == '__main__': 8 p1 =Process(target=work) 9 p2 =Process(target=work) 10 p3 =Process(target=work) 11 # p1.daemon = True 12 # p2.daemon = True #守護程序(守護他爹) 13 # p3.daemon = True #主程序死了子程序也死了(就不會執行子程序了) 14  p1.start() 15  p2.start() 16  p3.start() 17 18  p3.join() 19  p2.join() 20 p1.join() #多個join就是在等花費時間最長的那個執行完就執行主程式了 21 print('主程式') 22 23 # -瞭解方法--------------- 24 # p1.terminate() #強制關閉程序 25 # time.sleep(3) 26 # print(p1.is_alive()) #看是不是還活著 27 # print(p1.name) #檢視程序名字 28 # print(p1.pid) #檢視id號 29 # print('主程式')
複製程式碼

三、程序間的三種通訊(IPC)方式:

  方式一:佇列(推薦使用)

  程序彼此之間互相隔離,要實現程序間通訊(IPC),multiprocessing模組支援兩種形式:佇列和管道,這兩種方式都是使用訊息傳遞的

1.佇列:佇列類似於一條管道,元素先進先出
需要注意的一點是:佇列都是在記憶體中操作,程序退出,佇列清空,另外,佇列也是一個阻塞的形態
2.佇列分類
佇列有很多種,但都依賴與模組queue
queue.Queue() #先進先出
queue.LifoQueue() #後進先出
queue.PriorityQueue() #優先順序佇列
queue.deque() #雙線佇列

建立佇列的類(底層就是以管道和鎖定的方式實現):

?
1 2 Queue([maxsize]):建立共享的程序佇列,Queue是多程序安全的佇列, 可以使用Queue實現多程序之間的資料傳遞。

引數介紹:

?
1 1  maxsize是佇列中允許最大項數,省略則無大小限制。

方法介紹:

?
1 2 3 4 5 6 7 8 9 q.put方法用以插入資料到佇列中,put方法還有兩個可選引數:blocked和timeout。如果blocked為 True (預設值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該佇列有剩餘的空間。如果超時,會丟擲Queue.Full異常。如果blocked為 False ,但該Queue已滿,會立即丟擲Queue.Full異常。 q.get方法可以從佇列讀取並且刪除一個元素。同樣,get方法有兩個可選引數:blocked和timeout。如果blocked為 True (預設值),並且timeout為正值,那麼在等待時間內沒有取到任何元素,會丟擲Queue.Empty異常。如果blocked為 False ,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果佇列為空,則立即丟擲Queue.Empty異常.    q.get_nowait():同q.get( False ) q.put_nowait():同q.put( False )   q.empty():呼叫此方法時q為空則返回 True ,該結果不可靠,比如在返回 True 的過程中,如果佇列中又加入了專案。 q.full():呼叫此方法時q已滿則返回 True ,該結果不可靠,比如在返回 True 的過程中,如果佇列中的專案被取走。 q.qsize():返回佇列中目前專案的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣

應用:

複製程式碼
 1 # 1.可以往佇列裡放任意型別的
 2 # 2.先進先出
 3 from multiprocessing import Process,Queue
 4 q= Queue(3)
 5 q.put('first')  #預設block=True
 6 q.put('second')
 7 q.put('third')
 8 
 9 print(q.get())
10 print(q.get()) 11 print(q.get())
複製程式碼

生產者和消費者模型

在併發程式設計中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。

為什麼要使用生產者和消費者模式

線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

基於佇列實現生產者消費者模型

一個生產者和一個消費者(有兩種方式)

1、q.put(None):生產者給放一個None進去

複製程式碼
 1 from multiprocessing import Process,Queue
 2 import os
 3 import time
 4 import random
 5 #首先得有生產者和消費者
 6 # 生產者製造包子
 7 '''這種用 q.put(None)放進去一個None的方法雖然解決了問題
 8 但是如果有多個生產者多個消費者,或許框裡面沒有包子了但是
 9 還有其他的食物呢,你就已經顯示空著,這樣也可以解決,就是不完美,
10 還可以用到JoinableQueue去解決'''
11 def producter(q):
12     for i in range(10): 13 time.sleep(2) #生產包子得有個過程,就先讓睡一會 14 res = '包子%s'%i #生產了這麼多的包子 15 q.put(res) #吧生產出來的包子放進框裡面去 16 print('\033[44m%s製造了%s\033[0m'%(os.getpid(),res)) 17 q.put(None) #只有生產者才知道什麼時候就生產完了(放一個None進去說明此時已經生產完了) 18 # 消費者吃包子 19 def consumer(q): 20 while True:#假如消費者不斷的吃 21 res = q.get() 22 if res is None:break #如果吃的時候框裡面已經空了,就直接break了 23 time.sleep(random.randint(1,3)) 24 print('\033[41m%s吃了%s\033[0m' % (os.getpid(),res)) 25 if __name__ == '__main__': 26 q = Queue() 27 p1 = Process(target=producter,args=(q,)) 28 p2 = Process(target=consumer,args=(q,)) 29  p1.start() 30  p2.start() 31  p1.join() 32 p2.join() #等待執行完上面的程序,在去執行主 33 print('主')
複製程式碼

2、利用JoinableQueue

複製程式碼
 1 from multiprocessing import Process,JoinableQueue
 2 import os
 3 import time
 4 import random
 5 #首先得有生產者和消費者
 6 # 消費者吃包子
 7 def consumer(q):
 8     '''消費者'''
 9     while True:#假如消費者不斷的吃
10         res = q.get() 11 time.sleep(random.randint(1,3)) 12 print('\033[41m%s吃了%s\033[0m' % (os.getpid(),res)) 13 q.task_done() #任務結束了(消費者告訴生產者,我已經吧東西取走了) 14 # 生產者製造包子 15 def producter(q): 16 '''生產者''' 17 for i in range(5): 18 time.sleep(2) #生產包子得有個過程,就先讓睡一會 19 res = '包子%s'%i #生產了這麼多的包子 20 q.put(res) #吧生產出來的包子放進框裡面去 21 print('\033[44m%s製造了%s\033[0m'%(os.getpid(),res)) 22  q.join() 23 24 if __name__ == '__main__': 25 q = JoinableQueue() 26 p1 = Process(target=producter,args=(q,)) 27 p2 = Process(target=consumer,args=(q,)) 28 p2.daemon = True #在啟動之前吧消費者設定成守護程序,p1結束了p2也就結束了 29  p1.start() 30  p2.start() 31 p1.join() #在等生產者結束(生產者結束後,就不製造包子了,那消費者一直在吃,就卡住了 32 #都不生產了還吃啥,就把消費者也結束了 ) 33 #等待執行完上面的程序,在去執行主 34 print('主')
複製程式碼

多個生產者和多個消費者(有兩種方式)

 1、q.put(None):生產者給放一個None進去

多生產者與多消費者1

 2、利用JoinableQueue

複製程式碼
 1 from multiprocessing import Process,JoinableQueue
 2 import os
 3 import time
 4 import random
 5 #首先得有生產者和消費者
 6 # 消費者吃包子
 7 def consumer(q):
 8     while True: 9 res = q.get() 10 time.sleep(random.randint(1,3)) 11 print('\033[41m%s吃了%s\033[0m' % (os.getpid(),res)) 12 q.task_done() #任務結束了(消費者告訴生產者,我已經吧東西取走了) 13 def product_baozi(q): 14 for i in range(5): 15 time.sleep(2) 16 res = '包子%s' % i 17  q.put(res) 18 print('\033[44m%s製造了%s\033[0m' % (os.getpid(), res)) 19 q.join() #不用put(None) 了,在等q被取完。(如果資料沒有被取完,生產者就不會結束掉) 20 def product_gutou(q): 21 for i in range(5): 22 time.sleep(2) 23 res = '骨頭%s' % i 24  q.put(res) 25 print('\033[44m%s製造了%s\033[0m' % (os.getpid(), res)) 26  q.join() 27 def product_doujiang(q): 28 for i in range(5): 29 time.sleep(2) 30 res = '豆漿%s' % i 31  q.put(res) 32 print('\033[44m%s製造了%s\033[0m' % (os.getpid(), res)) 33  q.join() 34 35 if __name__ == '__main__': 36 q = JoinableQueue() 37 # 生產者們:廚師們 38 p1 = Process(target=product_baozi,args=(q,)) 39 p2 = Process(target=product_doujiang,args=(q,)) 40 p3 = Process(target=product_gutou,args=(q,)) 41 42 #消費者們:吃貨們 43 p4 = Process(target=consumer,args=(q,)) 44 p5 = Process(target=consumer,args=(q,)) 45 p4.daemon = True 46 p5.daemon = True 47 # p1.start() 48 # p2.start() 49 # p3.start() 50 # p4.start() 51 # p5.start() 52 li = [p1,p2,p3,p4,p5] 53 for i in li: 54  i.start() 55  p1.join() 56  p2.join() 57  p3.join() 58 print('主')
複製程式碼

  方式二:管道(不推薦使用,瞭解即可)

管道相當於佇列,但是管道不自動加鎖

  方式三:共享資料(不推薦使用,瞭解即可)

共享資料也沒有自動加鎖的功能,所以還是推薦用佇列的。感興趣的可以研究研究管道和共享資料

 

一、互斥鎖

程序之間資料隔離,但是共享一套檔案系統,因而可以通過檔案來實現程序直接的通訊,但問題是必須自己加鎖處理。

注意:加鎖的目的是為了保證多個程序修改同一塊資料時,同一時間只能有一個修改,即序列的修改,沒錯,速度是慢了,犧牲了速度而保證了資料安全。

1.上廁所的小例子:你上廁所的時候肯定得鎖門吧,有人來了看見門鎖著,就會在外面等著,等你吧門開開出來的時候,下一個人才去上廁所。

複製程式碼
 1 from multiprocessing import Process,Lock
 2 import os
 3 import time
 4 def work(mutex):
 5     mutex.acquire()
 6     print('task[%s] 上廁所'%os.getpid()) 7 time.sleep(3) 8 print('task[%s] 上完廁所'%os.getpid()) 9  mutex.release() 10 if __name__ == '__main__': 11 mutex = Lock() 12 p1 = Process(target=work,args=(mutex,)) 13 p2 = Process(target=work,args=(mutex,)) 14 p3 = Process(target=work,args=(mutex,)) 15  p1.start() 16  p2.start() 17  p3.start() 18  p1.join() 19  p2.join() 20  p3.join() 21 print('主')
複製程式碼

二、模擬搶票(也是利用了互斥鎖的原理  :LOCK互斥鎖)

複製程式碼
 1 import json
 2 import time
 3 import random
 4 import os
 5 from multiprocessing import Process,Lock
 6 def chakan(): 7 dic = json.load(open('piao',)) # 先檢視票數,也就是開啟那個檔案 8 print('剩餘票數:%s' % dic['count']) # 檢視剩餘的票數 9 def buy(): 10 dic = json.load(open('piao',)) 11 if dic['count']>0: #如果還有票 12 dic['count']-=1 #就修改裡面的值-1 13 time.sleep(random.randint(1,3)) #執行裡面買票的一系列操作就先不執行了,讓睡一會代替(並且隨機的睡) 14 json.dump(dic,open('piao','w')) 15 print('%s 購票成功' % os.getpid()) # 當前的那個id購票成功 16 def task(mutex): #搶票 17 chakan() #因為檢視的時候大家都可以看到,不需要加鎖 18 mutex.acquire() #加鎖 19 buy() #買的時候必須一個一個的買,先等一個人買完了,後面的人在買 20 mutex.release() #取消鎖 21 if __name__ == '__main__': 22 mutex = Lock() 23 for i in range(50):#讓50個人去訪問那個票數 24 p = Process(target=task,args=(mutex,)) 25 p.start()
複製程式碼

三、Process物件的其他屬性

p.daemon :守護程序(必須在開啟之前設定守護程序):如果父程序死,子程序p也死了

p.join:父程序等p執行完了才執行主程序,是父程序阻塞在原地,而p仍然在後臺執行。

terminate:強制關閉。(確保p裡面沒有其他子程序的時候關閉,如果裡面有子程序,你去用這個方法強制關閉了就會產生殭屍程序(打個比方:如果你老子掛了,你還沒掛,那麼就沒人給你收屍了,啊哈哈))

is_alive:關閉程序的時候,不會立即關閉,所以is_alive立刻檢視的結果可能還是存活

p.join():父程序在等p的結束,是父程序阻塞在原地,而p仍然在後臺執行

p.name:檢視名字

p.pid :檢視id

我們可以簡單介紹一下殭屍程序:

子程序執行完成,但是父程序遲遲沒有進行回收,此時子程序實際上並沒有退出,其仍然佔用著系統資源,這樣的⼦程序稱為殭屍程序

因為殭屍程序的資源一直未被回收,造成了系統資源的浪費,過多的殭屍程序將造成系統性能下降,所以應避免出現僵⼫程序。

複製程式碼
 1 from multiprocessing import Process
 2 import os
 3 import time
 4 def work():
 5     print('%s is working'%os.getpid())
 6     time.sleep(3) 7 if __name__ == '__main__': 8 p1 =Process(target=work) 9 p2 =Process(target=work) 10 p3 =Process(target=work) 11 # p1.daemon = True 12 # p2.daemon = True #守護程序(守護他爹) 13 # p3.daemon = True #主程序死了子程序也死了(就不會執行子程序了) 14  p1.start() 15  p2.start() 16  p3.start() 17 18  p3.join() 19  p2.join() 20 p1.join() #多個join就是在等花費時間最長的那個執行完就執行主程式了 21 print('主程式') 22 23 # -瞭解方法--------------- 24 # p1.terminate() #強制關閉程序 25 # time.sleep(3) 26 # print(p1.is_alive()) #看是不是還活著 27 # print(p1.name) #檢視程序名字 28 # print(p1.pid) #檢視id號 29 # print('主程式')
複製程式碼

三、程序間的三種通訊(IPC)方式:

  方式一:佇列(推薦使用)

  程序彼此之間互相隔離,要實現程序間通訊(IPC),multiprocessing模組支援兩種形式:佇列和管道,這兩種方式都是使用訊息傳遞的

1.佇列:佇列類似於一條管道,元素先進先出
需要注意的一點是:佇列都是在記憶體中操作,程序退出,佇列清空,另外,佇列也是一個阻塞的形態
2.佇列分類
佇列有很多種,但都依賴與模組queue
queue.Queue() #先進先出
queue.LifoQueue() #後進先出
queue.PriorityQueue() #優先順序佇列
queue.deque() #雙線佇列

建立佇列的類(底層就是以管道和鎖定的方式實現):

?
1 2 Queue([maxsize]):建立共享的程序佇列,Queue是多程序安全的佇列, 可以使用Queue實現多程序之間的資料傳遞。

引數介紹:

?
1 1  maxsize是佇列中允許最大項數,省略則無大小限制。

方法介紹:

?
1 2 3 4 5 6 7 8 9 q.put方法用以插入資料到佇列中,put方法還有兩個可選引數:blocked和timeout。如果blocked為 True (預設值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該佇列有剩餘的空間。如果超時,會丟擲Queue.Full異常。如果blocked為 False ,但該Queue已滿,會立即丟擲Queue.Full異常。 q.get方法可以從佇列讀取並且刪除一個元素。同樣,get方法有兩個可選引數:blocked和timeout。如果blocked為 True (預設值),並且timeout為正值,那麼在等待時間內沒有取到任何元素,會丟擲Queue.Empty異常。如果blocked為 False ,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果佇列為空,則立即丟擲Queue.Empty異常.    q.get_nowait():同q.get( False ) q.put_nowait():同q.put( False )   q.empty():呼叫此方法時q為空則返回 True ,該結果不可靠,比如在返回 True 的過程中,如果佇列中又加入了專案。 q.full():呼叫此方法時q已滿則返回 True ,該結果不可靠,比如在返回 True 的過程中,如果佇列中的專案被取走。 q.qsize():返回佇列中目前專案的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣

應用:

複製程式碼
 1 # 1.可以往佇列裡放任意型別的
 2 # 2.先進先出
 3 from multiprocessing import Process,Queue
 4 q= Queue(3)
 5 q.put('first')  #預設block=True
 6 q.put('second')
 7 q.put('third')
 8 
 9 print(q.get())
10 print(q.get()) 11 print(q.get())
複製程式碼

生產者和消費者模型

在併發程式設計中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。

為什麼要使用生產者和消費者模式

線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

基於佇列實現生產者消費者模型

一個生產者和一個消費者(有兩種方式)

1、q.put(None):生產者給放一個None進去

複製程式碼
 1 from multiprocessing import Process,Queue
 2 import os
 3 import time
 4 import random
 5 #首先得有生產者和消費者
 6 # 生產者製造包子
 7 '''這種用 q.put(None)放進去一個None的方法雖然解決了問題
 8 但是如果有多個生產者多個消費者,或許框裡面沒有包子了但是
 9 還有其他的食物呢,你就已經顯示空著,這樣也可以解決,就是不完美,
10 還可以用到JoinableQueue去解決'''
11 def producter(q):
12     for i in range(10): 13 time.sleep(2) #生產包子得有個過程,就先讓睡一會 14 res = '包子%s'%i #生產了這麼多的包子 15 q.put(res) #吧生產出來的包子放進框裡面去 16 print('\033[44m%s製造了%s\033[0m'%(os.getpid(),res)) 17 q.put(None) #只有生產者才知道什麼時候就生產完了(放一個None進去說明此時已經生產完了) 18 # 消費者吃包子 19 def consumer(q): 20 while True:#假如消費者不斷的吃 21 res = q.get() 22 if res is None:break #如果吃的時候框裡面已經空了,就直接break了 23 time.sleep(random.randint(1,3)) 24 print('\033[41m%s吃了%s\033[0m' % (os.getpid(),res)) 25 if __name__ == '__main__': 26 q = Queue() 27 p1 = Process(target=producter,args=(q,)) 28 p2 = Process(target=consumer,args=(q,)) 29  p1.start() 30  p2.start() 31  p1.join() 32 p2.join() #等待執行完上面的程序,在去執行主 33 print('主')
複製程式碼

2、利用JoinableQueue

複製程式碼
 1 from multiprocessing import Process,JoinableQueue
 2 import os
 3 import time
 4 import random
 5 #首先得有生產者和消費者
 6 # 消費者吃包子
 7 def consumer(q):
 8     '''消費者'''
 9     while True:#假如消費者不斷的吃
10         res = q.get() 11 time.sleep(random.randint(1,3)) 12 print('\033[41m%s吃了%s\033[0m' % (os.getpid(),res)) 13 q.task_done() #任務結束了(消費者告訴生產者,我已經吧東西取走了) 14 # 生產者製造包子 15 def producter(q): 16 '''生產者''' 17 for i in range(5): 18 time.sleep(2) #生產包子得有個過程,就先讓睡一會 19 res = '包子%s'%i #生產了這麼多的包子 20 q.put(res) #吧生產出來的包子放進框裡面去 21 print('\033[44m%s製造了%s\033[0m'%(os.getpid(),res)) 22  q.join() 23 24 if __name__ == '__main__': 25 q = JoinableQueue() 26 p1 = Process(target=producter,args=(q,)) 27 p2 = Process(target=consumer,args=(q,)) 28 p2.daemon = True #在啟動之前吧消費者設定成守護程序,p1結束了p2也就結束了 29  p1.start() 30  p2.start() 31 p1.join() #在等生產者結束(生產者結束後,就不製造包子了,那消費者一直在吃,就卡住了 32 #都不生產了還吃啥,就把消費者也結束了 ) 33 #等待執行完上面的程序,在去執行主 34 print('主')
複製程式碼

多個生產者和多個消費者(有兩種方式)

 1、q.put(None):生產者給放一個None進去

多生產者與多消費者1

 2、利用JoinableQueue

複製程式碼
 1 from multiprocessing import Process,JoinableQueue
 2 import os
 3 import time
 4 import random
 5 #首先得有生產者和消費者
 6 # 消費者吃包子
 7 def consumer(q):
 8     while True: 9 res = q.get() 10 time.sleep(random.randint(1,3)) 11 print('\033[41m%s吃了%s\033[0m' % (os.getpid(),res)) 12 q.task_done() #任務結束了(消費者告訴生產者,我已經吧東西取走了) 13 def product_baozi(q): 14 for i in range(5): 15 time.sleep(2) 16 res = '包子%s' % i 17  q.put(res) 18 print('\033[44m%s製造了%s\033[0m' % (os.getpid(), res)) 19 q.join() #不用put(None) 了,在等q被取完。(如果資料沒有被取完,生產者就不會結束掉) 20 def product_gutou(q): 21 for i in range(5): 22 time.sleep(2) 23 res = '骨頭%s' % i 24  q.put(res) 25 print('\033[44m%s製造了%s\033[0m' % (os.getpid(), res)) 26  q.join() 27 def product_doujiang(q): 28 for i in range(5): 29 time.sleep(2) 30 res = '豆漿%s' % i 31  q.put(res) 32 print('\033[44m%s製造了%s\033[0m' % (os.getpid(), res)) 33  q.join() 34 35 if __name__ == '__main__': 36 q = JoinableQueue() 37 # 生產者們:廚師們 38 p1 = Process(target=product_baozi,args=(q,)) 39 p2 = Process(target=product_doujiang,args=(q,)) 40 p3 = Process(target=product_gutou,args=(q,)) 41 42 #消費者們:吃貨們 43 p4 = Process(target=consumer,args=(q,)) 44 p5 = Process(target=consumer,args=(q,)) 45 p4.daemon = True 46 p5.daemon = True 47 # p1.start() 48 # p2.start() 49 # p3.start() 50 # p4.start() 51 # p5.start() 52 li = [p1,p2,p3,p4,p5] 53 for i in li: 54  i.start() 55  p1.join() 56  p2.join() 57  p3.join() 58 print('主')
複製程式碼

  方式二:管道(不推薦使用,瞭解即可)

管道相當於佇列,但是管道不自動加鎖

  方式三:共享資料(不推薦使用,瞭解即可)

共享資料也沒有自動加鎖的功能,所以還是推薦用佇列的。感興趣的可以研究研究管道和共享資料