1. 程式人生 > >python並行程式設計 - 執行緒篇

python並行程式設計 - 執行緒篇

目錄1

介紹篇
執行緒篇
程序篇
非同步篇
GPU篇
分散式篇


基本使用

python執行緒使用的兩個模組為: _thread (不推薦再使用)threading
(檢視threading的原始碼可以發現,threading實際是對_thread進一步的封裝,官方將其稱為 Low-level threading API,下面簡單嘗試使用_thread)

呼叫start_new_thread()函式生成新執行緒
函式宣告:_thread.start_new_thread(function, args[, kwargs])
function: 子執行緒所執行的函式
args: 傳遞的引數,引數型別必須是元組


kwargs:可選引數

示例:

#!usr/bin/env python
#coding=utf-8

import _thread
import time

def func(t_name):
    time.sleep(1)
    print(t_name, 'end')

_thread.start_new_thread(func, ('my_thread_1',))    # 傳遞的引數必須是元組型別
print('main thread end')
time.sleep(2)   # 暫停一下等待子執行緒,避免主執行緒結束後直接退出,看不到子執行緒的輸出

輸出


main thread end
my_thread_1 end

更多_thread — Low-level threading API

threading模組

需要 import threading
threading模組提供了比_thread模組更加高級別的方法,如下:

  • threading.active_count(): 返回當前執行的執行緒數量
  • threading.current_thread(): 返回當前執行的執行緒物件
  • threading.get_ident(): 返回當前執行的執行緒標識碼
  • threading.enumerate
    (): 獲取運作著的執行緒物件的列表(包含設定了daemon屬性的後臺執行緒)
  • threading.main_thread(): 獲取主執行緒物件

threading模組包含Thread類來處理執行緒
函式宣告:class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
(group: 官方預留的引數)
target: 子執行緒要執行的函式
name: 給子執行緒命名
args: 傳遞引數到要執行的函式中 (型別為元組)
daemon: 將執行緒設定為後臺執行緒2

