1. 程式人生 > >52.[Python]使用threading進行多執行緒程式設計

52.[Python]使用threading進行多執行緒程式設計

基礎知識

使用Python進行多程序程式設計歡迎參考我的前一篇部落格,本文介紹的threading模組在我的github專案上有對應的原始碼,歡迎閱讀時參考。

程序是資源的擁有者,程序的建立、切換和銷燬代價較高,所以引入了輕量程序——執行緒。

執行緒

現代作業系統中,程序不是一個可執行的實體,執行緒才是獨立執行和獨立排程的基本單位。程序是執行緒的容器,每個程序中至少包含一個執行緒。執行緒本身只擁有較少的在執行中必不可少的資源,與同屬一個程序的其它執行緒共享程序所擁有的全部資源(執行緒與資源分配無關)。由於執行緒比較“輕”,所以執行緒的建立、切換和銷燬的代價小,效率更高。同一程序中多個執行緒可以併發執行,互相約束,執行緒也有“執行”、“就緒”、“阻塞”三種狀態。

執行緒與CPU的關係

Linux系統可以使用cat /proc/cpuinfo檢視CPU資訊。物理CPU的個數等於伺服器主機板上插入的CPU個數(physical id的去重個數)。通常物理CPU內部會包含幾個擁有獨立硬體單元的物理核心(cpu cores),每個物理核心擁有編號core id。核心讀取CPU資訊時無法知曉CPU的物理結構,只關注CPU對外暴露的邏輯核個數(siblings),對於採用了超執行緒的CPU,邏輯核的個數一般是物理核數的2倍,如果沒有采用超執行緒技術,邏輯核的個數與物理核個數相等。

由於執行緒是程式執行的最小單位,所以每個CPU邏輯核排程的是執行緒。比如一臺CPU擁有8個邏輯核的計算機,開機後可能會同時執行幾十個程序,每個程序包含不同數量的執行緒,看起來像是在同時執行,其實內部在進行著高速的程序及執行緒的上下文切換。

IO密集與CPU密集

如果一個系統的處理速度受到CPU速度的制約,稱該系統是CPU密集的。
如果一個系統的處理速度受到IO(硬碟讀寫、網路讀寫)速度的制約,稱該系統是IO密集的。
如果一個系統的處理速度受到記憶體讀寫速度的制約,稱該系統是記憶體密集的。
如果一個系統的處理速度受到快取讀寫速度的制約,稱該系統是快取密集的。
一般來說,上述四種系統的處理速度從慢到快依次為:IO密集型 <記憶體密集型<快取密集型<CPU密集型。可以類比主機板結構推算,CPU與南橋的網絡卡、鍵盤等裝置通訊最慢,與北橋的記憶體通訊略快,與快取通訊更快,依賴自身運算最快。

GIL

我們常聽說Python的GIL(

Global Interpreter Lock)問題,其實並不是Python語言本身的問題,而是CPython直譯器的問題。CPython直譯器加了一把全域性的鎖阻止了多執行緒的併發執行,這是CPython直譯器的歷史問題。由於CPython的流行程度以及大量的庫依賴GIL特性,使得這把鎖不得不繼續存在。

如果要避開GIL問題,可以換用JPython、IronPython等沒有GIL限制的直譯器,使用這種小眾直譯器意味著放棄成熟的CPython社群其他大量的庫,所以需要使用者自己權衡。

對於IO密集型任務,使用Python的threading包也可以完成任務,因為等待IO時間遠比程序的切換更浪費時間。對於CPU密集型任務,如果真的追求極致,放棄Python採用執行效率更高的語言(比如C)或許更好。

threading

建立新執行緒並執行的程式碼如下:

# -*- encoding: utf-8 -*-
# Written by CSDN: Mars Loo的部落格

from threading import Thread
import time

def func():
    print "Child thread start, %s" % time.ctime()
    time.sleep(2)
    print "Child thread end, %s" % time.ctime()


if __name__ == "__main__":
    print "Parent thread start, %s" % time.ctime()
    p = Thread(target=func)
    # 開始執行子執行緒
    p.start()
    # 等待子執行緒結束
    p.join()
    time.sleep(1)
    print "Parent thread end, %s" % time.ctime()

上述程式碼的輸出為:

Parent thread start, Thu Oct  6 12:39:30 2016
Child thread start, Thu Oct  6 12:39:30 2016
Child thread end, Thu Oct  6 12:39:32 2016
Parent thread end, Thu Oct  6 12:39:33 2016

也可以建立守護執行緒、獲取執行緒名字:

# Written by CSDN: Mars Loo的部落格

import threading
import time

def func():
    print "Child thread name:", threading.current_thread().name
    print "Child thread start, %s" % time.ctime()
    time.sleep(2)
    print "Child thread end, %s" % time.ctime()


if __name__ == "__main__":
    print "Parent thread name:", threading.current_thread().name
    print "Parent thread start, %s" % time.ctime()
    p = threading.Thread(target=func)
    # 設定為守護執行緒
    p.daemon = True
    # 開始執行子執行緒
    p.start()
    time.sleep(1)
    print "Parent thread end, %s" % time.ctime()

