1. 程式人生 > >Python3 與 C# 併發程式設計之~ 執行緒上篇

Python3 與 C# 併發程式設計之~ 執行緒上篇

2.2.加強篇

其實以前的Linux中是沒有執行緒這個概念的,Windows程式設計師經常使用執行緒,這一看~方便啊,然後可能是當時程式設計師偷懶了,就把程序模組改了改(這就是為什麼之前說Linux下的多程序程式設計其實沒有Win下那麼“重量級”),弄了個精簡版程序==>執行緒(核心是分不出程序和執行緒的,反正PCB個數都是一樣)

多執行緒和多程序最大的不同在於,多程序中,同一個變數,各自有一份拷貝存在於每個程序中,互不影響,而多執行緒中,所有變數都由所有執行緒共享(全域性變數和堆 ==> 執行緒間共享。程序的棧 ==> 執行緒平分而獨佔

還記得通過current_thread()

獲取的執行緒資訊嗎?難道執行緒也沒個id啥的?一起看看:(通過ps -Lf pid 來檢視LWP

1.執行緒ID.png

  1. 程式碼(.text)
  2. 檔案描述符(fd)
  3. 記憶體對映(mmap)

2.2.1.執行緒同步~互斥鎖Lock

執行緒之間共享資料的確方便,但是也容易出現數據混亂的現象,來看個例子:

from multiprocessing.dummy import threading

num = 0  # def global num

def test(i):
    print(f"子程序:{i}")
    global num
    for i in range
(100000): num += 1 def main(): p_list = [threading.Thread(target=test, args=(i, )) for i in range(5)] for i in p_list: i.start() for i in p_list: i.join() print(num) # 應該是500000,發生了資料混亂,結果少了很多 if __name__ == '__main__': main()

輸出:(應該是500000,發生了資料混亂,只剩下358615

子程序:0
子程序:1
子程序:2
子程序:3
子程序:4
452238

Lock案例

共享資源+CPU排程==>資料混亂==解決==>執行緒同步 這時候Lock就該上場了

互斥鎖是實現執行緒同步最簡單的一種方式,讀寫都加鎖(讀寫都會序列)

先看看上面例子怎麼解決調:

from multiprocessing.dummy import threading, Lock

num = 0  # def global num

def test(i, lock):
    print(f"子程序:{i}")
    global num
    for i in range(100000):
        with lock:
            num += 1

def main():
    lock = Lock()
    p_list = [threading.Thread(target=test, args=(i, lock)) for i in range(5)]
    for i in p_list:
        i.start()
    for i in p_list:
        i.join()
    print(num)

if __name__ == '__main__':
    main()

輸出:time python3 1.thread.2.py

子程序:0
子程序:1
子程序:2
子程序:3
子程序:4
500000

real    0m2.846s
user    0m1.897s
sys 0m3.159s

優化下

lock設定為全域性或者區域性,效能幾乎一樣。迴圈換成map後效能有所提升(測試案例在Code中

from multiprocessing.dummy import Pool as ThreadPool, Lock

num = 0  # def global num
lock = Lock()

def test(i):
    print(f"子程序:{i}")
    global num
    global lock
    for i in range(100000):
        with lock:
            num += 1

def main():
    p = ThreadPool()
    p.map_async(test, list(range(5)))
    p.close()
    p.join()

    print(num)

if __name__ == '__main__':
    main()

輸出:

time python3 1.thread.2.py

子程序:0
子程序:1
子程序:3
子程序:2
子程序:4
500000

real    0m2.468s
user    0m1.667s
sys 0m2.644s

本來多執行緒訪問共享資源的時候可以並行,加鎖後就部分串行了(沒獲取到的執行緒就阻塞等了)

專案中可以多次加鎖,每次加鎖只對修改部分加(儘量少的程式碼) 】(以後會說協程和Actor模型

補充:以前都是這麼寫的,現在支援with託管了(有時候還會用到,所以瞭解下):【net是直接lock大括號包起來

#### 以前寫法:
lock.acquire() # 獲取鎖
try:
    num += 1
finally:
    lock.release() # 釋放鎖

#### 等價簡寫
with lock:
    num += 1

擴充套件知識:(GIL在擴充套件篇會詳說)

  1. GIL的作用:多執行緒情況下必須存在資源的競爭,GIL是為了保證在直譯器級別的執行緒唯一使用共享資源(cpu)。
  2. 同步鎖的作用:為了保證直譯器級別下的自己編寫的程式唯一使用共享資源產生了同步鎖
  3. lock.locked():判斷 lock 當前是否上鎖,如果上鎖,返回True,否則返回False【上鎖失敗時候的處理】

2.2.2.執行緒同步~可重入鎖RLock

看個場景:小明欠小張2000,欠小周5000,現在需要同時轉賬給他們:(規定:幾次轉賬加幾次鎖2.RLock.png

小明啥也沒管,直接擼起袖子就寫Code了:(錯誤Code示意

from multiprocessing.dummy import Pool as ThreadPool, Lock

xiaoming = 8000
xiaozhang = 3000
xiaozhou = 5000

def test(lock):
    global xiaoming
    global xiaozhang
    global xiaozhou
    # 小明想一次搞定:
    with lock:
        # 小明轉賬2000給小張
        xiaoming -= 2000
        xiaozhang += 2000
        with lock:
            # 小明轉賬5000給小周
            xiaoming -= 5000
            xiaozhou += 5000

def main():
    print(f"[還錢前]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}")
    lock = Lock()
    p = ThreadPool()
    p.apply_async(test, args=(lock, ))
    p.close()
    p.join()
    print(f"[還錢後]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}")

if __name__ == '__main__':
    main()

小明寫完程式碼就出去了,這可把小周和小張等急了,打了N個電話來催,小明心想啥情況?

一看程式碼楞住了,改了改程式碼,輕輕鬆鬆把錢轉出去了:

from multiprocessing.dummy import Pool as ThreadPool, Lock

xiaoming = 8000
xiaozhang = 3000
xiaozhou = 5000

# 小明轉賬2000給小張
def a_to_b(lock):
    global xiaoming
    global xiaozhang
    with lock:
        xiaoming -= 2000
        xiaozhang += 2000

# 小明轉賬5000給小周
def a_to_c(lock):
    global xiaoming
    global xiaozhou
    with lock:
        xiaoming -= 5000
        xiaozhou += 5000

def main():
    print(f"[還錢前]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}")
    lock = Lock()
    p = ThreadPool()
    p.apply_async(a_to_b, args=(lock, ))
    p.apply_async(a_to_c, args=(lock, ))
    p.close()
    p.join()
    print(f"[還錢後]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}")

if __name__ == '__main__':
    main()

輸出:

[還錢前]小明8000,小張3000,小周5000
[還錢後]小明1000,小張5000,小周10000

就這麼算了嗎?不不不,不符合小明性格,於是小明研究了下,發現~還有個遞迴鎖RLock呢,正好解決他的問題:

from multiprocessing.dummy import Pool as ThreadPool, RLock  # 就把這邊換了下

xiaoming = 8000
xiaozhang = 3000
xiaozhou = 5000

def test(lock):
    global xiaoming
    global xiaozhang
    global xiaozhou
    # 小明想一次搞定:
    with lock:
        # 小明轉賬2000給小張
        xiaoming -= 2000
        xiaozhang += 2000
        with lock:
            # 小明轉賬5000給小周
            xiaoming -= 5000
            xiaozhou += 5000

def main():
    print(f"[還錢前]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}")
    lock = RLock()  # 就把這邊換了下
    p = ThreadPool()
    p.apply_async(test, args=(lock, ))
    p.close()
    p.join()
    print(f"[還錢後]小明{xiaoming},小張{xiaozhang},小周{xiaozhou}")

if __name__ == '__main__':
    main()

RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源

2.2.3.死鎖引入

1.多次獲取導致死鎖

小明想到了之前說的(互斥鎖Lock讀寫都加鎖)就把程式碼拆分研究了下:

print("[開始]小明轉賬2000給小張")
lock.acquire()  # 獲取鎖
xiaoming -= 2000
xiaozhang += 2000

print("[開始]小明轉賬5000給小周")
lock.acquire()  # 獲取鎖(互斥鎖第二次加鎖)
xiaoming -= 5000
xiaozhou += 5000
lock.release()  # 釋放鎖
print("[結束]小明轉賬5000給小周")

lock.release()  # 釋放鎖
print("[開始]小明轉賬2000給小張")

輸出發現:(第二次加鎖的時候,變成阻塞等了【死鎖】)

[還錢前]小明8000,小張3000,小周5000
[開始]小明轉賬2000給小張
[開始]小明轉賬5000給小周

這種方式,Python提供的RLock就可以解決了

2.常見的死鎖

看個場景:小明和小張需要流水帳,經常互刷~小明給小張轉賬1000,小張給小明轉賬1000

一般來說,有幾個共享資源就加幾把鎖(小張、小明就是兩個共享資源,所以需要兩把Lock

先描述下然後再看程式碼:

正常流程 小明給小張轉1000:小明自己先加個鎖==>小明-1000==>獲取小張的鎖==>小張+1000==>轉賬完畢

死鎖情況 小明給小張轉1000:小明自己先加個鎖==>小明-1000==>準備獲取小張的鎖。可是這時候小張準備轉賬給小明,已經把自己的鎖獲取了,在等小明的鎖(兩個人相互等,於是就一直死鎖了)

程式碼模擬一下過程:

from time import sleep
from multiprocessing.dummy import Pool as ThreadPool, Lock

xiaoming = 5000
xiaozhang = 8000
m_lock = Lock() # 小明的鎖
z_lock = Lock() # 小張的鎖

# 小明轉賬1000給小張
def a_to_b():
    global xiaoming
    global xiaozhang
    global m_lock
    global z_lock
    with m_lock:
        xiaoming -= 1000
        sleep(0.01)
        with z_lock:
            xiaozhang += 1000

# 小張轉賬1000給小明
def b_to_a():
    global xiaoming
    global xiaozhang
    global m_lock
    global z_lock
    with z_lock:
        xiaozhang -= 1000
        sleep(0.01)
        with m_lock:
            xiaoming += 1000

def main():
    print(f"[還錢前]小明{xiaoming},小張{xiaozhang}")
    p = ThreadPool()
    p.apply_async(a_to_b)
    p.apply_async(b_to_a)
    p.close()
    p.join()
    print(f"[還錢後]小明{xiaoming},小張{xiaozhang}")

if __name__ == '__main__':
    main()

輸出:(卡在這邊了)

[轉賬前]小明5000,小張8000

專案中像這類的情況,一般都是這幾種解決方法:(還有其他解決方案,後面會繼續說)

  1. 按指定順序去訪問共享資源
  2. trylock的重試機制Lock(False)
  3. 在訪問其他鎖的時候,先把自己鎖解了
  4. 得不到全部鎖就先放棄已經獲取的資源

比如上面的情況,我們如果規定,不管是誰先轉賬,先從小明開始,然後再小張,那麼就沒問題了。或者誰錢多就誰(權重高的優先)

from time import sleep
from multiprocessing.dummy import Pool as ThreadPool, Lock

xiaoming = 5000
xiaozhang = 8000
m_lock = Lock()  # 小明的鎖
z_lock = Lock()  # 小張的鎖

# 小明轉賬1000給小張
def a_to_b():
    global xiaoming
    global xiaozhang
    global m_lock
    global z_lock
    # 以上次程式碼為例,這邊只修改了這塊
    with z_lock:  # 小張權重高,大家都先獲取小張的鎖
        xiaozhang += 1000
        sleep(0.01)
        with m_lock:
            xiaoming -= 1000

# 小張轉賬1000給小明
def b_to_a():
    global xiaoming
    global xiaozhang
    global m_lock
    global z_lock
    with z_lock:
        xiaozhang -= 1000
        sleep(0.01)
        with m_lock:
            xiaoming += 1000

def main():
    print(f"[轉賬前]小明{xiaoming},小張{xiaozhang}")
    p = ThreadPool()
    p.apply_async(a_to_b)
    p.apply_async(b_to_a)
    p.close()
    p.join()
    print(f"[轉賬後]小明{xiaoming},小張{xiaozhang}")

if __name__ == '__main__':
    main()

輸出:

[轉賬前]小明5000,小張8000
[轉賬後]小明5000,小張8000

2.2.4.執行緒同步~條件變數Condition

條件變數一般都不是鎖,能阻塞執行緒,從而減少不必要的競爭,Python內建了RLock(不指定就是RLock)

看看原始碼:

class Condition:
    """
    實現條件變數的類。
    條件變數允許一個或多個執行緒等到另一個執行緒通知它們為止
    如果給出了lock引數而不是None,那必須是Lock或RLock物件作底層鎖。
    否則,一個新的RLock物件被建立並用作底層鎖。
    """
    def __init__(self, lock=None):
        if lock is None:
            lock = RLock()
        self._lock = lock
        # 設定lock的acquire()和release()方法
        self.acquire = lock.acquire
        self.release = lock.release

再看看可不可以進行with託管:(支援)

def __enter__(self):
    return self._lock.__enter__()

def __exit__(self, *args):
    return self._lock.__exit__(*args)

看個生產消費者的簡單例子:(生產完就通知消費者)

from multiprocessing.dummy import Pool as ThreadPool, Condition

s_list = []
con = Condition()

def Shop(i):
    global con
    global s_list
    # 加鎖保護共享資源
    for x in range(5):
        with con:
            s_list.append(x)
            print(f"[生產者{i}]生產商品{x}")
            con.notify_all()  # 通知消費者有貨了

def User(i):
    global con
    global s_list
    while True:
        with con:
            if s_list:
                print(f"列表商品:{s_list}")
                name = s_list.pop()  # 消費商品
                print(f"[消費者{i}]消費商品{name}")
                print(f"列表剩餘:{s_list}")
            else:
                con.wait()

def main():
    p = ThreadPool()
    # 兩個生產者
    p.map_async(Shop, range(2))
    # 五個消費者
    p.map_async(User, range(5))
    p.close()
    p.join()

if __name__ == '__main__':
    main()

輸出:(list之類的雖然可以不加global標示,但是為了後期維護方便,建議加上)

[生產者0]生產商品0
[生產者0]生產商品1
列表商品:[0, 1]
[消費者0]消費商品1
列表剩餘:[0]
列表商品:[0]
[消費者0]消費商品0
列表剩餘:[]
[生產者0]生產商品2
列表商品:[2]
[消費者1]消費商品2
列表剩餘:[]
[生產者0]生產商品3
[生產者1]生產商品0
[生產者0]生產商品4
列表商品:[3, 0, 4]
[消費者1]消費商品4
列表剩餘:[3, 0]
[生產者1]生產商品1
[生產者1]生產商品2
[生產者1]生產商品3
[生產者1]生產商品4
列表商品:[3, 0, 1, 2, 3, 4]
[消費者2]消費商品4
列表剩餘:[3, 0, 1, 2, 3]
列表商品:[3, 0, 1, 2, 3]
[消費者0]消費商品3
列表剩餘:[3, 0, 1, 2]
列表商品:[3, 0, 1, 2]
[消費者1]消費商品2
列表剩餘:[3, 0, 1]
列表商品:[3, 0, 1]
[消費者3]消費商品1
列表剩餘:[3, 0]
列表商品:[3, 0]
[消費者3]消費商品0
列表剩餘:[3]
列表商品:[3]
[消費者3]消費商品3
列表剩餘:[]

通知方法:

  1. notify() :發出資源可用的訊號,喚醒任意一條因 wait()阻塞的程序
  2. notifyAll() :發出資源可用訊號,喚醒所有因wait()阻塞的程序

2.2.5.執行緒同步~訊號量Semaphore(互斥鎖的高階版)

記得當時在分析multiprocessing.Queue原始碼的時候,有提到過(點我回顧

同進程的一樣,semaphore管理一個內建的計數器,每當呼叫acquire()時內建函式-1,每當呼叫release()時內建函式+1

通俗講就是:在互斥鎖的基礎上封裝了下,實現一定程度的並行

舉個例子,以前使用互斥鎖的時候:(廁所就一個坑位,必須等裡面的人出來才能讓另一個人上廁所) 3.互斥鎖.png

使用訊號量之後:廁所坑位增加到5個(自己指定),這樣可以5個人一起上廁所了==>實現了一定程度的併發

舉個例子:(Python在語法這點特別爽,不用你記太多異同,功能差不多基本上程式碼也就差不多)

from time import sleep
from multiprocessing.dummy import Pool as ThreadPool, Semaphore

sem = Semaphore(5) # 限制最大連線數為5

def goto_wc(i):
    global sem
    with sem:
        print(f"[執行緒{i}]上廁所")
        sleep(0.1)

def main():
    p = ThreadPool()
    p.map_async(goto_wc, range(50))
    p.close()
    p.join()

if __name__ == '__main__':
    main()

輸出: 4.semaphore.png

可能看了上節回顧的會疑惑:原始碼裡面明明是BoundedSemaphore,搞啥呢?

其實BoundedSemaphore就比Semaphore多了個在呼叫release()時檢查計數器的值是否超過了計數器的初始值,如果超過了將丟擲一個異常

以上一個案例說事:你換成BoundedSemaphore和上面效果一樣==>sem = BoundedSemaphore(5)

Semaphore補充

之前有人問Semaphore訊號量在專案中有什麼應用?(⊙o⊙)…額,這個其實從概念就推出場景了,控制併發嘛~舉個例子:

  1. 比如說我們呼叫免費API的時候經常看見單位時間內限制併發數在30以內,想高併發==>給錢( ⊙ o ⊙ )捂臉
  2. 再比如我們去爬資料的時候控制一下爬蟲的併發數(避免觸發反爬蟲的一種方式,其他部分後面會逐步引入)

這些虛的說完了,來個控制併發數的案例,然後咱們就繼續併發程式設計的衍生了:

import time
from multiprocessing.dummy import threading, Semaphore

class MyThread(threading.Thread):
    def __init__(self, id, sem):
        super().__init__()
        self.__id = id
        self.__sem = sem

    def run(self):
        self.__sem.acquire()  # 獲取
        self.api_test()

    def api_test(self):
        """模擬api請求"""
        time.sleep(1)
        print(f"id={self.__id}")
        self.__sem.release()  # 釋放

def main():
    sem = Semaphore(10)  # 控制併發數
    t_list = [MyThread(i, sem) for i in range(1000)]
    for t in t_list:
        t.start()
    for t in t_list:
        t.join()

if __name__ == '__main__':
    main()

輸出圖示: 1.sem控制併發.gif

執行分析: 2.sem分析.png

效能全圖: 2.png