1. 程式人生 > >python之旅:並發編程之多進程

python之旅:並發編程之多進程

全部 rep start OS 運行時間 默認 sse a star top命令

一 multiprocessing模塊介紹

python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python提供了multiprocessing。
multiprocessing模塊用來開啟子進程,並在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。

  multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

需要再次強調的一點是:與線程不同,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。

二 Process類的介紹

創建進程的類

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)

強調:
1. 需要使用關鍵字的方式來指定參數
2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號

參數介紹:

group參數未使用,值始終為None

target表示調用對象,即子進程要執行的任務

args表示調用對象的位置參數元組,args=(1,2,egon,)  #元組內一定要,結尾

kwargs表示調用對象的字典,kwargs
={name:egon,age:18} name為子進程的名稱

  方法介紹:

 p.start():啟動進程,並調用該子進程中的p.run() 
 p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法  
 
 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那麽也將不會被釋放,進而導致死鎖
 p.is_alive():如果p仍然運行,返回True
 
 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程  

屬性介紹:

p.daemon:默認值為False,如果設為True,代表p為後臺運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True後,p不能創建自己的新進程,必須在p.start()之前設置

p.name:進程的名稱

p.pid:進程的pid

p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)

p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)

三 Process類的使用

註意:在windows中Process()必須放到# if __name__ == ‘__main__‘:下

技術分享圖片
Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. 
If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). 
This is the reason for hiding calls to Process() inside

if __name__ == "__main__"
since statements inside this if-statement will not get called upon import.
由於Windows沒有fork,多處理模塊啟動一個新的Python進程並導入調用模塊。 
如果在導入時調用Process(),那麽這將啟動無限繼承的新進程(或直到機器耗盡資源)。 
這是隱藏對Process()內部調用的原,使用if __name__ == “__main __”,這個if語句中的語句將不會在導入時被調用。
詳細解釋

創建並開啟子進程的兩種方式

技術分享圖片
#開進程的方法一:
import time
import random
from multiprocessing import Process
def piao(name):
    print(%s piaoing %name)
    time.sleep(random.randrange(1,5))
    print(%s piao end %name)



p1=Process(target=piao,args=(egon,)) #必須加,號
p2=Process(target=piao,args=(alex,))
p3=Process(target=piao,args=(wupeqi,))
p4=Process(target=piao,args=(yuanhao,))

p1.start()
p2.start()
p3.start()
p4.start()
print(主線程)
方法一 技術分享圖片
#開進程的方法二:
import time
import random
from multiprocessing import Process


class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print(%s piaoing %self.name)

        time.sleep(random.randrange(1,5))
        print(%s piao end %self.name)

p1=Piao(egon)
p2=Piao(alex)
p3=Piao(wupeiqi)
p4=Piao(yuanhao)

p1.start() #start會自動調用run
p2.start()
p3.start()
p4.start()
print(主線程)
方法二

進程直接的內存空間是隔離的

技術分享圖片
from multiprocessing import Process
n=100 #在windows系統中應該把全局變量定義在if __name__ == ‘__main__‘之上就可以了
def work():
    global n
    n=0
    print(子進程內: ,n)


if __name__ == __main__:
    p=Process(target=work)
    p.start()
    print(主進程內: ,n)
test

練習1:把上周所學的socket通信變成並發的形式

技術分享圖片
from socket import *
from multiprocessing import Process

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind((127.0.0.1,8080))
server.listen(5)

def talk(conn,client_addr):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == __main__: #windows下start進程一定要寫到這下面
    while True:
        conn,client_addr=server.accept()
        p=Process(target=talk,args=(conn,client_addr))
        p.start()
server端 技術分享圖片
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect((127.0.0.1,8080))


while True:
    msg=input(>>: ).strip()
    if not msg:continue

    client.send(msg.encode(utf-8))
    msg=client.recv(1024)
    print(msg.decode(utf-8))
多個client端 技術分享圖片
每來一個客戶端,都在服務端開啟一個進程,如果並發來一個萬個客戶端,要開啟一萬個進程嗎,你自己嘗試著在你自己的機器上開啟一萬個,10萬個進程試一試。
解決方法:進程池
這麽實現有沒有問題

Process對象的join方法

技術分享圖片
from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()
    def run(self):
        print(%s is piaoing %self.name)
        time.sleep(random.randrange(1,3))
        print(%s is piao end %self.name)


p=Piao(egon)
p.start()
p.join(0.0001) #等待p停止,等0.0001秒就不再等了
print(開始)
join:主進程等,等待子進程結束 技術分享圖片
from multiprocessing import Process
import time
import random
def piao(name):
    print(%s is piaoing %name)
    time.sleep(random.randint(1,3))
    print(%s is piao end %name)

