1. 程式人生 > >Python 多執行緒、多程序 (三)之 執行緒程序對比、多程序

Python 多執行緒、多程序 (三)之 執行緒程序對比、多程序

Python 多執行緒、多程序 (一)之 原始碼執行流程、GIL
Python 多執行緒、多程序 (二)之 多執行緒、同步、通訊
Python 多執行緒、多程序 (三)之 執行緒程序對比、多執行緒

一、多執行緒與多程序的對比

在之前簡單的提過,CPython中的GIL使得同一時刻只能有一個執行緒執行,即併發執行。並且即使是多核CPU,GIL使得同一個程序中的多個執行緒也無法對映到多個CPU上執行,這麼做最初是為了安全著想,慢慢的也成為了限制CPython效能的問題。
一個執行緒想要執行,就必須得到GIL,否則就不能拿到CPU資源。但是也不是說一個執行緒在拿到CPU資源後就一勞永逸,在執行的過程中GIL可能會釋放並被其他執行緒獲取,所以說其它的執行緒會與本執行緒競爭CPU資源,執行緒是搶佔式執行的。具體可在 understand GIL

中看到,[傳送門]
多執行緒在python2中:當一個執行緒進行I/O的時候會釋放鎖,另外當ticks計數達到100(ticks可以看作是Python自身的一個計數器,也可對比著位元組碼指令理解,專門做用於GIL,每次釋放後歸零,這個計數可以通過 sys.setcheckinterval 來調整)。鎖釋放之後,就涉及到執行緒的排程,執行緒的鎖進行,執行緒的切換。這是會消耗CPU資源,因此會造成程式效能問題和等待時延。另外由於執行緒共享記憶體的問題,沒有程序安全性高。
但是對於多程序,GIL就無法限制,多個程序可以再多個CPU上執行,充分利用多核優勢。事情往往是相對的,雖然可以充分利用多核優勢,但是程序之的建立和排程卻比執行緒的代價更高。
所以選擇多執行緒還是多程序,主要還是看怎樣權衡代價,什麼樣的情況。

1、CPU密集程式碼

下面來利用斐波那契數列模擬CPU密集運算。

def fib(n):
    # 求斐波那契數列的第n個值
    if n<=2:
        return 1
    return fib(n-1)+fib(n-2)

<1>、多程序

列印第25到35個斐波那契數,並計算程式執行時間

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor


def fib(n):
    if n<=2:
        return 1
    return fib(n-1)+fib(n-2)

if __name__ == "__main__":
    with ProcessPoolExecutor(3) as executor:  # 使用程序池控制  每次執行3個程序
        all_task = [executor.submit(fib, (num)) for num in range(25,35)]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))

        print("last time is: {}".format(time.time()-start_time))

# 輸出
exe result: 75025
exe result: 121393
exe result: 196418
exe result: 317811
exe result: 514229
exe result: 832040
exe result: 1346269
exe result: 2178309
exe result: 3524578
exe result: 5702887
last time is: 4.457437038421631

輸出結果,每次列印三個exe result,總重列印十個結果,多程序執行時間為4.45秒

<2>、多執行緒

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor


def fib(n):
    if n<=2:
        return 1
    return fib(n-1)+fib(n-2)

if __name__ == "__main__":
    with ThreadPoolExecutor(3) as executor:  # 使用執行緒池控制  每次執行3個執行緒
        all_task = [executor.submit(fib, (num)) for num in range(25,35)]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))

        print("last time is: {}".format(time.time()-start_time))

# 輸出
exe result: 121393
exe result: 75025
exe result: 196418
exe result: 317811
exe result: 514229
exe result: 832040
exe result: 1346269
exe result: 2178309
exe result: 3524578
exe result: 5702887
last time is: 7.3467772006988525

最終程式執行時間為7.34秒

程式的執行之間與計算機的效能有關,每天計算機的執行時間都會有差異。從上述結果中看顯然多執行緒比多程序要耗費時間。這就是因為對於密集程式碼(密集運算,迴圈語句等),tick計數很快達到100,GIL來回的釋放競爭,執行緒之間頻繁切換,所以對於密集程式碼的執行中,多執行緒效能不如對程序。

