1. 程式人生 > >Python中的多執行緒程式設計,執行緒安全與鎖(一) 聊聊Python中的GIL 聊聊Python中的GIL python基礎之多執行緒鎖機制 python--threading多執行緒總結 Python3入門之執行緒threading常用方法

Python中的多執行緒程式設計,執行緒安全與鎖(一) 聊聊Python中的GIL 聊聊Python中的GIL python基礎之多執行緒鎖機制 python--threading多執行緒總結 Python3入門之執行緒threading常用方法

1. 多執行緒程式設計與執行緒安全相關重要概念

在我的上篇博文 聊聊Python中的GIL 中,我們熟悉了幾個特別重要的概念:GIL,執行緒,程序, 執行緒安全,原子操作

以下是簡單回顧,詳細介紹請直接看聊聊Python中的GIL 

  • GIL:  Global Interpreter Lock,全域性直譯器鎖。為了解決多執行緒之間資料完整性和狀態同步的問題,設計為在任意時刻只有一個執行緒在直譯器中執行。
  • 執行緒:程式執行的最小單位。
  • 程序:系統資源分配的最小單位。
  • 執行緒安全:多執行緒環境中,共享資料同一時間只能有一個執行緒來操作。
  • 原子操作:原子操作就是不會因為程序併發或者執行緒併發而導致被中斷的操作。

還有一個重要的結論:當對全域性資源存在寫操作時,如果不能保證寫入過程的原子性,會出現髒讀髒寫的情況,即執行緒不安全。Python的GIL只能保證原子操作的執行緒安全,因此在多執行緒程式設計時我們需要通過加鎖來保證執行緒安全。

最簡單的鎖是互斥鎖(同步鎖),互斥鎖是用來解決io密集型場景產生的計算錯誤,即目的是為了保護共享的資料,同一時間只能有一個執行緒來修改共享的資料。

下面我們會來介紹如何使用互斥鎖。

 

2. Threading.Lock實現互斥鎖的簡單示例

我們通過Threading.Lock()來實現鎖。

以下是執行緒不安全的例子:

>>> import threading
>>> import time
>>> def sub1():
    global count
    tmp = count
    time.sleep(0.001)
    count = tmp + 1
    time.sleep(2)

    
>>> count = 0
>>> def verify(sub):
    global count
    thread_list = []
    for i in range(100):
        t 
= threading.Thread(target=sub,args=()) t.start() thread_list.append(t) for j in thread_list: j.join() print(count) >>> verify(sub1) 14

在這個例子中,我們把

count+=1

代替為

tmp = count
time.sleep(0.001)
count = tmp + 1

是因為,儘管count+=1是非原子操作,但是因為CPU執行的太快了,比較難以復現出多程序的非原子操作導致的程序不安全。經過代替之後,儘管只sleep了0.001秒,但是對於CPU的時間來說是非常長的,會導致這個程式碼塊執行到一半,GIL鎖就釋放了。即tmp已經獲取到count的值了,但是還沒有將tmp + 1賦值給count。而此時其他執行緒如果執行完了count = tmp + 1, 當返回到原來的執行緒執行時,儘管count的值已經更新了,但是count = tmp + 1是個賦值操作,賦值的結果跟count的更新的值是一樣的。最終導致了我們累加的值有很多丟失。

下面是執行緒安全的例子,我們可以用threading.Lock()獲得鎖

>>> count = 0
>>> def sub2():
    global count
    if lock.acquire(1):
    #acquire()是獲取鎖,acquire(1)返回獲取鎖的結果,成功獲取到互斥鎖為True,如果沒有獲取到互斥鎖則返回False tmp
= count time.sleep(0.001) count = tmp + 1 time.sleep(2) lock.release() 一系列操作結束之後需要釋放鎖 >>> def verify(sub): global count thread_list = [] for i in range(100): t = threading.Thread(target=sub,args=()) t.start() thread_list.append(t) for j in thread_list: j.join() print(count) >>> verify(sub2) 100

 

 獲取鎖和釋放鎖的語句也可以用Python的with來實現,這樣更簡潔

>>> count = 0
>>> def sub3():
    global count
    with lock:
        tmp = count
        time.sleep(0.001)
        count = tmp + 1
        time.sleep(2)

>>> def verify(sub):
    global count
    thread_list = []
    for i in range(100):
        t = threading.Thread(target=sub,args=())
        t.start()
        thread_list.append(t)
    for j in thread_list:
        j.join()
    print(count)
        
>>> verify(sub3)
100

 

3.  兩種死鎖情況及處理

死鎖產生的原因

兩種死鎖:

3.1 迭代死鎖與遞迴鎖(RLock)

該情況是一個執行緒“迭代”請求同一個資源,直接就會造成死鎖。這種死鎖產生的原因是我們標準互斥鎖threading.Lock的缺點導致的。標準的鎖物件(threading.Lock)並不關心當前是哪個執行緒佔有了該鎖;如果該鎖已經被佔有了,那麼任何其它嘗試獲取該鎖的執行緒都會被阻塞,包括已經佔有該鎖的執行緒也會被阻塞

