1. 程式人生 > >Python進程之multiprocessing模塊

Python進程之multiprocessing模塊

進程鎖 未使用 這就是 實例 成功 平臺 釋放 信號量 阻塞

python的multiprocessing模塊是跨平臺的多進程模塊,multiprocessing具有創建子進程,進程間通信,隊列,事件,鎖等功能,multiprocessing模塊包含Process,Queue,Pipe,Lock等多個組件。

1、Process

創建進程的類

Process([group [, target [, name [, args [, kwargs]]]]])

參數介紹:
group參數未使用,值始終為None
target表示調用對象,即子進程要執行的任務
args表示調用對象的位置參數元組,args=()
kwargs表示調用對象的字典,kwargs={‘key‘:‘value‘}

name為子進程的名稱

Note:需要使用關鍵字方式指定參數

示例1:創建單進程

from multiprocessing import Process

def func():
    print("first process")

if __name__ == ‘__main__‘:
    # 創建進程對象,主進程和子進程是異步執行的
    p = Process(target=func)
    # 開啟進程
    p.start()

示例2:傳參

from multiprocessing import Process

def func(*args,**kwargs):
    print("IPADDR:%s PORT:%d"%args)
    for k in kwargs:
        print("%s --> %s"%(k,kwargs[k]))

if __name__ == ‘__main__‘:
    # 創建進程對象,並傳遞參數
    p = Process(target=func,args=(‘127.0.0.1‘,8080),kwargs={‘key‘:‘value‘})
       # 如果主進程中的代碼已經結束了,子進程還沒結束,主進程會等待子進程
    # 開啟進程
    p.start()

示例3:創建多進程

import os
from multiprocessing import Process

def func():
    # os模塊的getpid方法可以獲取當前進程的pid,getppid方法可以獲取當前進程的父進程的pid
    print("子進程pid:%s,父進程pid:%s"%(os.getpid(),os.getppid()))

if __name__ == ‘__main__‘:
    p_l = []
    # 創建多個進程
    for i in range(10):
        p = Process(target=func)
        p.start()
        p_l.append(p)

    # 異步執行子進程,最後執行主進程中的代碼
    for p in p_l:
        p.join()   # 阻塞,使主進程等待子進程結束
    print("------主進程------")
結果:
子進程pid:9944,父進程pid:1484
子進程pid:8932,父進程pid:1484
子進程pid:8504,父進程pid:1484
子進程pid:14884,父進程pid:1484
子進程pid:4828,父進程pid:1484
子進程pid:14644,父進程pid:1484
子進程pid:14908,父進程pid:1484
子進程pid:1980,父進程pid:1484
子進程pid:14604,父進程pid:1484
子進程pid:10008,父進程pid:1484
------主進程------

Note :因為在windows操作系統中,沒有fork(),在創建子進程的時候會自動運行啟動它的文件中的所有代碼,因此必須將創建子進程的語句寫在ifname==‘main‘:條件語句下。

示例4:使用類繼承的方式創建進程

import os
from multiprocessing import Process

class MyProcess(Process):   # 必須繼承Process類
    def __init__(self,arg1,arg2,arg3):
        ‘‘‘
        繼承父類的初始化方法,加上自己需要的參數
        :param arg1:
        :param arg2:
        :param arg3:
        ‘‘‘
        super().__init__()
        self.arg1 = arg1
        self.arg2 = arg2
        self.arg3 = arg3

    def run(self):
        ‘‘‘
        必須要有run方法的實現
        :return:
        ‘‘‘
        print(‘子進程:%d ,父進程:%s ‘%(os.getpid(),os.getppid()),self.arg1,self.arg2,self.arg3)
        self.walk()  # walk方法在子進程中執行

    def walk(self):
        print(‘子進程:%d‘%os.getpid())

if __name__ == ‘__main__‘:
    p = MyProcess(1,2,3)
    p.start()  # 會默認調用run方法
    p.walk()   # walk方法直接在主進程中調用,並沒有在子進程中執行

    print(‘父進程:%d ‘%os.getpid())

結果:
子進程:1220
父進程:1220 
子進程:2164 ,父進程:1220  1 2 3
子進程:2164

示例5:守護進程

在為開啟daemon前,主進程會等待子進程結束在結束;
開啟daemon後,程序會在主進程結束時結束子進程

import time
from multiprocessing import Process

def cal_time(second):
    while True:
        print("current time:%s"%time.ctime())
        time.sleep(second)

if __name__ == ‘__main__‘:
    p = Process(target=cal_time,args=(1,))
    ‘‘‘
    守護進程的作用:會隨著主進程代碼執行結束而結束
    守護進程要在start前設置
    守護進程中不能再開啟子進程
    ‘‘‘
    p.daemon = True
    p.start()
    for i in range(10):
        time.sleep(0.2)
        print(‘*‘*i)

