1. 程式人生 > >一文看懂Python多程序與多執行緒程式設計(工作學習面試必讀)

一文看懂Python多程序與多執行緒程式設計(工作學習面試必讀)

程序(process)和執行緒(thread)是非常抽象的概念, 也是程式設計師必需掌握的核心知識。多程序和多執行緒程式設計對於程式碼的併發執行,提升程式碼效率和縮短執行時間至關重要。小編我今天就來嘗試下用一文總結下Python多程序和多執行緒的概念和區別, 並詳細介紹如何使用python的multiprocess和threading模組進行多執行緒和多程序程式設計。

重要知識點 - 什麼是程序(process)和執行緒(thread)

  • 程序是作業系統分配資源的最小單元, 執行緒是作業系統排程的最小單元。

  • 一個應用程式至少包括1個程序,而1個程序包括1個或多個執行緒,執行緒的尺度更小。

  • 每個程序在執行過程中擁有獨立的記憶體單元,而一個執行緒的多個執行緒在執行過程中共享記憶體。

網上有篇阮一峰的部落格曾對程序和執行緒做出了一個非常淺顯的解釋,我在這裡貼出來方便大家理解。

  •  計算機的核心是CPU,它承擔了所有的計算任務。它就像一座工廠,時刻在執行。

  • 假定工廠的電力有限,一次只能供給一個車間使用。也就是說,一個車間開工的時候,其他車間都必須停工。背後的含義就是,單個CPU一次只能執行一個任務。編者注: 多核的CPU就像有了多個發電廠,使多工廠(多程序)實現可能。

  • 程序就好比工廠的車間,它代表CPU所能處理的單個任務。任一時刻,CPU總是執行一個程序,其他程序處於非執行狀態。

  • 一個車間裡,可以有很多工人。他們協同完成一個任務。

  • 執行緒就好比車間裡的工人。一個程序可以包括多個執行緒。

  • 車間的空間是工人們共享的,比如許多房間是每個工人都可以進出的。這象徵一個程序的記憶體空間是共享的,每個執行緒都可以使用這些共享記憶體。

  • 可是,每間房間的大小不同,有些房間最多隻能容納一個人,比如廁所。裡面有人的時候,其他人就不能進去了。這代表一個執行緒使用某些共享記憶體時,其他執行緒必須等它結束,才能使用這一塊記憶體。

  • 一個防止他人進入的簡單方法,就是門口加一把鎖。先到的人鎖上門,後到的人看到上鎖,就在門口排隊,等鎖開啟再進去。這就叫"互斥鎖"(Mutual exclusion,縮寫 Mutex),防止多個執行緒同時讀寫某一塊記憶體區域。

  • 還有些房間,可以同時容納n個人,比如廚房。也就是說,如果人數大於n,多出來的人只能在外面等著。這好比某些記憶體區域,只能供給固定數目的執行緒使用。

  • 這時的解決方法,就是在門口掛n把鑰匙。進去的人就取一把鑰匙,出來時再把鑰匙掛回原處。後到的人發現鑰匙架空了,就知道必須在門口排隊等著了。這種做法叫做"訊號量"(Semaphore),用來保證多個執行緒不會互相沖突。

  • 不難看出,mutex是semaphore的一種特殊情況(n=1時)。也就是說,完全可以用後者替代前者。但是,因為mutex較為簡單,且效率高,所以在必須保證資源獨佔的情況下,還是採用這種設計。

原文地址見

  • http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html

Python的多程序程式設計與multiprocess模組

python的多程序程式設計主要依靠multiprocess模組。我們先對比兩段程式碼,看看多程序程式設計的優勢。我們模擬了一個非常耗時的任務,計算8的20次方,為了使這個任務顯得更耗時,我們還讓它sleep 2秒。第一段程式碼是單程序計算(程式碼如下所示),我們按順序執行程式碼,重複計算2次,並打印出總共耗時。

import time
import os

def long_time_task():
    print('當前程序: {}'.format(os.getpid()))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))

if __name__ == "__main__":
    print('當前母程序: {}'.format(os.getpid()))
    start = time.time()
    for i in range(2):
        long_time_task()

    end = time.time()
    print("用時{}秒".format((end-start)))

