1. 程式人生 > >Python多程序Multiprocessing學習筆記

Python多程序Multiprocessing學習筆記

參考:https://morvanzhou.github.io/tutorials/python-basic/multiprocessing/1-why/

1 什麼是Multiprocessing

多核計算、多程序計算

多程序 Multiprocessing 和多執行緒 threading 類似, 他們都是在 python 中用來並行運算的. 不過既然有了 threading, 為什麼 Python 還要出一個 multiprocessing 呢? 原因很簡單, 就是用來彌補 threading 的一些劣勢, 比如在 threading 教程中提到的GIL.

2 新增程序Process

與建立執行緒的方法非常類似

p1 = multiprocessing.Process(target = job, args = (1,2))

p1.start()

p1.join()

3 儲存程序輸出Queue

Queue的功能是將每個核或執行緒的運算結果放在隊裡中, 等到每個執行緒或核執行完畢後再從佇列中取出結果, 繼續載入運算。原因很簡單, 多執行緒呼叫的函式不能有返回值, 所以使用Queue儲存多個執行緒運算的結果

把結果放在 Queue 裡
(1)定義一個被多執行緒呼叫的函式,q 就像一個佇列,用來儲存每次函式執行的結果
#該函式沒有返回值!!!
def job(q):
    res=0
    for i in range(1000):
        res+=i+i**2+i**3
    q.put(res)    #queue
(2)主函式
定義一個多執行緒佇列,用來儲存結果
if __name__=='__main__':
    q = mp.Queue()
定義兩個執行緒函式,用來處理同一個任務, args 的引數只要一個值的時候,引數後面需要加一個逗號,表示args是可迭代的,後面可能還有別的引數,不加逗號會出錯
p1 = mp.Process(target=job,args=(q,))
p2 = mp.Process(target=job,args=(q,))
分別啟動、連線兩個執行緒
p1.start()
p2.start()
p1.join()
p2.join()
上面是分兩批處理的,所以這裡分兩批輸出,將結果分別儲存
res1 = q.get()
res2 = q.get()
列印最後的運算結果
print(res1+res2)
(3)完整的程式碼
import multiprocessing as mp
def job(q):
    res=0
    for i in range(1000):
        res+=i+i**2+i**3
    q.put(res)    #queue