2、I/O密集程式碼

一個執行緒在I/O阻塞的時候,會釋放GIL,掛起,然後其他的執行緒會競爭CPU資源,涉及到執行緒的切換,但是這種代價與較高時延的I/O來說是不足為道的。
下面用sleep函式模擬密集I/O

def random_sleep(n):
    time.sleep(n)
    return n

<1>、 多程序

def random_sleep(n):
    time.sleep(n)
    return n

if __name__ == "__main__":
    with ProcessPoolExecutor(5) as executor:
        all_task = [executor.submit(random_sleep, (num)) for num in [2]*30]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))

        print("last time is: {}".format(time.time()-start_time))
#  輸出
exe result: 2
exe result: 2
......(30個)
exe result: 2
exe result: 2
last time is: 12.412866353988647

每次列印5個結果,總共二十個列印結果,多程序執行時間為12.41秒

<2>、多執行緒

def random_sleep(n):
    time.sleep(n)
    return n

if __name__ == "__main__":
    with ThreadPoolExecutor(5) as executor:
        all_task = [executor.submit(random_sleep, (num)) for num in [2]*30]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))

        print("last time is: {}".format(time.time()-start_time))

#  輸出
exe result: 2
exe result: 2
......(30個)
exe result: 2
exe result: 2
last time is: 12.004231214523315

I/O密集多執行緒情況下,程式的效能較多程序有了略微的提高。IO密集型程式碼(檔案處理、網路爬蟲等),多執行緒能夠有效提升效率(單執行緒下有IO操作會進行IO等待,造成不必要的時間浪費,而開啟多執行緒能線上程A等待時,自動切換到執行緒B,可以不浪費CPU的資源,從而能提升程式執行效率)。所以python的多執行緒對IO密集型程式碼比較友好

3、執行緒程序對比

  • CPU密集型程式碼(各種迴圈處理、計數等等),多執行緒效能不如多程序。
  • I/O密集型程式碼(檔案處理、網路爬蟲等),多程序不如多執行緒。

二、多程序

在python 程序、執行緒 (一)已經有簡單的程序介紹。
不過與多執行緒程式設計相比,最需要注意的是這裡多程序由併發執行變成了真正意義上的並行執行。

1、fork()呼叫

Unix/Linux作業系統提供了一個fork()系統呼叫,它非常特殊。普通的函式呼叫,呼叫一次,返回一次,但是fork()呼叫一次,返回兩次,因為作業系統自動把當前程序(稱為父程序)複製了一份(稱為子程序),然後,分別在父程序和子程序內返回。子程序永遠返回0,而父程序返回子程序的ID。這樣做的理由是,一個父程序可以fork出很多子程序,所以,父程序要記下每個子程序的ID,而子程序只需要呼叫getppid()就可以拿到父程序的ID。Python的os模組封裝了常見的系統呼叫,其中就包括fork,可以在Python程式中輕鬆建立子程序,但是還是要有Unix/Linux系統支援,windows沒有系統呼叫fork(),可以在本地虛擬機器或者雲伺服器嘗試,預設liunx發行版中是有python2.X的。
情況一

import os

print("Lanyu")  # 只打印一次
pid = os.fork()

if pid == 0:
  print('子程序 {} ,父程序是: {}.' .format(os.getpid(), os.getppid()))
