併發程式設計之多程序
一、什麼是程序
一個正在執行的程式稱之為程序 是一種抽象概念 表示一個執行某件事情的過程,程序的概念 起源於作業系統
第一代計算機 程式是固定 無法修改 某種計算機只能幹某種活
第二代批處理系統 需要人工參與 將程式攢成一批 統一執行,序列執行 提高計算機的的利用率 但是除錯麻煩
第三代計算機 為了更好利用計算機資源,產生了
多道技術: (重點) 1.空間複用 記憶體分割為多個區域 每個區域儲存不同的應用程式
2.時間的複用 1.當一個程式遇到了I/O操作時 會切換到其他程式 (切換前需要儲存當前執行狀態 以便恢復執行)
2.當你的應用程式執行時間過長 作業系統會強行切走 以保證其他程式也能正常執行 當然因為cpu速度賊快 使用者感覺不到,降低效率
3.有一個優先順序更高的任務需要處理 此時也會切走,降低了效率
我們編寫程式時 只能儘量減少I/O操作
總的來說 有了多道技術之後 作業系統可以同時執行多個程式吧 這種情形稱之為併發 但是本質好 這些程式還是一個一個排隊執行。
#一 作業系統的作用: 1:隱藏醜陋複雜的硬體介面,提供良好的抽象介面 2:管理、排程程序,並且將多個程序對硬體的競爭變得有序 #二 多道技術: 1.產生背景:針對單核,實現併發 ps: 現在的主機一般是多核,那麼每個核都會利用多道技術 有4個cpu,運行於cpu1的某個程式遇到io阻塞,會等到io結束再重新排程,會被排程到4個 cpu中的任意一個,具體由作業系統排程演算法決定。2.空間上的複用:如記憶體中同時有多道程式 3.時間上的複用:複用一個cpu的時間片 強調:遇到io切,佔用cpu時間過長也切,核心在於切之前將程序的狀態儲存下來,這樣 才能保證下次切換回來時,能基於上次切走的位置繼續執行
程序的並行與併發
併發 看起來像是同時執行中 本質是不停切換執行 多個程序隨機執行並行 同一時刻 多個程序 同時進行 只有多核處理器才有真正的並行
區別:
並行是從微觀上,也就是在一個精確的時間片刻,有不同的程式在執行,這就要求必須有多個處理器。併發是從巨集觀上,在一個時間段上可以看出是同時執行的,比如一個伺服器同時處理多個session。
序列 一個一個 依次執行排隊 阻塞 遇到了I/O操作 看起來就是程式碼卡住了,非阻塞 不會卡住程式碼的執行 阻塞 和 非阻塞 說的是同一個程序的情況下
同步 一個呼叫必須獲得返回結果才能繼續執行 非同步 一個呼叫發起後 發起方不需要等待它的返回結果
同步和非同步 必須存在多個程序(執行緒) 無論是程序還是執行緒都是兩條獨立的執行路徑
多程序的執行順序 主程序必然先執行,子程序應該在主程序執行後執行,一旦子程序啟動了 後續的順序就無法控制了
python如何使用多程序 1.直接建立Process物件 同時傳入要做的事情就是一個函式
p = Process(taget=一個函式,args=(函式的引數))p.start() 讓作業系統啟動這個程序 2.建立一個類 繼承自Process 把要做的任務放在run方法中
常用屬性 start 開啟程序 join 父程序等待子程序 name 程序名稱 is_alive是否存活 terminate 終止程序 pid 獲取程序id
啟動程序的方式 1.系統初始化 會產生一個根程序 2.使用者的互動請求 滑鼠雙擊某個程式 3.在一個程序中 發起了系統呼叫啟動了另一個程序 4.批處理作業開始 某些專用計算機可能還在使用
不同作業系統建立程序的方式不同 unix < centos mac linux 完全拷貝父程序的所有資料 子程序可以訪問父程序的資料嗎?不可以 但是可以訪問拷貝過來資料副本 windows 建立子程序 載入父程序中所有可執行的檔案
二、在python程式中的程序操作
一 multiprocessing模組介紹
python中的多執行緒無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu\_count\(\)檢視),在python中大部分情況需要使用多程序。
Python提供了multiprocessing。 multiprocessing模組用來開啟子程序,並在子程序中執行我們定製的任務(比如函式),該模組與多執行緒模組threading的程式設計介面類似。multiprocessing模組的功能眾多:支援子程序、通訊和共享資料、執行不同形式的同步,>提供了Process、Queue、Pipe、Lock等元件。
需要再次強調的一點是:與執行緒不同,程序沒有任何共享狀態,程序修改的資料,改動僅限於該程序內
二 Process類的介紹
Process([group [, target [, name [, args [, kwargs]]]]]),由該類例項化得到的物件,可用來開啟一個子程序 強調: 1. 需要使用關鍵字的方式來指定引數 2. args指定的為傳給target函式的位置引數,是一個元組形式,必須有逗號
引數介紹:
group引數未使用,值始終為None target表示呼叫物件,即子程序要執行的任務 args表示呼叫物件的位置引數元組,args=(1,2,'egon',) kwargs表示呼叫物件的字典,kwargs={'name':'egon','age':18} name為子程序的名稱
方法介紹:
p.start():啟動程序,並呼叫該子程序中的p.run()
p.run():程序啟動時執行的方法,正是它去呼叫target指定的函式,我們自定義類的類中一定要實現該方法
p.terminate():強制終止程序p,不會進行任何清理操作,如果p建立了子程序,該子程序就成了殭屍程序,使用該方法需要特別小心這種情況。如果p還儲存了一個鎖那麼也將不會被釋放,進而導致死鎖
p.is_alive():如果p仍然執行,返回True
p.join([timeout]):主執行緒等待p終止(強調:是主執行緒處於等的狀態,而p是處於執行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的程序,而不能join住run開啟的程序
屬性介紹:
p.daemon:預設值為False,如果設為True,代表p為後臺執行的守護程序,當p的父程序終止時,p也隨之終止,並且設定為True後,p不能建立自己的新程序,必須在p.start()之前設定
p.name:程序的名稱
p.pid:程序的pid
三 Process類的使用
注意:在windows中Process()必須放到# if __name__ == '__main__':下
建立並開啟子程序的方式
import time import random from multiprocessing import Process def piao(name): print('%s piaoing' %name) time.sleep(random.randrange(1,5)) print('%s piao end' %name) if __name__ == '__main__': #例項化得到四個物件 p1=Process(target=piao,args=('egon',)) #必須加,號 p2=Process(target=piao,args=('alex',)) p3=Process(target=piao,args=('wupeqi',)) p4=Process(target=piao,args=('yuanhao',)) #呼叫物件下的方法,開啟四個程序 p1.start() p2.start() p3.start() p4.start() print('主')方式一
import time import random from multiprocessing import Process class Piao(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print('%s piaoing' %self.name) time.sleep(random.randrange(1,5)) print('%s piao end' %self.name) if __name__ == '__main__': #例項化得到四個物件 p1=Piao('egon') p2=Piao('alex') p3=Piao('wupeiqi') p4=Piao('yuanhao') #呼叫物件下的方法,開啟四個程序 p1.start() #start會自動呼叫run p2.start() p3.start() p4.start() print('主')方式二
四、Process物件的join方法
在主程序執行過程中如果想併發地執行其他的任務,我們可以開啟子程序,此時主程序的任務與子程序的任務分兩種情況
情況一:在主程序的任務與子程序的任務彼此獨立的情況下,主程序的任務先執行完畢後,主程序還需要等待子程序執行完畢,然後統一回收資源。
情況二:如果主程序的任務在執行到某一個階段時,需要等待子程序執行完畢後才能繼續執行,就需要有一種機制能夠讓主程序檢測子程序是否執行完畢,在子程序執行完畢後才繼續執行,否則一直在原地阻塞,這就是join方法的作用
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=('bob',)) p.start() p_lst.append(p) p.join() # [p.join() for p in p_lst] print('父程序在執行')多個程序同時執行,再談join方法(1)
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=('bob',)) p.start() p_lst.append(p) # [p.join() for p in p_lst] print('父程序在執行')多個程序同時執行,再談join方法(2)
除了上面這些開啟程序的方法,還有一種以繼承Process類的形式開啟程序的方式
import os from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print(os.getpid()) print('%s 正在和女主播聊天' %self.name) p1=MyProcess('wupeiqi') p2=MyProcess('yuanhao') p3=MyProcess('nezha') p1.start() #start會自動呼叫run p2.start() # p2.run() p3.start() p1.join() p2.join() p3.join() print('主執行緒')通過繼承Process類開啟程序
程序之間的資料隔離問題
from multiprocessing import Process def work(): global n n=0 print('子程序內: ',n) if __name__ == '__main__': n = 100 p=Process(target=work) p.start() print('主程序內: ',n)程序之間的資料隔離問題
三、守護程序
主程序建立子程序,然後將該程序設定成守護自己的程序,守護程序就好比崇禎皇帝身邊的老太監,崇禎皇帝已死老太監就跟著殉葬了。
關於守護程序需要強調兩點:
其一:守護程序會在主程序程式碼執行結束後就終止
其二:守護程序內無法再開啟子程序,否則丟擲異常:AssertionError: daemonic processes are not allowed to have children
如果我們有兩個任務需要併發執行,那麼開一個主程序和一個子程序分別去執行就ok了,如果子程序的任務在主程序任務結束後就沒有存在的必要了,那麼該子程序應該在開啟前就被設定成守護程序。主程序程式碼執行結束,守護程序隨即終止
from multiprocessing import Process import time import random def task(name): print('%s is piaoing' %name) time.sleep(random.randrange(1,3)) print('%s is piao end' %name) if __name__ == '__main__': p=Process(target=task,args=('egon',)) p.daemon=True #一定要在p.start()前設定,設定p為守護程序,禁止p建立子程序,並且父程序程式碼執行結束,p即終止執行 p.start() print('主') #只要終端打印出這一行內容,那麼守護程序p也就跟著結束掉了守護程序
守護 就是看著 陪著 在程式碼中 程序只能由程序類守護 一個程序守護者另一個程序 指的是兩個程序之間的關聯關係 特點:守護程序(妃子) 在被守護程序(皇帝)死亡時 會跟隨被守護程序死亡
什麼時候需要使用守護程序?例如: qq中有個下載視訊 應該用子程序去做 但是 下載的過程中 qq退出 那麼下載也沒必要繼續了
四、互斥鎖
from multiprocessing import Process,Lock # 程序間 記憶體空間是相互獨立的 def task1(lock): lock.acquire() for i in range(10000): print("===") lock.release() def task2(lock): lock.acquire() for i in range(10000): print("===============") lock.release() def task3(lock): lock.acquire() for i in range(10000): print("======================================") lock.release() if __name__ == '__main__': # 買了一把鎖 mutex = Lock() # for i in range(10): # p = Process(target=) p1 = Process(target=task1,args=(mutex,)) p2 = Process(target=task2,args=(mutex,)) p3 = Process(target=task3,args=(mutex,)) # p1.start() # p1.join() # p2.start() # p2.join() # p3.start() # p3.join() p1.start() p2.start() p3.start() print("over!")互斥鎖案例
# 什麼時候用鎖? # 當多個程序 同時讀寫同一份資料 資料很可能就被搞壞了 # 第一個程序寫了一箇中文字元的一個位元組 cpu被切到另一個程序 # 另一個程序也寫了一箇中文字元的一個位元組 # 最後檔案解碼失敗 # 問題之所以出現 是因為併發 無法控住順序 # 目前可以使用join來將所有程序併發改為序列 # 與join的區別? # 多個程序併發的訪問了同一個資源 將導致資源競爭(同時讀取不會產生問題 同時修改才會出問題) # 第一個方案 加上join 但是這樣就導致了 不公平 相當於 上廁所得按照顏值來 # 第二個方案 加鎖 誰先搶到資源誰先處理[ # 相同點: 都變成了序列 # 不同點: # 1.join順序固定 鎖順序不固定! # 2.join使整個程序的任務全部序列 而鎖可以指定哪些程式碼要序列 # 鎖使是什麼? # 鎖本質上就是一個bool型別的識別符號 大家(多個程序) 在執行任務之前先判斷識別符號 # 互斥鎖 兩個程序相互排斥 # 注意 要想鎖住資源必須保證 大家拿到鎖是同一把 # 怎麼使用? # 在需要加鎖的地方 lock.acquire() 表示鎖定 # 在程式碼執行完後 一定要lock.release() 表示釋放鎖 # lock.acquire() # 放需要競爭資源的程式碼 (同時寫入資料) # lock.release()
程序之間資料不共享,但是共享同一套檔案系統,所以訪問同一個檔案,或者列印終端是沒有問題的,但是帶來的是競爭,競爭帶來的結果是錯亂,如下:多個程序模擬多個人執行搶票任務
#檔案db.txt的內容為:{"count":1} #注意一定要用雙引號,不然json無法識別 from multiprocessing import Process import time,json def search(name): dic=json.load(open('db.txt')) time.sleep(1) print('\033[43m%s 查到剩餘票數%s\033[0m' %(name,dic['count'])) def get(name): dic=json.load(open('db.txt')) time.sleep(1) #模擬讀資料的網路延遲 if dic['count'] >0: dic['count']-=1 time.sleep(1) #模擬寫資料的網路延遲 json.dump(dic,open('db.txt','w')) print('\033[46m%s 購票成功\033[0m' %name) def task(name): search(name) get(name) if __name__ == '__main__': for i in range(10): #模擬併發10個客戶端搶票 name='<路人%s>' %i p=Process(target=task,args=(name,)) p.start()
併發執行,效率高,但競爭寫同一檔案,資料寫入錯亂,只有一張票,賣成功給了10個人
<路人0> 查到剩餘票數1 <路人1> 查到剩餘票數1 <路人2> 查到剩餘票數1 <路人3> 查到剩餘票數1 <路人4> 查到剩餘票數1 <路人5> 查到剩餘票數1 <路人6> 查到剩餘票數1 <路人7> 查到剩餘票數1 <路人8> 查到剩餘票數1 <路人9> 查到剩餘票數1 <路人0> 購票成功 <路人4> 購票成功 <路人1> 購票成功 <路人5> 購票成功 <路人3> 購票成功 <路人7> 購票成功 <路人2> 購票成功 <路人6> 購票成功 <路人8> 購票成功 <路人9> 購票成功執行結果
加鎖處理:購票行為由併發變成了序列,犧牲了執行效率,但保證了資料安全
#把檔案db.txt的內容重置為:{"count":1} from multiprocessing import Process,Lock import time,json def search(name): dic=json.load(open('db.txt')) time.sleep(1) print('\033[43m%s 查到剩餘票數%s\033[0m' %(name,dic['count'])) def get(name): dic=json.load(open('db.txt')) time.sleep(1) #模擬讀資料的網路延遲 if dic['count'] >0: dic['count']-=1 time.sleep(1) #模擬寫資料的網路延遲 json.dump(dic,open('db.txt','w')) print('\033[46m%s 購票成功\033[0m' %name) def task(name,lock): search(name) with lock: #相當於lock.acquire(),執行完自程式碼塊自動執行lock.release() get(name) if __name__ == '__main__': lock=Lock() for i in range(10): #模擬併發10個客戶端搶票 name='<路人%s>' %i p=Process(target=task,args=(name,lock)) p.start()
<路人0> 查到剩餘票數1 <路人1> 查到剩餘票數1 <路人2> 查到剩餘票數1 <路人3> 查到剩餘票數1 <路人4> 查到剩餘票數1 <路人5> 查到剩餘票數1 <路人6> 查到剩餘票數1 <路人7> 查到剩餘票數1 <路人8> 查到剩餘票數1 <路人9> 查到剩餘票數1 <路人0> 購票成功執行結果
加鎖可以保證多個程序修改同一塊資料時,同一時間只能有一個任務可以進行修改,即序列地修改,沒錯,速度是慢了,但犧牲了速度卻保證了資料安全。
雖然可以用檔案共享資料實現程序間通訊,但問題是:
1、效率低(共享資料基於檔案,而檔案是硬碟上的資料)
2、需要自己加鎖處理
因此我們最好找尋一種解決方案能夠兼顧:
1、效率高(多個程序共享一塊記憶體的資料)
2、幫我們處理好鎖問題。
這就是mutiprocessing模組為我們提供的基於訊息的IPC通訊機制:佇列和管道。
佇列和管道都是將資料存放於記憶體中,而佇列又是基於(管道+鎖)實現的,可以讓我們從複雜的鎖問題中解脫出來,因而佇列才是程序間通訊的最佳選擇。
我們應該儘量避免使用共享資料,儘可能使用訊息傳遞和佇列,避免處理複雜的同步和鎖問題,而且在程序數目增多時,往往可以獲得更好的可獲展性。
五、程序間的通訊
IPC 指的是程序間通訊 之所以開啟子程序 肯定需要它幫我們完成任務 很多情況下 需要將資料返回給父程序 然而 程序記憶體是物理隔離的 解決方案: 1.將共享資料放到檔案中 就是慢 2.管道 subprocess中的那個 管道只能單向通訊 必須存在父子關係 3.共享一塊記憶體區域 得作業系統幫你分配 速度快
from multiprocessing import Process,Manager import time def task(dic): print("子程序xxxxx") # li[0] = 1 # print(li[0]) dic["name"] = "xx" if __name__ == '__main__': m = Manager() # li = m.list([100]) dic = m.dict({}) # 開啟子程序 p = Process(target=task,args=(dic,)) p.start() time.sleep(3) print(dic)
六、殭屍程序和孤兒程序
一個程序任務執行完就死亡了 但是作業系統不會立即將其清理 為的是 開啟這個子程序的父程序可以訪問到這個子程序的資訊這樣的 任務完成的 但是沒有被作業系統清理的程序稱為殭屍程序 越少越好
孤兒程序 無害! 沒有爹的稱為孤兒 一個父程序已經死亡 然而他的子孫程序 還在執行著 這時候 作業系統會接管這些孤兒程序
七、佇列介紹
程序彼此之間互相隔離,要實現程序間通訊(IPC),multiprocessing模組支援兩種形式:佇列和管道,這兩種方式都是使用訊息傳遞的
建立佇列的類(底層就是以管道和鎖定的方式實現)
Queue([maxsize]):建立共享的程序佇列,Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞。
引數介紹:
maxsize是佇列中允許最大項數,省略則無大小限制。 但需要明確: 1、佇列記憶體放的是訊息而非大資料 2、佇列佔用的是記憶體空間,因而maxsize即便是無大小限制也受限於記憶體大小
主要方法介紹:
q.put方法用以插入資料到佇列中。
q.get方法可以從佇列讀取並且刪除一個元素。
佇列使用:
from multiprocessing import Process,Queue q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(1) q.put(2) q.put(3) print(q.full()) #滿了 # q.put(4) #再放就阻塞住了 print(q.get()) print(q.get()) print(q.get()) print(q.empty()) #空了 # print(q.get()) #再取就阻塞住了
""" 程序間通訊的另一種方式 使用queue queue 佇列 佇列的特點: 先進的先出 後進後出 就像扶梯 """ from multiprocessing import Process,Queue # 基礎操作 必須要掌握的 # 建立一個佇列 # q = Queue() # # 存入資料 # q.put("hello") # q.put(["1","2","3"]) # q.put(1) # # 取出資料 # print(q.get()) # print(q.get()) # print(q.get()) # print(q.get()) # 阻塞操作 必須掌握 # q = Queue(3) # # # 存入資料 # q.put("hello",block=False) # q.put(["1","2","3"],block=False) # q.put(1,block=False) # 當容量滿的時候 再執行put 預設會阻塞直到執行力了get為止 # 如果修改block=False 直接報錯 因為沒地方放了 # q.put({},block=False) # # 取出資料 # print(q.get(block=False)) # print(q.get(block=False)) # print(q.get(block=False)) # # 對於get 當佇列中中沒有資料時預設是阻塞的 直達執行了put # # 如果修改block=False 直接報錯 因為沒資料可取了 # print(q.get(block=False))
八、生產消費者模型:
1.生產者消費者模型 模型 設計模式 三層結構 等等表示的都是一種程式設計套路 生產者指的是能夠產生資料的一類任務 消費者指的是處理資料的一類任務
需求: 資料夾裡有十個文字文件 要求你找出檔案中包含習大大關鍵字的檔案,開啟並讀取檔案資料就是生產者,查詢關鍵字的過程就是消費者
生產者消費者模型為什麼出現? 生產者的處理能力與消費者的處理能力 不匹配不平衡 導致了一方等待另一方 浪費時間 目前我們通過多程序將生產 和 消費 分開處理 然後將生產者生產的資料通過佇列交給消費者
總結一下在生產者消費者模型中 不僅需要生產者消費者 還需要一個共享資料區域 1.將生產方和消費方耦合度降低 2.平衡雙方的能力 提高整體效率
上程式碼 : 搞兩個程序 一個負責生產 一個負責消費
from multiprocessing import Process,Queue # 製作熱狗 def make_hotdog(queue,name): for i in range(3): time.sleep(random.randint(1,2)) print("%s 製作了一個熱狗 %s" % (name,i)) # 生產得到的資料 data = "%s生產的熱狗%s" % (name,i) # 存到佇列中 queue.put(data) # 裝入一個特別的資料 告訴消費方 沒有了 #queue.put(None) # 吃熱狗 def eat_hotdog(queue,name): while True: data = queue.get() if not data:break time.sleep(random.randint(1, 2)) print("%s 吃了%s" % (name,data)) if __name__ ==