1. 程式人生 > >python(二):使用multiprocessing中的常見問題

python(二):使用multiprocessing中的常見問題

簡介
在python的直譯器中,CPython是應用範圍最廣的一種,其具有豐富的擴充套件包,方便了開發者的使用。當然CPython也不是完美的,由於全域性解釋鎖(GIL)的存在,python的多執行緒可以近似看作單執行緒。為此,開發者推出了multiprocessing,這裡介紹一下使用中的常見問題。

環境
>>> import sys
>>> print(sys.version)
3.6.0 |Anaconda 4.3.1 (64-bit)| (default, Dec 23 2016, 12:22:00) \n[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]
1
2
3
共享變數
任務能否切分成多個子任務是判斷一個任務能否使用多程序或多執行緒的重要標準。在任務切分時,不可避免的需要資料通訊,而共享變數是資料通訊的重要方式。在multiprocess中,共享變數有兩種方式:Shared memory和Server process。

share memory
multiprocess通過Array和Value來共享記憶體

from multiprocessing import Array, Value
num = 10
elements = Array("i", [2 * i + 1 for i in range(num)])
val = Value('d', 0.0)
1
2
3
4
然後就可以將資料同步到Process中。這裡舉一個例子,即將elements翻倍,val增加1,首先定義函式

def func(elements, val):
for i, ele in enumerate(elements):
elements[i] = ele * 2
val.value += 1
1
2
3
4
再定義Process

from multiprocessing import Process
p = Process(target=func, args=(elements, val, ))
p.start() # 執行Process
p.join() # 等待Process執行結束
1
2
3
4
最終執行結果

=====Process執行前=======
[elements]:1 3 5 7 9 11 13 15 17 19
[Value]:0.0
=====Process執行後=======
[elements]:2 6 10 14 18 22 26 30 34 38
[Value]:1.0
1
2
3
4
5
6
在某些特定的場景下要共享string型別,方式如下:

from ctypes import c_char_p
str_val = Value(c_char_p, b"Hello World")
1
2
關於Share Memory支援的更多型別,可以檢視module-multiprocessing.sharedctypes。

Server process
此種方式通過建立一個Server process來管理python object,然後其他process通過代理來訪問這些python object。相較於share memory,它支援任意型別的共享,包括:list、dict、Namespace等。這裡以dict和list舉一個例子:

from multiprocessing import Process, Manager

