52.[Python]使用threading進行多執行緒程式設計
基礎知識
使用Python進行多程序程式設計歡迎參考我的前一篇部落格,本文介紹的threading模組在我的github專案上有對應的原始碼,歡迎閱讀時參考。
程序是資源的擁有者,程序的建立、切換和銷燬代價較高,所以引入了輕量程序——執行緒。
執行緒
現代作業系統中,程序不是一個可執行的實體,執行緒才是獨立執行和獨立排程的基本單位。程序是執行緒的容器,每個程序中至少包含一個執行緒。執行緒本身只擁有較少的在執行中必不可少的資源,與同屬一個程序的其它執行緒共享程序所擁有的全部資源(執行緒與資源分配無關)。由於執行緒比較“輕”,所以執行緒的建立、切換和銷燬的代價小,效率更高。同一程序中多個執行緒可以併發執行,互相約束,執行緒也有“執行”、“就緒”、“阻塞”三種狀態。
執行緒與CPU的關係
Linux系統可以使用cat /proc/cpuinfo
檢視CPU資訊。物理CPU的個數等於伺服器主機板上插入的CPU個數(physical id的去重個數)。通常物理CPU內部會包含幾個擁有獨立硬體單元的物理核心(cpu cores),每個物理核心擁有編號core id。核心讀取CPU資訊時無法知曉CPU的物理結構,只關注CPU對外暴露的邏輯核個數(siblings),對於採用了超執行緒的CPU,邏輯核的個數一般是物理核數的2倍,如果沒有采用超執行緒技術,邏輯核的個數與物理核個數相等。
由於執行緒是程式執行的最小單位,所以每個CPU邏輯核排程的是執行緒。比如一臺CPU擁有8個邏輯核的計算機,開機後可能會同時執行幾十個程序,每個程序包含不同數量的執行緒,看起來像是在同時執行,其實內部在進行著高速的程序及執行緒的上下文切換。
IO密集與CPU密集
如果一個系統的處理速度受到CPU速度的制約,稱該系統是CPU密集的。
如果一個系統的處理速度受到IO(硬碟讀寫、網路讀寫)速度的制約,稱該系統是IO密集的。
如果一個系統的處理速度受到記憶體讀寫速度的制約,稱該系統是記憶體密集的。
如果一個系統的處理速度受到快取讀寫速度的制約,稱該系統是快取密集的。
一般來說,上述四種系統的處理速度從慢到快依次為:IO密集型 <
記憶體密集型<
快取密集型<
CPU密集型。可以類比主機板結構推算,CPU與南橋的網絡卡、鍵盤等裝置通訊最慢,與北橋的記憶體通訊略快,與快取通訊更快,依賴自身運算最快。
GIL
我們常聽說Python的GIL(
如果要避開GIL問題,可以換用JPython、IronPython等沒有GIL限制的直譯器,使用這種小眾直譯器意味著放棄成熟的CPython社群其他大量的庫,所以需要使用者自己權衡。
對於IO密集型任務,使用Python的threading包也可以完成任務,因為等待IO時間遠比程序的切換更浪費時間。對於CPU密集型任務,如果真的追求極致,放棄Python採用執行效率更高的語言(比如C)或許更好。
threading
建立新執行緒並執行的程式碼如下:
# -*- encoding: utf-8 -*-
# Written by CSDN: Mars Loo的部落格
from threading import Thread
import time
def func():
print "Child thread start, %s" % time.ctime()
time.sleep(2)
print "Child thread end, %s" % time.ctime()
if __name__ == "__main__":
print "Parent thread start, %s" % time.ctime()
p = Thread(target=func)
# 開始執行子執行緒
p.start()
# 等待子執行緒結束
p.join()
time.sleep(1)
print "Parent thread end, %s" % time.ctime()
上述程式碼的輸出為:
Parent thread start, Thu Oct 6 12:39:30 2016
Child thread start, Thu Oct 6 12:39:30 2016
Child thread end, Thu Oct 6 12:39:32 2016
Parent thread end, Thu Oct 6 12:39:33 2016
也可以建立守護執行緒、獲取執行緒名字:
# Written by CSDN: Mars Loo的部落格
import threading
import time
def func():
print "Child thread name:", threading.current_thread().name
print "Child thread start, %s" % time.ctime()
time.sleep(2)
print "Child thread end, %s" % time.ctime()
if __name__ == "__main__":
print "Parent thread name:", threading.current_thread().name
print "Parent thread start, %s" % time.ctime()
p = threading.Thread(target=func)
# 設定為守護執行緒
p.daemon = True
# 開始執行子執行緒
p.start()
time.sleep(1)
print "Parent thread end, %s" % time.ctime()
程式碼輸出如下:
Parent thread name: MainThread
Parent thread start, Thu Oct 6 12:50:57 2016
Child thread name: Thread-1
Child thread start, Thu Oct 6 12:50:57 2016
Parent thread end, Thu Oct 6 12:50:58 2016
Thread物件
Thread物件的初始化方法為Thread(group=None, target=None, name=None, args=(), kwargs={})
,各個引數的含義與在multiprocessing模組中一致,不再贅述(其實multiprocessing模組是按照threading模組設計的)。
Thread物件含有如下方法及屬性:
start()
:建立一個子執行緒並執行繫結物件,同一個執行緒物件不能呼叫兩次start
方法,否則會拋RuntimeError
錯誤。run()
:在子執行緒中執行繫結物件,通常在子類化Thread物件時重定義該方法(子類化時還應該重定義__init__()
方法)。join([timeout])
:等待子執行緒結束或超時時間到(如果提供了timeout
引數的話)。name
屬性:返回執行緒名字。is_alive()
:判斷執行緒是否在執行中。daemon
屬性:設定或獲取執行緒的守護程序屬性(需要在呼叫start()
方法之前設定)。
互斥鎖與死鎖
不同程序由於有不同的地址空間,所以記憶體是不能共享的,不過同一個程序內的多個執行緒之間是可以共享程序內的記憶體的,但是在訪問時要加鎖:
# -*- encoding:utf-8 -*-
# Written by CSDN: Mars Loo的部落格
import threading
import time
counter = 0
class MyThread(threading.Thread):
def __init__(self, lock):
super(MyThread, self).__init__()
self.lock = lock
def run(self):
global counter
self.lock.acquire()
counter += 1
self.lock.release()
if __name__ == "__main__":
thread_number = 1000000
threads = []
mutex = threading.Lock()
st = time.time()
for i in range(thread_number):
t = MyThread(mutex)
threads.append(t)
t.start()
for thread in threads:
thread.join()
et = time.time()
print "Counter in Main thread:", counter
print "Time used:", et-st
上述程式碼在我的機器上執行結果如下:
Counter in Main thread: 1000000
Time used: 227.931274891
考慮一種場景:執行緒A和B共享資源x和y,執行緒A擁有資源x後等待資源y,執行緒B擁有資源y後等待資源x,如果沒有外力作用,系統會永遠掛在這一狀態無法繼續執行,這種現象稱為死鎖。比如:
# -*- encoding:utf-8 -*-
# Written by CSDN: Mars Loo的部落格
import threading
import time
counterA = 0
counterB = 0
lockA = threading.Lock()
lockB = threading.Lock()
def funca():
global lockA, lockB
while True:
lockA.acquire()
print "func a acquire lockA"
lockB.acquire()
print "func a acquire lockB"
lockB.release()
print "func a release lockB"
lockA.release()
print "func a release lockA"
def funcb():
global lockA, lockB
while True:
lockB.acquire()
print "func b acquire lockB"
lockA.acquire()
print "func b acquire lockA"
lockA.release()
print "func b release lockA"
lockB.release()
print "func b release lockB"
if __name__ == "__main__":
ta = threading.Thread(target=funca)
tb = threading.Thread(target=funcb)
ta.start()
tb.start()
ta.join()
tb.join()
大神迪傑斯特拉發明的銀行家演算法可以避免死鎖。
可重入鎖
可重入鎖(reentrant lock)用來避免這樣一種簡單的死鎖場景:已經獲得鎖的執行緒再次請求相同的鎖,比如:
# Written by CSDN: Mars Loo的部落格
import threading
counterA = 0
lock = threading.RLock()
# 如果使用普通互斥鎖,或造成死鎖
# lock = threading.Lock()
def func():
global lock
lock.acquire()
print "I acquire lock-1"
lock.acquire()
print "I acquire lock-2"
lock.release()
print "I release lock-2"
lock.release()
print "I release lock-1"
if __name__ == "__main__":
t = threading.Thread(target=func)
t.start()
t.join()
可重入鎖對於鎖的擁有者會建立一個計數器,擁有者acquire
了n次鎖就必須release
鎖n次,這樣該鎖才能被其他執行緒重新acquire
。
條件物件
threading.Condition條件物件是一種特殊的鎖,用來完成複雜條件的鎖。以常見的“生產者-消費者”模型為例,生產者在生產時會判斷產量是否超過了10,如果超過,將不再生產,消費者在消費時,會判斷產品數量是否小於3,如果小於3,將不再消耗:
# -*- encoding: utf-8 -*-
# Written by CSDN: Mars Loo的部落格
import threading
products = 0
c = threading.Condition()
class Producer(threading.Thread):
def __init__(self):
super(Producer, self).__init__()
def run(self):
global products, c
while True:
# 獲取條件物件鎖
c.acquire()
if products >= 10:
print "Enough products.."
# 進入等待池等待重新獲取鎖,但是不自動釋放已經獲得的鎖
c.wait()
else:
products += 1
print "Produce 1 product, now we have %d product(s)." % products
# 在等待池內選一個物件通知其重新獲取鎖
# 但是不自動釋放已經獲得的鎖
c.notify()
# 主動釋放條件物件鎖
c.release()
class Consumer(threading.Thread):
def __init__(self):
super(Consumer, self).__init__()
def run(self):
global products, c
while True:
c.acquire()
if products <= 2:
print "Low inventory.."
c.wait()
else:
products -= 1
print "Consum 1 product, now we have %d product(s)." % products
c.notify()
c.release()
if __name__ == "__main__":
producer_number = 4
consumer_number = 5
producers = [Producer() for i in xrange(producer_number)]
consumers = [Consumer() for i in xrange(consumer_number)]
for producer in producers:
producer.start()
for consumer in consumers:
consumer.start()
預設情況下,Condition物件底層使用RLock物件,如果宣告c = threading.Condition(threading.Lock())
的條件物件,其底層將會使用普通互斥鎖。
Condition物件還有一個notify_all()
方法,在覺得有必要通知等待池中所有執行緒準備獲取Condition物件時,可以使用這個方法。
訊號量
threading.Semaphore物件可以看做特殊的鎖,比如某個飯店只有10張桌子,則只能同時有10桌客人吃飯,其他客人只能等其中一桌客人吃完飯離開後再上桌吃飯:
# -*- encoding: utf-8 -*-
# Written by CSDN: Mars Loo的部落格
import threading
import time
import random
s = threading.Semaphore(3)
def func():
while True:
s.acquire()
name = threading.current_thread().name
st = random.choice([0.1, 0.2, 0.3])
print "%s will sleep %f seconds.." % (name, st)
time.sleep(st)
s.release()
if __name__ == "__main__":
user_number = 6
users = [threading.Thread(target=func) for i in xrange(user_number)]
for user in users:
user.start()
訊號量常用於訪問容量控制場景,比如資料庫連線限制、HTTP連線限制。
事件物件
事件物件threading.Event用於執行緒間通訊的場景,Event物件內部有表示True
/False
的標誌位(初始狀態為False
),呼叫其set()
方法會將標記為置為True
,clear()
方法會將標記置為False
,is_set()
方法用於判斷標記是否被置為True
。block([timeout])
方法會阻塞執行緒直到標識被置為True
,如果設定了timeout
引數,則最長等待timeout
時間後停止阻塞。
假設一種場景,服務A, B, C依賴服務X,服務X發生重啟比較耗時,其他服務需要等到其重新啟動完成的訊息後才可以開始工作:
# -*- encoding: utf-8 -*-
# Written by CSDN: Mars Loo的部落格
import threading
import time
e = threading.Event()
def funcx():
print "%s X is restarting ..." % time.ctime()
time.sleep(3)
# 底層被依賴執行緒重啟3秒後置位為True
e.set()
print "%s X restarted" % time.ctime()
def func():
# 迴圈檢測,直到底層服務重啟完成
while not e.is_set():
print "%s waiting for X ..." % time.ctime()
# 如果底層服務未重啟完成,阻塞最多等待1秒
e.wait(1)
# 底層服務重啟完成後開始工作
print "%s %s start working ..." % (time.ctime(),
threading.current_thread().name)
if __name__ == "__main__":
x = threading.Thread(target=funcx)
abc = [threading.Thread(target=func) for i in xrange(3)]
x.start()
for t in abc:
t.start()
x.join()
for t in abc:
t.join()
print "Existing..."
# 退出前將Event置位為False
e.clear()
定時器
定時器threading.Timer物件用來定時執行某個執行緒,比如:
# Written by CSDN: Mars Loo的部落格
import threading
import time
def func():
print "%s I started" % time.ctime()
if __name__ == "__main__":
print "%s Program started" % time.ctime()
t = threading.Timer(3.0, func)
t.start()
上述程式碼輸出為:
Thu Oct 6 18:27:02 2016 Program started
Thu Oct 6 18:27:05 2016 I started
如果定時器在等待階段被cancel
,則其繫結的函式也不會被執行:
# Written by CSDN: Mars Loo的部落格
import threading
import time
def func():
print "%s I started" % time.ctime()
if __name__ == "__main__":
print "%s Program started" % time.ctime()
t = threading.Timer(3.0, func)
t.start()
time.sleep(2)
t.cancel()
如果覺得我的文章對您有幫助,歡迎關注我(CSDN:Mars Loo的部落格)或者為這篇文章點贊,謝謝!