1. 程式人生 > >第三十一天- 程序序列(鎖) 佇列 生成者消費者模型

第三十一天- 程序序列(鎖) 佇列 生成者消費者模型

 

1.程序同步/序列(鎖)

  程序之間資料不共享,但共享同一套檔案系統,所以訪問同一個檔案,或同一個列印終端,沒有問題,但共享帶來的是競爭容易錯亂,如搶票時。這就需讓程序一個個的進去保證資料安全,也就是加鎖處理,Lock

  併發,效率高,但是競爭同一個檔案時,導致資料混亂

  加鎖,由併發改成了序列,犧牲了執行效率,但避免資料競爭

  

以模擬搶票為例:

 1 # 注意:首先在當前檔案目錄下建立一個名為db的檔案
 2 # 檔案db的內容為:{"count":1},只有這一行資料,並且注意,每次執行完了之後,檔案中的1變成了0,你需要手動將0改為1,然後在去執行程式碼。
3 # 注意一定要用雙引號,不然json無法識別 4 # 加鎖保證資料安全,不出現混亂 5 from multiprocessing import Process,Lock 6 import time,json,random 7 8 9 # 檢視剩餘票數 10 def search(i): 11 dic=json.load(open('db')) # 開啟檔案,直接load檔案中的內容,拿到檔案中的包含剩餘票數的字典 12 print('客戶%s檢視剩餘票數%s' %(i,dic['count'])) 13 14 15 # 搶票 16 def get(i):
17 dic = json.load(open('db')) 18 time.sleep(0.1) # 模擬讀資料的網路延遲,那麼程序之間的切換,所有人拿到的字典都是{"count": 1},也就是每個人都拿到了這一票。 19 if dic['count'] > 0: 20 dic['count'] -= 1 21 time.sleep(random.randint(0,1)) # 模擬寫資料的網路延遲 22 json.dump(dic,open('db','w')) 23 # 若不加限制最終導致,每個人顯示都搶到了票,這就出現了問題
24 print('客戶%s購票成功'%i) 25 else: 26 print('sorry,客戶%s 沒票了親!'%i) 27 28 29 def task(i,lock): 30 search(i) 31 # 搶票時是發生資料變化的時候,所以我們將鎖加到這裡,讓程序序列執行 32 lock.acquire() # 加鎖 33 get(i) 34 lock.release() # 解鎖 35 36 37 if __name__ == '__main__': 38 lock = Lock() # 建立一個鎖 39 for i in range(10): # 模擬併發10個客戶端搶票 40 p = Process(target=task,args=(i,lock,)) # 將鎖作為引數傳給task函式 41 p.start()
加鎖模擬搶票

 

總結:

  加鎖可以保證多個程序修改同一塊資料時,同一時間只能有一個任務可以進行修改,即序列的修改,速度是慢了,但犧牲了速度卻保證了資料安全。

  因此需一種解決方案能夠兼顧:1、效率高(多個程序共享一塊記憶體的資料)2、幫我們處理好鎖問題。這就是 mutiprocessing 模組提供的基於訊息的IPC通訊機制:佇列和管道(見後續)。

  

 

2.程序守護

  子程序是不會隨著主程序結束而結束,子程序全部執行完後,程式才結束,那如果需求主程序結束,子程序必須跟著結束,怎麼辦?這就需要用到守護程序了!

  運用:如,系統關機,其他一切都要跟著結束

 1 import time
 2 from multiprocessing import Process
 3 
 4 def func1(m):
 5     time.sleep(1)
 6     print('我是func1',m)
 7 
 8 
 9 # 注意:程序之間是互相獨立的,主程序程式碼執行結束,不管有沒有執行完,守護程序隨即終止
10 if __name__ == '__main__':
11     p = Process(target=func1,args=(666,))
12     p.daemon = True  # 守護程序,在start之前
13     p.start()
14 
15     print('主程序執行結束')

 

總結:

  其一:守護程序會在主程序程式碼執行結束後就終止
  其二:守護程序內無法再開啟子程序,否則出異常

 

 

