1. 程式人生 > >鐵樂學python_Day42_線程-信號量事件條件

鐵樂學python_Day42_線程-信號量事件條件

oba outer 消費者與生產者 spa init .com end 超出 技術

鐵樂學python_Day42_線程-信號量事件條件

線程中的信號量

同進程的一樣,Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。

實例:(同時只有5個線程可以獲得semaphore,即可以限制最大連接數為5):
from threading import Thread,Semaphore
import threading
import time

def func():
    sm.acquire()
    print
(%s get sm‘ %threading.current_thread().getName()) time.sleep(3) sm.release() if __name__ == ‘__main__‘: sm=Semaphore(5) for i in range(23): t=Thread(target=func) t.start()

與進程池是完全不同的概念,進程池Pool(4),最大只能產生4個進程,
而且從頭到尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程。

線程中的事件

同進程的一樣,線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。
如果程序中的其他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。
為了解決這些問題,我們需要使用threading庫中的Event對象。
對象包含一個可由線程設置的信號標誌,它允許線程等待某些事件的發生。
在初始情況下,Event對象中的信號標誌被設置為假。
如果有線程等待一個Event對象, 而這個Event對象的標誌為假,那麽這個線程將會被一直阻塞直至該標誌為真。
一個線程如果將一個Event對象的信號標誌設置為真,它將喚醒所有等待這個Event對象的線程。
如果一個線程等待一個已經被設置為真的Event對象,那麽它將忽略這個事件, 繼續執行。

event.isSet():返回event的狀態值;
event.wait():如果 event.isSet()==False將阻塞線程;
event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;
event.clear():恢復event的狀態值為False。

技術分享圖片

例如,有多個工作線程嘗試鏈接MySQL,要在鏈接前確保MySQL服務正常才讓那些工作線程去連接MySQL服務器,
如果連接不成功,都會去嘗試重新連接。可以采用threading.Event機制來協調各個工作線程的連接操作。