未開啟daemon結果:子進程一直在運行
current time:Tue Feb 12 17:48:44 2019

*
**
***
****
current time:Tue Feb 12 17:48:45 2019
*****
******
*******
********
*********
current time:Tue Feb 12 17:48:46 2019
current time:Tue Feb 12 17:48:47 2019
current time:Tue Feb 12 17:48:48 2019
current time:Tue Feb 12 17:48:49 2019

開啟daemon後結果:主進程結束程序就結束了
current time:Tue Feb 12 17:49:14 2019

*
**
***
****
current time:Tue Feb 12 17:49:15 2019
*****
******
*******
********
*********

示例6:屬性:name,pid 方法:is_alive(),terminate()

name:查看進程名
pid:查看進程id
is_alive:查看進程是否正在運行
terminate:結束進程


import time
from multiprocessing import Process

def func():
    print("start")
    time.sleep(3)
    print("end")
if __name__ == ‘__main__‘:
    p = Process(target=func)
    p.start()
    time.sleep(3)
    print("進程名:%s,進程id:%s"%(p.name,p.pid))
    # is_alive方法查看進程是否正在運行
    print(p.is_alive())
    # terminate方法結束進程
    p.terminate()
    time.sleep(3)
    print(p.is_alive())
結果:
start
進程名:Process-1,進程id:17564
True
False

2、Lock

進程鎖:當多個進程訪問共享資源時,進程鎖保證同一時間只能有一個任務可以進行修改,程序的運行方法有並發改為串行,這樣速度慢了,但是保證了數據的安全

示例:

import os
import time
import random
from multiprocessing import Process,Lock

def func(lock,n):
    lock.acquire() #加鎖
    print(‘%s: %s is running‘ % (n, os.getpid()))
    time.sleep(random.random())
    print(‘%s: %s is done‘ % (n, os.getpid()))
    lock.release() #釋放
if __name__ == ‘__main__‘:
    lock=Lock()
    for i in range(3):
        p=Process(target=func,args=(lock,i))
        p.start()

3、Semaphore

信號量:Lock(鎖)可以保證同一時間只能有一個任務對共享數據進行操作,而Semaphore(信號量)可以在同一時間讓指定數量的進程操作共享數據。

示例:迷你唱吧

import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore

‘‘‘
迷你唱吧,20個人,同一時間只能有4個人進去
‘‘‘

def sing(i,sem):
    sem.acquire() # 加鎖
    print(‘%s enter the ktv‘%i)
    time.sleep(random.randint(1,10))
    print(‘%s leave the ktv‘%i)
    sem.release() # 釋放

if __name__ == ‘__main__‘:
    sem = Semaphore(4)
    for i in range(20):
        p = Process(target=sing,args=(i,sem))
        p.start()

4、Event

事件:Event是進程之間的狀態標記通信,因為進程不共享數據,所以事件對象需要以參數形式傳遞到函數中使用。

主要方法:

e = Event() # 實例化一個事件對象
e.set() # 標記變為非阻塞
e.wait() # 默認標記為阻塞,在等待的過程中,遇到非阻塞信號就繼續執行
e.clear() # 標記變為阻塞
e.is_set() # 是否阻塞 True就是非阻塞,False是阻塞

示例:紅綠燈

import time
import random
from multiprocessing import Event
from multiprocessing import Process

def traffic_light(e):
    while True:
        if e.is_set(): # True為綠燈
            time.sleep(3)  # 等3秒後變為紅燈
            print("紅燈亮")
            e.clear()
        else: # False為紅燈,等3秒後變為綠燈
            time.sleep(3)
            print("綠燈亮")
            e.set()

def car(i,e):
    e.wait() # 默認是紅燈
    print("%s 車通過"%i)

if __name__ == ‘__main__‘:
    e = Event()

    # 控制紅綠燈的進程
    tra = Process(target=traffic_light,args=(e,))
    tra.start()

    for i in range(100):
        if i%6 == 0:
            time.sleep(random.randint(1,3))
        p = Process(target=car,args=(i,e))
        p.start()

5、Pipe

管道是進程間通信(IPC)的一種,管道是雙向通信的,但它不保證數據安全
創建管道:p1,p2=Pipe()

主要方法:

send():發送數據
recv():接收數據
close():關閉

示例:

def func(p):
    foo,son = p
    foo.close() # 不使用主進程的管道一端,先行關閉
    while True:
        try:
            print(son.recv())
            # 子進程在結束數據時,如果管道無數據,且對端沒有close,就會報EOFError;如果管道無數據,對端沒close,進程會阻塞
        except EOFError:
            break

if __name__ == ‘__main__‘:
    foo,son = Pipe()
    p = Process(target=func,args=((foo,son),))
    p.start()
    son.close() # 不使用子進程的管道一端,先行關閉
    foo.send("hello")
    foo.send("hello")
    foo.close()

6、Queue

隊列:進程之間是獨立的,要實現進程間通信(IPC);multiprocessing模塊支持兩種形式:隊列(queue)和管道(pipe),這兩種方式都是使用消息傳遞的,且都是雙向通信的,Queue = Pipe+Lock。

隊列的兩種創建方式:

q = Queue() # 創建隊列對象,無長度限制
q1 = Queue(3) # 傳參數,創建一個有最大長度限制的隊列

隊列的主要方法:

q.put(1) # 放入一個數據,對於無長度限制的隊列來說,永不阻塞;對於有長度限制的隊列來說,放滿就阻塞

q.get() # 隊列中有數據就取出一個數據,隊列中無數據就會阻塞;遵循先進先出原則

q.qsize() # 查看隊列的數據大小,不一定準確

示例1:主進程與子進程之間的通信

from multiprocessing import Process
from multiprocessing import Queue

def queue_put(q):
    q.put("123") # 子進程隊列中放入一個變量

if __name__ == ‘__main__‘:
    q = Queue()
    p = Process(target=queue_put,args=(q,))
    p.start()
    print(q.get()) # 主進程獲取到變量

示例2:子進程與子進程之間的通信
from multiprocessing import Process
from multiprocessing import Queue

def queue_put(q):
    q.put("123") # 子進程隊列中放入一個變量

def queue_get(q):
    print(q.get()) # 另一個子進程獲取到隊列中的數據

if __name__ == ‘__main__‘:
    q = Queue()
    p = Process(target=queue_put,args=(q,))
    p.start()
    p1 = Process(target=queue_get,args=(q,))
    p1.start()

7、JoinableQueue

JoinableQueue也是multiprocessing模塊的一種隊列的實現,但它與Queue不同的是JoinableQueue允許項目的使用者通知生成者項目已經被成功處理。創建方式同Queue。
主要方法:put與get與Queue一致
? ? q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常
? ? q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止

示例:生產者消費者模型

import time
import random
from multiprocessing import JoinableQueue
from multiprocessing import Process

‘‘‘
程序執行流程
1、生產者生產的數據全部被消費 --> 2、生產者進程結束 --> 3、主進程代碼執行結束 --> 4、消費者守護進程結束
‘‘‘
def producer(q,food):
    for i in range(5):
        q.put("%s -- %s"%(i,food))
        print("生產了 %s"%(food))
        time.sleep(random.random())
    q.join() # 2、等待消費者消費完所有數據

def consumer(q,name):
    while True:
        food = q.get()
        if food == None:break
        print("%s 吃了 %s"%(name,food))
        q.task_done() # 1、消費者每消費一個數據就返回一個task_done給生產者

if __name__ == ‘__main__‘:
    q = JoinableQueue()
    p1 = Process(target=producer,args=(q,‘youtiao‘))
    p1.start()
    p2 = Process(target=producer,args=(q,‘baozi‘))
    p2.start()
    c1 = Process(target=consumer,args=(q,‘daxiong‘))
    c1.daemon = True # 4、消費者守護進程結束
    c1.start()
    c2 = Process(target=consumer,args=(q,‘chenglei‘))
    c2.daemon = True
    c2.start()
    c3 = Process(target=consumer,args=(q,‘niu‘))
    c3.daemon = True
    c3.start()

    p1.join() # 3、等待p1執行完畢
    p2.join() # 3、等待p2執行完畢

8、Manager

Manager也是multiprocessing模塊的一個類,這個類主要提供了進程間通信(IPC)的一個機制,它支持Python所有的數據類型,但不提供數據安全的機制。

示例:

from multiprocessing import Manager
from multiprocessing import Process

def func(d):
    print(d)
    d[‘num‘] -= 10

if __name__ == ‘__main__‘:

    m = Manager()
    d = m.dict({‘num‘:100})
    l = []
    for i in range(10):
        p = Process(target=func,args=(d,))

        p.start()
        # p.join() # 同步
        l.append(p)
    for j in l:
        j.join()   # 異步

結果:
{‘num‘: 100}
{‘num‘: 90}
{‘num‘: 80}
{‘num‘: 70}
{‘num‘: 60}
{‘num‘: 50}
{‘num‘: 40}
{‘num‘: 30}
{‘num‘: 20}
{‘num‘: 10}

9、Pool

在執行大量並發任務時,多進程是行之有效的手段之一,但是多進程需要註意幾個問題,一是操作系統不可能無限開啟進程,一般是有幾個核開啟幾個進程,二是開啟進程過多,系統資源占用過多,會導致系統運行速度變慢;那麽遇到這種情況時pool(進程池)便是最好的解決方案。
Pool可以指定開啟一定數量的進程(一般為CPU核數+1個)等待用戶使用,當有新的請求進入時,如果池中有空閑進程,便直接開啟;如果池中的進程都在使用,那麽該請求就會等待,直到池中有進程結束,重用該進程。

示例1:多進程與進程池效率對比

import time
from multiprocessing import Process
from multiprocessing import Pool
def func(i):
    i -= 1

if __name__ == ‘__main__‘:
    # 計算進程池所需事件
    start1_time = time.time()  # 開始時間
    p = Pool(5)  # 進程池中創建5個進程
    p.map(func,range(100)) # 調用進程執行任務,target = func args = (1,2,3...),第二個參數要是可叠代對象
    p.close() # 不允許再向進程池中添加任務
    p.join() # 等待進程池中所有進程執行結束
    stop1_time = time.time() - start1_time # 結束時間
    print("進程池所需時間: %s "%stop1_time)

    # 計算多進程所需時間
    start2_time = time.time()  # 開始時間
    l = []
    for i in range(100):
        p1 = Process(target=func,args=(i,))
        p1.start()
        l.append(p)
    for j in l:
        j.join()
    stop2_time = time.time() - start2_time
    print("多進程所需時間: %s"%stop2_time)
結果:
進程池所需時間: 0.19990277290344238 
多進程所需時間: 1.7190303802490234

由上可知,進程池在執行大量並發任務時的效率。

主要方法:

map(self, func, iterable, chunksize=None):將func應用於iterable中的每個元素,收集結果在返回的列表中。
map_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):異步的map
apply_async(self,func,args=(),kwds={},callback=None,error_callback=None):異步提交任務的機制
apply(self, func, args=(), kwds={}):同步提交任務的機制
close():不允許再提交新的任務
join():等待進程池中的進程執行結束在往下執行,此方法只能在close()或teminate()之後調用

執行apply或apply_async方法時,會返回ApplyResult類的實例對象
ApplyResult類有以下方法:
obj.get():獲取進程的返回值
obj.ready():調用完成時,返回True
obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發ValueError異常
obj.wait([timeout]):等待結果變為可用

示例2:apply與apply_async方法

import time
from multiprocessing import Pool

‘‘‘
apply:同步提交任務的機制
apply_async:異步提交任務機制
‘‘‘

def func(i):
    time.sleep(1)
    i += 1
    print(i)

if __name__ == ‘__main__‘:
    p = Pool(5)
    res_l = []
    for i in range(20):
        # p.apply(func,args=(i,)) # 同步,執行完畢立即獲取到返回值
        res = p.apply_async(func,args=(i,)) # 異步,通過get獲取返回值
        res_l.append(res)
    p.close() # 不允許再提交新的任務
    p.join() # 等待進程池中的進程執行結束在往下執行
    for res in res_l:
        print(res.get()) # 使用get來獲取apply_aync的結果

10、pool的call函數

在進程池中,一個進程任務結束就會返回一個結果,主進程則調用一個函數去處理這個結果,這就是回調函數。回調函數是在主進程中完成的,不能傳參數,只能接受多進程中函數的返回值;

示例:請求網頁

在爬蟲中,使用回調比較多,爬蟲將訪問網頁、下載網頁的過程放到子進程中去做,分析數據,處理數據讓回調函數去做,因為訪問網頁與下載網頁有網絡延時,而處理數據只占用很小的時間


import requests
from multiprocessing import Pool
def get(url):
    ret = requests.get(url)
    return {‘url‘:url,
            ‘status_code‘:ret.status_code,
            ‘content‘:ret.text}
def parser(dic):
    print(dic[‘url‘],len(dic[‘content‘]))
    parse_url = "URL:%s  Size:%s"%(dic[‘url‘],len(dic[‘content‘]))
    with open(‘url.txt‘,‘a‘) as f:
        f.write(parse_url+‘\n‘)

if __name__ == ‘__main__‘:
    url_l = [
        ‘http://www.baidu.com‘,
        ‘http://www.google.com‘,
        ‘https://zh.wikipedia.org/wiki/Wikipedia:%E9%A6%96%E9%A1%B5‘,
        ‘https://www.youtube.com/?app=desktop‘,
        ‘https://www.facebook.com/‘
    ]
    p = Pool(5)
    for i in url_l:
        p.apply_async(get,args=(i,),callback=parser)

    p.close()
    p.join()

Python進程之multiprocessing模塊