3.佇列

  程序之間互相隔離,要實現程序間通訊(IPC),multiprocessing模組支援 佇列和管道,這兩種方式都是使用訊息傳遞佇列就像一個特殊的列表,但是可以設定固定長度,並且從前面插入資料,從後面取出資料,先進先出,取出就沒有這個資料了。

  方法介紹:

 1 '''
 2 q = Queue([maxsize]) 
 3 建立共享的程序佇列。maxsize是佇列中允許的最大項數。如果省略此引數,則無大小限制。底層佇列使用管道和鎖定實現。另外,還需要執行支援執行緒以便佇列中的資料傳輸到底層管道中。 
 4 Queue的例項q具有以下方法:
 5 
 6 q.get( [ block [ ,timeout ] ] ) 
 7 返回q中的一個專案。如果q為空,此方法將阻塞,直到佇列中有專案可用為止。block用於控制阻塞行為,預設為True. 如果設定為False,將引發Queue.Empty異常(定義在Queue模組中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有專案變為可用,將引發Queue.Empty異常。
 8 
 9 q.get_nowait( ) 
10 同q.get(False)方法。
11 
12 q.put(item [, block [,timeout ] ] ) 
13 將item放入佇列。如果佇列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,預設為True。如果設定為False,將引發Queue.Empty異常(定義在Queue庫模組中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。
14 
15 q.qsize() 
16 返回佇列中目前專案的正確數量。此函式的結果並不可靠,因為在返回結果和在稍後程式中使用結果之間,佇列中可能新增或刪除了專案。在某些系統上,此方法可能引發NotImplementedError異常。
17 
18 
19 q.empty() 
20 如果呼叫此方法時 q為空,返回True。如果其他程序或執行緒正在往佇列中新增專案,結果是不可靠的。也就是說,在返回和使用結果之間,佇列中可能已經加入新的專案。
21 
22 q.full() 
23 如果q已滿,返回為True. 由於執行緒的存在,結果也可能是不可靠的(參考q.empty()方法)
24 
25 '''

  示例程式碼:

 1 from multiprocessing import Process,Queue
 2 # Queue 先進先出 fifo first in first out,佇列裡面的資料,只能取一次,取出就沒了
 3 
 4 q = Queue(3)  # Queue(引數)可理解成一個可限制長度(引數)的列表
 5 # 新增資料
 6 # print(q.full())
 7 q.put(4)
 8 q.put(3)
 9 q.put(2)
10 # print(q.full())  # 檢視序列是否滿了,但不可信的(如多程序時)
11 
12 # 取出資料
13 print('---------')
14 # print(q.empty())
15 print(q.get())
16 print(q.get())
17 print(q.get())
18 # print(q.empty())  # 檢視序列是否空了,但是不可信的(如多程序時)
19 print('---------')
20 print(q.get()) # 超出長度會一直停在這等待,直到有資料給他
21 
22 # 用try優化上面程式碼
23 # for i in range(4):
24 #     try:
25 #         s = q.get_nowait()
26 #         # s = q.get(False) # 等同nowait
27 #         print('=====',s)
28 #
29 #     except:
30 #         print('沒有資料了,去幹別的吧...')
31 #
佇列參考程式碼

 

  基於佇列的程序通訊:

 1 from multiprocessing import Process,Queue
 2 
 3 
 4 def func(q):
 5     # 拿出資料
 6     res = q.get()
 7     print(res)
 8     print(q.get())
 9 
10 
11 if __name__ == '__main__':
12     q = Queue(5)
13     q.put('hello')  # 新增資料
14     q.put('emmm')
15     p = Process(target=func,args=(q,))
16     p.start()
17 
18     print('主程序結束')
19 
20 # 佇列的資料是安全的,先進先出,且取一次出來就沒有了
View Code

 

 

4.生產消費模式

  線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題我們需要通過一個容器(緩衝區)來解決生產者和消費者的強耦合問題。

  生產消費模式圖解:

 

基於佇列來實現一個生產者消費者模型:

 1 # 解耦合
 2 import time
 3 from multiprocessing import Process,Queue
 4 
 5 
 6 def producer(q):
 7     for i in range(10):
 8         time.sleep(0.5)
 9         q.put('包子%s號'%i)
10         print('包子%s號做好了'%i)
11     q.put(None)  # None表示沒有 防止後面死迴圈
12 
13 
14 def consumer(q):
15     while 1:
16         baozi = q.get()
17         if baozi == None:
18             break
19         time.sleep(1)
20         print('%s被吃掉了'%baozi)
21 
22 
23 if __name__ == '__main__':
24     q = Queue(10)  # 建立一個佇列,耦合生產者和消費者,p1和p2共享q(獨立於程序的一個空間)
25     p1 = Process(target=producer,args=(q,))
26     p2 = Process(target=consumer,args=(q,))
27     p1.start()
28     p2.start()
View Code

 