else:
  print('我是父程序:{}.'.format(os.getpid())

# 輸出
Lanyu
我是父程序:2993
子程序2994,父程序2993

fork()呼叫複製了一個程序,然後程式中就有兩個程序,父程序的pid不為0,所以先列印子程序2994,父程序2993。然後子程序pid=0,列印我是父程序:2993。這裡的Lanyu列印一次
情況二

import os


pid = os.fork()
print("Lanyu")  # 這裡列印兩次
if pid == 0:
  print('子程序 {} ,父程序是: {}.' .format(os.getpid(), os.getppid()))
else:
  print('我是父程序:{}.'.format(os.getpid())

# 輸出
Lanyu
我是父程序:2993
Lanyu
子程序2994,父程序2993

這裡的Lanyu列印兩次是因為,由於fork()函式呼叫之後,程式立即成成一個子程序,主程序列印一次,子程序再列印一次。因此這裡的Lanyu列印兩次

情況三
還記得作業系統專業課的時候,老師講的一道考研題

int main{
    fork();
    fork();
    fork():
    printf('process')
    return 0;
}

三次fork(),問此程式最終列印幾個次process,關鍵在於fork()函式的用途,每一次都會複製一次程序,則最終,一個父程序被複製成8個程序,列印8次。

2、python多程序

雖然python中沒有提供直接的程序呼叫函式,但是標準庫中的模組提供能更多更方便的選擇。 ProcessPoolExecutor程序池,與 multiprocessing標準的多程序模組。其實ProcessPoolExecutor也是對multiprocessing的封裝呼叫,並且與ThreadPoolExecutor執行緒池提供的介面類似。而multiprocessing則更加底層。

<1>、程序程式設計

import time
import multiprocessing

def get_html(n):
    time.sleep(n)
    print("sub_progress success")
    return n

if __name__ == "__main__":
    progress = multiprocessing.Process(target=get_html, args=(2,))
    print(progress.pid)  # 列印結果為None,因為這個時候程序還未開啟
    progress.start()  # 程序開啟
    print(progress.pid)
    progress.join()
    print("main progress end")

# 輸出
None
5056
sub_progress success
main progress end

<2>、使用程序池

import time
import multiprocessing

def get_html(n):
    time.sleep(n)
    print("sub_progress success")
    return n


if __name__ == "__main__":
    #使用程序池
    pool = multiprocessing.Pool(multiprocessing.cpu_count())  # 可以指明程序數,預設等於CPU數
    result = pool.apply_async(get_html, args=(3,))

    #等待所有任務完成
    pool.close()
    pool.join()

    print(result.get())

# 輸出
sub_progress success
3

<3>、imap 介面

例項一

import time
import multiprocessing

def get_html(n):
    time.sleep(n)
    print("sub_progress success")
    return n


if __name__ == "__main__":

    # imap
    for result in pool.imap(get_html, [1,5,3]):
        print("{} sleep success".format(result))

# 輸出
sub_progress success
1 sleep success
sub_progress success
sub_progress success
5 sleep success
3 sleep success

imap有點像python提供的內建函式map,講[1,5,3]這個列表中的值一個一個傳遞給get_html函式物件,並按照傳值的先後順序,一一執行輸出程序結果。

例項二:

import multiprocessing  

import time
def get_html(n):
    time.sleep(n)
    print("sub_progress success")
    return n


if __name__ == "__main__":

    pool = multiprocessing.Pool(multiprocessing.cpu_count())  # 可以程序數,不過最好是等於CPU數,這裡也是程序數

    for result in pool.imap_unordered(get_html, [1,5,3]):
        print("{} sleep success".format(result))
# 輸出
sub_progress success
1 sleep success
sub_progress success
3 sleep success
sub_progress success
5 sleep success

與imap方法不同的是imap_unordered方法,imap_unordered是按照程序的執行完成的先後順序,列印程序執行結果,而不是依照列表中的先後順序。可以依照需要呼叫。

劃重點**多程序程式設計中,需要在__name__ == __main__下編寫**

更多API參考傳送門

3、程序通訊

<1>、共享變數通訊

類比執行緒之間的通訊,首先想到的就是共享變數通訊。但是在多程序中,一個程序都有自的隔離區,導致變數不能共享。
情況一

def producer(a):
    a += 100
    time.sleep(2)

def consumer(a):
    time.sleep(2)
    print(a)

if __name__ == "__main__":
    a = 1
    my_producer = Process(target=producer, args=(a,))
    my_consumer = Process(target=consumer, args=(a,))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

# 輸出
1

結果程序沒有共享變數。

但是Python的標準模組提供了Manager()在記憶體中劃出一塊單獨的記憶體區,供所有的程序使用,共享變數。
情況二

from multiprocessing import Process, Manager

def add_data(p_dict, key, value):
    p_dict[key] = value

if __name__ == "__main__":
    progress_dict = Manager().dict()

    first_progress = Process(target=add_data, args=(progress_dict, "666", 666))  # 更新progress_dict
    second_progress = Process(target=add_data, args=(progress_dict, "999", 999))  # 更新progress_dict

    first_progress.start()
    second_progress.start()
    first_progress.join()
    second_progress.join()

    print(progress_dict)

# 列印結果
{'666': 666, '999': 999}  # 實現了變數的共享

在Manager中還可以有其它的資料結構,例如列表陣列等可共享使用。

因此,在使用多程序程式設計的時候,如果像情況二共享全域性變數,就仍舊需要加鎖實現程序同步。

<2>、Queue佇列通訊

在multiprocessing模組中有Queue類安全的佇列,也可以實現通訊,不過在這種情況下無法聯通執行緒池。

import time
from multiprocessing import Process, Queue, Pool, Manager

def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)

if __name__ == "__main__":
    queue = Queue(10)  # 使用普通的Queue
    pool = Pool(2)

    pool.apply_async(producer, args=(queue,))
    pool.apply_async(consumer, args=(queue,))

    pool.close()
    pool.join()

# 無輸出

想要使用程序池又實現訊息佇列通訊就需要用到Manager管理者

import time
from multiprocessing import Process, Queue, Pool, Manager

def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)

