1. 程式人生 > >程序、執行緒、協程和GIL(二)

程序、執行緒、協程和GIL(二)

    上一篇部落格講了程序、執行緒、協程和GIL的基本概念,這篇我們來說說在以下三點:

  1> python中使用threading庫來建立執行緒的兩種方式

  2> 使用Event對消來判斷執行緒是否已啟動

  3> 使用Semaphore和BoundedSemaphore兩個類分別來控制執行緒的併發數以及二者之間的區別。

  如果想要了解基本概念,請移步我的上一篇部落格:https://www.cnblogs.com/ss-py/p/10236125.html

正文:

  利用threading庫來在建立一個執行緒:

from threading import
Thread def run(name): print('我是: %s' % (name)) if __name__ == '__main__': t = Thread(target=run, args=('執行緒1號',)) t.start()

  執行結果:  

  

  首先,建立一個Thread執行緒,並用target=run來指定這個執行緒需要執行那個run方法,然後將run方法所需的引數以args引數的形式傳遞過去,

  請注意,args所傳的引數是一個元組(tuple)型別,因此即使元組中只有一個引數,也要在這個引數後再加一個逗號,如 args=('執行緒1號',)

  

  而且,當我們建立一個執行緒物件後,這個執行緒並不會立即執行,除非呼叫它的star()方法,當呼叫star()方法後,這個執行緒會呼叫你使用target傳給他的函式,

  並將args中的引數傳遞給這個函式。

  Python中的執行緒會在一個單獨的系統級執行緒中執行(如一個POSIX執行緒或一個Windows執行緒),這些執行緒全部由作業系統進行管理,執行緒一旦啟動,

  它將獨立執行直至目標函式執行結束。

  我們可以呼叫is_alive()方法來進行判斷該執行緒是否在執行(is_alive()方法返回True或者False):

  我們知道,程序是依賴於執行緒來執行的,所以當我們的py檔案在執行時,我們可以理解為有一個主執行緒在執行,當我們建立一個子執行緒時,就相當於當前程式一共有兩個執行緒在執行。當子執行緒被建立後,主執行緒和子執行緒就獨立執行,相互並不影響。

  程式碼如下:

 1 import time
 2 from threading import Thread
 3 
 4 def run(name):
 5     time.sleep(2)
 6     print('我是: %s' % (name))
 7 
 8 if __name__ == '__main__':
 9     t = Thread(target=run, args=('子執行緒',))
10     t.start()
11     print('我是主執行緒')

  執行結果:

  

  在程式碼的第9行,建立了一個執行緒t,讓它來執行run()方法,這時,程式中就有了兩個執行緒同時存在、各自獨立執行,預設的,主執行緒是不會等待子執行緒的運算結果的,所以主執行緒繼續向下執行,列印L“我是主執行緒”,而子執行緒在呼叫run()方法時sleep了兩秒鐘,之後才打印出“我是子執行緒”

  當然,我們可以手動的呼叫join()方法來讓主執行緒等待子執行緒執行結束後再向下執行:

 1 import time
 2 from threading import Thread
 3 
 4 def run(name):
 5     time.sleep(2)
 6     print('我是: %s' % (name))
 7 
 8 if __name__ == '__main__':
 9     t = Thread(target=run, args=('子執行緒',))
10     t.start()
11     t.join()
12     print('我是主執行緒')

  

  這時,程式在進行到第十行後,主執行緒就卡住了,它在等待子執行緒執行結束,檔子執行緒執行結束後,主執行緒才會繼續向下執行,直至程式退出。

  但是,但是,無論主執行緒等不等待子執行緒,Python直譯器都會等待所有的執行緒都終止後才會退出。也就是說,無論主執行緒等不等待子執行緒,這個程式最終都

  會執行兩秒多,因為子執行緒sleep了兩秒。

  所以,當遇到需要長時間執行的執行緒或者是需要一直在後臺執行的執行緒時,可以將其設定為後臺執行緒(守護執行緒)daemon=True,如:

t = Thread(target=run, args=('子執行緒',), daemon=True)

  守護執行緒,顧名思義是守護主執行緒的執行緒,他們是依賴於主執行緒而存在的,當主執行緒執行結束後,守護執行緒會被立即登出,無論該執行緒是否執行結束。

  當然,我們也可以利用join()來使主執行緒等待主執行緒。

  使用threading庫來建立執行緒還有一種方式:

from threading import Thread

class CreateThread(Thread):

    def __init__(self):
        super().__init__()

    def run(self):
        print('我是子執行緒!')

t = CreateThread()
t.start()

  在開始t = Thread(target=run, args=('子執行緒',))這種方式呼叫方法時子執行緒呼叫的run方法的這個方法名是我隨便起的,實際上叫什麼都行,

  但是以繼承Thread類方式實現執行緒時,執行緒呼叫的方法名必須是run() 這個是程式寫死的。

 

   使用Event物件判斷執行緒是否已經啟動

   threading庫中的Event物件包含一個可由執行緒來設定的訊號標誌,它允許執行緒等待某些事件的發生。

  初始狀態時,event物件中的訊號標誌被設定為假,如果有一個執行緒等待event物件,且這個event物件的標誌為假,那麼這個執行緒就會一直阻塞,直到該標誌為真。如果將一個event物件的標誌設定為真,他將喚醒所有等待這個標誌的執行緒,如果一個執行緒等待一個被設定為真得Event物件,那麼它將忽略這個事件,繼續向下執行。  

  Event (事件) 定義了一個全域性的標誌Flag,如果Flag為False,當程式執行event.wait()時就會阻塞,當Flag為True時,程式執行event.wait()時便不會阻塞:

  event.set():  將標誌Flag設定為True, 並通知所有因等待該標誌而處於阻塞狀態的執行緒恢復執行。

  event.clear(): 將標誌Flag設定為False

  event.wait():  判斷當前標誌狀態,如果是True則立即返回,否則執行緒繼續阻塞。

  event.isSet(): 獲取標誌Flag狀態: 返回True或者False

 1 from threading import Thread, Event
 2 
 3 def run(num, start_evt):
 4     if int(num) >10:
 5         start_evt.set()
 6     else:
 7         start_evt.clear()
 8 start_evt = Event()
 9 
10 if __name__ == '__main__':
11     num = input("請輸入數字>>>")
12     t = Thread(target=run, args=(num, start_evt,))
13     t.start()
14     start_evt.wait()  # 主執行緒獲取Event物件的標誌狀態,若為True,則主執行緒繼續執行,否則,主執行緒阻塞
15     print("主執行緒繼續執行!")

  上邊這段程式碼:當輸入的數字大於10時,將標誌設定為True,主程式繼續執行,當小於或者等於10時,將標誌設為False(預設為False),主執行緒阻塞。

     

  值得注意的是:當Event物件的標誌被設定為True時,他會喚醒所有等待他的執行緒,如果只想喚醒某一個執行緒,最好使用訊號量。

 

  訊號量

  訊號量,說白了就是一個計數器,用來控制執行緒的併發數,每次有執行緒獲得訊號量的時候(即acquire())計數器-1,釋放訊號量時候(release())計數器+1,計數器為0的時候其它執行緒就被阻塞無法獲得訊號量

  acquire()   # 設定一個訊號量

  release()   # 釋放一個訊號量

  python中有兩個類實現了訊號量:(Semaphore和BoundedSemaphore)

  Semaphore和BoundedSemaphore的相同之處

    通過: threading.Semaphore(3) 或者 threading.BoundedSemaphore(3) 來設定初始值為3的計數器

    執行acquire() 計數器-1,執行release() 計數器+1,當計數器為0時,其他執行緒均無法再獲得訊號量從而阻塞

import threading

se = threading.BoundedSemaphore(3)

for i in range(5):
    se.acquire()
    print('訊號量被設定')

for j in range(10):
    se.release()
    print('訊號量被釋放了')

  執行結果:   

import threading

se = threading.Semaphore(3)

for i in range(5):
    se.acquire()
    print('訊號量被設定')

for j in range(10):
    se.release()
    print('訊號量被釋放了')

  執行結果:

  可以看到,無論我們用那個類建立訊號量,當計數器被減為0時,其他執行緒均會阻塞。

  這個功能經常被用來控制執行緒的併發數

  沒有設定訊號量:

import time
import threading

num = 3

def run():
    time.sleep(2)
    print(time.time())

if __name__ == '__main__':
    t_list = []
    for i in range(20):
        t = threading.Thread(target=run)
        t_list.append(t)
    for i in t_list:
        i.start()

  執行結果:20個執行緒幾乎在同時執行,,如果主機在執行IO密集型任務時執行這種程式時,主機有可能會宕機,

  但是在設定了訊號量時,我們可以來控制同一時間同時執行的執行緒數:

import time
import threading

num = 3

def run():
    se.acquire()  # 新增訊號量
    time.sleep(2)
    print(time.time())
    se.release()  #  釋放一個訊號量


if __name__ == '__main__':
    t_list = []
    se = threading.Semaphore(5)  # 設定一個大小為5的計數器(同一時間,最多允許5個執行緒在執行)
   # se = threading.BoundedSemaphore(5) 
    for i in range(20):
        t = threading.Thread(target=run)
        t_list.append(t)
    for i in t_list:
        i.start()

  這時,我們給程式加上訊號量,控制它在同一時間內,最多隻有5個執行緒在執行。

  

  兩者之間的差異性:

    當計數器達到設定好的上線時,BoundedSemaphore就無法進行release()操作了,Semaphore沒有這個限制,它會丟擲異常。

  

import threading


se = threading.Semaphore(3)

for i in range(3):  # 將計數器值減為0
    se.acquire(3)

for j in range(5):  # 將計數器值加至5
    se.release()
    print('訊號量被釋放了')

  執行結果:

  

import threading


se = threading.BoundedSemaphore(3)

for i in range(3):  # 將計數器值減為0
    se.acquire(3)

for j in range(5):  # 將計數器值加至5
    se.release()
    print('訊號量被釋放了')

  執行結果:

  拋異常了:訊號量被釋放太多次。。。

   好了,這篇文章的就先寫到這裡,下一篇文章我會講解關於執行緒間通訊、執行緒加鎖等問題

  想了解更多關於Python、爬蟲的資訊,歡迎關注我的個人微信公眾號: