並發編程之——多進程
一、基本概念
1.1 進程
其實進程就是正在進行的一個程序或者任務,而負責執行任務的是CPU,執行任務的地方是內存。跟程序相比,程序僅僅是一堆代碼而已,而程序運行時的過程才是進程。另外同一個程序執行兩次就是兩個進程了。
1.2 並發與並行
無論是並行還是並發,在用戶看來都是‘同時‘運行的,不管是進程還是線程,都只是一個任務而已,真是幹活的是cpu,cpu來做這些任務,而一個cpu同一時刻只能執行一個任務。對於“並發”而言,是偽並行,即看起來是同時運行,單個cpu+多道技術就可以實現並發;而“並行”才是真正意義上的“同時運行”——僅有多核才能夠實現“並行”。
需要強調的一點是:與線程不同,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。
二、Multiprocessing模塊
python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu\_count\(\)查看),在python中大部分情況需要使用多進程。
Python提供了multiprocessing。 multiprocessing模塊用來開啟子進程,並在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,>提供了Process、Queue、Pipe、Lock等組件。
2.1 開啟子進程的兩種方式
2.1.1 直接在Multiprocessing模塊中導入Process類,利用這個類實例化進程對象
# -*- coding: utf-8 -*- # -*- Author: WangHW -*- #方式一 from multiprocessing import Process import time import os def task(name): print(‘%s is running...‘%name) print(‘子進程的id為:‘,os.getpid()) print(‘子進程的父進程的id為:‘,os.getppid()) time.sleep(View Code3) print(‘%s is done‘%name) if __name__ == ‘__main__‘: #Process(target=task, kwargs={‘name‘:‘子進程1‘}) #得到一個對象 p = Process(target=task,args=(‘子進程1‘,)) #start僅僅只是給操作系統發送了一個信號,發完信號以後父進程不會等子進程 #是完全獨立的兩個進程 p.start() print(‘主進程‘) print(‘主進程的id為:‘,os.getpid()) print(‘主進程的父進程id為:‘,os.getppid())
2.2.2 利用類的繼承,自己定義一個MyProcessing類,繼承自Process,但是需要註意的是:裏面必須要有一個名為run的方法去執行主體:
# -*- coding: utf-8 -*- # -*- Author: WangHW -*- from multiprocessing import Process import time #用類的繼承方式實現 class MyProcessing(Process): def __init__(self,name): super().__init__() self.name = name #註意名字必須叫run def run(self): print(‘%s is running......‘%self.name) time.sleep(3) print(‘%s is done...‘%self.name) if __name__ == ‘__main__‘: p = MyProcessing(‘進程1‘) p.start() print(‘主進程‘)View Code
2.2 Process類實例化出對象的join方法
在主進程運行過程中如果想並發地執行其他的任務,我們可以開啟子進程,此時主進程的任務與子進程的任務分兩種情況
情況一:在主進程的任務與子進程的任務彼此獨立的情況下,主進程的任務先執行完畢後,主進程還需要等待子進程執行完畢,然後統一回收資源。
情況二:如果主進程的任務在執行到某一個階段時,需要等待子進程執行完畢後才能繼續執行,就需要有一種機制能夠讓主進程檢測子進程是否運行完畢,在子進程執行完畢後才繼續執行,否則一直在原地阻塞,這就是join方法的作用
# -*- coding: utf-8 -*- # -*- Author: WangHW -*- from multiprocessing import Process import time import os def task(name,n): print(‘%s is running...‘%name) time.sleep(n) print(‘%s is done...‘% name) if __name__ == ‘__main__‘: start_time = time.time() # for i in range(5,8): # p = Process(target=task,args=(‘p%s‘%(i+1),i)) # p.start() #p.join() p1 = Process(target=task,args=(‘p1‘,5)) p2 = Process(target=task,args=(‘p2‘,2)) p3 = Process(target=task,args=(‘p3‘,3)) #start僅僅是向操作系統發出信號,具體誰先執行不一定,由操作系統決定 p1.start() p2.start() p3.start() #保證有序,看著像“串行”,但實際上還是並行:最後一行的運行時間可以驗證 p1.join() p2.join() p3.join() print(‘主進程開啟,id為:‘,os.getpid()) #打印出來的結果可知,程序仍然是並發執行的,不是串行執行的 print(‘運行時間:‘,time.time()-start_time)View Code
關於join方法,需要註意的一點是:雖然我們看著像“串行”,但實際上還是並行:由上面程序最後一行的運行時間可以驗證:
三、互斥鎖
3.1 雖然進程之間數據不共享,但是可以共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結果就是錯亂。
如何控制,就是加鎖處理。而互斥鎖的意思就是互相排斥,如果把多個進程比喻為多個人,互斥鎖的工作原理就是多個人都要去爭搶同一個資源:衛生間,一個人搶到衛生間後上一把鎖,其他人都要等著,等到這個完成任務後釋放鎖,其他人才有可能有一個搶到......所以互斥鎖的原理,就是把並發改成穿行,降低了效率,但保證了數據安全不錯亂。
這裏有一個利用互斥鎖模擬搶票的程序(whw.json文件的內容為:{"count": 2}):
# -*- coding: utf-8 -*- # -*- Author: WangHW -*- from multiprocessing import Process,Lock import json import time #查票 def search(name): time.sleep(1) with open(‘whw.json‘,‘r‘) as f: ticket_dict = json.load(f) print(‘<%s>查看到余票為:<%s>‘%(name,ticket_dict[‘count‘])) #買票 def get(name): time.sleep(1) f = open(‘whw.json‘,‘r‘) ticket_dict = json.load(f) print(‘<%s>查看到余票還剩余:<%s>‘%(name,ticket_dict[‘count‘])) if ticket_dict[‘count‘] > 0: ticket_dict[‘count‘] -= 1 print(‘<%s>購票成功!‘ % name) time.sleep(1) else: print(‘余票不足~購票失敗‘) f.close() #保存 f_new = open(‘whw.json‘,‘w‘) json.dump(ticket_dict,f_new) f_new.close() def task(name,mutex): search(name) #在購票前加鎖 mutex.acquire() get(name) #釋放鎖 mutex.release() if __name__ == ‘__main__‘: mutex = Lock() for i in range(5): p = Process(target=task,args=(‘路人%s‘%(i+1),mutex)) p.start()View Code
結果展示:
3.2關於互斥鎖與join的區別:
用一句話來簡單概括:“互斥鎖”是將代碼的“局部變成串行”,而如果用join的話會整個功能代碼變為串行,所以對於本例而言互斥鎖要靈活一些。
四、隊列
4.1對於多進程有一個問題需要我們考慮:是否有一種方案能夠同時兼顧一下兩點:一是效率高(多個進程共享一塊內存數據),另外一點是能夠幫我們處理好鎖的問題。
答案就是~~利用隊列!
首先,隊列是將數據存到內存中處理,這就滿足了“效率高”這個要求,另外,隊列是基於“管道+鎖”設計的,所以另外一點也滿足了。事實上,隊列才是進程間通信(IPC)的最佳選擇!
另外需要大家註意的是:隊列是一種先進先出的數據結構。
創建隊列用以下方式:
# -*- coding: utf-8 -*- # -*- Author: WangHW -*- from multiprocessing import Queue #隊列中不應該放大文件,發的只是精簡的消息 #可以不指定大小,但最終受限於內存的大小 q = Queue(3) q.put(‘hello‘) q.put({‘a‘:1}) q.put(3333333) #判斷一下隊列滿沒有 print(q.full()) #取出來~先進先出 print(q.get()) print(q.get()) print(q.get())View Code
4.2 隊列的應用——生產者消費者模型
“生產者消費者模型”是並發編程的非常重要的一個模型,也是隊列的一個非常重要的應用之一:
上圖是一個簡單的生產者與消費者模型:生產者將生產的DATA先放到隊列裏,消費者從隊列中獲取生產者生產的數據,這樣使得程序的耦合性大大降低,而且也平衡了生產者與消費者之間的速度差:
具體代碼如下:
# -*- coding: utf-8 -*- # -*- Author: WangHW -*- from multiprocessing import Process,Queue import time def producer(q): for i in range(5): res = ‘包子%s‘%i time.sleep(0.5) print(‘生產者生產了%s‘%res) q.put(res) def consumer(q): while 1: res = q.get() if res is None: break time.sleep(1) print(‘消費者吃了%s‘%res) if __name__ == ‘__main__‘: q = Queue() p1 = Process(target=producer,args=(q,)) p2 = Process(target=producer,args=(q,)) c1 = Process(target=consumer,args=(q,)) c2 = Process(target=consumer,args=(q,)) p1.start() p2.start() c1.start() c2.start() p1.join() p2.join() #有兩個消費者,需要最後put兩次None q.put(None) q.put(None) #print(‘主進程‘.center(20,‘*‘))View Code
實現效果如下:
當然上面的代碼可以利用“守護進程”優化一下(作為了解),將消費者進程設置為守護進程,隨著主程序進程一起消除:
# -*- coding: utf-8 -*- # -*- Author: WangHW -*- from multiprocessing import Process,JoinableQueue import time def producer(q): for i in range(5): res = ‘包子%s‘%i time.sleep(0.5) print(‘生產者生產了%s‘%res) q.put(res) q.join() def consumer(q): while 1: res = q.get() if res is None: break time.sleep(1) print(‘消費者吃了%s‘%res) q.task_done() if __name__ == ‘__main__‘: q = JoinableQueue() p1 = Process(target=producer,args=(q,)) p2 = Process(target=producer,args=(q,)) c1 = Process(target=consumer,args=(q,)) c2 = Process(target=consumer,args=(q,)) #將消費者進程設置為守護進程,隨著主程序一起消除 c1.daemon = True c2.daemon = True p1.start() p2.start() c1.start() c2.start() p1.join() p2.join()View Code
五、其他補充
5.1 需要註意的一點是:進程之間的內存空間是相互隔離的,看如下程序:
from multiprocessing import Process n = 100 def work(): global n n = 0 print(‘子進程內的n為:‘,n) if __name__ == ‘__main__‘: p = Process(target=work) p.start() print(‘主進程的n為:‘,n)
運行結果為:
並發編程之——多進程