if __name__ == "__main__":
    queue = Manager().Queue(10)  # 在使用Manger的時候需要先將Manager例項化在呼叫Queue
    pool = Pool(2)

    pool.apply_async(producer, args=(queue,))
    pool.apply_async(consumer, args=(queue,))

    pool.close()
    pool.join()

# 輸出
正常列印字元a

<3>、pipe管道通訊

pipe也用於程序通訊,從功能上說,提供的介面應該是queue的子集。但是queue為了更好的控制,所以內部加了很多的鎖,而pipe在兩個程序通訊的時候效能會比queue更好一些。

def producer(pipe):
    pipe.send("Lanyu")

def consumer(pipe):
    print(pipe.recv())

if __name__ == "__main__":
    recevie_pipe, send_pipe = Pipe()
    #pipe只能適用於兩個程序
    my_producer= Process(target=producer, args=(send_pipe, ))
    my_consumer = Process(target=consumer, args=(recevie_pipe,))

    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

# 輸出
Lanyu

三、總結

最開始為了引出GIL,簡單輸了python原始碼的執行流程,也是先編譯成位元組碼再執行。在CPython中,為了資料完整性和狀態同步才有GIL,GIL同樣使得多執行緒不能利用CPU多核優勢,所以效能低部分是因為GIL。

執行緒需要加上GIL才能獲取CPU資源,才能執行。執行緒通訊的時候,可以用訊息佇列Queue和全域性變數,但是對於全域性變數這種通訊方式,在執行位元組碼一定數量之後,會釋放GIL,執行緒搶佔式執行同樣導致變數的混亂,所以我們加上了使用者級別的互斥鎖Lock,或者迭代鎖Rlock保證了執行緒的狀態同步。condition幫我們實現了執行緒的複雜通訊,而semaphore訊號量,使得我們在多個執行緒的情況下,控制併發執行緒的數量。執行緒池進一步的封裝,提供了對執行緒的狀態,非同步控制等操作。

對於多程序,可以利用多核CPU優勢,但是使用多執行緒和多程序還需要進一步根據密集I/O和密集運算型程式碼等具體情況。多程序標準模組中提供的介面與多執行緒類似,可相互參照。

陸陸續續總結關於這篇博文也有一個多星期了,但是還是感覺有說不清楚的地方邏輯不通,希望讀者能在評論區指出。期間參閱了很多的文件,部落格,教程。
印象最深刻的還是Understand GIL: [傳送門]這篇關於GIL的解釋,雖然是英文文件,但是作者總是能以最精煉的句子表達最清晰的觀點。

上一篇:Python 多執行緒、多程序 (二)之 多執行緒、同步、通訊