下面是例子,

#/usr/bin/python3
# -*- coding: utf-8 -*-

import threading
import time

count_list = [0,0]
lock = threading.Lock()

def change_0():
    global count_list
    with lock:
        tmp = count_list[0]
        time.sleep(0.001)
        count_list[0] = tmp + 1
        time.sleep(2)
        print("Done. count_list[0]:%s" % count_list[0])
        
def change_1():
    global count_list
    with lock:
        tmp = count_list[1]
        time.sleep(0.001)
        count_list[1] = tmp + 1
        time.sleep(2)
        print("Done. count_list[1]:%s" % count_list[1])
        
def change():
    with lock:
        change_0()
time.sleep(0.001) change_1()
def verify(sub): global count_list thread_list = [] for i in range(100): t = threading.Thread(target=sub, args=()) t.start() thread_list.append(t) for j in thread_list: j.join() print(count_list) if __name__ == "__main__": verify(change)

示例中,我們有一個共享資源count_list,有兩個分別取這個共享資源第一部分和第二部分的數字(count_list[0]和count_list[1])。兩個訪問函式都使用了鎖來確保在獲取資料時沒有其它執行緒修改對應的共享資料。
現在,如果我們思考如何新增第三個函式來獲取兩個部分的資料。一個簡單的方法是依次呼叫這兩個函式,然後返回結合的結果。

這裡的問題是,如有某個執行緒在兩個函式呼叫之間修改了共享資源,那麼我們最終會得到不一致的資料。

最明顯的解決方法是在這個函式中也使用lock。然而,這是不可行的。裡面的兩個訪問函式將會阻塞,因為外層語句已經佔有了該鎖

結果是沒有任何輸出,死鎖。

為了解決這個問題,我們可以用threading.RLock代替threading.Lock

#/usr/bin/python3
# -*- coding: utf-8 -*-

import threading
import time

count_list = [0,0]
lock = threading.RLock()

def change_0():
    global count_list
    with lock:
        tmp = count_list[0]
        time.sleep(0.001)
        count_list[0] = tmp + 1
        time.sleep(2)
        print("Done. count_list[0]:%s" % count_list[0])
        
def change_1():
    global count_list
    with lock:
        tmp = count_list[1]
        time.sleep(0.001)
        count_list[1] = tmp + 1
        time.sleep(2)
        print("Done. count_list[1]:%s" % count_list[1])
        
def change():
    with lock:
        change_0()
time.sleep(0.001) change_1()
def verify(sub): global count_list thread_list = [] for i in range(100): t = threading.Thread(target=sub, args=()) t.start() thread_list.append(t) for j in thread_list: j.join() print(count_list) if __name__ == "__main__": verify(change)

 

3.2 互相等待死鎖與鎖的升序使用

死鎖的另外一個原因是兩個程序想要獲得的鎖已經被對方程序獲得,只能互相等待又無法釋放已經獲得的鎖,而導致死鎖。假設銀行系統中,使用者a試圖轉賬100塊給使用者b,與此同時使用者b試圖轉賬500塊給使用者a,則可能產生死鎖。
2個執行緒互相等待對方的鎖,互相佔用著資源不釋放。

下面是一個互相呼叫導致死鎖的例子:

#/usr/bin/python3
# -*- coding: utf-8 -*-

import threading
import time

class Account(object):
    def __init__(self, name, balance, lock):
        self.name = name
        self.balance = balance
        self.lock = lock
        
    def withdraw(self, amount):
        self.balance -= amount
        
    def deposit(self, amount):
        self.balance += amount
        
def transfer(from_account, to_account, amount):
    with from_account.lock:
        from_account.withdraw(amount)
        time.sleep(1)
        print("trying to get %s's lock..." % to_account.name)
        with to_account.lock:
            to_account_deposit(amount)
    print("transfer finish")
    
if __name__ == "__main__":
    a = Account('a',1000, threading.Lock())
    b = Account('b',1000, threading.Lock())
    thread_list = []
    thread_list.append(threading.Thread(target = transfer, args=(a,b,100)))
    thread_list.append(threading.Thread(target = transfer, args=(b,a,500)))
    for i in thread_list:
        i.start()
    for j in thread_list:
        j.join()
    

最終的結果是死鎖:

trying to get account a's lock...
trying to get account b's lock...

即我們的問題是:

