1. 程式人生 > >python-並發編程之多進程

python-並發編程之多進程

upper 類的方法 cli 需要 super 處理 如果 star client

multiprocessing模塊

創建進程的類

技術分享圖片
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)

強調:
1. 需要使用關鍵字的方式來指定參數
2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號

1 group參數未使用,值始終為None
2 
3 target表示調用對象,即子進程要執行的任務
4 
5 args表示調用對象的位置參數元組,args=(1,2,egon,)
6 
7 kwargs表示調用對象的字典,kwargs={
name:egon,age:18} 8 9 name為子進程的名稱
View Code

方法介紹

技術分享圖片
1 p.start():啟動進程,並調用該子進程中的p.run() 
 2 p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法  
 3 
 4 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那麽也將不會被釋放,進而導致死鎖
 5 p.is_alive():如果p仍然運行,返回True
 6 
 7 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程  
View Code

屬性介紹

技術分享圖片
1 p.daemon:默認值為False,如果設為True,代表p為後臺運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True後,p不能創建自己的新進程,必須在p.start()之前設置
2 
3 p.name:進程的名稱
4 
5 p.pid:進程的pid
6 
7 p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)
8 
9 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)
View Code

註意:Process類,在windows中Process()必須放到#if __name__==‘__main__‘下

技術分享圖片在windows中Process()必須放到#if __name__==‘__main__‘下

開啟子進程的兩種方式

技術分享圖片
開啟進程的兩種方式:

方法一:
from multiprocessing import Process
import time
def task(name):
    print(%s是好人%name)
    time.sleep(2)
    print(你說的啥)
if __name__ == __main__:
    p = Process(target=task,args=(tom,))
    p.start()
    print(你是壞人)
# 方法二:
from multiprocessing import Process
import time

class Myprocess(Process):
    def __init__(self,name):    #定義自己的方法
        super().__init__()  #繼承父類的方法
        self.name = name
    def run(self):  #這個函數的函數名必須是run,不能是其他的,如果是其他的不會被執行該方法下面的東西
        time.sleep(3)
        print(%s is running%self.name)
        time.sleep(3)
if __name__ == __main__:
    p = Myprocess(alex)
    p.start()       #p.run()    #默認是調用的run方法
    print()
View Code

將socket通信變成並發形式

技術分享圖片
服務端
from socket import *
from multiprocessing import Process


def server(ip,port):
    server = socket(AF_INET, SOCK_STREAM)
    server.bind((ip,port))
    server.listen(5)
    while True:
        conn,addr = server.accept()
        p = Process(target=communicate,args=(conn,))
        p.start()
    server.close()