輸出結果如下,總共耗時4秒,至始至終只有一個程序14236。看來電腦計算8的20次方基本不費時。

當前母程序: 14236
當前程序: 14236
結果: 1152921504606846976
當前程序: 14236
結果: 1152921504606846976
用時4.01080060005188秒

第2段程式碼是多程序計算程式碼。我們利用multiprocess模組的Process方法建立了兩個新的程序p1和p2來進行平行計算。Process方法接收兩個引數, 第一個是target,一般指向函式名,第二個時args,需要向函式傳遞的引數。對於建立的新程序,呼叫start()方法即可讓其開始。我們可以使用os.getpid()打印出當前程序的名字。

from multiprocessing import Process
import os
import time


def long_time_task(i):
    print('子程序: {} - 任務{}'.format(os.getpid(), i))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))


if __name__=='__main__':
    print('當前母程序: {}'.format(os.getpid()))
    start = time.time()
    p1 = Process(target=long_time_task, args=(1,))
    p2 = Process(target=long_time_task, args=(2,))
    print('等待所有子程序完成。')
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    end = time.time()
    print("總共用時{}秒".format((end - start)))

輸出結果如下所示,耗時變為2秒,時間減了一半,可見併發執行的時間明顯比順序執行要快很多。你還可以看到儘管我們只建立了兩個程序,可實際執行中卻包含裡1個母程序和2個子程序。之所以我們使用join()方法就是為了讓母程序阻塞,等待子程序都完成後才打印出總共耗時,否則輸出時間只是母程序執行的時間。

當前母程序: 6920
等待所有子程序完成。
子程序: 17020 - 任務1
子程序: 5904 - 任務2
結果: 1152921504606846976
結果: 1152921504606846976
總共用時2.131091356277466秒

知識點:

  • 新建立的程序與程序的切換都是要耗資源的,所以平時工作中程序數不能開太大。

  • 同時可以執行的程序數一般受制於CPU的核數。

  • 除了使用Process方法,我們還可以使用Pool類建立多程序。

利用multiprocess模組的Pool類建立多程序

很多時候系統都需要建立多個程序以提高CPU的利用率,當數量較少時,可以手動生成一個個Process例項。當程序數量很多時,或許可以利用迴圈,但是這需要程式設計師手動管理系統中併發程序的數量,有時會很麻煩。這時程序池Pool就可以發揮其功效了。可以通過傳遞引數限制併發程序的數量,預設值為CPU的核數。 

Pool類可以提供指定數量的程序供使用者呼叫,當有新的請求提交到Pool中時,如果程序池還沒有滿,就會建立一個新的程序來執行請求。如果池滿,請求就會告知先等待,直到池中有程序結束,才會建立新的程序來執行這些請求。 

下面介紹一下multiprocessing 模組下的Pool類的幾個方法:

1.apply_async

函式原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

其作用是向程序池提交需要執行的函式及引數, 各個程序採用非阻塞(非同步)的呼叫方式,即每個子程序只管執行自己的,不管其它程序是否已經完成。

2.map()

函式原型:map(func, iterable[, chunksize=None])

Pool類中的map方法,與內建的map函式用法行為基本一致,它會使程序阻塞直到結果返回。 注意:雖然第二個引數是一個迭代器,但在實際使用中,必須在整個佇列都就緒後,程式才會執行子程序。

3.map_async()

函式原型:map_async(func, iterable[, chunksize[, callback]])
與map用法一致,但是它是非阻塞的。其有關事項見apply_async。

4.close()

關閉程序池(pool),使其不在接受新的任務。

5. terminate()

結束工作程序,不在處理未處理的任務。

6.join()

主程序阻塞等待子程序的退出, join方法要在close或terminate之後使用。

下例是一個簡單的multiprocessing.Pool類的例項。因為小編我的CPU是4核的,一次最多可以同時執行4個程序,所以我開啟了一個容量為4的程序池。4個程序需要計算5次,你可以想象4個程序並行4次計算任務後,還剩一次計算任務(任務4)沒有完成,系統會等待4個程序完成後重新安排一個程序來計算。

from multiprocessing import Pool, cpu_count
import os
import time


def long_time_task(i):
    print('子程序: {} - 任務{}'.format(os.getpid(), i))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))


if __name__=='__main__':
    print("CPU核心數:{}".format(cpu_count()))
    print('當前母程序: {}'.format(os.getpid()))
    start = time.time()
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('等待所有子程序完成。')
    p.close()
    p.join()
    end = time.time()
    print("總共用時{}秒".format((end - start)))

知識點:  

  • 對Pool物件呼叫join()方法會等待所有子程序執行完畢,呼叫join()之前必須先呼叫close()或terminate()方法,讓其不再接受新的Process了。

輸出結果如下所示,5個任務(每個任務大約耗時2秒)使用多程序平行計算只需4.37秒,, 耗時減少了60%。

CPU核心數:4
當前母程序: 2556
等待所有子程序完成。
子程序: 16480 - 任務0
子程序: 15216 - 任務1
子程序: 15764 - 任務2
子程序: 10176 - 任務3
結果: 1152921504606846976
結果: 1152921504606846976
子程序: 15216 - 任務4
結果: 1152921504606846976
結果: 1152921504606846976
結果: 1152921504606846976
總共用時4.377134561538696秒

相信大家都知道python直譯器中存在GIL(全域性直譯器鎖), 它的作用就是保證同一時刻只有一個執行緒可以執行程式碼。由於GIL的存在,很多人認為python中的多執行緒其實並不是真正的多執行緒,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多程序。然而這並意味著python多執行緒程式設計沒有意義哦,請繼續閱讀下文。

 多程序間的資料共享與通訊

通常,程序之間是相互獨立的,每個程序都有獨立的記憶體。通過共享記憶體(nmap模組),程序之間可以共享物件,使多個程序可以訪問同一個變數(地址相同,變數名可能不同)。多程序共享資源必然會導致程序間相互競爭,所以應該盡最大可能防止使用共享狀態。還有一種方式就是使用佇列queue來實現不同程序間的通訊或資料共享,這一點和多執行緒程式設計類似。

下例這段程式碼中中建立了2個獨立程序,一個負責寫(pw), 一個負責讀(pr), 實現了共享一個佇列queue。

from multiprocessing import Process, Queue
import os, time, random

# 寫資料程序執行的程式碼:
def write(q):
    print('Process to write: {}'.format(os.getpid()))
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 讀資料程序執行的程式碼:
def read(q):
    print('Process to read:{}'.format(os.getpid()))
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父程序建立Queue,並傳給各個子程序:
   q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啟動子程序pw,寫入:
    pw.start()
    # 啟動子程序pr,讀取:
    pr.start()
    # 等待pw結束:
    pw.join()
    # pr程序裡是死迴圈,無法等待其結束,只能強行終止:
    pr.terminate()

輸出結果如下所示:

Process to write: 3036
Put A to queue...
Process to read:9408
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

Python的多執行緒程式設計與threading模組

python 3中的多程序程式設計主要依靠threading模組。建立新執行緒與建立新程序的方法非常類似。threading.Thread方法可以接收兩個引數, 第一個是target,一般指向函式名,第二個時args,需要向函式傳遞的引數。對於建立的新執行緒,呼叫start()方法即可讓其開始。我們還可以使用current_thread().name打印出當前執行緒的名字。 下例中我們使用多執行緒技術重構之前的計算程式碼。

import threading
import time


def long_time_task(i):
    print('當前子執行緒: {} - 任務{}'.format(threading.current_thread().name, i))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))


if __name__=='__main__':
    start = time.time()
    print('這是主執行緒:{}'.format(threading.current_thread().name))
    t1 = threading.Thread(target=long_time_task, args=(1,))
    t2 = threading.Thread(target=long_time_task, args=(2,))
    t1.start()
    t2.start()

    end = time.time()
    print("總共用時{}秒".format((end - start)))

下面是輸出結果。為什麼總耗時居然是0秒? 我們可以明顯看到主執行緒和子執行緒其實是獨立執行的,主執行緒根本沒有等子執行緒完成,而是自己結束後就列印了消耗時間。主執行緒結束後,子執行緒仍在獨立執行,這顯然不是我們想要的。

這是主執行緒:MainThread
當前子執行緒: Thread-1 - 任務1
當前子執行緒: Thread-2 - 任務2
總共用時0.0017192363739013672秒
結果: 1152921504606846976
結果: 1152921504606846976

如果要實現主執行緒和子執行緒的同步,我們必需使用join方法(程式碼如下所示)。

import threading
import time


def long_time_task(i):
    print('當前子執行緒: {} 任務{}'.format(threading.current_thread().name, i))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))


if __name__=='__main__':
    start = time.time()
    print('這是主執行緒:{}'.format(threading.current_thread().name))
    thread_list = []
    for i in range(1, 3):
        t = threading.Thread(target=long_time_task, args=(i, ))
        thread_list.append(t)

    for t in thread_list:
        t.start()

    for t in thread_list:
        t.join()

    end = time.time()
    print("總共用時{}秒".format((end - start)))

修改程式碼後的輸出如下所示。這時你可以看到主執行緒在等子執行緒完成後才答應出總消耗時間(2秒),比正常順序執行程式碼(4秒)還是節省了不少時間。

這是主執行緒:MainThread
當前子執行緒: Thread - 1 任務1
當前子執行緒: Thread - 2 任務2
結果: 1152921504606846976
結果: 1152921504606846976
總共用時2.0166890621185303秒

當我們設定多執行緒時,主執行緒會建立多個子執行緒,在python中,預設情況下主執行緒和子執行緒獨立執行互不干涉。如果希望讓主執行緒等待子執行緒實現執行緒的同步,我們需要使用join()方法。如果我們希望一個主執行緒結束時不再執行子執行緒,我們應該怎麼辦呢? 我們可以使用t.setDaemon(True),程式碼如下所示。

import threading
import time


def long_time_task():
    print('當子執行緒: {}'.format(threading.current_thread().name))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))


if __name__=='__main__':
    start = time.time()
    print('這是主執行緒:{}'.format(threading.current_thread().name))
    for i in range(5):
        t = threading.Thread(target=long_time_task, args=())
        t.setDaemon(True)
        t.start()

    end = time.time()
    print("總共用時{}秒".format((end - start)))

通過繼承Thread類重寫run方法建立新程序

除了使用Thread()方法建立新的執行緒外,我們還可以通過繼承Thread類重寫run方法建立新的執行緒,這種方法更靈活。下例中我們自定義的類為MyThread, 隨後我們通過該類的例項化建立了2個子執行緒。

#-*- encoding:utf-8 -*-
import threading
import time


def long_time_task(i):
    time.sleep(2)
    return 8**20


class MyThread(threading.Thread):
    def __init__(self, func, args , name='', ):
        threading.Thread.__init__(self)
        self.func = func
        self.args = args
        self.name = name
        self.result = None

    def run(self):
        print('開始子程序{}'.format(self.name))
        self.result = self.func(self.args[0],)
        print("結果: {}".format(self.result))
        print('結束子程序{}'.format(self.name))


if __name__=='__main__':
    start = time.time()
    threads = []
    for i in range(1, 3):
        t = MyThread(long_time_task, (i,), str(i))
        threads.append(t)

    for t in threads:
        t.start()
    for t in threads:
        t.join()

    end = time.time()
    print("總共用時{}秒".format((end - start)))

輸出結果如下所示:

開始子程序1
開始子程序2
結果: 1152921504606846976
結果: 1152921504606846976
結束子程序1
結束子程序2
總共用時2.005445718765259秒

不同執行緒間的資料共享

一個程序所含的不同執行緒間共享記憶體,這就意味著任何一個變數都可以被任何一個執行緒修改,因此執行緒之間共享資料最大的危險在於多個執行緒同時改一個變數,把內容給改亂了。如果不同執行緒間有共享的變數,其中一個方法就是在修改前給其上一把鎖lock,確保一次只有一個執行緒能修改它。threading.lock()方法可以輕易實現對一個共享變數的鎖定,修改完後release供其它執行緒使用。比如下例中賬戶餘額balance是一個共享變數,使用lock可以使其不被改亂。

