1. 程式人生 > >Python-multiprocessing程序管理

Python-multiprocessing程序管理

multiprocessing模組包含一個API,它基於threading API可以在多個程序間劃分工作。有些情況下,multiprocessing可以作為臨時替換,取代threading來利用多個CPU核心,避免全域性直譯器鎖帶來的效能瓶頸。

1. multiprocessing基礎

建立程序(MP.Process)

要建立第二個程序,最簡單的方法是例項化一個Process物件,並呼叫start()讓其工作。
import multiprocessing

def worker():
    print 'Worker'
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target = worker)
        jobs.append(p)
        p.start()

執行結果將會列印5次‘Worker',不過不清楚孰先孰後,這取決於具體的執行順序,因為每個程序都在競爭訪問輸出流。更有用的做法是,建立一個程序時可以提供引數。與threading不同,要向一個multiprocessing Process傳遞引數,這個引數必須能夠使用pickle序列化。下面的例子向各個工作程序傳遞一個要列印的數。
import multiprocessing
import time

def worker(num):
    print "Worker", num
    time.sleep(0.1)
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target = worker, args = (i,))
        jobs.append(p)
        p.start()
Worker 0 Worker 2 Worker 3 Worker 4 Worker 1

可匯入的目標函式

threading與multiprocessing例子之間有一個區別,multiprocessing例子中對__main__使用了額外的保護。對於新程序的啟動方式,要求子程序能夠匯入包含目標函式的指令碼。可以講應用的主要部分包裝在一個__main__檢查中,確保模組匯入時不會在各個子程序中遞迴地執行。另外一個方法是從一個單獨的指令碼中匯入目標函式,下面例子中程序的工作函式是simple.py中worker函式:
import multiprocessing
import simple

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(
                target = simple.worker,
                )
        jobs.append(p)
        p.start()

確定當前程序(MP.current_process().name)

每個Process例項都有一個名稱,其預設值可以在建立程序時改變。給程序命名對於跟蹤程序很有用,特別是在當前應用中有多種型別的程序同時執行時。
import multiprocessing
import time

def worker():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    time.sleep(2)
    print name, 'Exiting'

def my_device():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    time.sleep(3)
    print name, 'Exiting'

if __name__ == '__main__':
    service = multiprocessing.Process(name = 'my_service',target = my_device)
    worker_1 = multiprocessing.Process(name = 'worker 1', target = worker)
    worker_2 = multiprocessing.Process(target = worker) #default name, or set name by worker_2.name = 'worker_2'

    worker_1.start()
    worker_2.start()
    service.start()
程序名稱為Process-3的行對應未命名的程序worker_2。 worker 1 Starting worker 1 Exiting Process-3 Starting Process-3 Exiting my_service Starting my_service Exiting

守護程序(P.daemon=True)

要標誌一個程序為守護程序,可以將其daemon屬性設定為True,預設情況下不為守護程序。
service = multiprocessing.Process(name = 'my_service',target = my_device)
service.daemon = True
主程序可以使用join()等待守護程序退出,也可以給join傳入一個超時引數(浮點數,單位為秒),即使程序在這個超時範圍內沒有完成,join()也會返回,此時daemon程序會繼續執行,不會終結。情況和threading一樣。

終止程序(P.terminate())

儘管最好使用“毒丸”(posion pill)方法向程序發出訊號讓其退出,但是如果一個程序看起來已經掛起或者陷入死鎖,則需要能夠強制性的結束。對一個程序呼叫terminate()會結束子程序。
import multiprocessing
import time

def slow_worker():
    print 'Starting worker'
    time.sleep(10)
    print 'finished worker'

if __name__ == '__main__':
    p = multiprocessing.Process(target = slow_worker)
    print "Befor:", p, p.is_alive()
    p.start()
    print 'During:', p, p.is_alive()
    p.terminate()
    print 'Terminate:', p, p.is_alive()
    p.join()
    print 'Joined:', p, p.is_alive()
Befor: <Process(Process-1, initial)> False During: <Process(Process-1, started)> True Terminate: <Process(Process-1, started)> True Joined: <Process(Process-1, stopped[SIGTERM])> False 注意:終止程序後要使用join()退出程序,使程序管理程式碼有時間更新物件的狀態,以反映程序已經終止。

