1. 程式人生 > >Python---執行緒的鎖

Python---執行緒的鎖

1、同步鎖

為了防止讀取到髒資料,對臨界資源進行加鎖,將並行被迫改為序列。通過threading.Lock()方法建立一把鎖。

acquire() 方法:只有一個執行緒能成功的獲取鎖,按先後順序 其他執行緒只能等待。release() 方法:執行緒釋放。這把鎖不允許在同一執行緒中被多次acquire()。

import threading
import time

def check():
    global number
    lock.acquire()  # 對臨界資源進行加鎖
    temp = number
    time.sleep(0.01)
    number = temp-1
    lock.release()

l = []
number = 100
# 例項一把鎖
lock = threading.Lock()
for i in range(100):
    t = threading.Thread(target=check)
    t.start()
    l.append(t)

for t in l:
    t.join()
print(number)

2、遞迴鎖(解決死鎖問題)

遞迴鎖:  RLock允許在同一執行緒中被多次acquire()  加鎖,其內部存在著一個計數器,只要計數器大於0就沒人能拿到這把鎖。
acquire()方法和release()方法必須成對出現,即呼叫了n次acquire,必須呼叫n次的release才能真正釋放所佔用的鎖。
通過  rlock = threading.Rlock()   方法獲得一把遞迴鎖。

死鎖:當一個或多個程序等待系統資源,而資源又被程序本身或其它程序佔用時,就形成了死鎖。

import time
import threading

class MyThread(threading.Thread):
    def run(self):
        self.actionA()
        self.actionB()

    def actionA(self):
        LockA.acquire()
        print("got Alock"+str(threading.current_thread()))
        time.sleep(1)
        LockB.acquire()
        print("got Block" + str(threading.current_thread()))
        time.sleep(0.)
        LockB.release()
        LockA.release()

    def actionB(self):
        LockB.acquire()
        print("got Block"+str(threading.current_thread()))
        time.sleep(1)
        LockA.acquire()
        print("got Alock" + str(threading.current_thread()))
        time.sleep(0.1)
        LockA.release()
        LockB.release()

if __name__ == '__main__':
    LockA = threading.Lock()
    LockB = threading.Lock()
    L = []
    for i in range(5):
        t = MyThread()
        t.start()
        L.append(t)
    for t in L:
        t.join()
    print('ending')



當一個執行緒執行完actionA方法以後,會繼續執行actionB方法,此時並不會產生死鎖。當第二個執行緒開始執行的時候,可以獲得A鎖,當獲取B鎖的時候,返現B鎖已經被第一個執行緒佔用,而第一個執行緒也在請求A鎖。兩個執行緒僵持不下所以發生死鎖。當加上遞迴鎖以後就解決此問題。

加上遞迴鎖:

import time
import threading

class MyThread(threading.Thread):
    def run(self):
        self.actionA()
        self.actionB()

    def actionA(self):
        LockA.acquire()
        print("got Alock"+str(threading.current_thread()))
        time.sleep(1)
        LockA.acquire()
        print("got Block" + str(threading.current_thread()))
        time.sleep(0.)
        LockA.release()
        LockA.release()

    def actionB(self):
        LockA.acquire()
        print("got Block"+str(threading.current_thread()))
        time.sleep(1)
        LockA.acquire()
        print("got Alock" + str(threading.current_thread()))
        time.sleep(0.1)
        LockA.release()
        LockA.release()

if __name__ == '__main__':
#   獲得一把鎖
    LockA = threading.RLock()
    L = []
#   例項執行緒物件
    for i in range(5):
        t = MyThread()
        t.start()
        L.append(t)
    for t in L:
        t.join()
    print('ending')

當第一個執行緒執行完actionA方法以後再執行actionB方法會再次請求一把相同的鎖,和其他執行緒一同競爭獲得該鎖。從而解決了此問題。

3、訊號量

保證兩個或多個關鍵程式碼段不被併發呼叫,也是一種鎖。threading.Semaphore(size) 建立一個訊號量。也有acquire()、release()方法和上面相同。

import threading
import time

def check():
    # 當進來一個變數,就會減一,一共有5把鎖
    if semaphore.acquire():
        print(threading.current_thread())
        time.sleep(3)
        # 每釋放一把鎖就會加一
        semaphore.release()
#         最後顯示會每三秒出5個結果

if __name__ == '__main__':
    L = []
    # 建立5個訊號量,訊號量也是一種鎖
    semaphore = threading.Semaphore(5)
    for i in range(100):
        t = threading.Thread(target=check)
        t.start()
        L.append(t)
    for t in L:
        t.join()
    print('end')

4、同步物件Event

threading.event()建立一個同步物件,類似於訊號量機制,只不過訊號量為一。
wait()方法:沒有設定標誌位就會卡住,一旦被設定等同於pass。set()方法:設定標誌位。clear()方法:清楚標誌位。is_set()方法:是否設定。

import time
import threading

class Boss(threading.Thread):

    def run(self):
        print("加班加班")
        #  檢驗是否設定標誌位
        print(event.is_set())
        #  設定標誌位
        event.set()
        time.sleep(3)
        # 清空標誌位
        event.clear()
        print('加班結束')
        event.set()

class Worker(threading.Thread):

    def run(self):
#       等待設定標誌位,如果設定,此語句相當於pass
        event.wait()
        print("命苦啊")
        time.sleep(1)
        event.clear()
        event.wait()
        print("OH yeah")

if __name__ == '__main__':
    L = []
#   建立一個同步物件
    event = threading.Event()
    for i in range(5):
        L.append(Worker())
    L.append(Boss())
    for t in L:
        t.start()
    for t in L:
        t.join()
    print("end")

5、生產者消費者模型

(1):佇列:執行緒佇列一種資料結構,是多執行緒的、安全的。列表是執行緒不安全的。

            佇列的模式:1、先進先出   2、先進後出   3、按優先順序。

import queue

# 建立一個佇列先進先出   
q = queue.Queue(3)

# q = queue.LifoQueue()   建立先進後出佇列。(棧)later in first out
# q = queue.PriorityQueue()  建立一個按優先順序順序的佇列

# 新增資料  當超過佇列大小時,就會卡住put。後面如果加上false引數就會在滿的時候報錯。
q.put(1)
q.put("csdn")
q.put({"name":"2333"})
q.put("baba")

# print(q.qsize())   不是最大值,而是當前佇列有多少個元素。 
# print(q.maxsize)   佇列的最長度
# print(q.full())   隊滿
# print(q.empty())   隊空

while 1:
    print("+++++++++")
#   取資料,沒資料的時候會一直卡住,一直等到有新增值。(另外的一個執行緒可以繼續新增值)。後面如果加上false引數就會在滿的時候報錯
    print(q.get())
    print("---------")

當前程式會卡在put()方法。

當使用優先順序佇列時在put()資料時一同寫上優先順序,數值越小優先順序越高。

q.put([1,1])

(2)Producer-consumer problem:

task_done()方法:完成一項工作後向另一個執行緒發訊號。join()方法:等到佇列為空時,在執行別的操作。成對出現,否則沒意義。 

import time, threading, queue, random

def producer(name):
    count = 0
    while count < 10:
        print("making....")
        # 隨機數範圍1--2
        time.sleep(random.randrange(3))
        q.put(count)
        print("{}做好了第{}個包子".format(name,count))
        count += 1
        # q.task_done()
        q.join()

def consumer(name):
    count = 0
    while count < 10:
        time.sleep(random.randrange(4))
        print('waiting')
        # q.join()     沒有訊號會卡住,有人給訊號就往下執行
        data = q.get()
        q.task_done()
        print("{}正在吃第{}個包子".format(name,data))
        count += 1

if __name__ == '__main__':
    q = queue.Queue()
    t1 = threading.Thread(target=producer,args=('A君',))
    t2 = threading.Thread(target=consumer,args=('B君',))
    t3 = threading.Thread(target=consumer,args=('C君',))
    t4 = threading.Thread(target=consumer,args=('D君',))
    t1.start()
    t2.start()
    t3.start()
    t4.start()

task_done : 用於 get 的後續呼叫.告訴佇列任務處理完成.當然你也可以不呼叫get。

join:阻塞操作,直到佇列所有的任務都處理,換句話說,當佇列裡面沒有東西的時候才會向下執行,否則會阻塞。也就是說:往裡 put 幾次,就要呼叫task_done幾次。然而task_done()並不會使佇列長度-1,而是會向佇列傳送訊號,真正使佇列長度減一的是get()操作。

put佇列完成的時候千萬不能用task_done(),否則會報錯,因為該方法僅僅表示get成功後,執行的一個標記。