python並行程式設計 - 執行緒篇
目錄1
基本使用
python執行緒使用的兩個模組為: 、 _thread
(不推薦再使用)threading
(檢視threading的原始碼可以發現,threading實際是對_thread進一步的封裝,官方將其稱為 Low-level threading API,下面簡單嘗試使用_thread)
呼叫start_new_thread()函式生成新執行緒
函式宣告:_thread.start_new_thread(function, args[, kwargs])
function: 子執行緒所執行的函式
args: 傳遞的引數,引數型別必須是元組
kwargs:可選引數
示例:
#!usr/bin/env python
#coding=utf-8
import _thread
import time
def func(t_name):
time.sleep(1)
print(t_name, 'end')
_thread.start_new_thread(func, ('my_thread_1',)) # 傳遞的引數必須是元組型別
print('main thread end')
time.sleep(2) # 暫停一下等待子執行緒,避免主執行緒結束後直接退出,看不到子執行緒的輸出
輸出
main thread end
my_thread_1 end
更多:_thread — Low-level threading API
threading模組
需要 import threading
threading模組提供了比_thread模組更加高級別的方法,如下:
- threading.active_count(): 返回當前執行的執行緒數量
- threading.current_thread(): 返回當前執行的執行緒物件
- threading.get_ident(): 返回當前執行的執行緒標識碼
- threading.enumerate
(): 獲取運作著的執行緒物件的列表(包含設定了daemon屬性的後臺執行緒)- threading.main_thread(): 獲取主執行緒物件
threading模組包含Thread類來處理執行緒
函式宣告:class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
(group: 官方預留的引數)
target: 子執行緒要執行的函式
name: 給子執行緒命名
args: 傳遞引數到要執行的函式中 (型別為元組)
daemon: 將執行緒設定為後臺執行緒2
Thread類包含的方法:
- start(): 開始執行緒,它會安排在單獨的控制執行緒中使該物件的
run()
方法被呼叫 (invoked)3 (如果多次呼叫,會raiseRuntimeError
)- run(): 你可以在子類中重寫這個方法,標準的
run()
方法會在構造器傳遞了target
引數後呼叫它- join(timeout=None): 阻塞當前執行緒,直到等待呼叫了
join()
的執行緒結束,或到達設定的超時timeout
的引數為止 (如果嘗試加入當前執行緒4,因為會發生死鎖,join()
會raiseRuntimeError
。線上程啟動前呼叫join()
也會報相同的錯誤)- name: 執行緒名
- ident: 執行緒標識碼 (如果執行緒未
start()
,則為None
。實測執行緒結束後,ident值還存在)- is_alive(): 判斷執行緒是否在執行
- daemon: 是否為後臺執行緒的屬性值
- isDaemon(): 判斷是否為後臺執行緒
更多:threading — Thread-based parallelism
函式形式
使用threading.Thread類例項化物件,再呼叫start()
方法執行
示例:
#!usr/bin/env python
#coding=utf-8
import threading
import time
def func():
print(threading.current_thread().name, ' start')
time.sleep(1)
print(threading.current_thread().name, ' end')
t1 = threading.Thread(target=func) # 建立執行緒
t2 = threading.Thread(target=func)
t1.start() # 開始執行緒
t2.start()
# t1.join() # 等待該執行緒結束後,再往下執行
# t2.join()
print('main thread end') # 使用threading模組Thread類的執行緒,程式需要等待全部執行緒執行完後才退出
輸出
Thread-1 start
Thread-2 start
main thread end
Thread-1 end
Thread-2 end
繼承類的形式
通過繼承threading.Thread類,可以重寫run()
方法,再例項化該類,呼叫start()
方法執行
(繼承Thread類,並不是非要重寫run()
)
示例:
#!usr/bin/env python
#coding=utf-8
import threading
import time
class MyThread(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name
def run(self):
print(self.name + ' start')
time.sleep(1)
print(self.name + ' end')
t1 = MyThread('thread1')
t2 = MyThread('thread2')
t1.start()
t2.start()
# t1.join()
# t2.join()
print('main thread end')
輸出
thread1 start
thread2 start
main thread end
thread1 end
thread2 end
執行緒同步
當屬於併發執行緒的多個操作嘗試訪問共享記憶體,並且至少有一個操作能修改資料的狀態時,這時如果沒有恰當的同步機制,可能會發生意外情況或bug。使用鎖
可以解決此問題。
當一個執行緒想要訪問共享記憶體的某一部分割槽域時,它必須再使用前獲取到該部分的鎖。並在操作完後,要釋放掉之前獲取到的鎖。
注意! 要避免死鎖
5的情況發生
使用Lock實現執行緒同步
使用threading.Lock()例項化Lock鎖物件
在共享資源操作的部分,呼叫Lock的方法acquire()
獲取鎖
結束操作後,呼叫Lock的方法release()
釋放鎖,以便於其它執行緒使用該資源
(函式宣告:
acquire(blocking=True, timeout=-1)
獲取鎖,並阻塞其它執行緒訪問這部分資源
blocking6: 設定為False的執行緒不會被阻塞 (並且timeout
設定為預設值-1時,失去同步效果。設定為非-1值時,被設定為True的執行緒阻塞,則False立即返回。這2種情況都會提示錯誤資訊)
timeout: 設定等待的超時值,-1為無限等待,超時後無視阻塞
返回值為True
成功獲取鎖定,False
反之(例如超時到期)
release()
在未鎖的資源上呼叫釋放鎖方法,會引發RuntimeError
)
示例:
#!usr/bin/env python
#coding=utf-8
import threading
import time
lock = threading.Lock() # 建立Lock鎖
num = 0 # 累加這個變數,觀察不同步的情況出現
def func():
lock.acquire() # 獲取鎖
global num
for i in range(1000000): # 如果未出現不同步,是由於運算太快,加大迴圈值
num += 1
lock.release() # 釋放鎖
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t1.start()
t2.start()
t1.join()
t2.join()
print(num)
有鎖情況下,輸出
2000000
無鎖情況下,不同步,輸出
第一次輸出
1253312
第二次輸出
1227567
第三次輸出
1309097
注! 書中並不提倡使用鎖來解決,因為可能會導致死鎖情況發生,也會對程式碼的可讀性產生影響,除錯困難
使用RLock實現執行緒同步
可重入鎖(reentrant lock)
操作方式同Lock鎖
與Lock的區別:RLock在 同一個執行緒中可以多次acquire()
獲取鎖而不發生阻塞 (這是為了解決一些特殊場景的使用)
# 部分程式碼
lock = threading.RLock() # 建立RLock鎖
def func():
lock.acquire() # 獲取鎖
lock.acquire()
global num
for i in range(1000000):
num += 1
lock.release() # 釋放鎖
lock.release()
注意! acquire()需要成對使用
使用訊號量實現執行緒同步
訊號量的提出,首次用在作業系統中。它是一個作業系統管理的抽象資料型別,用於同步多個執行緒對共享資源與資料的訪問
(本質上,訊號量是由一個內部變數構成的,它標識出了對其所關聯的資源的併發訪問量)
使用threading.Semaphore()建立物件
線上程模組中,訊號量的操作基於acquire()
與release()
當一個執行緒想使用一個資源,它需要呼叫acquire()
,會判斷訊號量內部變數值_value,如果為0則阻塞執行緒,並且進行timeout超時處理,如果_value不為0,執行緒執行 (由於訊號量的初始值為非負數,故設計中不存在負數情況的程式碼)
當一個執行緒使用完一個資源後,它需要呼叫release()
,該操作會增加訊號量的內部變數值,並通知等待的執行緒
( 注! 書中的描述和threading模組的原始碼不符,重新按原始碼的理解寫)
注意! acquire()
和release()
並不需要放在某段程式碼的前後,來鎖住某段資源
示例:
(由於書中給的示例程式碼,感覺很符合理解訊號量的特點,這裡也採用生產者和消費者的關係編寫程式碼)
#!usr/bin/env python
#coding=utf-8
import threading
import time
import random
# 可選引數為內部變數_value賦初值,預設為1
# 如果賦的值小於0,會raise ValueError異常
sem = threading.Semaphore(0)
def producer():
global item
time.sleep(1)
item = random.randint(0, 1000)
print('producer: produced', item)
sem.release() # 釋放訊號量,將內部_value加1,並通知其它等待的執行緒
def consumer():
print('consumer is waiting')
sem.acquire() # 獲取訊號量,值等於0則阻塞執行緒,否則內部_value減1,並繼續執行
print('consumer: sonsumed', item)
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
輸出
consumer is waiting
producer: produced 295
consumer: sonsumed 295
分析:多執行幾次,邏輯上可以發現消費者總需要等待生產者生產出產品後,才能消費
可以看出訊號量很適合這樣的場景,下面可以測試,生產者可以多生產幾個,消費者再消費
>>> import threading
>>> sem = threading.Semaphore()
>>> sem.acquire() # 獲取初始化的訊號量
True
>>> sem.release() # 訊號量+1
>>> sem.release() # 訊號量+1
>>> sem.release() # 訊號量+1
>>> sem.acquire() # 訊號量-1
True
>>> sem.acquire() # 訊號量-1
True
>>> sem.acquire() # 訊號量-1
True
>>> sem.acquire() # 由於訊號量=0
# 所以陷入了阻塞
使用條件實現執行緒同步
使用threading.Condition()建立物件
檢視Condition的原始碼,發現可以傳入一個鎖作為初始化引數。如果不傳,預設會賦值RLock
鎖來進行後續的鎖的操作 (acquire()
、release()
)
# Condition類的初始化部分原始碼
def __init__(self, lock=None):
if lock is None:
lock = RLock()
self._lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
...
示例:
(下例還是使用生產者和消費者的關係編寫示例程式碼,用items作為儲存容器,以滿了(10個)就不能再生產作為條件,以沒了(0個)就不能再消費作為條件)
#!usr/bin/env python
#coding=utf-8
import threading
import time
condition = threading.Condition() # 建立條件
items = [] # 作為產品的儲存容器,設達到10個為滿了,就不能再生產了
def producer():
global items
condition.acquire() # 獲取鎖
if len(items) == 10:
print('producer: stop produce')
condition.wait() # 等待(items達到10,等待消費者消費)
items.append('')
print('producer: produced', len(items))
condition.notify() # 通知等待的執行緒
condition.release() # 釋放鎖
def consumer():
global items
condition.acquire() # 獲取鎖
if len(items) == 0:
print('consumer: waiting')
condition.wait() # 等待(items為0,等待生產者生產)
items.pop() # 注! 註釋掉這行,會發現,等待wait()在接收到通知notify()後,並沒有再次判斷條件,直接就接著運行了
print('consumer: sonsumed', len(items))
condition.notify() # 通知等待的執行緒
condition.release() # 釋放鎖
# producer_loop()和consumer_loop()用來多次迴圈執行,為了達到items為0或10的情況
def producer_loop():
for i in range(20):
time.sleep(1)
producer()
def consumer_loop():
for i in range(20):
time.sleep(4)
consumer()
t1 = threading.Thread(target=producer_loop)
t2 = threading.Thread(target=consumer_loop)
t1.start()
t2.start()
輸出
producer: produced 1
producer: produced 2
producer: produced 3
consumer: sonsumed 2
producer: produced 3
producer: produced 4
producer: produced 5
producer: produced 6
consumer: sonsumed 5
producer: produced 6
producer: produced 7
producer: produced 8
producer: produced 9
consumer: sonsumed 8
producer: produced 9
producer: produced 10
producer: stop produce
consumer: sonsumed 9
producer: produced 10
producer: stop produce
consumer: sonsumed 9
producer: produced 10
producer: stop produce
consumer: sonsumed 9
producer: produced 10
producer: stop produce
consumer: sonsumed 9
producer: produced 10
producer: stop produce
consumer: sonsumed 9
producer: produced 10
producer: stop produce
consumer: sonsumed 9
producer: produced 10
producer: stop produce
consumer: sonsumed 9
producer: produced 10
consumer: sonsumed 9
consumer: sonsumed 8
consumer: sonsumed 7
consumer: sonsumed 6
consumer: sonsumed 5
consumer: sonsumed 4
consumer: sonsumed 3
consumer: sonsumed 2
consumer: sonsumed 1
consumer: sonsumed 0
分析:觀察可以發現,生產者生產滿了10個就會進入等待wait()
,直到消費者通知notify()
為了觀察消費者消費到0個的情況,可以將生產者和消費者迴圈的等待時間做調整
使用事件實現執行緒同步
檢視Event類的原始碼,發現內部使用的是條件Condition
類實現,並傳入了Lock
鎖。事件通過對內部的標誌_flag進行管理來實現執行緒同步
使用set()
方法可以將標誌設為True
使用clear()
方法將其重置為False
使用wait()
方法阻塞執行緒
# Event類的初始化原始碼
def __init__(self):
self._cond = Condition(Lock())
self._flag = False
注意! 並不存在set()
和clear()
放在某段程式碼的前後,來鎖住某段資源
示例:
(還是採用生產者和消費者的關係編寫,生產者做完工作,呼叫set()
設定_flag
標誌,並通知wait()
等待的消費者執行緒執行;再使用clear()
清除_flag
標誌,以便後面的消費者能正確的進入等待。這個過程類似於事件的觸發)
#!usr/bin/env python
#coding=utf-8
import threading
import time
import random
event = threading.Event() # 建立事件
items = [1,2,3] # 設定個初值,便於後面註釋event.clear()後的測試(可以發現,消費者不等待了)
def producer():
global items
print('producer: start')
items.append(random.randint(0,100))
event.set() # 將內部標誌_flag設為True,並通知所有等待的執行緒(類似於觸發事件)
print('producer: notify')
event.clear() # 將內部標誌_flag設為False(只有清除了_flag,消費者下一次的wait()操作才會正常進入等待)(clear()操作也可以交給消費者呼叫,不過為了簡化消費者的操作,讓消費者只需要等待通知即可)
print('producer: end')
def consumer():
global items
print('consumer: waiting')
event.wait() # 等待(等待生產者通知,根據_flag標誌判斷是否進入等待)
print('consumer:', items.pop())
# producer_loop()和consumer_loop()用來多次迴圈執行
def producer_loop():
for i in range(3):
time.sleep(1)
producer()
def consumer_loop():
while True: # 消費者有wait()等待,就不用執行緒休眠了,以免錯過生產者的set()通知
consumer()
t1 = threading.Thread(target=producer_loop)
t2 = threading.Thread(target=consumer_loop)
t1.start()
t2.start()
輸出
consumer: waiting
producer: start
producer: notify
consumer: 57
consumer: waiting
producer: end
producer: start
producer: notify
consumer: 10
producer: end
consumer: waiting
producer: start
producer: notify
consumer: 24
producer: end
consumer: waiting
如果註釋掉程式碼中的# event.clear()
一行,會出現如下輸出
consumer: waiting
producer: start
producer: notify
consumer: 19
producer: end
consumer: waiting
consumer: 3
consumer: waiting
consumer: 2
consumer: waiting
consumer: 1
consumer: waiting
Exception in thread Thread-2:
Traceback (most recent call last):
File “D:\app\Python\Python37\lib\threading.py”, line 917, in _bootstrap_inner
self.run()
File “D:\app\Python\Python37\lib\threading.py”, line 865, in run
self._target(*self._args, **self._kwargs)
File “g:/tmp/code.py”, line 35, in consumer_loop
consumer()
File “g:/tmp/code.py”, line 24, in consumer
print(‘consumer:’, items.pop())
IndexError: pop from empty listproducer: start
producer: notify
producer: end
producer: start
producer: notify
producer: end
分析:會發現,消費者沒有等待(打印出的consumer: waiting,只是給自己的提示,實際沒有等待),直接進行了列表的pop()
操作,直到後面列表為空再彈出時報錯為止
附:其它
使用with語句1
(由於沒有理解部分,這一部分基本就是書中原文)
with: 是Python 2.5中引入的。當有兩個相關的操作需要對一個程式碼塊承兌執行時,with語句的作用就彰顯出來了。它可以再自動精確的分配或釋放資源 (因此也被稱為上下文管理器)。如執行緒模組中,使用到acquire()
和release()
方法的地方,都可以採用with
語句塊,如下:
- Lock
- RLock
- 條件
- 訊號量 (感覺這用了
with
就不太靈活了)
示例:
(會測試上述列表中的with操作)
#!usr/bin/env python
#coding=utf-8
import threading
def func_with(statement):
with statement: # 會自動進行acquire()和release()
print('//todo1')
def func_not_with(statement):
statement.acquire()
try: # 為了避免出現異常,導致沒有release()釋放
print('//todo2')
finally:
statement.release()
lock = threading.Lock()
rlock = threading.RLock()
condition = threading.Condition()
sem = threading.Semaphore(1) # 採用with,需要初始至少有一個訊號量值(因為需要先acquire())
li = [lock, rlock, condition, sem]
for statement in li:
t1 = threading.Thread(target=func_with, args=(statement,))
t2 = threading.Thread(target=func_not_with, args=(statement,))
t1.start()
t2.start()
使用佇列實現執行緒通訊
雖然python執行緒模組提供了很多同步原語 (鎖
、訊號量
、條件
、事件
),但有時候,在使用場景中,可能採用佇列模組會是個最佳選擇。它使得執行緒程式設計變得更加容易和安全
使用佇列Queue
的方式處理,儘管它不屬於threading
模組,但檢視其原始碼,發現佇列的功能實現有用到threading
模組
# Queue的初始化原始碼
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
# mutex must be held whenever the queue is mutating. All methods
# that acquire mutex must release it before returning. mutex
# is shared between the three conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
self.mutex = threading.Lock()
# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
self.not_empty = threading.Condition(self.mutex)
# Notify not_full whenever an item is removed from the queue;
# a thread waiting to put is notified then.
self.not_full = threading.Condition(self.mutex)
# Notify all_tasks_done whenever the number of unfinished tasks
# drops to zero; thread waiting to join() is notified to resume
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
Queue
會用到如下方法:
- put(): 新增一個專案到佇列
- get(): 從佇列中取出一個專案
- task_done(): 每處理完一個專案,需要呼叫該方法
- join(): 阻塞執行緒,等待全部的任務完成
附:檢視原始碼,分析可得Queue內部的操作是這樣的:(這段可以不用看)
- 呼叫
put()
-> with佇列滿(條件鎖not_full
) -> 能阻塞?(引數block) & 能超時?(引數timeout) & 需要等待?(條件鎖not_full.wait()
) -> 新增資料(內部方法_put()
) -> 任務計數加1(內部計數變數unfinished_tasks += 1
) -> 發起佇列非空的通知(條件鎖not_empty.notify()
)- 呼叫
get()
-> with佇列空(條件鎖not_empty
) -> 能阻塞?(引數block) & 能超時?(引數timeout) & 需要等待?(條件鎖not_empty.wait()
) -> 取出資料(內部方法_get()
) -> 發起佇列非滿的通知(條件鎖not_full.notify()
) ( 注意!get()
與put()
操作相比並沒有對任務計數操作,需要通過後面task_done()
完成任務方法來減少任務數)- *呼叫
task_done()
-> with任務完成(條件鎖all_tasks_done
) -> 判斷全部任務完成了?是的話發起通知(條件鎖all_tasks_done.notify_all()
) -> 任務計數減1(內部計數變數unfinished_tasks -= 1
) *- 呼叫
join()
-> with任務完成(條件鎖all_tasks_done
) -> 迴圈未完成的任務計數變數(內部計數變數unfinished_tasks
) -> 還有沒完成的任務,等待(條件鎖all_tasks_done.wait()
| 全部完成,退出迴圈,解除執行緒阻塞)
示例:
#!usr/bin/env python
#coding=utf-8
import threading
from queue import Queue
import time
import random
queue = Queue()
def producer():
for i in range(5):
item = random.randint(0, 100)
queue.put(item)
print('producer:', 'put', item)
time.sleep(1)
def consumer():
while True:
item = queue.get()
print('consumer:', 'get', item)
queue.task_done()
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
輸出
producer: put 47
consumer: get 47
producer: put 71
consumer: get 71
producer: put 99
consumer: get 99
producer: put 30
consumer: get 30
producer: put 75
consumer: get 75
後臺執行緒在主執行緒停止後就直接停止執行。他們資源(如開啟的檔案,資料庫事務等)可能不會被正確的釋放。如果你想要你的執行緒優美的停止,讓他們不要變為後臺和使用一個合適的訊號機制如事件
Event
↩︎官方文件中使用invoke一詞,我並沒有更好的翻譯,因為其它語言中invoke反射是一種技術手段,但Google的翻譯中,將此解釋為呼叫 (是我想多了) ↩︎
加入執行緒?不理解如何能加入執行緒,並且官方文件說會死鎖。實測,建立2個執行緒互相join(),雖然陷入死迴圈,但並沒丟擲錯誤 // TODO: 不知理解有偏差沒有 ↩︎
多個物件,互持對方所需資源的鎖,導致都無法訪問 ↩︎
鎖的acquire()方法的引數,有點難理解,文中所寫是結合官方文件和實測的結果描述所得。不過一般,我們都不用改變它的預設值 // TODO: 沒有從原始碼分析(找不到原始碼) ↩︎