程序的退出狀態(P.exitcode)

程序退出時聲稱的狀態碼可以通過exitcode屬性訪問。狀態碼的範圍是:
  • == 0 : 未生成任何錯誤
  • >0     : 程序有一個錯誤,並以該錯誤碼退出
  • <0     :程序由一個-1*exitcode訊號結束

2. 日誌

除錯併發問題時,能夠訪問multiprocessing提供的物件的內部狀態很有用。可以使用一個方便的模組級函式來啟動日誌記錄,名為log_to_stderr()。它使用logging建立一個日誌記錄器物件,並增加一個處理程式,使得日誌訊息將傳送到標準錯誤通道,日誌預設格式為 '[%(levelname)s/%(processName)s] %(message)s’
import multiprocessing
import logging
import sys

def worker():
    print "Doing some work"
    sys.stdout.flush()

if __name__ == '__main__':
    multiprocessing.log_to_stderr(logging.DEBUG)
    p = multiprocessing.Process(target = worker)
    p.start()
    p.join()
預設情況下,日誌級別設定為NOTSET,即不產生任何訊息。通過傳入一個不同的日誌級別,可以初始化日誌記錄器,制定所需的詳細程度。 [INFO/Process-1] child process calling self.run() Doing some work [INFO/Process-1] process shutting down [DEBUG/Process-1] running all "atexit" finalizers with priority >= 0 [DEBUG/Process-1] running the remaining "atexit" finalizers [INFO/Process-1] process exiting with exitcode 0 [INFO/MainProcess] process shutting down [DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0 [DEBUG/MainProcess] running the remaining "atexit" finalizers 要直接處理日誌記錄器(修改日誌級別或新增處理程式),可以使用get_logger()
import multiprocessing
import logging
import sys

def worker():
    print 'Doing some work'
    sys.stdout.flush()

if __name__ == '__main__':
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    p = multiprocessing.Process(target = worker)
    p.start()
    p.join()
[INFO/Process-1] child process calling self.run() Doing some work [INFO/Process-1] process shutting down [INFO/Process-1] process exiting with exitcode 0 [INFO/MainProcess] process shutting down

3. 派生程序

名字聽上去挺唬人的,其實就是把multiprocessing.Process作為基類來建立新的class。
import multiprocessing

class Worker(multiprocessing.Process):
def run(self):
	print "In %s' % self.name
	return

if __name__ == '__main__':
	jobs = []
	for i in range(5):
		p = Worker()
		jobs.append(p)
		p.start()
	for j in jobs:
		j.join()
In Worker-1 In Worker-2 In Worker-3 In Worker-5 In Worker-4
派生程序應當覆蓋run()來完成工作

4. 訊息傳遞

類似於執行緒,對於多個程序,一種常用的模式是將一個工作劃分為多個工作程序並行地執行。要想有效地使用多個程序,通常它們之間有某種通訊,這樣才能有效分解工作,並完成結果的彙總。利用multiprocessing完成程序通訊的一種簡單方法是使用一個Queue來傳遞訊息。能夠用pickle序列化的任何物件都可以通過Queue傳遞。
import multiprocessing

class MyFancyClass(object):

    def __init__(self, name):
        self.name = name

    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print 'Doing something fancy in %s for %s' % (proc_name, self.name)

def worker(q):
    obj = q.get() # get a object from the queue
    obj.do_something() # doing the task of the object

if __name__ == '__main__':
    queue = multiprocessing.Queue() # the queue
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    queue.put(MyFancyClass('Fancy Dan')) # put one object to the queue

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()
這個例子只是像一個工作程序傳遞一個訊息,然後主程序等待這個工作程序完成。結果為: Doing something fancy in Process-1 for Fancy Dan

5. 執行緒池

Multiprocessing同樣提供了執行緒池,可設定執行緒池大小等。
from multiprocessing import Pool

def my_func(x):
    print x**2

pool = Pool(processes=5)
target = range(10)
pool.map(my_func, target)
上述程式碼定義了一個5個執行緒大小的執行緒池,用以計算每個數的平方。