if __name__=='__main__':
    q = mp.Queue()
    p1 = mp.Process(target=job,args=(q,)) #注意args如果只有一個引數,要在後邊加上逗號,
    p2 = mp.Process(target=job,args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print(res1+res2)
執行的時候還是要在terminal中,最後執行結果為
499667166000

4 效率對比threading&multiprocessing

上篇講了多程序/多核的運算,這次我們來對比下多程序,多執行緒和什麼都不做時的消耗時間,看看哪種方式更有效率。
(1)建立多程序 multiprocessing
和上節一樣,首先import multiprocessing並定義要實現的job(),同時為了容易比較,我們將計算的次數增加到1000000
import multiprocessing as mp
def job(q):
    res = 0
    for i in range(1000000):
        res += i + i**2 + i**3
    q.put(res) # queue
因為多程序是多核運算,所以我們將上節的多程序程式碼命名為multicore()
def multicore():
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print('multicore:',res1 + res2)
(2)建立多執行緒 multithread
接下來建立多執行緒程式,建立多執行緒和多程序有很多相似的地方。首先import threading然後定義multithread()完成同樣的任務
import threading as td
def multithread():
    q = mp.Queue() # thread可放入process同樣的queue中
    t1 = td.Thread(target=job, args=(q,))
    t2 = td.Thread(target=job, args=(q,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = q.get()
    res2 = q.get()
    print('multithread:', res1 + res2)
(3)建立普通函式
最後我們定義最普通的函式。注意,在上面例子中我們建立了兩個程序或執行緒,均對job()進行了兩次運算,所以在normal()中我們也讓它迴圈兩次
def normal():
    res = 0
    for _ in range(2):
        for i in range(1000000):
            res += i + i**2 + i**3
    print('normal:', res)
(4)執行時間
最後,為了對比各函式執行時間,我們需要import time, 然後依次執行定義好函式:
import time
if __name__ == '__main__':
    st = time.time()
    normal()
    st1 = time.time()
    print('normal time:', st1 - st)
    multithread()
    st2 = time.time()
    print('multithread time:', st2 - st1)
    multicore()
    print('multicore time:', time.time() - st2)
大功告成,下面我們來看下實際執行對比。
(5)結果對比
"""
# range(1000000)
('normal:', 499999666667166666000000L)
('normal time:', 1.1306169033050537)
('thread:', 499999666667166666000000L)
('multithread time:', 1.3054230213165283)
('multicore:', 499999666667166666000000L)
('multicore time:', 0.646507978439331)
"""
普通/多執行緒/多程序的執行時間分別是1.13,1.3和0.64秒。 我們發現多核/多程序最快,說明在同時間運行了多個任務。 而多執行緒的執行時間居然比什麼都不做的程式還要慢一點,說明多執行緒還是有一定的短板的。 
我們將運算次數加十倍,再來看看三種方法的執行時間:
"""
# range(10000000)
('normal:', 4999999666666716666660000000L)
('normal time:', 40.041773080825806)
('thread:', 4999999666666716666660000000L)
('multithread time:', 41.777158975601196)
('multicore:', 4999999666666716666660000000L)
('multicore time:', 22.4337899684906)
"""
這次執行時間依然是 多程序 < 普通 < 多執行緒

,由此我們可以清晰地看出哪種方法更有效率。

5 程序池Pool

這次我們講程序池Pool。 程序池就是我們將所要執行的東西,放到池子裡,Python會自行解決多程序的問題
首先import multiprocessing和定義job()
import multiprocessing as mp
def job(x):
    return x*x
(1)程序池 Pool() 和 map()
然後我們定義一個Pool
pool = mp.Pool()
有了池子之後,就可以讓池子對應某一個函式,我們向池子裡丟資料,池子就會返回函式返回的值。 Pool和之前的Process的不同點是丟向Pool的函式有返回值,而Process的沒有返回值。
接下來用map()獲取結果,在map()中需要放入函式和需要迭代運算的值,然後它會自動分配給CPU核,返回結果
res = pool.map(job, range(10))
讓我們來執行一下
def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    if __name__ == '__main__':
    multicore()
執行結果:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
自定義核數量
我們怎麼知道Pool是否真的呼叫了多個核呢?我們可以把迭代次數增大些,然後開啟CPU負載看下CPU執行情況
開啟CPU負載(Mac):活動監視器 > CPU > CPU負載(單擊一下即可)
Pool預設大小是CPU的核數,我們也可以通過在Pool中傳入processes引數即可自定義需要的核數量,
def multicore():
    pool = mp.Pool(processes=3) # 定義CPU核數量為3
    res = pool.map(job, range(10))
    print(res)
apply_async()
Pool除了map()外,還有可以返回結果的方式,那就是apply_async().
apply_async()中只能傳遞一個值,它只會放入一個核進行運算,但是傳入值時要注意是可迭代的,所以在傳入值後需要加逗號, 同時需要用get()方法獲取返回值
def multicore():
    pool = mp.Pool() 
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get獲得結果
    print(res.get())
執行結果;
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]  # map()
4 # apply_async()
用 apply_async() 輸出多個結果
那麼如何用apply_async()輸出多個迭代呢?
我們在apply_async()中多傳入幾個值試試
res = pool.apply_async(job, (2,3,4,))
結果會報錯:
TypeError: job() takes exactly 1 argument (3 given)
即apply_async()只能輸入一組引數。
在此我們將apply_async() 放入迭代器中,定義一個新的multi_res
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
同樣在取出值時需要一個一個取出來
print([res.get() for res in multi_res])
合併程式碼
def multicore():
    pool = mp.Pool() 
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get獲得結果
    print(res.get())
    # 迭代器,i=0時apply一次,i=1時apply一次等等
    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
    # 從迭代器中取出
    print([res.get() for res in multi_res])
執行結果
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # map()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # multi_res
可以看出在apply用迭代器的得到的結果和用map得到的結果是一樣的
總結
Pool預設呼叫是CPU的核數,傳入processes引數可自定義CPU核數
map() 放入迭代引數,返回多個結果
apply_async()只能放入一組引數,並返回一個結果,如果想得到map()的效果需要通過迭代

6 共享記憶體

這節我們學習如何定義共享記憶體。只有用共享記憶體才能讓CPU之間有交流。
Shared Value
我們可以通過使用Value資料儲存在一個共享的記憶體表中。
import multiprocessing as mp
value1 = mp.Value('i', 0) 
value2 = mp.Value('d', 3.14)
其中d和i引數用來設定資料型別的,d表示一個雙精浮點型別,i表示一個帶符號的整型。更多的形式請檢視本頁最後的表.
Shared Array
在Python的mutiprocessing中,有還有一個Array類,可以和共享記憶體互動,來實現在程序之間共享資料。
array = mp.Array('i', [1, 2, 3, 4])
這裡的Array和numpy中的不同,它只能是一維的,不能是多維的。同樣和Value 一樣,需要定義資料形式,否則會報錯。 我們會在後一節舉例說明這兩種的使用方法.
錯誤形式
array = mp.Array('i', [[1, 2], [3, 4]]) # 2維list
"""
TypeError: an integer is required
"""
參考資料形式
各引數代表的資料型別
| Type code | C Type             | Python Type       | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'`     | signed char        | int               | 1                     |
| `'B'`     | unsigned char      | int               | 1                     |
| `'u'`     | Py_UNICODE         | Unicode character | 2                     |
| `'h'`     | signed short       | int               | 2                     |
| `'H'`     | unsigned short     | int               | 2                     |
| `'i'`     | signed int         | int               | 2                     |
| `'I'`     | unsigned int       | int               | 2                     |
| `'l'`     | signed long        | int               | 4                     |
| `'L'`     | unsigned long      | int               | 4                     |
| `'q'`     | signed long long   | int               | 8                     |
| `'Q'`     | unsigned long long | int               | 8                     |
| `'f'`     | float              | float             | 4                     |
| `'d'`     | double             | float             | 8                     |
(來源:https://docs.python.org/3/library/array.html)

7 程序鎖Lock

這次我們講程序鎖的運用。
(1)不加程序鎖
讓我們看看沒有加程序鎖時會產生什麼樣的結果。
import multiprocessing as mp
import time
def job(v, num):
    for _ in range(5):
        time.sleep(0.1) # 暫停0.1秒,讓輸出效果更明顯
        v.value += num # v.value獲取共享變數值
        print(v.value, end="")

def multicore():
    v = mp.Value('i', 0) # 定義共享變數
    p1 = mp.Process(target=job, args=(v,1))
    p2 = mp.Process(target=job, args=(v,3)) # 設定不同的number看如何搶奪記憶體
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
if __name__ == '__main__':
    multicore()
在上面的程式碼中,我們定義了一個共享變數v,兩個程序都可以對它進行操作。 在job()中我們想讓v每隔0.1秒輸出一次累加num的結果,但是在兩個程序p1和p2 中設定了不同的累加值。所以接下來讓我們來看下這兩個程序是否會出現衝突。
執行一下:
1
4
5
8
9
12
13
16
17
20
我們可以看到,程序1和程序2在相互搶著使用共享記憶體v。
(2)加程序鎖
為了解決上述不同程序搶共享資源的問題,我們可以用加程序鎖來解決。
首先需要定義一個程序鎖
    l = mp.Lock() # 定義一個程序鎖
然後將程序鎖的資訊傳入各個程序中
    p1 = mp.Process(target=job, args=(v,1,l)) # 需要將Lock傳入
    p2 = mp.Process(target=job, args=(v,3,l)) 
在job()中設定程序鎖的使用,保證執行時一個程序的對鎖內內容的獨佔
def job(v, num, l):
    l.acquire() # 鎖住
    for _ in range(5):
        time.sleep(0.1) 
        v.value += num # v.value獲取共享記憶體
        print(v.value)
    l.release() # 釋放
完整程式碼:
def job(v, num, l):
    l.acquire() # 鎖住
    for _ in range(5):
        time.sleep(0.1) 
        v.value += num # 獲取共享記憶體
        print(v.value)
    l.release() # 釋放
def multicore():
    l = mp.Lock() # 定義一個程序鎖
    v = mp.Value('i', 0) # 定義共享記憶體
    p1 = mp.Process(target=job, args=(v,1,l)) # 需要將lock傳入
    p2 = mp.Process(target=job, args=(v,3,l)) 
    p1.start()
    p2.start()
    p1.join()
    p2.join()
if __name__ == '__main__':
    multicore()
執行一下,讓我們看看是否還會出現搶佔資源的情況:
1
2
3
4
5
8
11
14
17
20
顯然,程序鎖保證了程序p1的完整執行,然後才進行了程序p2的執行