p1=Process(target=piao,args=(egon,))
p2=Process(target=piao,args=(alex,))
p3=Process(target=piao,args=(yuanhao,))
p4=Process(target=piao,args=(wupeiqi,))

p1.start()
p2.start()
p3.start()
p4.start()

#有的同學會有疑問:既然join是等待進程結束,那麽我像下面這樣寫,進程不就又變成串行的了嗎?
#當然不是了,必須明確:p.join()是讓誰等?
#很明顯p.join()是讓主線程等待p的結束,卡住的是主線程而絕非進程p,

#詳細解析如下:
#進程只要start就會在開始運行了,所以p1-p4.start()時,系統中已經有四個並發的進程了
#而我們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵
#join是讓主線程等,而p1-p4仍然是並發執行的,p1.join的時候,其余p2,p3,p4仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接通過檢測,無需等待
# 所以4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間
p1.join()
p2.join()
p3.join()
p4.join()

print(主線程)


#上述啟動進程與join進程可以簡寫為
# p_l=[p1,p2,p3,p4]
# 
# for p in p_l:
#     p.start()
# 
# for p in p_l:
#     p.join()
有了join,程序不就是串行了嗎???

Process對象的其他方法或屬性(了解)

技術分享圖片
#進程對象的其他方法一:terminate,is_alive
from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()

    def run(self):
        print(%s is piaoing %self.name)
        time.sleep(random.randrange(1,5))
        print(%s is piao end %self.name)


p1=Piao(egon1)
p1.start()

p1.terminate()#關閉進程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活
print(p1.is_alive()) #結果為True

print(開始)
print(p1.is_alive()) #結果為False
terminate與is_alive 技術分享圖片
from multiprocessing import Process
import time
import random
class Piao(Process):
    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法會執行self.name=Piao-1,
        #                    #所以加到這裏,會覆蓋我們的self.name=name

        #為我們開啟的進程設置名字的做法
        super().__init__()
        self.name=name

    def run(self):
        print(%s is piaoing %self.name)
        time.sleep(random.randrange(1,3))
        print(%s is piao end %self.name)

p=Piao(egon)
p.start()
print(開始)
print(p.pid) #查看pid
name與pid

僵屍進程與孤兒進程(了解)

技術分享圖片
參考博客:http://www.cnblogs.com/Anker/p/3271773.html

一:僵屍進程(有害)
  僵屍進程:一個進程使用fork創建子進程,如果子進程退出,而父進程並沒有調用wait或waitpid獲取子進程的狀態信息,那麽子進程的進程描述符仍然保存在系統中。這種進程稱之為僵死進程。詳解如下

我們知道在unix/linux中,正常情況下子進程是通過父進程創建的,子進程在創建新的進程。子進程的結束和父進程的運行是一個異步過程,即父進程永遠無法預測子進程到底什麽時候結束,如果子進程一結束就立刻回收其全部資源,那麽在父進程內將無法獲取子進程的狀態信息。

因此,UNⅨ提供了一種機制可以保證父進程可以在任意時刻獲取子進程結束時的狀態信息:
1、在每個進程退出的時候,內核釋放該進程所有的資源,包括打開的文件,占用的內存等。但是仍然為其保留一定的信息(包括進程號the process ID,退出狀態the termination status of the process,運行時間the amount of CPU time taken by the process等)
2、直到父進程通過wait / waitpid來取時才釋放. 但這樣就導致了問題,如果進程不調用wait / waitpid的話,那麽保留的那段信息就不會釋放,其進程號就會一直被占用,但是系統所能使用的進程號是有限的,如果大量的產生僵死進程,將因為沒有可用的進程號而導致系統不能產生新的進程. 此即為僵屍進程的危害,應當避免。

  任何一個子進程(init除外)在exit()之後,並非馬上就消失掉,而是留下一個稱為僵屍進程(Zombie)的數據結構,等待父進程處理。這是每個子進程在結束時都要經過的階段。如果子進程在exit()之後,父進程沒有來得及處理,這時用ps命令就能看到子進程的狀態是“Z”。如果父進程能及時 處理,可能用ps命令就來不及看到子進程的僵屍狀態,但這並不等於子進程不經過僵屍狀態。  如果父進程在子進程結束之前退出,則子進程將由init接管。init將會以父進程的身份對僵屍狀態的子進程進行處理。

二:孤兒進程(無害)

  孤兒進程:一個父進程退出,而它的一個或多個子進程還在運行,那麽那些子進程將成為孤兒進程。孤兒進程將被init進程(進程號為1)所收養,並由init進程對它們完成狀態收集工作。

  孤兒進程是沒有父進程的進程,孤兒進程這個重任就落到了init進程身上,init進程就好像是一個民政局,專門負責處理孤兒進程的善後工作。每當出現一個孤兒進程的時候,內核就把孤 兒進程的父進程設置為init,而init進程會循環地wait()它的已經退出的子進程。這樣,當一個孤兒進程淒涼地結束了其生命周期的時候,init進程就會代表黨和政府出面處理它的一切善後工作。因此孤兒進程並不會有什麽危害。