def func(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()

if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
print("=====Process執行前=======")
print(d)
print(l)

p = Process(target=func, args=(d, l))
p.start()
p.join()

print("=====Process執行後=======")
print(d)
print(l)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
執行結果如下

=====Process執行前=======
{}
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
=====Process執行後=======
{1: '1', '2': 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
1
2
3
4
5
6
程序間通訊
眾所周知,併發程式設計中應該儘量避免共享變數,多程序更是如此。在這種情況下,多程序間的通訊就要用到Queue和Pipe。

Queue
Queue是一種執行緒、程序安全的先進先出的佇列。使用中,首先定義Queue

from multiprocessing import Queue
queue = Queue()
1
2
然後將需要處理的資料放入Queue中

elements = [i for i in range(100)]
for i in elements:
queue.put(i)
1
2
3
然後建立子程序process

from multiprocessing import Process
process = Process(target=func, args=(queue, ))
1
2
其中func是子程序處理資料的邏輯。

from queue import Empty
def func(queue):
buff = []
while True:
try:
ele = queue.get(block=True, timeout=1)
buff.append(str(ele))
except Empty:
print(" ".join(buff))
print("Queue has been empty.....")
break
1
2
3
4
5
6
7
8
9
10
11
使用queue.get時,若Queue中沒有資料,則會丟擲queue.Empty錯誤。值得注意的是,在使用queue.get()時一定要設定block=True和timeout,否則它會一直等待,直到queue中放入資料(剛開始用的時候,我一直奇怪為什麼程式一直處在等待狀態)。執行結果

=====單程序======
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
Queue has been empty.....
1
2
3
Pipe
Pipe是一種管道,一端輸入,一端輸出。在multiprocess中,可以通過Pipe()函式來定義,返回send和recv的connection。使用中,先定義

from multiprocessing import Pipe
parent_conn, child_conn = Pipe()
1
2
然後一端放入資料,另一端就可以接受資料了

from multiprocessing import Process
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()
1
2
3
4
5
6
7
8
輸出結果

[42, None, 'hello']
1
另外,值得注意的是,若兩個或更多程序同時從管道一端讀或寫資料,會導致管道中的資料corrupt。為了直觀的理解這種情況,這裡舉一個例子,即在主程序將資料放入管道,在子程序從管道中讀出資料,並列印結果。區別之處在於,子程序的數量。首先將資料放入管道:

def func(conn):
a = conn.recv()
print(a)
parent, child = Pipe()
child.send("Hello world...")
1
2
3
4
5
然後開啟子程序

print("======單程序========")
p = Process(target=func, args=(parent, ))
p.start()
p.join()
print("======多程序========")
num_process = 2
ps = [Process(target=func, args=(parent, )) for _ in range(num_process)]
for p in ps:
p.start()
for p in ps:
p.join()
1
2
3
4
5
6
7
8
9
10
11
輸出結果

 

多程序並未按照預想的輸出兩個Hello World,而是處於死鎖的狀態。

例子
關於Queue和Pipe的用法講了這麼多,下面做一個小練習,內容是:利用多執行緒從檔案中讀取資料,處理後將資料儲存到另外一個檔案中。具體方法如下:
1. 開闢一個子程序,從檔案中讀取資料,並將資料存入Queue中
2. 開闢多個子程序,從Queue中讀取資料,處理資料,並將資料放入管道一端(注意加鎖)
3. 開闢一個子程序,從管道另一端獲取資料,並將資料寫入檔案中

0.導包

from multiprocessing import Process, Array, Queue, Value, Pipe, Lock
from queue import Empty
import sys
1
2
3
1.讀取資料

def read_file(fin, work_queue):
for line in fin:
i = int(line.strip("\n"))
work_queue.put_nowait(i)
1
2
3
4
其中work_queue用於連通“讀資料的程序”和“處理資料的程序”。

2.處理資料

def worker_func(work_queue, conn, lock, index):
while True:
try:
ele = work_queue.get(block=True, timeout=0.5) + 1
with lock:
conn.send(ele)
except Empty:
print("Process-{} finish...".format(index))
conn.send(-1)
break
1
2
3
4
5
6
7
8
9
10
從佇列中讀取資料,直到佇列中的資料全被取走。當Queue中不存在資料時,向queue放入終止符(-1),告訴後面的程序,前面的人任務已經完成。

3.寫資料

def write_file(conn, fout, num_workers):
record = 0
while True:
val = conn.recv()
if val == -1:
record += 1
else:
print(val, file=fout)
fout.flush()
if record == num_workers:
break
1
2
3
4
5
6
7
8
9
10
11
當寫程序收到特定數量終止符(-1)時,寫程序就終止了。

4.執行

path_file_read = "./raw_data.txt"
path_file_write = "./data.txt"

with open(path_file_read) as fin, \
open(path_file_write, "w") as fout:
queue = Queue()
parent, child = Pipe()
lock = Lock()
read_Process = Process(target=read_file, args=(fin, queue, ))
worker_Process = [Process(target=worker_func, args=(queue, parent, lock, index, ))
for index in range(3)]
write_Process = Process(
target=write_file, args=(child, fout, len(worker_Process), ))

read_Process.start()
for p in worker_Process:
p.start()
write_Process.start()
print("read....")
read_Process.join()
print("worker....")
for p in worker_Process:
p.join()
print("write....")
write_Process.join()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
輸入/輸出
列印錯行
在使用多程序中,你會發現列印的結果發生錯行。這是因為python的print函式是執行緒不安全的,從而導致錯行。解決方法也很簡單,給print加一把鎖就好了,方式如下

from multiprocessing import Process, Lock

def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()

if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
1
2
3
4
5
6
7
8
9
10
11
12
13
無法列印日誌資訊
剛開始用多程序時,經常會出現日誌資訊無法列印的情況。其實問題很簡單。在多程序中,列印內容會存在快取中,直到達到一定數量才會列印。解決這個問題,只需要加上

import sys
sys.stdout.flush()
sys.stderr.flush()
1
2
3
例如上面的例子,應該寫成

import sys

def f(l, i):
l.acquire()
try:
print('hello world', i)
sys.stdout.flush() # 加入flush
finally:
l.release()
1
2
3
4
5
6
7
8
9
總結
以上就是我在使用multiprocessing遇到的問題。
---------------------
作者:cptu
來源:CSDN
原文:https://blog.csdn.net/AckClinkz/article/details/78457045
版權宣告:本文為博主原創文章,轉載請附上博文連結!