1. 程式人生 > >day031同步鎖、訊號量、事件、佇列、生成者消費者模型、Jionablequeue

day031同步鎖、訊號量、事件、佇列、生成者消費者模型、Jionablequeue

multiprocessing模組主要內容:

1、守護程序
2、程序同步
    1.同步鎖(*****)
    2.訊號量
    3.事件
3、程序通訊
    1.佇列(*****)
    2.生產者消費者模型,JoinableQueue, 主要在佇列的基礎上多了兩個功能:q.task_done,q.join

一、multiprocessing模組的介紹

仔細說來,multiprocess不是一個模組而是python中一個操作、管理程序的包。
之所以叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和程序有關的所有子模組。
由於提供的子模組非常多,為了方便大家歸類記憶,

我將這部分大致分為四個部分:
建立程序部分,
程序同步部分,
程序池部分,
程序之間資料共享。

重點強調:程序沒有任何共享狀態,程序修改的資料,改動僅限於該程序內,但是通過一些特殊的方法,可以實現程序之間資料的共享

1、process模組介紹

process模組是一個建立程序的模組,藉助這個模組,就可以完成程序的建立。
Process([group [, target [, name [, args [, kwargs]]]]]),由該類例項化得到的物件,表示一個子程序中的任務(尚未啟動)

強調:
1. 需要使用關鍵字的方式來指定引數
2. args指定的為傳給target函式的位置引數,是一個元組形式,必須有逗號

fe:建立一個程序

# 當前檔名稱為test.py
from multiprocessing import Process

def func(): print(12345) if __name__ == '__main__': #windows 下才需要寫這個,這和系統建立程序的機制有關係,不用深究,記著windows下要寫就好啦 #首先我運行當前這個test.py檔案,執行這個檔案的程式,那麼就產生了程序,這個程序我們稱為主程序 p = Process(target=func,) #將函式註冊到一個程序中,p是一個程序物件,此時還沒有啟動程序,只是建立了一個程序物件。並且func是不加括號的,因為加上括號這個函式就直接運行了對吧。 p.start() #告訴作業系統,給我開啟一個程序,func這個函式就被我們新開的這個程序執行了,而這個程序是我主程序執行過程中創建出來的,所以稱這個新建立的程序為主程序的子程序,而主程序又可以稱為這個新程序的父程序。           #而這個子程序中執行的程式,相當於將現在這個test.py檔案中的程式copy到一個你看不到的python檔案中去執行了,就相當於當前這個檔案,被另外一個py檔案import過去並執行了。           #start並不是直接就去執行了,我們知道程序有三個狀態,程序會進入程序的三個狀態,就緒,(被排程,也就是時間片切換到它的時候)執行,阻塞,並且在這個三個狀態之間不斷的轉換,等待cpu執行時間片到了。 print('*' * 10) #這是主程序的程式,上面開啟的子程序的程式是和主程序的程式同時執行的,我們稱為非同步 
Python Copy

fe:證明子程序和主程序的關係

我們通過主程序建立的子程序是非同步執行的,那麼我們就驗證一下,並且看一下子程序和主程序(也就是父程序)的ID號(講一下pid和ppid,使用pycharm舉例),來看看是否是父子關係。

子程序與主程序的關係

1.看一個問題,說明linux和windows兩個不同的作業系統建立程序的不同機制導致的不同結果:

import time
import os
from multiprocessing import Process

def func(): print('aaaa') time.sleep(1) print('子程序>>',os.getpid()) print('該子程序的父程序>>',os.getppid()) print(12345) print('太白老司機~~~~') # 如果我在這裡加了一個列印,你會發現執行結果中會出現兩次打印出來的太白老司機,因為我們在主程序中開了一個子程序, # 子程序中的程式相當於import的主程序中的程式,那麼import的時候會不會執行你import的那個檔案的程式啊,前面學的,是會執行的,所以出現了兩次列印 # 其實是因為windows開起程序的機制決定的,在linux下是不存在這個效果的,因為windows使用的是process方法來開啟程序,他就會拿到主程序中的所有程式, # 而linux下只是去執行我子程序中註冊的那個函式,不會執行別的程式,這也是為什麼在windows下要加上執行程式的時候, 要加上if __name__ == '__main__':,否則會出現子程序中執行的時候還開啟子程序,那就出現無限迴圈的建立程序了,就報錯了 
Python Copy