我們來測試一下(創建完子進程後,主進程所在的這個腳本就退出了,當父進程先於子進程結束時,子進程會被init收養,成為孤兒進程,而非僵屍進程),文件內容

import os
import sys
import time

pid = os.getpid()
ppid = os.getppid()
print im father, pid, pid, ppid, ppid
pid = os.fork()
#執行pid=os.fork()則會生成一個子進程
#返回值pid有兩種值:
#    如果返回的pid值為0,表示在子進程當中
#    如果返回的pid值>0,表示在父進程當中
if pid > 0:
    print father died..
    sys.exit(0)

# 保證主線程退出完畢
time.sleep(1)
print im child, os.getpid(), os.getppid()

執行文件,輸出結果:
im father pid 32515 ppid 32015
father died..
im child 32516 1

看,子進程已經被pid為1的init進程接收了,所以僵屍進程在這種情況下是不存在的,存在只有孤兒進程而已,孤兒進程聲明周期結束自然會被init來銷毀。


三:僵屍進程危害場景:

  例如有個進程,它定期的產 生一個子進程,這個子進程需要做的事情很少,做完它該做的事情之後就退出了,因此這個子進程的生命周期很短,但是,父進程只管生成新的子進程,至於子進程 退出之後的事情,則一概不聞不問,這樣,系統運行上一段時間之後,系統中就會存在很多的僵死進程,倘若用ps命令查看的話,就會看到很多狀態為Z的進程。 嚴格地來說,僵死進程並不是問題的根源,罪魁禍首是產生出大量僵死進程的那個父進程。因此,當我們尋求如何消滅系統中大量的僵死進程時,答案就是把產生大 量僵死進程的那個元兇槍斃掉(也就是通過kill發送SIGTERM或者SIGKILL信號啦)。槍斃了元兇進程之後,它產生的僵死進程就變成了孤兒進 程,這些孤兒進程會被init進程接管,init進程會wait()這些孤兒進程,釋放它們占用的系統進程表中的資源,這樣,這些已經僵死的孤兒進程 就能瞑目而去了。

四:測試
#1、產生僵屍進程的程序test.py內容如下

#coding:utf-8
from multiprocessing import Process
import time,os

def run():
    print(,os.getpid())

if __name__ == __main__:
    p=Process(target=run)
    p.start()
    
    print(,os.getpid())
    time.sleep(1000)


#2、在unix或linux系統上執行
[root@vm172-31-0-19 ~]# python3  test.py &
[1] 18652
[root@vm172-31-0-19 ~]# 主 18652
子 18653

[root@vm172-31-0-19 ~]# ps aux |grep Z
USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root     18653  0.0  0.0      0     0 pts/0    Z    20:02   0:00 [python3] <defunct> #出現僵屍進程
root     18656  0.0  0.0 112648   952 pts/0    S+   20:02   0:00 grep --color=auto Z

[root@vm172-31-0-19 ~]# top #執行top命令發現1zombie
top - 20:03:42 up 31 min,  3 users,  load average: 0.01, 0.06, 0.12
Tasks:  93 total,   2 running,  90 sleeping,   0 stopped,   1 zombie
%Cpu(s):  0.0 us,  0.3 sy,  0.0 ni, 99.7 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem :  1016884 total,    97184 free,    70848 used,   848852 buff/cache
KiB Swap:        0 total,        0 free,        0 used.   782540 avail Mem 

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND                                                                                                                                        
root      20   0   29788   1256    988 S  0.3  0.1   0:01.50 elfin                                                                                                                      


#3、
等待父進程正常結束後會調用wait/waitpid去回收僵屍進程
但如果父進程是一個死循環,永遠不會結束,那麽該僵屍進程就會一直存在,僵屍進程過多,就是有害的
解決方法一:殺死父進程
解決方法二:對開啟的子進程應該記得使用join,join會回收僵屍進程
參考python2源碼註釋
class Process(object):
    def join(self, timeout=None):
        ‘‘‘
        Wait until child process terminates
        ‘‘‘
        assert self._parent_pid == os.getpid(), can only join a child process
        assert self._popen is not None, can only join a started process
        res = self._popen.wait(timeout)
        if res is not None:
            _current_process._children.discard(self)

join方法中調用了wait,告訴系統釋放僵屍進程。discard為從自己的children中剔除

解決方法三:http://blog.csdn.net/u010571844/article/details/50419798
View Code

python之旅:並發編程之多進程