Thread類包含的方法:

  • start(): 開始執行緒,它會安排在單獨的控制執行緒中使該物件的run()方法被呼叫 (invoked)3 (如果多次呼叫,會raise RuntimeError
  • run(): 你可以在子類中重寫這個方法,標準的run()方法會在構造器傳遞了target引數後呼叫它
  • join(timeout=None): 阻塞當前執行緒,直到等待呼叫了join()的執行緒結束,或到達設定的超時timeout的引數為止 (如果嘗試加入當前執行緒4,因為會發生死鎖,join()會raise RuntimeError。線上程啟動前呼叫join()也會報相同的錯誤)
  • name: 執行緒名
  • ident: 執行緒標識碼 (如果執行緒未start(),則為None。實測執行緒結束後,ident值還存在)
  • is_alive(): 判斷執行緒是否在執行
  • daemon: 是否為後臺執行緒的屬性值
  • isDaemon(): 判斷是否為後臺執行緒

更多threading — Thread-based parallelism

函式形式

使用threading.Thread類例項化物件,再呼叫start()方法執行

示例:

#!usr/bin/env python
#coding=utf-8

import threading
import time

def func():
    print(threading.current_thread().name, ' start')
    time.sleep(1)
    print(threading.current_thread().name, ' end')

t1 = threading.Thread(target=func)   # 建立執行緒
t2 = threading.Thread(target=func)

t1.start()   # 開始執行緒
t2.start()

# t1.join()    # 等待該執行緒結束後,再往下執行
# t2.join()

print('main thread end')    # 使用threading模組Thread類的執行緒,程式需要等待全部執行緒執行完後才退出

輸出
Thread-1 start
Thread-2 start
main thread end
Thread-1 end
Thread-2 end

繼承類的形式

通過繼承threading.Thread類,可以重寫run()方法,再例項化該類,呼叫start()方法執行
(繼承Thread類,並不是非要重寫run()

示例:

#!usr/bin/env python
#coding=utf-8

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name
    
    def run(self):
        print(self.name + ' start')
        time.sleep(1)
        print(self.name + ' end')

t1 = MyThread('thread1')
t2 = MyThread('thread2')

t1.start()
t2.start()

# t1.join()
# t2.join()

print('main thread end')

輸出
thread1 start
thread2 start
main thread end
thread1 end
thread2 end


執行緒同步

當屬於併發執行緒的多個操作嘗試訪問共享記憶體,並且至少有一個操作能修改資料的狀態時,這時如果沒有恰當的同步機制,可能會發生意外情況或bug。使用可以解決此問題。
當一個執行緒想要訪問共享記憶體的某一部分割槽域時,它必須再使用前獲取到該部分的鎖。並在操作完後,要釋放掉之前獲取到的鎖。

注意! 要避免死鎖5的情況發生

使用Lock實現執行緒同步

使用threading.Lock()例項化Lock鎖物件
在共享資源操作的部分,呼叫Lock的方法acquire()獲取鎖
結束操作後,呼叫Lock的方法release()釋放鎖,以便於其它執行緒使用該資源

(函式宣告:
acquire(blocking=True, timeout=-1)
獲取鎖,並阻塞其它執行緒訪問這部分資源
blocking6: 設定為False的執行緒不會被阻塞 (並且timeout設定為預設值-1時,失去同步效果。設定為非-1值時,被設定為True的執行緒阻塞,則False立即返回。這2種情況都會提示錯誤資訊)
timeout: 設定等待的超時值,-1為無限等待,超時後無視阻塞
返回值為True成功獲取鎖定,False反之(例如超時到期)
release()
在未鎖的資源上呼叫釋放鎖方法,會引發RuntimeError

示例:

#!usr/bin/env python
#coding=utf-8

import threading
import time

lock = threading.Lock()     # 建立Lock鎖
num = 0     # 累加這個變數,觀察不同步的情況出現

def func():
    lock.acquire()  # 獲取鎖
    global num
    for i in range(1000000):    # 如果未出現不同步,是由於運算太快,加大迴圈值
        num += 1
    lock.release()  # 釋放鎖

t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)

t1.start()
t2.start()

t1.join()
t2.join()

print(num)

有鎖情況下,輸出
2000000

無鎖情況下,不同步,輸出
第一次輸出
1253312
第二次輸出
1227567
第三次輸出
1309097

注! 書中並不提倡使用鎖來解決,因為可能會導致死鎖情況發生,也會對程式碼的可讀性產生影響,除錯困難

使用RLock實現執行緒同步

可重入鎖(reentrant lock)
操作方式同Lock鎖

與Lock的區別:RLock在 同一個執行緒中可以多次acquire()獲取鎖而不發生阻塞 (這是為了解決一些特殊場景的使用)

# 部分程式碼
lock = threading.RLock()     # 建立RLock鎖

def func():
    lock.acquire()  # 獲取鎖
    lock.acquire()
    global num
    for i in range(1000000):
        num += 1
    lock.release()  # 釋放鎖
    lock.release()

注意! acquire()需要成對使用

使用訊號量實現執行緒同步

訊號量的提出,首次用在作業系統中。它是一個作業系統管理的抽象資料型別,用於同步多個執行緒對共享資源與資料的訪問
(本質上,訊號量是由一個內部變數構成的,它標識出了對其所關聯的資源的併發訪問量)

使用threading.Semaphore()建立物件
線上程模組中,訊號量的操作基於acquire()release()

當一個執行緒想使用一個資源,它需要呼叫acquire(),會判斷訊號量內部變數值_value,如果為0則阻塞執行緒,並且進行timeout超時處理,如果_value不為0,執行緒執行 (由於訊號量的初始值為非負數,故設計中不存在負數情況的程式碼)
當一個執行緒使用完一個資源後,它需要呼叫release(),該操作會增加訊號量的內部變數值,並通知等待的執行緒
注! 書中的描述和threading模組的原始碼不符,重新按原始碼的理解寫)

注意! acquire()release()並不需要放在某段程式碼的前後,來鎖住某段資源

示例:

(由於書中給的示例程式碼,感覺很符合理解訊號量的特點,這裡也採用生產者和消費者的關係編寫程式碼)

#!usr/bin/env python
#coding=utf-8

import threading
import time
import random

# 可選引數為內部變數_value賦初值,預設為1
# 如果賦的值小於0,會raise ValueError異常
sem = threading.Semaphore(0)

def producer():
    global item
    time.sleep(1)
    item = random.randint(0, 1000)
    print('producer: produced', item)
    sem.release()   # 釋放訊號量,將內部_value加1,並通知其它等待的執行緒

def consumer():
    print('consumer is waiting')
    sem.acquire()   # 獲取訊號量,值等於0則阻塞執行緒,否則內部_value減1,並繼續執行
    print('consumer: sonsumed', item)

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)

t1.start()
t2.start()

輸出
consumer is waiting
producer: produced 295
consumer: sonsumed 295

分析:多執行幾次,邏輯上可以發現消費者總需要等待生產者生產出產品後,才能消費

可以看出訊號量很適合這樣的場景,下面可以測試,生產者可以多生產幾個,消費者再消費

>>> import threading
>>> sem = threading.Semaphore()
>>> sem.acquire()    # 獲取初始化的訊號量
True
>>> sem.release()    # 訊號量+1
>>> sem.release()    # 訊號量+1
>>> sem.release()    # 訊號量+1
>>> sem.acquire()    # 訊號量-1
True
>>> sem.acquire()    # 訊號量-1
True
>>> sem.acquire()    # 訊號量-1
True
>>> sem.acquire()    # 由於訊號量=0
    # 所以陷入了阻塞

使用條件實現執行緒同步

使用threading.Condition()建立物件

檢視Condition的原始碼,發現可以傳入一個鎖作為初始化引數。如果不傳,預設會賦值RLock鎖來進行後續的鎖的操作 acquire()release()

# Condition類的初始化部分原始碼
    def __init__(self, lock=None):
        if lock is None:
            lock = RLock()
        self._lock = lock
        # Export the lock's acquire() and release() methods
        self.acquire = lock.acquire
        self.release = lock.release
        ...

示例:

(下例還是使用生產者和消費者的關係編寫示例程式碼,用items作為儲存容器,以滿了(10個)就不能再生產作為條件,以沒了(0個)就不能再消費作為條件)

#!usr/bin/env python
#coding=utf-8

import threading
import time

condition = threading.Condition()   # 建立條件
items = []      # 作為產品的儲存容器,設達到10個為滿了,就不能再生產了

def producer():
    global items
    condition.acquire()     # 獲取鎖
    if len(items) == 10:
        print('producer: stop produce')
        condition.wait()    # 等待(items達到10,等待消費者消費)
    items.append('')
    print('producer: produced', len(items))
    condition.notify()      # 通知等待的執行緒
    condition.release()     # 釋放鎖
    
def consumer():
    global items
    condition.acquire()     # 獲取鎖
    if len(items) == 0:
        print('consumer: waiting')
        condition.wait()    # 等待(items為0,等待生產者生產)
    items.pop()       # 注! 註釋掉這行,會發現,等待wait()在接收到通知notify()後,並沒有再次判斷條件,直接就接著運行了
    print('consumer: sonsumed', len(items))
    condition.notify()      # 通知等待的執行緒
    condition.release()     # 釋放鎖


# producer_loop()和consumer_loop()用來多次迴圈執行,為了達到items為0或10的情況
def producer_loop():
    for i in range(20):
        time.sleep(1)
        producer()

def consumer_loop():
    for i in range(20):
        time.sleep(4)
        consumer()


t1 = threading.Thread(target=producer_loop)
t2 = threading.Thread(target=consumer_loop)

t1.start()
t2.start()

輸出
producer: produced 1
producer: produced 2
producer: produced 3
consumer: sonsumed 2
producer: produced 3
producer: produced 4
producer: produced 5
producer: produced 6
consumer: sonsumed 5
producer: produced 6
producer: produced 7
producer: produced 8
producer: produced 9
consumer: sonsumed 8
producer: produced 9
producer: produced 10
producer: stop produce
consumer: sonsumed 9
producer: produced 10
producer: stop produce
consumer: sonsumed 9
producer: produced 10
producer: stop produce
consumer: sonsumed 9
producer: produced 10
producer: stop produce
consumer: sonsumed 9
producer: produced 10
producer: stop produce
consumer: sonsumed 9
producer: produced 10
producer: stop produce
consumer: sonsumed 9
producer: produced 10
producer: stop produce
consumer: sonsumed 9
producer: produced 10
consumer: sonsumed 9
consumer: sonsumed 8
consumer: sonsumed 7
consumer: sonsumed 6
consumer: sonsumed 5
consumer: sonsumed 4
consumer: sonsumed 3
consumer: sonsumed 2
consumer: sonsumed 1
consumer: sonsumed 0

分析:觀察可以發現,生產者生產滿了10個就會進入等待wait(),直到消費者通知notify()
為了觀察消費者消費到0個的情況,可以將生產者和消費者迴圈的等待時間做調整

使用事件實現執行緒同步

檢視Event類的原始碼,發現內部使用的是條件Condition類實現,並傳入了Lock鎖。事件通過對內部的標誌_flag進行管理來實現執行緒同步
使用set()方法可以將標誌設為True
使用clear()方法將其重置為False
使用wait()方法阻塞執行緒

# Event類的初始化原始碼
    def __init__(self):
        self._cond = Condition(Lock())
        self._flag = False

注意! 並不存在set()clear()放在某段程式碼的前後,來鎖住某段資源

示例:
(還是採用生產者和消費者的關係編寫,生產者做完工作,呼叫set()設定_flag標誌,並通知wait()等待的消費者執行緒執行;再使用clear()清除_flag標誌,以便後面的消費者能正確的進入等待。這個過程類似於事件的觸發)

#!usr/bin/env python
#coding=utf-8

import threading
import time
import random

event = threading.Event()   # 建立事件
items = [1,2,3] # 設定個初值,便於後面註釋event.clear()後的測試(可以發現,消費者不等待了)

def producer():
    global items
    print('producer: start')
    items.append(random.randint(0,100))
    event.set()     # 將內部標誌_flag設為True,並通知所有等待的執行緒(類似於觸發事件)
    print('producer: notify')
    event.clear()   # 將內部標誌_flag設為False(只有清除了_flag,消費者下一次的wait()操作才會正常進入等待)(clear()操作也可以交給消費者呼叫,不過為了簡化消費者的操作,讓消費者只需要等待通知即可)
    print('producer: end')
    
def consumer():
    global items
    print('consumer: waiting')
    event.wait()    # 等待(等待生產者通知,根據_flag標誌判斷是否進入等待)
    print('consumer:', items.pop())


# producer_loop()和consumer_loop()用來多次迴圈執行
def producer_loop():
    for i in range(3):
        time.sleep(1)
        producer()

def consumer_loop():
    while True: # 消費者有wait()等待,就不用執行緒休眠了,以免錯過生產者的set()通知
        consumer()


t1 = threading.Thread(target=producer_loop)
t2 = threading.Thread(target=consumer_loop)

t1.start()
t2.start()

輸出
consumer: waiting
producer: start
producer: notify
consumer: 57
consumer: waiting
producer: end
producer: start
producer: notify
consumer: 10
producer: end
consumer: waiting
producer: start
producer: notify
consumer: 24
producer: end
consumer: waiting

如果註釋掉程式碼中的# event.clear()一行,會出現如下輸出
consumer: waiting
producer: start
producer: notify
consumer: 19
producer: end
consumer: waiting
consumer: 3
consumer: waiting
consumer: 2
consumer: waiting
consumer: 1
consumer: waiting
Exception in thread Thread-2:
Traceback (most recent call last):
File “D:\app\Python\Python37\lib\threading.py”, line 917, in _bootstrap_inner
self.run()
File “D:\app\Python\Python37\lib\threading.py”, line 865, in run
self._target(*self._args, **self._kwargs)
File “g:/tmp/code.py”, line 35, in consumer_loop
consumer()
File “g:/tmp/code.py”, line 24, in consumer
print(‘consumer:’, items.pop())
IndexError: pop from empty list

producer: start
producer: notify
producer: end
producer: start
producer: notify
producer: end

分析:會發現,消費者沒有等待(打印出的consumer: waiting,只是給自己的提示,實際沒有等待),直接進行了列表的pop()操作,直到後面列表為空再彈出時報錯為止


附:其它

使用with語句1

(由於沒有理解部分,這一部分基本就是書中原文)

with: 是Python 2.5中引入的。當有兩個相關的操作需要對一個程式碼塊承兌執行時,with語句的作用就彰顯出來了。它可以再自動精確的分配或釋放資源 (因此也被稱為上下文管理器)。如執行緒模組中,使用到acquire()release()方法的地方,都可以採用with語句塊,如下:

  • Lock
  • RLock
  • 條件
  • 訊號量 (感覺這用了with就不太靈活了)

示例:
(會測試上述列表中的with操作)

#!usr/bin/env python
#coding=utf-8

import threading

def func_with(statement):
    with statement: # 會自動進行acquire()和release()
        print('//todo1')

def func_not_with(statement):
    statement.acquire()
    try:    # 為了避免出現異常,導致沒有release()釋放
        print('//todo2')
    finally:
        statement.release()

lock = threading.Lock()
rlock = threading.RLock()
condition = threading.Condition()
sem = threading.Semaphore(1)    # 採用with,需要初始至少有一個訊號量值(因為需要先acquire())

li = [lock, rlock, condition, sem]

for statement in li:
    t1 = threading.Thread(target=func_with, args=(statement,))
    t2 = threading.Thread(target=func_not_with, args=(statement,))

    t1.start()
    t2.start()

使用佇列實現執行緒通訊