你正在寫一個多執行緒程式,其中執行緒需要一次獲取多個鎖,此時如何避免死鎖問題。
解決方案:
多執行緒程式中,死鎖問題很大一部分是由於執行緒同時獲取多個鎖造成的。舉個例子:一個執行緒獲取了第一個鎖,然後在獲取第二個鎖的 時候發生阻塞,那麼這個執行緒就可能阻塞其他執行緒的執行,從而導致整個程式假死。 其實解決這個問題,核心思想也特別簡單:目前我們遇到的問題是兩個執行緒想獲取到的鎖,都被對方執行緒拿到了,那麼我們只需要保證在這兩個執行緒中,獲取鎖的順序保持一致就可以了。舉個例子,我們有執行緒thread_a, thread_b, 鎖lock_1, lock_2。只要我們規定好了鎖的使用順序,比如先用lock_1,再用lock_2,當執行緒thread_a獲得lock_1時,其他執行緒如thread_b就無法獲得lock_1這個鎖,也就無法進行下一步操作(獲得lock_2這個鎖),也就不會導致互相等待導致的死鎖。簡言之,解決死鎖問題的一種方案是為程式中的每一個鎖分配一個唯一的id,然後只允許按照升序規則來使用多個鎖,這個規則使用上下文管理器 是非常容易實現的,示例如下:

 

#/usr/bin/python3
# -*- coding: utf-8 -*-

import threading
import time
from contextlib import contextmanager

thread_local = threading.local()

@contextmanager
def acquire(*locks):
    #sort locks by object identifier
    locks = sorted(locks, key=lambda x: id(x))
    
    #make sure lock order of previously acquired locks is not violated
    acquired = getattr(thread_local,'acquired',[])
    if acquired and (max(id(lock) for lock in acquired) >= id(locks[0])):
        raise RuntimeError('Lock Order Violation')
    
    # Acquire all the locks
    acquired.extend(locks)
    thread_local.acquired = acquired
    
    try:
        for lock in locks:
            lock.acquire()
        yield
    finally:
        for lock in reversed(locks):
            lock.release()
        del acquired[-len(locks):]

class Account(object):
    def __init__(self, name, balance, lock):
        self.name = name
        self.balance = balance
        self.lock = lock
        
    def withdraw(self, amount):
        self.balance -= amount
        
    def deposit(self, amount):
        self.balance += amount
        
def transfer(from_account, to_account, amount):
    print("%s transfer..." % amount)
    with acquire(from_account.lock, to_account.lock):
        from_account.withdraw(amount)
        time.sleep(1)
        to_account.deposit(amount)
    print("%s transfer... %s:%s ,%s: %s" % (amount,from_account.name,from_account.balance,to_account.name, to_account.balance))
    print("transfer finish")
    
if __name__ == "__main__":
    a = Account('a',1000, threading.Lock())
    b = Account('b',1000, threading.Lock())
    thread_list = []
    thread_list.append(threading.Thread(target = transfer, args=(a,b,100)))
    thread_list.append(threading.Thread(target = transfer, args=(b,a,500)))
    for i in thread_list:
        i.start()
    for j in thread_list:
        j.join()
    

我們獲得的結果是

100 transfer...
500 transfer...
100 transfer... a:900 ,b:1100
transfer finish
500 transfer... b:600, a:1400
transfer finish

成功的避免了互相等待導致的死鎖問題。

在上述程式碼中,有幾點語法需要解釋:

  • 1. 裝飾器@contextmanager是用來讓我們能用with語句呼叫鎖的,從而簡化鎖的獲取和釋放過程。關於with語句,大家可以參考淺談 Python 的 with 語句(https://www.ibm.com/developerworks/cn/opensource/os-cn-pythonwith/)。簡言之,with語句在呼叫時,先執行 __enter__()方法,然後執行with結構體內的語句,最後執行__exit__()語句。有了裝飾器@contextmanager. 生成器函式中 yield 之前的語句在 __enter__() 方法中執行,yield 之後的語句在 __exit__() 中執行,而 yield 產生的值賦給了 as 子句中的 value 變數。
  • 2. try和finally語句中實現的是鎖的獲取和釋放。
  • 3. try之前的語句,實現的是對鎖的排序,以及鎖排序是否被破壞的判斷。

今天我們主要討論了Python多執行緒中如何保證執行緒安全,互斥鎖的使用方法。另外著重討論了兩種導致死鎖的情況:迭代死鎖與互相等待死鎖,以及這兩種死鎖的解決方案:遞迴鎖(RLock)的使用和鎖的升序使用。

對於多執行緒程式設計,我們將在下一篇文章討論執行緒同步(Event)問題,以及對Python多執行緒模組(threading)進行總結。

 

參考文獻:

1. 深入理解 GIL:如何寫出高效能及執行緒安全的 Python 程式碼 http://python.jobbole.com/87743/

2. Python中的原子操作 https://www.jianshu.com/p/42060299c581

3. 詳解python中的Lock與RLock https://blog.csdn.net/ybdesire/article/details/80294638

4. 深入解析Python中的執行緒同步方法 https://www.jb51.net/article/86599.htm

5.  Python中死鎖的形成示例及死鎖情況的防止 https://www.jb51.net/article/86617.htm

6.  舉例講解 Python 中的死鎖、可重入鎖和互斥鎖 http://python.jobbole.com/82723/

7.  python基礎之多執行緒鎖機制

8. python--threading多執行緒總結

9. Python3入門之執行緒threading常用方法

10. 淺談 Python 的 with 語句 https://www.ibm.com/developerworks/cn/opensource/os-cn-pythonwith/