例:模擬連接mysql數據庫
import threading
import
time,random from threading import Thread,Event def conn_mysql(): count=1 while not event.is_set(): if count > 3: raise TimeoutError(‘鏈接超時‘) print(‘<%s>第%s次嘗試鏈接‘ % (threading.current_thread().getName(), count)) event.wait(0.5) count+=1 print(‘<%s>鏈接成功‘ %threading.current_thread().getName()) def check_mysql(): print(\033[45m[%s]正在檢查mysql\033[0m‘ % threading.current_thread().getName()) time.sleep(random.randint(2,4)) event.set() if __name__ == ‘__main__‘: event=Event() conn1=Thread(target=conn_mysql) conn2=Thread(target=conn_mysql) check=Thread(target=check_mysql) conn1.start() conn2.start() check.start() 例:模擬連接數據庫2 import time import random from threading import Event, Thread # 模擬連接數據庫 def connect_db(e): count = 1 while count < 4: print(‘嘗試第%s次檢測連接‘ % count) e.wait(0.5) # 如果不傳參數會一直等到事件為True為止 # 如果傳參數 傳一個時間參數 count += 1 if e.is_set(): print(‘連接成功‘) break else: print(‘連接失敗‘) def check_conn(e): ‘‘‘檢測數據庫是否可以連接‘‘‘ time.sleep(random.randint(1, 2)) e.set() e = Event() Thread(target=check_conn, args=(e,)).start() Thread(target=connect_db, args=(e,)).start() # 當你要做一件事情是有前提(前置條件)的時候 # 你就先去處理前提(前置條件)的問題 —— 前提處理好了,把狀態設置成True # 來控制即將要做的事情可以開始了。 運行效果如下: 嘗試第1次檢測連接 嘗試第2次檢測連接 連接成功

條件(Condition)

使線程等待,當滿足某條件時,才釋放n個線程。

Python提供的Condition對象提供了對復雜線程同步問題的支持。
Condition被稱為條件變量,除了提供與Lock類似的acquire和release方法外,還提供了wait和notify方法。
線程首先acquire一個條件變量,然後判斷一些條件。
如果條件不滿足則wait;
如果條件滿足,進行一些處理改變條件後,通過notify方法通知其他線程,
其他處於wait狀態的線程接到通知後會重新判斷條件。
不斷的重復這一過程,從而解決復雜的同步問題。

例:線程設置條件
import threading

def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s" % n)
    con.release()

if __name__ == ‘__main__‘:

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

    while True:
        inp = input(‘>>>‘)
        if inp == ‘q‘:
            break
        con.acquire()
        con.notify(int(inp))
        con.release()
        print(‘****‘)

運行如下:
>>>1
****
>>>run the thread: 0
2
****
>>>run the thread: 1
run the thread: 2
q

例2:通過條件變量控制線程分批執行
from threading import Condition, Thread

def func(i, con):
    con.acquire()
    con.wait()
    print(i * ‘*‘)
    con.release()

con = Condition()
# 定義了範圍從1到9
for i in range(1, 10):
    Thread(target=func, args=(i, con)).start()
while True:
    # 分批控制線程執行
    n = int(input(‘>>>‘))
    if n == 0:
        break
    con.acquire()
    con.notify(n)
    con.release()

運行效果如下:
>>>1
>>>*
2
>>>**
***
3
>>>******
****
*****
4
>>>*******
********
*********
0

以上,輸入1則運行一次任務,1+2+3+4 10次己超出任務次數,完成後不會重復執行

例:那朵花躲迷藏
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# ____ Condition 條件變量
# ____ 模擬那朵花中的面碼捉迷藏對話

import threading, time

# 將躲迷藏中搜尋的角色這個類創造出來,且繼承了線程Thread這個類
class Seeker(threading.Thread):

    def __init__(self, cond, name):
        super(Seeker, self).__init__()
        self.cond = cond
        self.name = name

    def run(self):
        # 睡一秒,讓躲貓貓的面碼也運行起來,不然會阻塞住
        time.sleep(1)
        self.cond.acquire()
        print(self.name + ‘: (藏)好了嗎?‘)
        self.cond.notify()
        self.cond.wait()
        print(self.name + ‘: (藏)好了嗎?~~‘)
        self.cond.notify()
        self.cond.wait()
        print(self.name + ‘: 看到你了!面碼!‘)
        self.cond.notify()
        self.cond.wait()
        self.cond.release()
        print(self.name + ": 謝謝你,面碼~ ")

# 再來是躲迷藏的面碼
class Hider(threading.Thread):

    def __init__(self, cond, name):
        super(Hider, self).__init__()
        self.cond = cond
        self.name = name

    def run(self):
        self.cond.acquire()
        self.cond.wait()
        # 釋放對鎖的占用,同時線程掛起來在這裏,直到被notify
        print(self.name + ": 還沒好哦~")
        self.cond.notify()
        self.cond.wait()
        print(self.name + ": (藏)好了哦~")
        self.cond.notify()
        self.cond.wait()
        self.cond.notify()
        self.cond.release()
        print(self.name + ": 阿,被看到了~")

cond = threading.Condition()
seeker = Seeker(cond, "仁太")
hider = Hider(cond, "面碼")
seeker.start()
hider.start()

運行效果:交替進行的對話

仁太: (藏)好了嗎?
面碼: 還沒好哦~
仁太: (藏)好了嗎?~~
面碼: (藏)好了哦~
仁太: 看到你了!面碼!
面碼: 阿,被看到了~
仁太: 謝謝你,面碼~ 

無限循環的例子

經典的生產者與消費者問題:假設有一群生產者(Producer)和一群消費者(Consumer)通過一個市場來交互產品。
生產者的”策略“是如果市場上剩余的產品少於1000個,那麽就生產100個產品放到市場上;
而消費者的”策略“是如果市場上剩余產品的數量多余100個,那麽就消費3個產品。
用Condition解決生產者與消費者問題的代碼如下:

#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# ____消費者與生產者模型,多線程,條件變量,無限循環

import threading
import time

# 生產者
class Producer(threading.Thread):

    def run(self):
        global count
        while True:
            if con.acquire():
                # 當產品在市場超過1000個時開始等候
                if count > 1000:
                    con.wait()
                # 少於1000個則開始生產100個產品投入市場
                else:
                    count = count + 100
                    msg = self.name + ‘ produce 100, count=‘ + str(count)
                    print(msg)
                    con.notify()
                con.release()
                time.sleep(1)


# 消費者
class Consumer(threading.Thread):

    def run(self):
        global count
        while True:
            # 當市場少於200個產品時,消費者等候
            if con.acquire():
                if count < 200:
                    con.wait()
                # 否則消費
                else:
                    count = count - 30
                    msg = self.name + ‘ consume 30, count=‘ + str(count)
                    print(msg)
                    con.notify()
                con.release()
                time.sleep(1)

# 初始產品為100個
count = 100
con = threading.Condition()


def main():
    # 兩個生產者
    for i in range(2):
        p = Producer()
        p.start()
    # 五個消費者
    for i in range(5):
        c = Consumer()
        c.start()


if __name__ == ‘__main__‘:
    main()

無限循環的部分運行效果:
Thread-1 produce 100, count=200
Thread-2 produce 100, count=300
Thread-3 consume 30, count=270
Thread-4 consume 30, count=240
Thread-5 consume 30, count=210
Thread-6 consume 30, count=180
Thread-1 produce 100, count=280
Thread-3 consume 30, count=250
Thread-5 consume 30, count=220
Thread-2 produce 100, count=320
Thread-4 consume 30, count=290
Thread-6 consume 30, count=260
Thread-7 consume 30, count=230
Thread-1 produce 100, count=330
Thread-4 consume 30, count=300
Thread-2 produce 100, count=400
Thread-5 consume 30, count=370
Thread-3 consume 30, count=340
Thread-6 consume 30, count=310
Thread-7 consume 30, count=280
Thread-1 produce 100, count=380
Thread-5 consume 30, count=350
Thread-3 consume 30, count=320
Thread-4 consume 30, count=290
Thread-2 produce 100, count=390
Thread-6 consume 30, count=360
Thread-7 consume 30, count=330
Thread-1 produce 100, count=430
Thread-3 consume 30, count=400
Thread-2 produce 100, count=500

淺析區別

  • 信號量 semaphore 允許統一時刻n個線程執行這段代碼
  • 事件 event 有一個內部的事件來控制wait的行為且控制的是所有的線程
  • 條件 condition 有一個內部的條件來控制wait的行為,可以逐個或者分批次的控制線程的走向

end
參考:http://www.cnblogs.com/Eva-J/articles/8306047.html

鐵樂學python_Day42_線程-信號量事件條件