1. 程式人生 > >python中的Queue與多程序(multiprocessing)

python中的Queue與多程序(multiprocessing)

最近接觸一個專案,要在多個虛擬機器中執行任務,參考別人之前專案的程式碼,採用了多程序來處理,於是上網查了查python中的多程序

一、先說說Queue(佇列物件)

Queue是python中的標準庫,可以直接import 引用,之前學習的時候有聽過著名的“先吃先拉”與“後吃先吐”,其實就是這裡說的佇列,佇列的構造的時候可以定義它的容量,別吃撐了,吃多了,就會報錯,構造的時候不寫或者寫個小於1的數則表示無限多

import Queue

q = Queue.Queue(10)

向佇列中放值(put)

q.put(‘yang’)

q.put(4)

q.put([‘yan’,’xing’])

在佇列中取值get()

預設的佇列是先進先出的

>>> q.get() 
'yang' 
>>> q.get() 

>>> q.get() 
['yan', 'xing'] 
>>>

當一個佇列為空的時候如果再用get取則會堵塞,所以取佇列的時候一般是用到

get_nowait()方法,這種方法在向一個空佇列取值的時候會拋一個Empty異常

所以更常用的方法是先判斷一個佇列是否為空,如果不為空則取值

佇列中常用的方法

Queue.qsize() 返回佇列的大小  
Queue.empty() 如果佇列為空,返回True,反之False  
Queue.full() 如果佇列滿了,返回True,反之False 
Queue.get([block[, timeout]]) 獲取佇列,timeout等待時間  
Queue.get_nowait() 相當Queue.get(False) 
非阻塞 Queue.put(item) 寫入佇列,timeout等待時間  
Queue.put_nowait(item) 相當Queue.put(item, False)

二、multiprocessing中使用子程序概念

from multiprocessing import Process

可以通過Process來構造一個子程序

p = Process(target=fun,args=(args))

再通過p.start()來啟動子程序

再通過p.join()方法來使得子程序執行結束後再執行父程序

from multiprocessing import Process
import os

# 子程序要執行的程式碼
defrun_proc(name):
    print 'Run child process %s (%s)...' % (name, os.getpid())

if
__name__=='__main__': print 'Parent process %s.' % os.getpid() p = Process(target=run_proc, args=('test',)) print 'Process will start.' p.start() p.join() print 'Process end.'

image

三、在multiprocessing中使用pool

如果需要多個子程序時可以考慮使用程序池(pool)來管理

from multiprocessing import Pool
from multiprocessing import Pool
import os, time

deflong_time_task(name):
    print 'Run task %s (%s)...' % (name, os.getpid())
    start = time.time()
    time.sleep(3)
    end = time.time()
    print 'Task %s runs %0.2f seconds.' % (name, (end - start))

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print 'Waiting for all subprocesses done...'
    p.close()
    p.join()
    print 'All subprocesses done.'

pool建立子程序的方法與Process不同,是通過

p.apply_async(func,args=(args))實現,一個池子裡能同時執行的任務是取決你電腦的cpu數量,如我的電腦現在是有4個cpu,那會子程序task0,task1,task2,task3可以同時啟動,task4則在之前的一個某個程序結束後才開始

image

上面的程式執行後的結果其實是按照上圖中1,2,3分開進行的,先列印1,3秒後列印2,再3秒後列印3

程式碼中的p.close()是關掉程序池子,是不再向裡面新增程序了,對Pool物件呼叫join()方法會等待所有子程序執行完畢,呼叫join()之前必須先呼叫close(),呼叫close()之後就不能繼續新增新的Process了。

當時也可以是例項pool的時候給它定義一個程序的多少

如果上面的程式碼中p=Pool(5)那麼所有的子程序就可以同時進行

三、多個子程序間的通訊

多個子程序間的通訊就要採用第一步中說到的Queue,比如有以下的需求,一個子程序向佇列中寫資料,另外一個程序從佇列中取資料,

#coding:gbk

from multiprocessing import Process, Queue
import os, time, random

# 寫資料程序執行的程式碼:
defwrite(q):
    for value in ['A', 'B', 'C']:
        print 'Put %s to queue...' % value
        q.put(value)
        time.sleep(random.random())

# 讀資料程序執行的程式碼:
defread(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print 'Get %s from queue.' % value
            time.sleep(random.random())
        else:
            break

if __name__=='__main__':
    # 父程序建立Queue,並傳給各個子程序:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啟動子程序pw,寫入:
    pw.start()    
    # 等待pw結束:
    pw.join()
    # 啟動子程序pr,讀取:
    pr.start()
    pr.join()
    # pr程序裡是死迴圈,無法等待其結束,只能強行終止:
    print
    print '所有資料都寫入並且讀完'

四、關於上面程式碼的幾個有趣的問題

if __name__=='__main__':    
    # 父程序建立Queue,並傳給各個子程序:
    q = Queue()
    p = Pool()
    pw = p.apply_async(write,args=(q,))    
    pr = p.apply_async(read,args=(q,))
    p.close()
    p.join()
    
    print
    print '所有資料都寫入並且讀完'

如果main函式寫成上面的樣本,本來我想要的是將會得到一個佇列,將其作為引數傳入程序池子裡的每個子程序,但是卻得到

RuntimeError: Queue objects should only be shared between processes through inheritance

的錯誤,查了下,大意是佇列物件不能在父程序與子程序間通訊,這個如果想要使用程序池中使用佇列則要使用multiprocess的Manager類

if __name__=='__main__':
    manager = multiprocessing.Manager()
    # 父程序建立Queue,並傳給各個子程序:
    q = manager.Queue()
    p = Pool()
    pw = p.apply_async(write,args=(q,))
    time.sleep(0.5)
    pr = p.apply_async(read,args=(q,))
    p.close()
    p.join()
    
    print
    print '所有資料都寫入並且讀完'

這樣這個佇列物件就可以在父程序與子程序間通訊,不用池則不需要Manager,以後再擴充套件multiprocess中的Manager類吧

關於鎖的應用,在不同程式間如果有同時對同一個佇列操作的時候,為了避免錯誤,可以在某個函式操作佇列的時候給它加把鎖,這樣在同一個時間內則只能有一個子程序對佇列進行操作,鎖也要在manager物件中的鎖

#coding:gbk

from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random

# 寫資料程序執行的程式碼:
defwrite(q,lock):
    lock.acquire() #加上鎖
    for value in ['A', 'B', 'C']:
        print 'Put %s to queue...' % value        
        q.put(value)        
    lock.release() #釋放鎖  

# 讀資料程序執行的程式碼:
defread(q):
    while True:
        if not q.empty():
            value = q.get(False)
            print 'Get %s from queue.' % value
            time.sleep(random.random())
        else:
            break

if __name__=='__main__':
    manager = multiprocessing.Manager()
    # 父程序建立Queue,並傳給各個子程序:
    q = manager.Queue()
    lock = manager.Lock() #初始化一把鎖
    p = Pool()
    pw = p.apply_async(write,args=(q,lock))    
    pr = p.apply_async(read,args=(q,))
    p.close()
    p.join()
    
    print
    print '所有資料都寫入並且讀完'

參考文章: