1. 程式人生 > >Python 學習筆記 多程序 multiprocessing

Python 學習筆記 多程序 multiprocessing

Python 直譯器有一個全域性直譯器鎖(PIL),導致每個 Python 程序中最多同時執行一個執行緒,因此 Python 多執行緒程式並不能改善程式效能,不能發揮多核系統的優勢,可以通過這篇文章瞭解。

但是多程序程式不受此影響, Python 2.6 引入了 multiprocessing 來解決這個問題。這裡介紹 multiprocessing 模組下的程序,程序同步,程序間通訊和程序管理四個方面的內容。 這裡主要講解多程序的典型使用,multiprocessing 的 API 幾乎是完複製了 threading 的API, 因此只需花少量的時間就可以熟悉 threading 程式設計了。

Process

先來看一段程式碼

1234567891011 from multiprocessing import Process, current_processdef func(): time.sleep(1) proc = current_process() proc.name, proc.pidsub_proc = Process(target=func, args=())sub_proc.start()sub_proc.join()proc = current_process()proc.name, proc.pid

這是在主程序中建立子程序,然後啟動(start) 子程序,等待(join) 子程序執行完,再繼續執行主程序的整個的執行流程。

那麼,一個程序應該是用來做什麼的,它應該儲存一些什麼狀態,它的生命週期是什麼樣的呢?

一個程序需要處理一些不同任務,或者處理不同的物件。建立程序需要一個 function 和相關引數,引數可以是dictProcess(target=func, args=(), kwargs = {})name 可以用來標識程序。

控制子程序進入不同階段的是 start()join()is_alive()terminate()exitcode 方法。這些方法只能在建立子程序的程序中執行。

程序同步

Lock

鎖是為了確保資料一致性,比如讀寫鎖,每個程序給一個變數增加 1 ,但是如果在一個程序讀取但還沒有寫入的時候,另外的程序也同時讀取了,並寫入該值,則最後寫入的值是錯誤的,這時候就需要鎖。

123456789 def func(lock): lock.acquire() # do mysql query select update ... lock.release()lock = Lock()for i in xrange(4): proc = Process(target=func, args=(lock)) proc.start()

Lock 同時也實現了 ContextManager API, 可以結合 with 語句使用, 關於 ContextManager, 請移步 Python 學習實踐筆記 裝飾器 與 context 檢視。

Semaphore

Semaphore 和 Lock 稍有不同,Semaphore 相當於 N 把鎖,獲取其中一把就可以執行了。 訊號量的總數 N 在構造時傳入,s = Semaphore(N)。 和 Lock 一樣,如果訊號量為0,則程序堵塞,直到訊號大於0。

Pipes

Pipe 是在兩個程序之間通訊的工具,Pipe Constructor 會返回兩個端

1 conn1, conn2 = Pipe(True)

如果是全雙工的(建構函式引數為True),則雙埠都可接收發送,否則前面的埠用於接收,後面的埠用於傳送。

1234567891011 def proc1(pipe): for i in xrange(10000): pipe.send(i)def proc2(pipe): while True: print "proc2 rev:", pipe.recv()pipe = Pipe()Process(target=proc1, args=(pipe[0],)).start()Process(target=proc2, args=(pipe[1],)).start()

Pipe 的每個埠同時最多一個程序讀寫,否則資料會出各種問題

Queues

multiprocessing.Queue 與 Queue.Queue 非常相似。其 API 列表如下

  • qsize()
  • empty()
  • full()
  • put()
  • put_nowait()
  • get()
  • get_nowait()
  • close()
  • join_thread()
  • cancel_join_thread()

當 Queue 為 Queue.Full 狀態時,再 put() 會堵塞,當狀態為 Queue.Empty 時,再 get() 也是。當 put() 或 get() 設定了超時引數,而超時的時候,會丟擲異常。

Queue 主要用於多個程序產生和消費,一般使用情況如下

123456789101112 def producer(q): for i in xrange(10): q.put(i)def consumer(q): while True: print "consumer", q.get()q = Queue(40)for i in xrange(10): Process(target=producer, args=(q,)).start()Process(target=consumer, args=(q,)).start()

十個生產者程序,一個消費者程序,共用同一個佇列進行同步。

有一個簡化版本的 multiprocessing.queues.SimpleQueue, 只支援3個方法 empty(), get(), put()。

也有一個強化版本的 JoinableQueue, 新增兩個方法 task_done() 和 join()。 task_done() 是給消費者使用的,每完成佇列中的一個任務,呼叫一次該方法。當所有的 tasks 都完成之後,交給呼叫 join() 的程序執行。

123456789101112131415 def consumer(q): while True: print "consumer", q.get() q.task_done()jobs = JoinableQueue()for i in xrange(10): jobs.put(i)for i in xrange(10): p = Process(target=consumer, args=(jobs,)) p.daemon = True p.start()jobs.join()

這個 join 函式等待 JoinableQueue 為空的時候,等待就結束,外面的程序可以繼續執行了,但是那10個程序幹嘛去了呢,他們還在等待呀,上面是設定了 p.daemon = True, 子程序才隨著主程序結束的,如果沒有設定,它們還是會一直等待的呢。

Lock、Pipe、Queue 和 Pipe 需要注意的是:儘量避免使用 Process.terminate 來終止程式,否則將會導致很多問題, 詳情請移步python 官方文件檢視。

程序間資料共享

前一節中, Pipe、Queue 都有一定資料共享的功能,但是他們會堵塞程序, 這裡介紹的兩種資料共享方式都不會堵塞程序, 而且都是多程序安全的。

共享記憶體

共享記憶體有兩個結構,一個是 Value, 一個是 Array,這兩個結構內部都實現了鎖機制,因此是多程序安全的。 用法如下:

1234567891011 def func(n, a): n.value = 50 for i in range(len(a)): a[i] += 10num = Value('d', 0.0)ints= Array('i', range(10))p = Process(target=func, args=(num, ints))p.start()p.join()

Value 和 Array 都需要設定其中存放值的型別,d 是 double 型別,i 是 int 型別,具體的對應關係在Python 標準庫的 sharedctypes 模組中檢視。

服務程序 Manager

上面的共享記憶體支援兩種結構 Value 和 Array, 這些值在主程序中管理,很分散。 Python 中還有一統天下,無所不能的 Server process,專門用來做資料共享。 其支援的型別非常多,比如list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value 和 Array 用法如下:

123456789101112131415 from multiprocessing import Process, Managerdef func(dct, lst): dct[88] = 88 lst.reverse()manager = Manager()dct = manager.dict()lst = manager.list(range(5,10))p = Process(target=func, args=(dct, lst))p.start()p.join()print dct, '|', lstOut: {88: 88} | [9, 8, 7, 6, 5]

一個 Manager 物件是一個服務程序,推薦多程序程式中,資料共享就用一個 manager 管理。

程序管理

如果有50個任務要執行, 但是 CPU 只有4核, 你可以建立50個程序來做這個事情。但是大可不必,徒增管理開銷。如果你只想建立4個程序,讓他們輪流替你完成任務,不用自己去管理具體的程序的建立銷燬,那 Pool 是非常有用的。

Pool 是程序池,程序池能夠管理一定的程序,當有空閒程序時,則利用空閒程序完成任務,直到所有任務完成為止,用法如下

12345 def func(x): return x*xpool = Pool(processes=4)print pool.map(func, range(8))

Pool 程序池建立4個程序,不管有沒有任務,都一直在程序池中等候,等到有資料的時候就開始執行。
Pool 的 API 列表如下:

  • apply(func[, args[, kwds]])
  • apply_async(func[, args[, kwds[, callback]]])
  • map(func, iterable[, chunksize])
  • map_async(func, iterable[, chunksize[, callback]])
  • imap(func, iterable[, chunksize])
  • imap_unordered(func, iterable[, chunksize])
  • close()
  • terminate()
  • join()

非同步執行

apply_async 和 map_async 執行之後立即返回,然後非同步返回結果。 使用方法如下

123456789101112 def func(x): return x*xdef callback(x): print x, 'in callback'pool = Pool(processes=4)result = pool.map_async(func, range(8), 8, callback)print result.get(), 'in main'Out:[0, 1, 4, 9, 16, 25, 36, 49] in callback[0, 1, 4, 9, 16, 25, 36, 49] in main

有兩個值得提到的,一個是 callback,另外一個是 multiprocessing.pool.AsyncResult。 callback 是在結果返回之前,呼叫的一個函式,這個函式必須只有一個引數,它會首先接收到結果。callback 不能有耗時操作,因為它會阻塞主執行緒。

AsyncResult 是獲取結果的物件,其 API 如下

  • get([timeout])
  • wait([timeout])
  • ready()
  • successful()

如果設定了 timeout 時間,超時會丟擲 multiprocessing.TimeoutError 異常。wait 是等待執行完成。 ready 測試是否已經完成,successful 是在確定已經 ready 的情況下,如果執行中沒有丟擲異常,則成功,如果沒有ready 就呼叫該函式,會得到一個 AssertionError 異常。

Pool 管理

這裡不再繼續講 map 的各種變體了,因為從上面的 API 一看便知。

然後我們來看看 Pool 的執行流程,有三個階段。第一、一個程序池接收很多工,然後分開執行任務;第二、不再接收任務了;第三、等所有任務完成了,回家,不幹了。

這就是上面的方法,close 停止接收新的任務,如果還有任務來,就會丟擲異常。 join 是等待所有任務完成。 join 必須要在 close 之後呼叫,否則會丟擲異常。terminate 非正常終止,記憶體不夠用時,垃圾回收器呼叫的就是這個方法。