1. 程式人生 > >快速瞭解Python併發程式設計的工程實現(下)

快速瞭解Python併發程式設計的工程實現(下)

關於我
一個有思想的程式猿,終身學習實踐者,目前在一個創業團隊任team lead,技術棧涉及Android、Python、Java和Go,這個也是我們團隊的主要技術棧。
Github:https://github.com/hylinux1024
微信公眾號:終身開發者(angrycode)

0x00 使用程序實現併發

上一篇文章介紹了執行緒的使用。然而Python中由於Global Interpreter Lock(全域性解釋鎖GIL)的存在,每個執行緒在在執行時需要獲取到這個GIL,在同一時刻中只有一個執行緒得到解釋鎖的執行,Python中的執行緒並沒有真正意義上的併發執行,多執行緒的執行效率也不一定比單執行緒的效率更高。

如果要充分利用現代多核CPU的併發能力,就要使用multipleprocessing模組了。

0x01 multipleprocessing

與使用執行緒的threading模組類似,multipleprocessing模組提供許多高階API。最常見的是Pool物件了,使用它的介面能很方便地寫出併發執行的程式碼。

from multiprocessing import Pool

def f(x):
    return x * x

if __name__ == '__main__':
    with Pool(5) as p:
        # map方法的作用是將f()方法併發地對映到列表中的每個元素
        print(p.map(f, [1, 2, 3]))

# 執行結果
# [1, 4, 9]

關於Pool下文中還會提到,這裡我們先來看Process

Process

要建立一個程序可以使用Process類,使用start()方法啟動程序。

from multiprocessing import Process
import os

def echo(text):
    # 父程序ID
    print("Process Parent ID : ", os.getppid())
    # 程序ID
    print("Process PID : ", os.getpid())
    print('echo : ', text)

if __name__ == '__main__':
    p = Process(target=echo, args=('hello process',))
    p.start()
    p.join()
    
# 執行結果
# Process Parent ID :  27382
# Process PID :  27383
# echo :  hello process
程序池

正如開篇提到的multiprocessing模組提供了Pool類可以很方便地實現一些簡單多程序場景。
它主要有以下介面

  • apply(func[, args[, kwds]])
    執行func(args,kwds)方法,在方法結束返回前會阻塞。
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]])
    非同步執行func(args,kwds),會立即返回一個result物件,如果指定了callback引數,結果會通過回撥方法返回,還可以指定執行出錯的回撥方法error_callback()
  • map(func, iterable[, chunksize])
    類似內建函式map(),可以併發執行func,是同步方法
  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])
    非同步版本的map
  • close()
    關閉程序池。當池中的所有工作程序都執行完畢時,程序會退出。
  • terminate()
    終止程序池
  • join()
    等待工作程序執行完,必需先呼叫close()或者terminate()
from multiprocessing import Pool

def f(x):
    return x * x

if __name__ == '__main__':
    with Pool(5) as p:
        # map方法的作用是將f()方法併發地對映到列表中的每個元素
        a = p.map(f, [1, 2, 3])
        print(a)
        # 非同步執行map
        b = p.map_async(f, [3, 5, 7, 11])
        # b 是一個result物件,代表方法的執行結果
        print(b)
        # 為了拿到結果,使用join方法等待池中工作程序退出
        p.close()
        # 呼叫join方法前,需先執行close或terminate方法
        p.join()
        # 獲取執行結果
        print(b.get())

# 執行結果
# [1, 4, 9]
# <multiprocessing.pool.MapResult object at 0x10631b710>
# [9, 25, 49, 121]

map_async()apply_async()執行後會返回一個class multiprocessing.pool.AsyncResult物件,通過它的get()可以獲取到執行結果,ready()可以判斷AsyncResult的結果是否準備好。

程序間資料的傳輸

multiprocessing模組提供了兩種方式用於程序間的資料共享:佇列(Queue)和管道(Pipe)

Queue是執行緒安全,也是程序安全的。使用Queue可以實現程序間的資料共享,例如下面的demo中子程序put一個物件,在主程序中就能get到這個物件。
任何可以序列化的物件都可以通過Queue來傳輸。

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    # 使用Queue進行資料通訊
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    # 主程序取得子程序中的資料
    print(q.get())  # prints "[42, None, 'hello']"
    p.join()

# 執行結果
# [42, None, 'hello']

Pipe()返回一對通過管道連線的Connection物件。這兩個物件可以理解為管道的兩端,它們通過send()recv()傳送和接收資料。

from multiprocessing import Process, Pipe

def write(conn):
    # 子程序中傳送一個物件
    conn.send([42, None, 'hello'])
    conn.close()

def read(conn):
    # 在讀的程序中通過recv接收物件
    data = conn.recv()
    print(data)

if __name__ == '__main__':
    # Pipe()方法返回一對連線物件
    w_conn, r_conn = Pipe()

    wp = Process(target=write, args=(w_conn,))
    rp = Process(target=read, args=(r_conn,))

    wp.start()
    rp.start()

# 執行結果
# [42, None, 'hello']

需要注意的是,兩個程序不能同時對一個連線物件進行sendrecv操作。

同步

我們知道執行緒間的同步是通過鎖機制來實現的,程序也一樣。

from multiprocessing import Process, Lock
import time

def print_with_lock(l, i):
    l.acquire()
    try:
        time.sleep(1)
        print('hello world', i)
    finally:
        l.release()

def print_without_lock(i):
    time.sleep(1)
    print('hello world', i)

if __name__ == '__main__':
    lock = Lock()

    # 先執行有鎖的
    for num in range(5):
        Process(target=print_with_lock, args=(lock, num)).start()
    # 再執行無鎖的
    # for num in range(5):
    #     Process(target=print_without_lock, args=(num,)).start()

有鎖的程式碼將每秒依次列印

hello world 0
hello world 1
hello world 2
hello world 3
hello world 4

如果執行無鎖的程式碼,則在我的電腦上執行結果是這樣的

hello worldhello world  0
1
hello world 2
hello world 3
hello world 4

除了Lock,還包括RLockConditionSemaphoreEvent等程序間的同步原語。其用法也與執行緒間的同步原語很類似。API使用可以參考文末中引用的文件連結。
在工程中實現程序間的資料共享應當優先使用佇列或管道。

0x02 總結

本文對multiprocessing模組中常見的API作了簡單的介紹。講述了ProcessPool的常見用法,同時介紹了程序間的資料方式:佇列和管道。最後簡單瞭解了程序間的同步原語。
通過與上篇的對比學習,本文的內容應該是更加容易掌握的。

0x03 引用

  • https://python-parallel-programmning-cookbook.readthedocs.io
  • https://docs.python.org/3/library/threading.html
  • https://docs.python.org/3.7/library/multiprocessing.html
  • https://docs.python.org/3/glossary.html#term-global-interpreter-lock
  • https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures