1. 程式人生 > >Python3之多執行緒GIL、同步鎖、訊號量、死鎖與遞迴鎖、執行緒Queue、Event、定時器

Python3之多執行緒GIL、同步鎖、訊號量、死鎖與遞迴鎖、執行緒Queue、Event、定時器

GIL與互斥鎖再理解

GIL

執行緒一要把python程式碼交給直譯器去執行,而此時垃圾回收執行緒和執行緒二也需要將自己的任務交給python直譯器去執行,為了防止各個執行緒之間的資料產生衝突,誰拿到GIL鎖的許可權誰才能執行自己的任務,這就避免了不同任務之間的資料不會產生衝突,這是在同一個程序中加GIL鎖會保證資料的安全,不同的資料要加不同的鎖

死鎖與遞迴鎖

死鎖

程式碼演示

from threading import Thread, Lock
import time
mutexA = Lock()
mutexB = Lock()

class MyThread(Thread)
:
def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('%s 拿到A鎖' % self.name) mutexB.acquire() print('%s 拿到B鎖' % self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() time.sleep(1
) print('%s 拿到B鎖' % self.name) mutexA.acquire() print('%s 拿到A鎖' % self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t = MyThread() t.start()

該種情況出現死鎖:
死鎖

程式碼講解

由於Thread-1建立的比較快,所以Thread-1先搶到A鎖,繼而順利成章的拿到B鎖,當Thread-1釋放掉A鎖時,另外9個執行緒搶A鎖,於此同時,Thread-1搶到B鎖,而此時Thread-2搶到A鎖,這樣Thread-1、Thread-2就等待彼此把鎖釋放掉,這樣程式就卡住了,解決這個問題就用到了遞迴鎖。

遞迴鎖

程式碼演示

from threading import Thread, Lock, RLock
import time

# mutexA = Lock()
# mutexB = Lock()
mutexA = mutexB = RLock()


class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print('%s 拿到A鎖' % self.name)
        mutexB.acquire()
        print('%s 拿到B鎖' % self.name)
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        time.sleep(1)
        print('%s 拿到B鎖' % self.name)
        mutexA.acquire()
        print('%s 拿到A鎖' % self.name)
        mutexA.release()
        mutexB.release()


if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()

程式碼講解

遞迴鎖時通過計數完成對鎖的控制的,當acquire一次,count+=1,release一次,count-=1,當count=0,所有的執行緒都可以對鎖進行搶奪。從而避免了死鎖的產生。

訊號量Semaphore

程式碼演示

from threading import Thread, Semaphore, currentThread
import time
smph = Semaphore(5)


def do_task():
    smph.acquire()
    print('\033[45m%s\033[0m 獲得了許可權' % currentThread().name)
    time.sleep(2)
    print('\033[46m%s\033[0m 放棄了許可權' % currentThread().name)
    smph.release()


if __name__ == '__main__':
    for i in range(10):
        t = Thread(target=do_task, )
        t.start()

程式碼效果

效果

程式碼講解

訊號量Semaphore本質也是一把鎖,但是這把鎖可以限定允許多個任務同時執行任務,但是不能超出規定的限制,下面的程式碼引數5就代表可以執行5個任務,如果第6個任務要執行,必須等5個任務中的一個結束,然後第六個才能進入執行。

smph = Semaphore(5)

這有點像程序池,只不過程序池規定了程序數量,多個任務進入程序池只能有數量一定的程序進行處理。,但是Semaphore可以產生多個執行緒。

執行緒Queue

佇列Queue

程式碼演示

import queue

q = queue.Queue()
q.put('1')
q.put(1)
q.put({'a': 1})

print(q.get())
print(q.get())
print(q.get())

程式碼講解

  1. 先進先出
  2. 可以存放任意型別資料

堆疊Queue

程式碼演示

import queue
q = queue.LifoQueue()
q.put(1)
q.put('1')
q.put({'a': 1})

print(q.get())
print(q.get())
print(q.get())

程式碼講解

  1. 可以存放任意資料型別
  2. Lifo代表後進先出

優先順序Queue

程式碼演示

import queue

q = queue.PriorityQueue()
q.put((10, 'Q'))
q.put((30, 'Z'))
q.put((20, 'A'))

print(q.get())
print(q.get())
print(q.get())

程式碼講解

  1. 存放的資料是元組型別,帶有優先順序數字越小優先順序越高。
  2. 資料優先順序高的優先被取出。
  3. 用於VIP使用者資料優先被取出場景,因為上面兩種都要挨個取出。

Event

程式碼演示

from threading import Thread, Event, currentThread
import time

e = Event()


def traffic_lights():
    time.sleep(5)
    e.set()


def cars():
    print('\033[45m%s\033[0m is waiting' % currentThread().name)
    e.wait()
    print('\033[45m%s\033[0m is running' % currentThread().name)


if __name__ == '__main__':
    for i in range(10):
        t = Thread(target=cars, )
        t.start()
    traffic_lights = Thread(target=traffic_lights, )
    traffic_lights.start()

程式碼講解

首先建立10個執行緒代表10輛車正在等訊號燈,建立1個執行緒代表訊號燈,當10輛汽車被建立後就等著訊號燈發訊號起跑,當遇到e.wait()時程式被掛起,等待訊號燈變綠,而e.set()就是來改變這個狀態讓訊號燈變綠,當e.set被設定後cars等到了訊號,就可以繼續往後跑了,程式碼可以繼續執行了。e.set()預設False,e.set()呼叫後值變為True,e.wait()接收到後程序由掛起變為可執行。

應用場景

程式碼演示

from threading import Thread, Event, currentThread
import time

e = Event()


def check_sql():
    print('%s is checking mySQL' % currentThread().name)
    time.sleep(5)
    e.set()


def link_sql():
    count = 1
    while not e.is_set():#e.isSet是一個繫結方法,自帶布林值為True,e.is_set()預設值為False
        e.wait(timeout=1)
        print('%s is trying %s' % (currentThread().name, count))
        if count > 3:
            raise ConnectionError('連線超時')
        count += 1
    print('%s is connecting' % currentThread().name)


if __name__ == '__main__':
    t_check = Thread(target=check_sql, )
    t_check.start()
    for i in range(3):
        t_link = Thread(target=link_sql, )
        t_link.start()

程式碼講解

  1. 資料庫遠端連線
  2. e.isSet是一個繫結方法,自帶布林值為True,e.is_set()預設值為False

定時器

程式碼演示

from threading import Timer


def deal_task(n):
    print('%s 我被執行了~' % n)


t = Timer(3, deal_task, args=(10,))
t.start()

程式碼講解

注意傳參時必須是元組形式