並發編程之多進程2
一:multiprocessing模塊介紹
用來開啟子進程,並在子進程中執行定制的任務(比如函數)。該模塊功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue|Pipe、Lock等組件。
需要再次強調的一點是:與線程不同,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。
二:Process類的介紹
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動) 強調: 1. 需要使用關鍵字的方式來指定參數 2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號 參數介紹:具體介紹1 group參數未使用,值始終為None 2 3 target表示調用對象,即子進程要執行的任務 4 5 args表示調用對象的位置參數元組,args=(1,2,‘egon‘,) 6 7 kwargs表示調用對象的字典,kwargs={‘name‘:‘egon‘,‘age‘:18} 8 9 name為子進程的名稱 方法介紹: 1 p.start():啟動進程,並調用該子進程中的p.run() 2 p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法 3 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那麽也將不會被釋放,進而導致死鎖4 p.is_alive():如果p仍然運行,返回True 5 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程 屬性介紹: 1 p.daemon:默認值為False,如果設為True,代表p為後臺運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True後,p不能創建自己的新進程,必須在p.start()之前設置 2 p.name:進程的名稱 3 p.pid:進程的pid 4 p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)
三:僵屍進程與孤兒進程
1.僵屍進程(有害):一個進程使用fork創建子進程,如果子進程退出,而父進程並沒有調用wait或waitpid獲取子進程的狀態信息,那麽子進程的進程描述符仍然保存在系統中。
簡單來說就是指子進程執行完所有任務,已經終止了,但是還殘留一些信息(進程id,進程名),但是父進程沒有去處理這些殘留信息,導致殘留信息占用系統資源。
當出現大量的僵屍進程時,會占用系統資源,可以把它父進程殺掉,僵屍就成了孤兒,操作系統會負責回收。
import time from multiprocessing import Process def task1(): print("子進程 run") if __name__ == ‘__main__‘: for i in range(10): p = Process(target=task1) p.start() time.sleep(100000)僵屍進程
2.孤兒進程(無害):一個父進程結束,而它的一個或者多個子進程還在運行,那麽那些進程將成為孤兒進程。孤兒進程將被init進程所收養,並且由init進程對它們完成狀態收集工作。
import os import sys import time pid = os.getpid() ppid = os.getppid() print ‘im father‘, ‘pid‘, pid, ‘ppid‘, ppid pid = os.fork() #執行pid=os.fork()則會生成一個子進程 #返回值pid有兩種值: # 如果返回的pid值為0,表示在子進程當中 # 如果返回的pid值>0,表示在父進程當中 if pid > 0: print ‘father died..‘ sys.exit(0) # 保證主線程退出完畢 time.sleep(1) print ‘im child‘, os.getpid(), os.getppid() 執行文件,輸出結果: im father pid 32515 ppid 32015 father died.. im child 32516 1孤兒進程
四:守護進程
主進程創建守護進程:
其一:守護進程會在主進程代碼執行完畢後就終止
其二:守護進程內無法再開啟子進程,否則拋異常(AssertionError: daemonic processes are not allowed to have children)
註意:進程之間是相互獨立的,主進程代碼運行結束,守護進程隨機終止
from multiprocessing import Process import time def task(): print(‘小主的一生‘) time.sleep(2) print(‘小主涼了‘) #守護進程運行的話,此行代碼就不會運行 if __name__ == ‘__main__‘: xiaozhu=Process(target=task) xiaozhu.daemon=True #守護進程,默認為False,意味著不守護,改為True表示是守護進程 xiaozhu.start() print(‘皇帝登基‘) time.sleep(1) print(‘hello‘) print(‘皇帝薨‘)守護進程
五:進程同步
進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端是沒有問題的。但是共享帶來的就是競爭,競爭帶來的結果就是錯亂,如何控制,加鎖處理。
使用鎖將需要共享的數據加鎖,在執行代碼之前會先判斷這個值。要註意:在使用鎖時,必須保證鎖是同一個。
互斥鎖:保證了每次只有一個進程進入改都拿程序的操作,從而保證了多進程情況下數據的正確性。
from multiprocessing import Process,Lock import random import time def task1(lock): lock.acquire() print(‘hello 1‘) print(‘1 How are you!‘) time.sleep(random.randint(1,2)) print(‘bye 1‘) lock.release() def task2(lock): lock.acquire() print(‘hello 2‘) print(‘2 How are you!‘) print(‘bye 2‘) lock.release() def task3(lock): lock.acquire() print(‘hello 3‘) print(‘3 How are you!‘) print(‘bye 3‘) lock.release() if __name__ == ‘__main__‘: lock=Lock() p1=Process(target=task1,args=(lock,)) p1.start() p2=Process(target=task2,args=(lock,)) p2.start() p3=Process(target=task3,args=(lock,)) p3.start()加鎖
import json from multiprocessing import Process,Lock import time import random """ join和鎖的區別 1.join中順序是固定的 不公平 2.join是完全串行 而 鎖可以使部分代碼串行 其他代碼還是並發 """ # 查看剩余票數 def check_ticket(usr): time.sleep(random.randint(1,3)) with open("ticket.json","r",encoding="utf-8") as f: dic = json.load(f) print("%s查看 剩余票數:%s" % (usr,dic["count"])) def buy_ticket(usr): with open("ticket.json","r",encoding="utf-8") as f: dic = json.load(f) if dic["count"] > 0: time.sleep(random.randint(1,3)) dic["count"] -= 1 with open("ticket.json", "w", encoding="utf-8") as f2: json.dump(dic,f2) print("%s 購票成功!" % usr) def task(usr,lock): check_ticket(usr) lock.acquire() buy_ticket(usr) lock.release() if __name__ == ‘__main__‘: lock = Lock() for i in range(10): p = Process(target=task,args=("用戶%s" % i,lock)) p.start() #p.join() # 只有第一個整個必須完畢 別人才能買 這是不公平的互斥鎖搶票
加鎖可以保證多個進程修改同一個數據時,同一時間只能有一個任務可以進行修改,即串行的修改,降低了速度,但是保證了數據安全。
死鎖:指的是鎖無法打開導致程序卡死,首先要明確有一把鎖的時候是不會卡死的,正常開發時一把鎖足夠使用,不要打開多把鎖。
from multiprocessing import Process,Lock import time def task1(l1,l2,i): l1.acquire() print("盤子被%s搶走了" % i) time.sleep(1) l2.acquire() print("筷子被%s搶走了" % i) print("吃飯..") l1.release() l2.release() pass def task2(l1,l2,i): l2.acquire() print("筷子被%s搶走了" % i) l1.acquire() print("盤子被%s搶走了" % i) print("吃飯..") l1.release() l2.release() if __name__ == ‘__main__‘: l1 = Lock() l2 = Lock() Process(target=task1,args=(l1,l2,1)).start() Process(target=task2,args=(l1,l2,2)).start() ============================================= 運行結果卡住>>>: 盤子被1搶走了 筷子被2搶走了死鎖
六:IPC(進程間的通訊)
由於進程之間的內存都是相互獨立的,所以需要對應的解決方案,能夠使得進程之間可以相互傳遞數據。
有三種方案:
1.使用共享文件,多個進程同時讀寫同一個文件(I/O速度慢,傳輸數據大小不受限制)
2.管道是基於內存的,速度快,但是是單向的,用起來麻煩(了解 )
3.申請共享內存空間,多個進程可以共享這個內存區域(重點)
速度快,但是數據量不能太大
from multiprocessing import Manager,Process,Lock def work(d): # with lock: d[‘count‘]-=1 if __name__ == ‘__main__‘: with Manager() as m: dic=m.dict({‘count‘:100}) #創建一個共享的字典 p_l=[] for i in range(100): p=Process(target=work,args=(dic,)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)IPC
七:隊列(推薦使用)
進程之間彼此隔離,要實現進程間通信,multiprocessing模塊支持兩種形式:隊列和管道。
隊列特點:先進先出
優點:可以保證數據不會錯亂,即使在多進程下,因為其put和get默認都是阻塞的。
Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。maxsize是隊列中允許最大項數,省略則無大小限制。
1 q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。 2 q.get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,那麽在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常. 3 q.get_nowait():同q.get(False) 4 q.put_nowait():同q.put(False) 5 q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。 6 q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。 7 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣主要方法介紹
from multiprocessing import Queue # 例1: # q = Queue(1) # 創建一個隊列 最多可以存一個數據 # # q.put("張三") # print(q.get()) # # q.put("李四") # put默認會阻塞 當容器中已經裝滿了 # # print(q.get()) # print(q.get()) # get默認會阻塞 當容器中已經沒有數據了 # # print("over") # 例2: q = Queue(1) # 創建一個隊列 最多可以存一個數據 # q.put("張三") # q.put("李四",False) # 第二個參數 設置為False表示不會阻塞 無論容器是滿了 都會強行塞 如果滿了就拋異常 print(q.get()) print(q.get(timeout=3)) # timeout 僅用於阻塞時 # q.put("李四") # put默認會阻塞 當容器中已經裝滿了 # # print(q.get()) # print(q.get()) # get默認會阻塞 當容器中已經沒有數據了 # # print("over")隊列
八:生產者消費者模型
1.什麽是生產者消費者模型?
生產者消費者模式是通過一個容器來解決生產者與消費者的強耦合問題,生產者和消費者彼此之間不直接通訊,而是通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩沖區,平衡了生產者與消費者的處理能力。
import random from multiprocessing import Process,Queue import time # 爬數據 def get_data(q): for num in range(5): print("正在爬取第%s個數據" % num) time.sleep(random.randint(1,2)) print("第%s個數據 爬取完成" % num) # 把數據裝到隊列中 q.put("第%s個數據" % num) def parse_data(q): for num in range(5): # 取出數據 data = q.get() print("正在解析%s" % data) time.sleep(random.randint(1, 2)) print("%s 解析完成" % data) if __name__ == ‘__main__‘: # 共享數據容器 q = Queue(5) #生產者進程 produce = Process(target=get_data,args=(q,)) produce.start() #消費者進程 customer = Process(target=parse_data,args=(q,)) customer.start()生產者消費者模型
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print(‘\033[45m%s 吃 %s\033[0m‘ %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res=‘包子%s‘ %i q.put(res) print(‘\033[44m%s 生產了 %s\033[0m‘ %(os.getpid(),res)) if __name__ == ‘__main__‘: q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() print(‘主‘)例二
總結:程序中有兩類角色
一類負責生產數據(生產者)
一類負責處理數據(消費者)
解決的問題:平衡生產者與消費者之間的工作能力,從而提高程序整體出口ishuju的速度。
並發編程之多進程2