總結:

 1 # 生產者消費者模型總結
 2 
 3 # 程式中有兩類角色
 4 一類負責生產資料(生產者)
 5 一類負責處理資料(消費者)
 6 
 7 # 引入生產者消費者模型為了解決的問題是:
 8 平衡生產者與消費者之間的工作能力,從而提高程式整體處理資料的速度
 9 
10 # 如何實現:
11 生產者 < -->佇列 <—— > 消費者
12 # 生產者消費者模型實現類程式的解耦和
13 
14 生產者消費者模型總結

 

 

5.joinableQueue

  有多個生產者和多個消費者時,由於佇列是程序安全的,一個程序拿走了結束訊號,另外一個程序就拿不到了,所以使用時需要消費者傳送訊息給生產者已使用。

1 #JoinableQueue([maxsize]):這就像是一個Queue物件,但佇列允許專案的使用者通知生成者專案已經被成功處理。通知程序是使用共享的訊號和條件變數來實現的。
2 
3    #引數介紹:
4     maxsize是佇列中允許最大項數,省略則無大小限制。    
5   #方法介紹:
6     JoinableQueue的例項p除了與Queue物件相同的方法之外還具有:
7     q.task_done():使用者使用此方法發出訊號,表示q.get()的返回專案已經被處理。如果呼叫此方法的次數大於從佇列中刪除專案的數量,將引發ValueError異常
8     q.join():生產者呼叫此方法進行阻塞,直到佇列中所有的專案均被處理。阻塞將持續到佇列中的每個專案均呼叫q.task_done()方法為止,也就是佇列中的資料全部被get拿走了。

 

 1 import time
 2 from multiprocessing import Process,JoinableQueue
 3 
 4 def producer(q):
 5     for i in range(10):
 6         time.sleep(0.5)
 7         q.put('包子%s號'%i)
 8         print('包子%s號生產完畢'%i)
 9     print('aaaaaaaaaaaaa')
10     q.join()  #
11     print('包子賣完了')
12 
13 def consumer(q):
14     while 1:
15         baozi = q.get()
16         time.sleep(0.8)
17         print('%s被吃掉了'%baozi)
18         q.task_done()  # 給佇列傳送一個任務處理完了的訊號
19 
20 if __name__ == '__main__':
21 
22     q = JoinableQueue()
23     p1 = Process(target=producer,args=(q,))
24     p2 = Process(target=consumer,args=(q,))
25     p2.daemon = True
26     p1.start()
27     p2.start()
28     p1.join()  # 主程序等著生產者程序的結束才結束 ,生產者結束意味著q獲得了10個task_done的訊號,
簡單示例

 

 1 # 與queque類似,多了 q.task_done()  q.join()
 2 from multiprocessing import Process,JoinableQueue
 3 import time,random,os
 4 
 5 
 6 def consumer(q):
 7     while True:
 8         res=q.get()
 9         # time.sleep(random.randint(1,3))
10         time.sleep(random.random())
11         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
12         q.task_done() # 向q.join()傳送一次訊號,證明一個數據已經被取走並執行完了
13 
14 
15 def producer(name,q):
16     for i in range(10):
17         # time.sleep(random.randint(1,3))
18         time.sleep(random.random())
19         res='%s%s' %(name,i)
20         q.put(res)
21         print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
22     print('%s生產結束'%name)
23     q.join() # 生產完畢,使用此方法進行阻塞,直到佇列中所有專案均被處理。
24     print('%s生產結束~~~~~~'%name)
25 
26 
27 if __name__ == '__main__':
28     q=JoinableQueue()
29     # 生產者們:即廚師們
30     p1=Process(target=producer,args=('包子',q))
31     p2=Process(target=producer,args=('骨頭',q))
32     p3=Process(target=producer,args=('泔水',q))
33 
34     # 消費者們:即吃貨們
35     c1=Process(target=consumer,args=(q,))
36     c2=Process(target=consumer,args=(q,))
37     c1.daemon=True
38     c2.daemon=True
39     # 如果不加守護,那麼主程序結束不了,但是加了守護之後,必須確保生產者的內容生產完並且被處理完了,所有必須還要在主程序給生產者設定join,才能確保生產者生產的任務被執行完了,並且能夠確保守護程序在所有任務執行完成之後才隨著主程序的結束而結束。
40 
41     # 開始
42     p_l=[p1,p2,p3,c1,c2]
43     for p in p_l:
44         p.start()
45 
46     p1.join() # 我要確保你的生產者程序結束了,生產者程序的結束標誌著你生產的所有的人任務都已經被處理完了
47     p2.join()
48     p3.join()
49     print('主程式')
稍複雜示例參考