python-並發編程之多進程
阿新 • • 發佈:2017-11-30
upper 類的方法 cli 需要 super 處理 如果 star client
View Code
View Code
multiprocessing模塊
創建進程的類
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動) 強調: 1. 需要使用關鍵字的方式來指定參數 2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號 1 group參數未使用,值始終為None 2 3 target表示調用對象,即子進程要執行的任務 4 5 args表示調用對象的位置參數元組,args=(1,2,‘egon‘,) 6 7 kwargs表示調用對象的字典,kwargs={‘View Codename‘:‘egon‘,‘age‘:18} 8 9 name為子進程的名稱
方法介紹
1 p.start():啟動進程,並調用該子進程中的p.run() 2 p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法 3 4 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那麽也將不會被釋放,進而導致死鎖 5 p.is_alive():如果p仍然運行,返回True 6 7 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程
屬性介紹
1 p.daemon:默認值為False,如果設為True,代表p為後臺運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True後,p不能創建自己的新進程,必須在p.start()之前設置 2 3 p.name:進程的名稱 4 5 p.pid:進程的pid 6 7 p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可) 8 9 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)
註意:Process類,在windows中Process()必須放到#if __name__==‘__main__‘下
在windows中Process()必須放到#if __name__==‘__main__‘下開啟子進程的兩種方式
開啟進程的兩種方式: 方法一: from multiprocessing import Process import time def task(name): print(‘%s是好人‘%name) time.sleep(2) print(‘你說的啥‘) if __name__ == ‘__main__‘: p = Process(target=task,args=(‘tom‘,)) p.start() print(‘你是壞人‘) # 方法二: from multiprocessing import Process import time class Myprocess(Process): def __init__(self,name): #定義自己的方法 super().__init__() #繼承父類的方法 self.name = name def run(self): #這個函數的函數名必須是run,不能是其他的,如果是其他的不會被執行該方法下面的東西 time.sleep(3) print(‘%s is running‘%self.name) time.sleep(3) if __name__ == ‘__main__‘: p = Myprocess(‘alex‘) p.start() #p.run() #默認是調用的run方法 print(‘主‘)View Code
將socket通信變成並發形式
服務端 from socket import * from multiprocessing import Process def server(ip,port): server = socket(AF_INET, SOCK_STREAM) server.bind((ip,port)) server.listen(5) while True: conn,addr = server.accept() p = Process(target=communicate,args=(conn,)) p.start() server.close() def communicate(conn): while True: try: data = conn.recv(1024) if not data:break conn.send(data.upper()) except ConnectionResetError: break conn.close() if __name__ == ‘__main__‘: server(‘127.0.0.1‘,8090) 客戶端 from socket import * client = socket(AF_INET,SOCK_STREAM) client.connect((‘127.0.0.1‘,8090)) while True: msg = input(‘>>‘) if not msg :continue client.send(msg.encode(‘utf-8‘)) date = client.recv(1024) print(date.decode(‘utf-8‘)) client.close()View Code
雖然上面方法可以實現socket通信的並發,但是每來一個客戶端,都在服務端開啟一個進程,如果並發來一萬個客戶端,要開啟一萬個進程嗎,你自己嘗試著在你自己的機器上開一萬個試試.(解決方法就是使用進程池)
process對象的join方法
join:主進程等待子進程結束process對象的其他方法或屬性
from multiprocessing import Process import time,random def task(): print(‘孫子運行了‘) time.sleep(3) def piao(name): print(‘%s is piaoing‘ %name) time.sleep(random.randint(1,3)) print(‘%s is done‘ %name) p = Process(target=task) p.start() if __name__ == ‘__main__‘: p1 = Process(target=piao,args=(‘alex‘,),name = ‘***‘) p1.start() p1.join() print(p1.name) #-->查看進程的名稱 print(p1.pid) #查看進程的pid # p.terminate() -->強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,用該方法需要特別小心這種情況。如果p保存了一個鎖那麽也將不會被釋放,進而導致死鎖 print(‘主‘)View Code
#進程對象的其他方法一:terminate,is_alive from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print(‘%s is piaoing‘ %self.name) time.sleep(random.randrange(1,5)) print(‘%s is piao end‘ %self.name) p1=Piao(‘egon1‘) p1.start() p1.terminate()#關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活 print(p1.is_alive()) #結果為True print(‘開始‘) print(p1.is_alive()) #結果為FalseView Code
方法總結:
p.name #-->查看進程的名稱 p.pid #查看進程的pid p.terminate() -->強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,用該方法需要特別小心這種情況。如果p保存了一個鎖那麽也將不會被釋放,進而導致死鎖 p.join() -->主進程等子進程執行完之後再結束 - --> 等的時間就是執行時間最長的 p.is_alive()查看子進程是否還存活
進程池
提交/調用任務的方式有兩種:
同步調用:提交/調用一個任務,然後就在原地等著,等到該任務執行完畢拿到結果,再執行下一行代碼
異步調用:提交/調用一個任務,不在原地等著,直接執行下一行代碼
這裏引入進程池的概念,是為了限制並發數很多時服務器被卡死,所以就用一個池子來限制並發的數量,所有連接數都會被接收,但不會立即去執行,而是先執行進程池中的對上面描述的一個例題1
比如進程池規定一次可以並發四個任務,也就是一次可以同時處理四個任務,當這四個任務中的任何一個被處理完後,進程池中就會立馬填補一個然後再進行處理
當任務數很少時還是不使用進程池比較好
時間是3秒多一些 from multiprocessing import Process,Pool該模塊中的方法比較麻煩,所以使用下面的模塊實現 from concurrent.futures import ProcessPoolExecutor import time,random,os def piao(name,n): print(‘%s is piaoing %s‘ %(name,os.getpid())) time.sleep(1) return n**2 if __name__ == ‘__main__‘: p = ProcessPoolExecutor(4) #規定每次可以並發四個任務 objs = [] start = time.time() for i in range(10): #這裏有10個任務. # res = p.submit(piao,‘alex %s‘%i,i).result() #同步調用 # print(res) obj = p.submit(piao,‘alex %s‘%i,i) #異步調用 #發出指令要開啟子進程 objs.append(obj) for obj in objs: print(obj.result()) #result是取子進程中的結果 stop = time.time() print(stop-start) #關門+等 # pool.close() # pool.join() p.shutdown(wait=True) # 關門+等 print(‘主‘,os.getpid())
p.submit()是發出開啟子進程的指令,obj.result()是取子進程中產生的結果
p.shutdown(wait=True) #任務數是一定的,不會再增加,也就是關門,並等待子進程結束後再執行主程序
from multiprocessing import Process import time,random,os def piao(name,n): print(‘%s is piaoing %s‘ %(name,os.getpid())) time.sleep(1) return n**2 if __name__ == ‘__main__‘: start=time.time() p_l=[] for i in range(10): p=Process(target=piao,args=(‘xx‘,1)) p_l.append(p) p.start() for p in p_l: p.join() stop=time.time() print(stop-start)2
例題1和例題2中在連接數比較少時使用2會更好,原因是2中的是所有任務一起去執行,而1中是每四個任務一起執行,要分三次,
每次使用一秒,總共要三秒多,但2中是一起執行的,是 使用了一秒多.要根據不同的環境選擇不同的方法使用
多進程是實現並發的手段之一,但需要註意的問題是:
1.很明顯需要並發執行的任務通常要遠大於核數
2.一個操作系統不可能無限開啟進程,通常有幾個核就開幾個進程
3.進程開啟過多,效率反而會下降(開啟進程是需要占用操作系統資源的,而且開啟多余核數目的進程也無法做到並行)
當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態動態生成多個進程,十幾個還好,如果是上百個,上千個,手動的去限制進程數量卻又太過繁瑣,此時使用程序池是最好的
對於遠程過程調用高級應用程序而言,應該使用進程池,pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,就會創建一個新的
進程來執行該請求,但如果池中的進程數已經達到規定最大值,那麽該請求就會等待,直到翅中有進程結束,就重用進程池中的進程
python-並發編程之多進程