程式碼輸出如下:

Parent thread name: MainThread
Parent thread start, Thu Oct  6 12:50:57 2016
Child thread name: Thread-1
Child thread start, Thu Oct  6 12:50:57 2016
Parent thread end, Thu Oct  6 12:50:58 2016

Thread物件

Thread物件的初始化方法為Thread(group=None, target=None, name=None, args=(), kwargs={}),各個引數的含義與在multiprocessing模組中一致,不再贅述(其實multiprocessing模組是按照threading模組設計的)。

Thread物件含有如下方法及屬性:

  • start():建立一個子執行緒並執行繫結物件,同一個執行緒物件不能呼叫兩次start方法,否則會拋RuntimeError錯誤。
  • run():在子執行緒中執行繫結物件,通常在子類化Thread物件時重定義該方法(子類化時還應該重定義__init__()方法)。
  • join([timeout]):等待子執行緒結束或超時時間到(如果提供了timeout引數的話)。
  • name屬性:返回執行緒名字。
  • is_alive():判斷執行緒是否在執行中。
  • daemon屬性:設定或獲取執行緒的守護程序屬性(需要在呼叫start()方法之前設定)。

互斥鎖與死鎖

不同程序由於有不同的地址空間,所以記憶體是不能共享的,不過同一個程序內的多個執行緒之間是可以共享程序內的記憶體的,但是在訪問時要加鎖:

# -*- encoding:utf-8 -*-
# Written by CSDN: Mars Loo的部落格

import threading
import time

counter = 0

class MyThread(threading.Thread):
    def __init__(self, lock):
        super(MyThread, self).__init__()
        self.lock = lock

    def run(self):
        global counter
        self.lock.acquire()
        counter += 1
        self.lock.release()

if __name__ == "__main__":
    thread_number = 1000000
    threads = []
    mutex = threading.Lock()
    st = time.time()
    for i in range(thread_number):
        t = MyThread(mutex)
        threads.append(t)
        t.start()

    for thread in threads:
        thread.join()
    et = time.time()
    print "Counter in Main thread:", counter
    print "Time used:", et-st

上述程式碼在我的機器上執行結果如下:

Counter in Main thread: 1000000
Time used: 227.931274891

考慮一種場景:執行緒A和B共享資源x和y,執行緒A擁有資源x後等待資源y,執行緒B擁有資源y後等待資源x,如果沒有外力作用,系統會永遠掛在這一狀態無法繼續執行,這種現象稱為死鎖。比如:

# -*- encoding:utf-8 -*-
# Written by CSDN: Mars Loo的部落格

import threading
import time

counterA = 0
counterB = 0
lockA = threading.Lock()
lockB = threading.Lock()

def funca():
    global lockA, lockB
    while True:
        lockA.acquire()
        print "func a acquire lockA"
        lockB.acquire()
        print "func a acquire lockB"
        lockB.release()
        print "func a release lockB"
        lockA.release()
        print "func a release lockA"

def funcb():
    global lockA, lockB
    while True:
        lockB.acquire()
        print "func b acquire lockB"
        lockA.acquire()
        print "func b acquire lockA"
        lockA.release()
        print "func b release lockA"
        lockB.release()
        print "func b release lockB"

if __name__ == "__main__":
    ta = threading.Thread(target=funca)
    tb = threading.Thread(target=funcb)
    ta.start()
    tb.start()
    ta.join()
    tb.join()

大神迪傑斯特拉發明的銀行家演算法可以避免死鎖。

可重入鎖

可重入鎖(reentrant lock)用來避免這樣一種簡單的死鎖場景:已經獲得鎖的執行緒再次請求相同的鎖,比如:

# Written by CSDN: Mars Loo的部落格
import threading

counterA = 0

lock = threading.RLock()
# 如果使用普通互斥鎖,或造成死鎖
# lock = threading.Lock()

def func():
    global lock

    lock.acquire()
    print "I acquire lock-1"
    lock.acquire()
    print "I acquire lock-2"
    lock.release()
    print "I release lock-2"
    lock.release()
    print "I release lock-1"


if __name__ == "__main__":
    t = threading.Thread(target=func)
    t.start()
    t.join()

可重入鎖對於鎖的擁有者會建立一個計數器,擁有者acquire了n次鎖就必須release鎖n次,這樣該鎖才能被其他執行緒重新acquire

條件物件

threading.Condition條件物件是一種特殊的鎖,用來完成複雜條件的鎖。以常見的“生產者-消費者”模型為例,生產者在生產時會判斷產量是否超過了10,如果超過,將不再生產,消費者在消費時,會判斷產品數量是否小於3,如果小於3,將不再消耗:

# -*- encoding: utf-8 -*-
# Written by CSDN: Mars Loo的部落格

import threading

products = 0
c = threading.Condition()

class Producer(threading.Thread):
    def __init__(self):
        super(Producer, self).__init__()

    def run(self):
        global products, c
        while True:
            # 獲取條件物件鎖
            c.acquire()
            if products >= 10:
                print "Enough products.."
                # 進入等待池等待重新獲取鎖,但是不自動釋放已經獲得的鎖
                c.wait()
            else:
                products += 1
                print "Produce 1 product, now we have %d product(s)." % products
                # 在等待池內選一個物件通知其重新獲取鎖
                # 但是不自動釋放已經獲得的鎖
                c.notify()
            # 主動釋放條件物件鎖
            c.release()

class Consumer(threading.Thread):
    def __init__(self):
        super(Consumer, self).__init__()

    def run(self):
        global products, c
        while True:
            c.acquire()
            if products <= 2:
                print "Low inventory.."
                c.wait()
            else:
                products -= 1
                print "Consum 1 product, now we have %d product(s)." % products
                c.notify()
            c.release()

if __name__ == "__main__":
    producer_number = 4
    consumer_number = 5
    producers = [Producer() for i in xrange(producer_number)]
    consumers = [Consumer() for i in xrange(consumer_number)]
    for producer in producers:
        producer.start()
    for consumer in consumers:
        consumer.start()

預設情況下,Condition物件底層使用RLock物件,如果宣告c = threading.Condition(threading.Lock())的條件物件,其底層將會使用普通互斥鎖。

Condition物件還有一個notify_all()方法,在覺得有必要通知等待池中所有執行緒準備獲取Condition物件時,可以使用這個方法。

訊號量

threading.Semaphore物件可以看做特殊的鎖,比如某個飯店只有10張桌子,則只能同時有10桌客人吃飯,其他客人只能等其中一桌客人吃完飯離開後再上桌吃飯:

# -*- encoding: utf-8 -*-
# Written by CSDN: Mars Loo的部落格

import threading
import time
import random

s = threading.Semaphore(3)


def func():
    while True:
        s.acquire()
        name = threading.current_thread().name
        st = random.choice([0.1, 0.2, 0.3])
        print "%s will sleep %f seconds.." % (name, st)
        time.sleep(st)
        s.release()

if __name__ == "__main__":
    user_number = 6
    users = [threading.Thread(target=func) for i in xrange(user_number)]
    for user in users:
        user.start()

訊號量常用於訪問容量控制場景,比如資料庫連線限制、HTTP連線限制。

事件物件

事件物件threading.Event用於執行緒間通訊的場景,Event物件內部有表示True/False的標誌位(初始狀態為False),呼叫其set()方法會將標記為置為Trueclear()方法會將標記置為Falseis_set()方法用於判斷標記是否被置為Trueblock([timeout])方法會阻塞執行緒直到標識被置為True,如果設定了timeout引數,則最長等待timeout時間後停止阻塞。
假設一種場景,服務A, B, C依賴服務X,服務X發生重啟比較耗時,其他服務需要等到其重新啟動完成的訊息後才可以開始工作:

# -*- encoding: utf-8 -*-
# Written by CSDN: Mars Loo的部落格

import threading
import time

e = threading.Event()


def funcx():
    print "%s X is restarting ..." % time.ctime()
    time.sleep(3)
    # 底層被依賴執行緒重啟3秒後置位為True
    e.set()
    print "%s X restarted" % time.ctime()


def func():
    # 迴圈檢測,直到底層服務重啟完成
    while not e.is_set():
        print "%s waiting for X ..." % time.ctime()
        # 如果底層服務未重啟完成,阻塞最多等待1秒
        e.wait(1)
    # 底層服務重啟完成後開始工作
    print "%s %s start working ..." % (time.ctime(),
                                       threading.current_thread().name)


if __name__ == "__main__":
    x = threading.Thread(target=funcx)
    abc = [threading.Thread(target=func) for i in xrange(3)]
    x.start()
    for t in abc:
        t.start()
    x.join()
    for t in abc:
        t.join()
    print "Existing..."
    # 退出前將Event置位為False
    e.clear()

定時器

定時器threading.Timer物件用來定時執行某個執行緒,比如:

# Written by CSDN: Mars Loo的部落格

import threading
import time


def func():
    print "%s I started" % time.ctime()

if __name__ == "__main__":
    print "%s Program started" % time.ctime()
    t = threading.Timer(3.0, func)
    t.start()

上述程式碼輸出為:

Thu Oct  6 18:27:02 2016 Program started
Thu Oct  6 18:27:05 2016 I started

如果定時器在等待階段被cancel,則其繫結的函式也不會被執行:

# Written by CSDN: Mars Loo的部落格

import threading
import time


def func():
    print "%s I started" % time.ctime()

if __name__ == "__main__":
    print "%s Program started" % time.ctime()
    t = threading.Timer(3.0, func)
    t.start()
    time.sleep(2)
    t.cancel()

如果覺得我的文章對您有幫助,歡迎關注我(CSDN:Mars Loo的部落格)或者為這篇文章點贊,謝謝!