Python學習【第20篇】:互斥鎖以及進程之間的三種通信方式(IPC)以及生產者個消費者模型
一、互斥鎖
進程之間數據隔離,但是共享一套文件系統,因而可以通過文件來實現進程直接的通信,但問題是必須自己加鎖處理。
註意:加鎖的目的是為了保證多個進程修改同一塊數據時,同一時間只能有一個修改,即串行的修改,沒錯,速度是慢了,犧牲了速度而保證了數據安全。
1.上廁所的小例子:你上廁所的時候肯定得鎖門吧,有人來了看見門鎖著,就會在外面等著,等你吧門開開出來的時候,下一個人才去上廁所。
1 from multiprocessing import Process,Lock
2 import os
3 import time
4 def work(mutex):
5 mutex.acquire()
6 print(‘task[%s] 上廁所‘%os.getpid())
7 time.sleep(3)
8 print(‘task[%s] 上完廁所‘%os.getpid())
9 mutex.release()
10 if __name__ == ‘__main__‘:
11 mutex = Lock()
12 p1 = Process(target=work,args=(mutex,))
13 p2 = Process(target=work,args=(mutex,))
14 p3 = Process(target=work,args=(mutex,))
15 p1.start()
16 p2.start()
17 p3.start()
18 p1.join()
19 p2.join()
20 p3.join()
21 print(‘主‘)
二、模擬搶票(也是利用了互斥鎖的原理 :LOCK互斥鎖)
1 import json
2 import time
3 import random
4 import os
5 from multiprocessing import Process,Lock
6 def chakan():
7 dic = json.load(open(‘piao‘,)) # 先查看票數,也就是打開那個文件
8 print(‘剩余票數:%s‘ % dic[‘count‘]) # 查看剩余的票數
9 def buy():
10 dic = json.load(open(‘piao‘,))
11 if dic[‘count‘]>0: #如果還有票
12 dic[‘count‘]-=1 #就修改裏面的值-1
13 time.sleep(random.randint(1,3)) #執行裏面買票的一系列操作就先不執行了,讓睡一會代替(並且隨機的睡)
14 json.dump(dic,open(‘piao‘,‘w‘))
15 print(‘%s 購票成功‘ % os.getpid()) # 當前的那個id購票成功
16 def task(mutex): #搶票
17 chakan() #因為查看的時候大家都可以看到,不需要加鎖
18 mutex.acquire() #加鎖
19 buy() #買的時候必須一個一個的買,先等一個人買完了,後面的人在買
20 mutex.release() #取消鎖
21 if __name__ == ‘__main__‘:
22 mutex = Lock()
23 for i in range(50):#讓50個人去訪問那個票數
24 p = Process(target=task,args=(mutex,))
25 p.start()
三、Process對象的其他屬性
p.daemon :守護進程(必須在開啟之前設置守護進程):如果父進程死,子進程p也死了
p.join:父進程等p執行完了才運行主進程,是父進程阻塞在原地,而p仍然在後臺運行。
terminate:強制關閉。(確保p裏面沒有其他子進程的時候關閉,如果裏面有子進程,你去用這個方法強制關閉了就會產生僵屍進程(打個比方:如果你老子掛了,你還沒掛,那麽就沒人給你收屍了,啊哈哈))
is_alive:關閉進程的時候,不會立即關閉,所以is_alive立刻查看的結果可能還是存活
p.join():父進程在等p的結束,是父進程阻塞在原地,而p仍然在後臺運行
p.name:查看名字
p.pid :查看id
我們可以簡單介紹一下僵屍進程:
子進程運行完成,但是父進程遲遲沒有進行回收,此時子進程實際上並沒有退出,其仍然占用著系統資源,這樣的?進程稱為僵屍進程。
因為僵屍進程的資源一直未被回收,造成了系統資源的浪費,過多的僵屍進程將造成系統性能下降,所以應避免出現僵?進程。
1 from multiprocessing import Process
2 import os
3 import time
4 def work():
5 print(‘%s is working‘%os.getpid())
6 time.sleep(3)
7 if __name__ == ‘__main__‘:
8 p1 =Process(target=work)
9 p2 =Process(target=work)
10 p3 =Process(target=work)
11 # p1.daemon = True
12 # p2.daemon = True #守護進程(守護他爹)
13 # p3.daemon = True #主進程死了子進程也死了(就不會執行子進程了)
14 p1.start()
15 p2.start()
16 p3.start()
17
18 p3.join()
19 p2.join()
20 p1.join() #多個join就是在等花費時間最長的那個運行完就執行主程序了
21 print(‘主程序‘)
22
23 # -了解方法---------------
24 # p1.terminate() #強制關閉進程
25 # time.sleep(3)
26 # print(p1.is_alive()) #看是不是還活著
27 # print(p1.name) #查看進程名字
28 # print(p1.pid) #查看id號
29 # print(‘主程序‘)
三、進程間的三種通信(IPC)方式:
方式一:隊列(推薦使用)
進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
1.隊列:隊列類似於一條管道,元素先進先出
需要註意的一點是:隊列都是在內存中操作,進程退出,隊列清空,另外,隊列也是一個阻塞的形態
2.隊列分類
隊列有很多種,但都依賴與模塊queue
queue.Queue() #先進先出
queue.LifoQueue() #後進先出
queue.PriorityQueue() #優先級隊列
queue.deque() #雙線隊列
創建隊列的類(底層就是以管道和鎖定的方式實現):
?1 2 |
Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,
可以使用Queue實現多進程之間的數據傳遞。
|
參數介紹:
?1 |
1 maxsize是隊列中允許最大項數,省略則無大小限制。
|
方法介紹:
?1 2 3 4 5 6 7 8 9 |
q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為 True (默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為 False ,但該Queue已滿,會立即拋出Queue.Full異常。
q.get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為 True (默認值),並且timeout為正值,那麽在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為 False ,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.
q.get_nowait():同q.get( False )
q.put_nowait():同q.put( False )
q.empty():調用此方法時q為空則返回 True ,該結果不可靠,比如在返回 True 的過程中,如果隊列中又加入了項目。
q.full():調用此方法時q已滿則返回 True ,該結果不可靠,比如在返回 True 的過程中,如果隊列中的項目被取走。
q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
|
應用:
1 # 1.可以往隊列裏放任意類型的
2 # 2.先進先出
3 from multiprocessing import Process,Queue
4 q= Queue(3)
5 q.put(‘first‘) #默認block=True
6 q.put(‘second‘)
7 q.put(‘third‘)
8
9 print(q.get())
10 print(q.get())
11 print(q.get())
生產者和消費者模型
在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
為什麽要使用生產者和消費者模式
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麽生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麽消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
什麽是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
基於隊列實現生產者消費者模型
一個生產者和一個消費者(有兩種方式)
1、q.put(None):生產者給放一個None進去
1 from multiprocessing import Process,Queue
2 import os
3 import time
4 import random
5 #首先得有生產者和消費者
6 # 生產者制造包子
7 ‘‘‘這種用 q.put(None)放進去一個None的方法雖然解決了問題
8 但是如果有多個生產者多個消費者,或許框裏面沒有包子了但是
9 還有其他的食物呢,你就已經顯示空著,這樣也可以解決,就是不完美,
10 還可以用到JoinableQueue去解決‘‘‘
11 def producter(q):
12 for i in range(10):
13 time.sleep(2) #生產包子得有個過程,就先讓睡一會
14 res = ‘包子%s‘%i #生產了這麽多的包子
15 q.put(res) #吧生產出來的包子放進框裏面去
16 print(‘\033[44m%s制造了%s\033[0m‘%(os.getpid(),res))
17 q.put(None) #只有生產者才知道什麽時候就生產完了(放一個None進去說明此時已經生產完了)
18 # 消費者吃包子
19 def consumer(q):
20 while True:#假如消費者不斷的吃
21 res = q.get()
22 if res is None:break #如果吃的時候框裏面已經空了,就直接break了
23 time.sleep(random.randint(1,3))
24 print(‘\033[41m%s吃了%s\033[0m‘ % (os.getpid(),res))
25 if __name__ == ‘__main__‘:
26 q = Queue()
27 p1 = Process(target=producter,args=(q,))
28 p2 = Process(target=consumer,args=(q,))
29 p1.start()
30 p2.start()
31 p1.join()
32 p2.join() #等待執行完上面的進程,在去執行主
33 print(‘主‘)
2、利用JoinableQueue
1 from multiprocessing import Process,JoinableQueue
2 import os
3 import time
4 import random
5 #首先得有生產者和消費者
6 # 消費者吃包子
7 def consumer(q):
8 ‘‘‘消費者‘‘‘
9 while True:#假如消費者不斷的吃
10 res = q.get()
11 time.sleep(random.randint(1,3))
12 print(‘\033[41m%s吃了%s\033[0m‘ % (os.getpid(),res))
13 q.task_done() #任務結束了(消費者告訴生產者,我已經吧東西取走了)
14 # 生產者制造包子
15 def producter(q):
16 ‘‘‘生產者‘‘‘
17 for i in range(5):
18 time.sleep(2) #生產包子得有個過程,就先讓睡一會
19 res = ‘包子%s‘%i #生產了這麽多的包子
20 q.put(res) #吧生產出來的包子放進框裏面去
21 print(‘\033[44m%s制造了%s\033[0m‘%(os.getpid(),res))
22 q.join()
23
24 if __name__ == ‘__main__‘:
25 q = JoinableQueue()
26 p1 = Process(target=producter,args=(q,))
27 p2 = Process(target=consumer,args=(q,))
28 p2.daemon = True #在啟動之前吧消費者設置成守護進程,p1結束了p2也就結束了
29 p1.start()
30 p2.start()
31 p1.join() #在等生產者結束(生產者結束後,就不制造包子了,那消費者一直在吃,就卡住了
32 #都不生產了還吃啥,就把消費者也結束了 )
33 #等待執行完上面的進程,在去執行主
34 print(‘主‘)
多個生產者和多個消費者(有兩種方式)
1、q.put(None):生產者給放一個None進去
多生產者與多消費者12、利用JoinableQueue
1 from multiprocessing import Process,JoinableQueue
2 import os
3 import time
4 import random
5 #首先得有生產者和消費者
6 # 消費者吃包子
7 def consumer(q):
8 while True:
9 res = q.get()
10 time.sleep(random.randint(1,3))
11 print(‘\033[41m%s吃了%s\033[0m‘ % (os.getpid(),res))
12 q.task_done() #任務結束了(消費者告訴生產者,我已經吧東西取走了)
13 def product_baozi(q):
14 for i in range(5):
15 time.sleep(2)
16 res = ‘包子%s‘ % i
17 q.put(res)
18 print(‘\033[44m%s制造了%s\033[0m‘ % (os.getpid(), res))
19 q.join() #不用put(None) 了,在等q被取完。(如果數據沒有被取完,生產者就不會結束掉)
20 def product_gutou(q):
21 for i in range(5):
22 time.sleep(2)
23 res = ‘骨頭%s‘ % i
24 q.put(res)
25 print(‘\033[44m%s制造了%s\033[0m‘ % (os.getpid(), res))
26 q.join()
27 def product_doujiang(q):
28 for i in range(5):
29 time.sleep(2)
30 res = ‘豆漿%s‘ % i
31 q.put(res)
32 print(‘\033[44m%s制造了%s\033[0m‘ % (os.getpid(), res))
33 q.join()
34
35 if __name__ == ‘__main__‘:
36 q = JoinableQueue()
37 # 生產者們:廚師們
38 p1 = Process(target=product_baozi,args=(q,))
39 p2 = Process(target=product_doujiang,args=(q,))
40 p3 = Process(target=product_gutou,args=(q,))
41
42 #消費者們:吃貨們
43 p4 = Process(target=consumer,args=(q,))
44 p5 = Process(target=consumer,args=(q,))
45 p4.daemon = True
46 p5.daemon = True
47 # p1.start()
48 # p2.start()
49 # p3.start()
50 # p4.start()
51 # p5.start()
52 li = [p1,p2,p3,p4,p5]
53 for i in li:
54 i.start()
55 p1.join()
56 p2.join()
57 p3.join()
58 print(‘主‘)
方式二:管道(不推薦使用,了解即可)
管道相當於隊列,但是管道不自動加鎖
方式三:共享數據(不推薦使用,了解即可)
共享數據也沒有自動加鎖的功能,所以還是推薦用隊列的。感興趣的可以研究研究管道和共享數據
Python學習【第20篇】:互斥鎖以及進程之間的三種通信方式(IPC)以及生產者個消費者模型