2.Process類中引數的介紹:

引數介紹:
1 group引數未使用,值始終為None
2 target表示呼叫物件,即子程序要執行的任務
3 args表示呼叫物件的位置引數元組,args=(1,2,'egon',)
4 kwargs表示呼叫物件的字典,kwargs={'name':'egon','age':18}
5 name為子程序的名稱
fe:給要執行的函式傳引數

給要執行的函式傳引數

3.Process類中各方法的介紹:

1、p.start():啟動程序,並呼叫該子程序中的p.run() 2、p.run():程序啟動時執行的方法,正是它去呼叫target指定的函式,我們自定義類的類中一定要實現該方法 3、p.terminate():強制終止程序p,不會進行任何清理操作,如果p建立了子程序,該子程序就成了殭屍程序,使用該方法需要特別小心這種情況。如果p還儲存了一個鎖那麼也將不會被釋放,進而導致死鎖 4、p.is_alive():如果p仍然執行,返回True, 判斷程序是否還存活 5、p.join([timeout]):主執行緒等待p終止(強調:是主執行緒處於等的狀態,而p是處於執行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的程序,而不能join住run開啟的程序 
Python Copy

join方法的例子:

讓主程序加上join的地方等待(也就是阻塞住),等待子程序執行完之後,再繼續往下執行我的主程序,
好多時候,我們主程序需要子程序的執行結果,所以必須要等待。join感覺就像是將子程序和主程序拼接起來一樣,將非同步改為同步執行。

join方法的例子

4.怎麼樣開啟多個程序呢?for迴圈。

並且我有個需求就是說,所有的子程序非同步執行,然後所有的子程序全部執行完之後,我再執行主程序,怎麼搞?看程式碼

開啟多個程序

fe:模擬兩個應用場景:1、同時對一個檔案進行寫操作 2、同時建立多個檔案

示例

5.Process類中自帶封裝的各屬性的介紹

1、p.daemon:預設值為False,如果設為True,代表p為後臺執行的守護程序,當p的父程序終止時,p也隨之終止,並且設定為True後,p不能建立自己的新程序,必須在p.start()之前設定 2、p.name:程序的名稱 3、p.pid:程序的pid 4、p.exitcode:程序在執行時為None、如果為–N,表示被訊號N結束(瞭解即可) 5、p.authkey:程序的身份驗證鍵,預設是由os.urandom()隨機生成的32字元的字串。這個鍵的用途是為涉及網路連線的底層程序間通訊提供安全性,這類連線只有在具有相同的身份驗證鍵時才能成功(瞭解即可) 
Python Copy

2、Process類的使用

注意:在windows中Process()必須放到# if name == ‘main‘:下

原因解釋

練習:那我們自己通過多程序來實現一下同時和多個客戶端進行連線通訊。

我們之前學socket的時候,知道tcp協議的socket是不能同時和多個客戶端進行連線的,(這裡先不考慮socketserver那個模組),
對不對,那我們自己通過多程序來實現一下同時和多個客戶端進行連線通訊。
服務端程式碼示例:(注意一點:通過這個是不能做qq聊天的,
因為qq聊天是qq的客戶端把資訊發給另外一個qq的客戶端,中間有一個服務端幫你轉發訊息,
而不是我們這樣的單純的客戶端和服務端對話,並且子程序開啟之後咱們是沒法操作的,
並且沒有為子程序input輸入提供控制檯,所有你再在子程序中寫上了input會報錯,EOFError錯誤,
這個錯誤的意思就是你的input需要輸入,但是你輸入不了,就會報這個錯誤。
而子程序的輸出列印之類的,是pycharm做了優化,將所有子程序中的輸出結果幫你打印出來了,但實質還是不同程序的。)

服務端程式碼客戶端程式碼

1.上面我們通過多程序實現了併發,但是有個問題

每來一個客戶端,都在服務端開啟一個程序,如果併發來一個萬個客戶端,要開啟一萬個程序嗎,你自己嘗試著在你自己的機器上開啟一萬個,10萬個程序試一試。
解決方法:程序池,本篇部落格後面會講到,大家繼續學習呀

2.殭屍程序與孤兒程序(簡單瞭解 一下就可以啦)

殭屍程序與孤兒程序

3、守護程序

之前我們講的子程序是不會隨著主程序的結束而結束,子程序全部執行完之後,程式才結束,
那麼如果有一天我們的需求是我的主程序結束了,由我主程序建立的那些子程序必須跟著結束,怎麼辦?守護程序就來了!

主程序建立守護程序

其一:守護程序會在主程序程式碼執行結束後就終止

其二:守護程序內無法再開啟子程序,否則丟擲異常:AssertionError: daemonic processes are not allowed to have children

注意:程序之間是互相獨立的,主程序程式碼執行結束,守護程序隨即終止
Python Copy
import time
from multiprocessing import Process

def func1(): time.sleep(2) print('xxxxx') if __name__ == '__main__': p = Process(target=func1,) p.daemon = True #守護程序,要寫在start之前 p.start() print('主程序程式碼執行完了') # time.sleep(3) 
Python Copy

4.程序同步(鎖)重點

保證資料安全用的,但是將鎖起來的那段程式碼的執行變成了同步\序列,犧牲了效率,保證了安全
from multiprocessing import Lock
L = Lock() # 建立鎖的物件 L.acquire() # 限定一個進入 資料操作 L.release() # 釋放 
Python Copy

fe:接下來,我們以模擬搶票為例,來看看資料安全的重要性。

併發執行,效率高,但是競爭同一個檔案,導致資料混亂加鎖:購票行為由併發變成了序列,犧牲了效率,但是保證了資料安全

程序鎖總結

程序鎖總結

5、訊號量 (瞭解)

互斥鎖同時只允許一個執行緒更改資料,而訊號量Semaphore是同時允許一定數量的執行緒更改資料 。
假設商場裡有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。
實現:
訊號量同步基於內部計數器,每呼叫一次acquire(),計數器減1;每呼叫一次release(),計數器加1.當計數器為0時,acquire()呼叫被阻塞。 這是迪科斯徹(Dijkstra)訊號量概念P()和V()的Python實現。訊號量同步機制適用於訪問像伺服器這樣的有限資源。 訊號量與程序池的概念很像,但是要區分開,訊號量涉及到加鎖的概念 
Python Copy

訊號量的使用

6、事件(瞭解)

事件介紹

python執行緒的事件用於主執行緒控制其他執行緒的執行,事件主要提供了三個方法 set、wait、clear。

    事件處理的機制:全域性定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麼event.wait 方法時便不再阻塞。 clear:將“Flag”設定為False set:將“Flag”設定為True 
Python Copy

事件方法的使用通過事件來模擬紅綠燈示例基於事件的程序通訊

7、佇列(推薦使用)重點

佇列:程序安全的,能夠保證資料安全,
程序彼此之間互相隔離,要實現程序間通訊(IPC),multiprocessing模組支援兩種形式:佇列和管道,這兩種方式都是使用訊息傳遞的。
佇列就像一個特殊的列表,但是可以設定固定長度,並且從前面插入資料,從後面取出資料,先進先出。

Queue([maxsize]) 建立共享的程序佇列。
引數 :maxsize是佇列中允許的最大項數。如果省略此引數,則無大小限制。
底層佇列使用管道和鎖實現

佇列的簡單用法示例queue的方法介紹queue的其他方法(瞭解)子程序與父程序通過佇列進行通訊

8、生產者消費者的模型 JoinableQueue

緩衝區解耦的事情
Joinablequeue,能記錄著你往佇列裡面put的資料量
其他方法和queue是一樣的,比queue多了兩個方法:q.task_done,q.join

佇列是程序安全的:同一時間只能一個程序拿到佇列中的一個數據,你拿到了一個數據,這個資料別人就拿不到了。
在併發程式設計中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。

為什麼要使用生產者和消費者模式

線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。
在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。
同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麼是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。
生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,
直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,
平衡了生產者和消費者的處理能力,並且我可以根據生產速度和消費速度來均衡一下多少個生產者可以為多少個消費者提供足夠的服務,
就可以開多程序等等,而這些程序都是到阻塞佇列或者說是緩衝區中去獲取或者新增資料。
Python Copy

那麼我們基於佇列來實現一個生產者消費者模型,程式碼示例:

基於佇列的生產者消費者模型

#生產者消費者模型總結

    #程式中有兩類角色
        一類負責生產資料(生產者)
        一類負責處理資料(消費者)

    #引入生產者消費者模型為了解決的問題是:
        平衡生產者與消費者之間的工作能力,從而提高程式整體處理資料的速度

    #如何實現:
        生產者<-->佇列<——>消費者 #生產者消費者模型實現類程式的解耦和 ```` #### 9、多生產者多消費者模型 運用JoinableQueue 在有多個生產者和多個消費者時,由於佇列我們說了是程序安全的,我一個程序拿走了結束訊號, 另外一個程序就拿不到了,還需要多傳送一個結束訊號,有幾個取資料的程序就要傳送幾個結束訊號,我們則需要用一個很low的方式去解決 這樣的方法並不適合,所有這裡引入了JoinableQueue #### JoinableQueue([maxsize]) ```python #JoinableQueue([maxsize]):這就像是一個Queue物件,但佇列允許專案的使用者通知生成者專案已經被成功處理。通知程序是使用共享的訊號和條件變數來實現的。 #引數介紹: maxsize是佇列中允許最大項數,省略則無大小限制。 #方法介紹: JoinableQueue的例項p除了與Queue物件相同的方法之外還具有: q.task_done():使用者使用此方法發出訊號,表示q.get()的返回專案已經被處理。如果呼叫此方法的次數大於從佇列中刪除專案的數量,將引發ValueError異常 q.join():生產者呼叫此方法進行阻塞,直到佇列中所有的專案均被處理。阻塞將持續到佇列中的每個專案均呼叫q.task_done()方法為止,也就是佇列中的資料全部被get拿走了。 ``` ![](https://lj.fwit.win/wp-content/uploads/2018/11/a8579f17425d37b3ba954fcff35d610e.png) #### JoinableQueue佇列實現生產者消費者模型 <details> <summary>JoinableQueue佇列實現生產者消費者模型</summary> ```python # 2.基於佇列寫一個多個生產者和多個消費者的模型 from multiprocessing import Process, JoinableQueue import time def producer1(name,q): # 重複程式碼的重用就行了,不用每次都寫多一個函式,只用引數來控制 for i in range(5): time.sleep(0.5) q.put("%s號生產的%s" % (i,name)) print("%s號的%s好了" %(i,name)) q.join() # 只設置一個join就行了,這是個共享的佇列空間,q.task_done()都是發到這裡來的,#生產完畢,使用此方法進行阻塞,直到佇列中所有專案均被處理。 # print("結束了") def consumer1(q): while 1: time.sleep(1) res = q.get() print("%s被消費者吃掉了" % res) q.task_done() # 每拿一次,向q.join()傳送一次訊號,證明一個數據已經被取走並執行完了 if __name__ == '__main__': q = JoinableQueue(10) # 生產者:廚師們 p1 = Process(target=producer1, args=("包子", q,)) p2 = Process(target=producer1, args=("饅頭", q,)) # 消費者:吃貨們 c1 = Process(target=consumer1, args=(q,)) c2 = Process(target=consumer1, args= (q,)) c1.daemon = True c2.daemon = True # 消費者設定為守護程序,這樣消費者這個子程序才會結束,,但是加了守護之後,必須確保生產者的內容生產完並且被處理完了, # 所有必須還要在主程序給生產者設定join,才能確保生產者生產的任務被執行完了,並且能夠確保守護程序在所有任務執行完成之後才隨著主程序的結束而結束。 p1.start() p2.start() c1.start() c2.start() p1.join() # 等待一個子程序就行了,因為q.join是共享的 # 主程序等--->p1,p2,p3等---->c1,c2 # p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到佇列的資料 # 因而c1,c2也沒有存在的價值了,不需要繼續阻塞在程序中影響主程序了。應該隨著主程序的結束而結束,所以設定成守護程序就可以了。