def communicate(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()
if __name__ == __main__:
    server(127.0.0.1,8090)


客戶端
from socket import *
client = socket(AF_INET,SOCK_STREAM)
client.connect((127.0.0.1,8090))
while True:
    msg = input(>>)
    if not msg :continue
    client.send(msg.encode(utf-8))
    date = client.recv(1024)
    print(date.decode(utf-8))
client.close()
View Code

雖然上面方法可以實現socket通信的並發,但是每來一個客戶端,都在服務端開啟一個進程,如果並發來一萬個客戶端,要開啟一萬個進程嗎,你自己嘗試著在你自己的機器上開一萬個試試.(解決方法就是使用進程池)

process對象的join方法

技術分享圖片join:主進程等待子進程結束

process對象的其他方法或屬性

技術分享圖片
from multiprocessing import Process
import time,random
def task():
    print(孫子運行了)
    time.sleep(3)
def piao(name):
    print(%s is piaoing %name)
    time.sleep(random.randint(1,3))
    print(%s is done %name)
    p = Process(target=task)
    p.start()
if __name__ == __main__:
    p1 = Process(target=piao,args=(alex,),name = ***)
    p1.start()
    p1.join()
    print(p1.name)  #-->查看進程的名稱
    print(p1.pid)       #查看進程的pid
    # p.terminate() -->強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,用該方法需要特別小心這種情況。如果p保存了一個鎖那麽也將不會被釋放,進而導致死鎖
    print()
View Code 技術分享圖片
#進程對象的其他方法一:terminate,is_alive
from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()

    def run(self):
        print(%s is piaoing %self.name)
        time.sleep(random.randrange(1,5))
        print(%s is piao end %self.name)


p1=Piao(egon1)
p1.start()

p1.terminate()#關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活
print(p1.is_alive()) #結果為True

print(開始)
print(p1.is_alive()) #結果為False
View Code

方法總結:

    p.name  #-->查看進程的名稱
    p.pid      #查看進程的pid
    p.terminate() -->強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,用該方法需要特別小心這種情況。如果p保存了一個鎖那麽也將不會被釋放,進而導致死鎖
    p.join() -->主進程等子進程執行完之後再結束 - --> 等的時間就是執行時間最長的
    p.is_alive()查看子進程是否還存活

進程池

提交/調用任務的方式有兩種:
同步調用:提交/調用一個任務,然後就在原地等著,等到該任務執行完畢拿到結果,再執行下一行代碼
異步調用:提交/調用一個任務,不在原地等著,直接執行下一行代碼
這裏引入進程池的概念,是為了限制並發數很多時服務器被卡死,所以就用一個池子來限制並發的數量,所有連接數都會被接收,但不會立即去執行,而是先執行進程池中的
比如進程池規定一次可以並發四個任務,也就是一次可以同時處理四個任務,當這四個任務中的任何一個被處理完後,進程池中就會立馬填補一個然後再進行處理
當任務數很少時還是不使用進程池比較好

技術分享圖片對上面描述的一個例題1

時間是3秒多一些
from multiprocessing import Process,Pool該模塊中的方法比較麻煩,所以使用下面的模塊實現
from concurrent.futures import  ProcessPoolExecutor
import time,random,os
def piao(name,n):
    print(%s is piaoing %s %(name,os.getpid()))
    time.sleep(1)
    return n**2
if __name__ == __main__:
    p = ProcessPoolExecutor(4)  #規定每次可以並發四個任務
    objs = []
    start = time.time()
    for i in range(10):     #這裏有10個任務.
        # res = p.submit(piao,‘alex %s‘%i,i).result() #同步調用
        # print(res)
        obj = p.submit(piao,alex %s%i,i)  #異步調用   #發出指令要開啟子進程
        objs.append(obj)
    for obj in objs:
        print(obj.result())     #result是取子進程中的結果

    stop = time.time()
    print(stop-start)
    #關門+等
    # pool.close()
    # pool.join()
    p.shutdown(wait=True)   #   關門+等
    print(,os.getpid())


p.submit()是發出開啟子進程的指令,obj.result()是取子進程中產生的結果
p.shutdown(wait=True) #任務數是一定的,不會再增加,也就是關門,並等待子進程結束後再執行主程序

技術分享圖片

from multiprocessing import Process
import time,random,os

def piao(name,n):
    print(%s is piaoing %s %(name,os.getpid()))
    time.sleep(1)
    return n**2
if __name__ == __main__:
    start=time.time()
    p_l=[]
    for i in range(10):
        p=Process(target=piao,args=(xx,1))
        p_l.append(p)
        p.start()

    for p in p_l:
        p.join()

    stop=time.time()
    print(stop-start)
2

例題1和例題2中在連接數比較少時使用2會更好,原因是2中的是所有任務一起去執行,而1中是每四個任務一起執行,要分三次,

每次使用一秒,總共要三秒多,但2中是一起執行的,是 使用了一秒多.要根據不同的環境選擇不同的方法使用

多進程是實現並發的手段之一,但需要註意的問題是:
1.很明顯需要並發執行的任務通常要遠大於核數
2.一個操作系統不可能無限開啟進程,通常有幾個核就開幾個進程
3.進程開啟過多,效率反而會下降(開啟進程是需要占用操作系統資源的,而且開啟多余核數目的進程也無法做到並行)
當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態動態生成多個進程,十幾個還好,如果是上百個,上千個,手動的去限制進程數量卻又太過繁瑣,此時使用程序池是最好的

對於遠程過程調用高級應用程序而言,應該使用進程池,pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,就會創建一個新的
進程來執行該請求,但如果池中的進程數已經達到規定最大值,那麽該請求就會等待,直到翅中有進程結束,就重用進程池中的進程
 

python-並發編程之多進程