# -*- coding: utf-8 -*

import threading


class Account:
    def __init__(self):
        self.balance = 0

    def add(self, lock):
        # 獲得鎖
        lock.acquire()
        for i in range(0, 100000):
            self.balance += 1
        # 釋放鎖
        lock.release()

    def delete(self, lock):
        # 獲得鎖
        lock.acquire()
        for i in range(0, 100000):
            self.balance -= 1
            # 釋放鎖
        lock.release()


if __name__ == "__main__":
    account = Account()
    lock = threading.Lock()
    # 建立執行緒
   thread_add = threading.Thread(target=account.add, args=(lock,), name='Add')
    thread_delete = threading.Thread(target=account.delete, args=(lock,), name='Delete')

    # 啟動執行緒
   thread_add.start()
    thread_delete.start()

    # 等待執行緒結束
   thread_add.join()
    thread_delete.join()

    print('The final balance is: {}'.format(account.balance))

另一種實現不同執行緒間資料共享的方法就是使用訊息佇列queue。不像列表,queue是執行緒安全的,可以放心使用,見下文。

使用queue佇列通訊-經典的生產者和消費者模型

下例中建立了兩個執行緒,一個負責生成,一個負責消費,所生成的產品存放在queue裡,實現了不同執行緒間溝通。

from queue import Queue
import random, threading, time


# 生產者類
class Producer(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self, name=name)
        self.queue = queue

    def run(self):
        for i in range(1, 5):
            print("{} is producing {} to the queue!".format(self.getName(), i))
            self.queue.put(i)
            time.sleep(random.randrange(10) / 5)
        print("%s finished!" % self.getName())


# 消費者類
class Consumer(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self, name=name)
        self.queue = queue

    def run(self):
        for i in range(1, 5):
            val = self.queue.get()
            print("{} is consuming {} in the queue.".format(self.getName(), val))
            time.sleep(random.randrange(10))
        print("%s finished!" % self.getName())


def main():
    queue = Queue()
    producer = Producer('Producer', queue)
    consumer = Consumer('Consumer', queue)

    producer.start()
    consumer.start()

    producer.join()
    consumer.join()
    print('All threads finished!')


if __name__ == '__main__':
    main()

佇列queue的put方法可以將一個物件obj放入佇列中。如果佇列已滿,此方法將阻塞至佇列有空間可用為止。queue的get方法一次返回佇列中的一個成員。如果佇列為空,此方法將阻塞至佇列中有成員可用為止。queue同時還自帶emtpy(), full()等方法來判斷一個佇列是否為空或已滿,但是這些方法並不可靠,因為多執行緒和多程序,在返回結果和使用結果之間,佇列中可能新增/刪除了成員。

Python多程序和多執行緒哪個快?

由於GIL的存在,很多人認為Python多程序程式設計更快,針對多核CPU,理論上來說也是採用多程序更能有效利用資源。網上很多人已做過比較,我直接告訴你結論吧。

  • 對CPU密集型程式碼(比如迴圈計算) - 多程序效率更高

  • 對IO密集型程式碼(比如檔案操作,網路爬蟲) - 多執行緒效率更高。

為什麼是這樣呢?其實也不難理解。對於IO密集型操作,大部分消耗時間其實是等待時間,在等待時間中CPU是不需要工作的,那你在此期間提供雙CPU資源也是利用不上的,相反對於CPU密集型程式碼,2個CPU幹活肯定比一個CPU快很多。那麼為什麼多執行緒會對IO密集型程式碼有用呢?這時因為python碰到等待會釋放GIL供新的執行緒使用,實現了執行緒間的切換。

小結

本文總結了多程序和多執行緒的概念和區別, 並詳細介紹如何使用python的multiprocess和threading模組進行多執行緒和多程序程式設計。我們還簡單介紹了不同程序和執行緒間的通訊和資料共享。如果您能熟練掌握本文中的所有知識點,那麼你已經足以應付大部分面試和工作需求了。如果喜歡本文,就加入微信收藏常來看看吧。

我們後續會對比單執行緒爬蟲和多執行緒爬蟲的效率,歡迎關注我們的微信。

大江狗

2018.10.4