雖然python執行緒模組提供了很多同步原語 訊號量條件事件,但有時候,在使用場景中,可能採用佇列模組會是個最佳選擇。它使得執行緒程式設計變得更加容易和安全

使用佇列Queue的方式處理,儘管它不屬於threading模組,但檢視其原始碼,發現佇列的功能實現有用到threading模組

# Queue的初始化原始碼
    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self._init(maxsize)

        # mutex must be held whenever the queue is mutating.  All methods
        # that acquire mutex must release it before returning.  mutex
        # is shared between the three conditions, so acquiring and
        # releasing the conditions also acquires and releases mutex.
        self.mutex = threading.Lock()

        # Notify not_empty whenever an item is added to the queue; a
        # thread waiting to get is notified then.
        self.not_empty = threading.Condition(self.mutex)

        # Notify not_full whenever an item is removed from the queue;
        # a thread waiting to put is notified then.
        self.not_full = threading.Condition(self.mutex)

        # Notify all_tasks_done whenever the number of unfinished tasks
        # drops to zero; thread waiting to join() is notified to resume
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0

Queue會用到如下方法:

  • put(): 新增一個專案到佇列
  • get(): 從佇列中取出一個專案
  • task_done(): 每處理完一個專案,需要呼叫該方法
  • join(): 阻塞執行緒,等待全部的任務完成

附:檢視原始碼,分析可得Queue內部的操作是這樣的:(這段可以不用看)

  • 呼叫put() -> with佇列滿(條件鎖not_full) -> 能阻塞?(引數block) & 能超時?(引數timeout) & 需要等待?(條件鎖not_full.wait()) -> 新增資料(內部方法_put()) -> 任務計數加1(內部計數變數unfinished_tasks += 1) -> 發起佇列非空的通知(條件鎖not_empty.notify())
  • 呼叫get() -> with佇列空(條件鎖not_empty) -> 能阻塞?(引數block) & 能超時?(引數timeout) & 需要等待?(條件鎖not_empty.wait()) -> 取出資料(內部方法_get()) -> 發起佇列非滿的通知(條件鎖not_full.notify()) ( 注意! get()put()操作相比並沒有對任務計數操作,需要通過後面task_done()完成任務方法來減少任務數)
  • *呼叫task_done() -> with任務完成(條件鎖all_tasks_done) -> 判斷全部任務完成了?是的話發起通知(條件鎖all_tasks_done.notify_all()) -> 任務計數減1(內部計數變數unfinished_tasks -= 1) *
  • 呼叫join() -> with任務完成(條件鎖all_tasks_done) -> 迴圈未完成的任務計數變數(內部計數變數unfinished_tasks) -> 還有沒完成的任務,等待(條件鎖all_tasks_done.wait() | 全部完成,退出迴圈,解除執行緒阻塞)

示例:

#!usr/bin/env python
#coding=utf-8

import threading
from queue import Queue
import time
import random

queue = Queue()

def producer():
    for i in range(5):
        item = random.randint(0, 100)
        queue.put(item)
        print('producer:', 'put', item)
        time.sleep(1)

def consumer():
    while True:
        item = queue.get()
        print('consumer:', 'get', item)
        queue.task_done()

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)

t1.start()
t2.start()

輸出
producer: put 47
consumer: get 47
producer: put 71
consumer: get 71
producer: put 99
consumer: get 99
producer: put 30
consumer: get 30
producer: put 75
consumer: get 75



  1. 參考書籍:《Python並行程式設計手冊》 ↩︎ ↩︎

  2. 後臺執行緒在主執行緒停止後就直接停止執行。他們資源(如開啟的檔案,資料庫事務等)可能不會被正確的釋放。如果你想要你的執行緒優美的停止,讓他們不要變為後臺和使用一個合適的訊號機制如事件Event ↩︎

  3. 官方文件中使用invoke一詞,我並沒有更好的翻譯,因為其它語言中invoke反射是一種技術手段,但Google的翻譯中,將此解釋為呼叫 (是我想多了) ↩︎

  4. 加入執行緒?不理解如何能加入執行緒,並且官方文件說會死鎖。實測,建立2個執行緒互相join(),雖然陷入死迴圈,但並沒丟擲錯誤 // TODO: 不知理解有偏差沒有 ↩︎

  5. 多個物件,互持對方所需資源的鎖,導致都無法訪問 ↩︎

  6. 鎖的acquire()方法的引數,有點難理解,文中所寫是結合官方文件和實測的結果描述所得。不過一般,我們都不用改變它的預設值 // TODO: 沒有從原始碼分析(找不到原始碼) ↩︎