1. 程式人生 > >Python併發程式設計系列之多執行緒

Python併發程式設計系列之多執行緒

1引言

2 建立執行緒

  2.1 函式的方式建立執行緒

  2.2 類的方式建立執行緒

3 Thread類的常用屬性和方法

  3.1 守護執行緒:Deamon

  3.2 join()方法

4 執行緒間的同步機制

  4.1 互斥鎖:Lock

  4.2 遞迴鎖:RLock

  4.3 Condition

  4.4 訊號量:Semaphore

  4.5 事件:Event

  4.6 定時器:Timer

5 執行緒間的通行

  5.1佇列:Queue

6 執行緒池

7 總結

1 引言

  上一篇博文詳細總結了Python程序的用法,這一篇博文來所以說Python中執行緒的用法。實際上,程式的執行都是以執行緒為基本單位的,每一個程序中都至少有一個執行緒(主執行緒),執行緒又可以建立子執行緒。執行緒間共享資料比程序要容易得多(輕而易舉),程序間的切換也要比程序消耗CPU資源少。

  執行緒管理可以通過thead模組(Python中已棄用)和threading 模組,但目前主要以threading模組為主。因為更加先進,有更好的執行緒支援,且 threading模組的同步原語遠多於thread模組。另外,thread 模組中的一些屬性會和 threading 模組有衝突。故,本文建立執行緒和使用執行緒都通過threading模組進行。

  threading模組提供的類: Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。

  threading 模組提供的常用方法:

  threading.currentThread(): 返回當前的執行緒變數。

  threading.enumerate(): 返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。

  threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果。

  threading 模組提供的常量:

  threading.TIMEOUT_MAX 設定threading全域性超時時間。

2 建立執行緒

  無論是用自定義函式的方法建立執行緒還是用自定義類的方法建立執行緒與建立程序的方法極其相似的,不過,建立執行緒時,可以不在“if __name__==”__main__:”語句下進行”。無論是哪種方式,都必須通過threading模組提供的Thread類進行。Thread類常用屬性和方法如下。

  Thread類屬性:

  name:執行緒名

  ident:執行緒的識別符號

  daemon:布林值,表示這個執行緒是否是守護執行緒

  Thread類方法:

  __init__(group=None,target=None,name=None,args=(),kwargs={},verbose=None,daemon=None):例項化一個執行緒物件,需要一個可呼叫的target物件,以及引數args或者kwargs。還可以傳遞name和group引數。daemon的值將會設定thread.daemon的屬性

  start():開始執行該執行緒

  run():定義執行緒的方法。(通常開發者應該在子類中重寫)

  join(timeout=None):直至啟動的執行緒終止之前一直掛起;除非給出了timeout(單位秒),否則一直被阻塞

  isAlive:布林值,表示這個執行緒是否還存活(駝峰式命名,python2.6版本開始已被取代)

  isDaemon():布林值,表示是否是守護執行緒

  setDaemon(布林值):線上程start()之前呼叫,把執行緒的守護標識設定為指定的布林值

  在下面兩小節我們分別通過程式碼來演示。

2.1 自定義函式的方式建立執行緒

import os

import time

import threading

def fun(n):

    print('子執行緒開始執行……')

    time.sleep(1)

    my_thread_name = threading.current_thread().name#獲取當前執行緒名稱

    my_thread_id = threading.current_thread().ident#獲取當前執行緒id

    print('當前執行緒為:{},執行緒id為:{},所在程序為:{},您輸入的引數為:{}'.format(my_thread_name ,my_thread_id , os.getpid(),n))

    print('子執行緒執行結束……')

 

t = threading.Thread(target=fun , name='執行緒1',args=('引數1',))

t.start()

time.sleep(2)

main_thread_name = threading.current_thread().name#獲取當前執行緒名稱

main_thread_id = threading.current_thread().ident#獲取當前執行緒id

print('主執行緒為:{},執行緒id為:{},所在程序為:{}'.format(main_thread_name ,main_thread_id , os.getpid()))

2.2 類的方式建立執行緒

import os

import time

import threading

class MyThread(threading.Thread):

    def __init__(self , n , name=None):

        super().__init__()

        self.name = name

        self.n = n

    def run(self):

        print('子執行緒開始執行……')

        time.sleep(1)

        my_thread_name = threading.current_thread().name#獲取當前執行緒名稱

        my_thread_id = threading.current_thread().ident#獲取當前執行緒id

        print('當前執行緒為:{},執行緒id為:{},所在程序為:{},您輸入的引數為:{}'.format(my_thread_name ,my_thread_id , os.getpid(),self.n))

        print('子執行緒執行結束……')

t = MyThread(name='執行緒1', n=1)

t.start()

time.sleep(2)

main_thread_name = threading.current_thread().name#獲取當前執行緒名稱

main_thread_id = threading.current_thread().ident#獲取當前執行緒id

print('主執行緒為:{},執行緒id為:{},所在程序為:{}'.format(main_thread_name ,main_thread_id , os.getpid()))

 

  輸出結果:

  子執行緒開始執行……

  當前執行緒為:執行緒1,執行緒id為:11312,所在程序為:4532,您輸入的引數為:1

  子執行緒執行結束……

  主執行緒為:MainThread,執行緒id為:10868,所在程序為:4532

  上述兩塊程式碼輸出結果是一樣的(id不一樣),觀察輸出結果可以發現,子執行緒和主執行緒所在的程序都是一樣的,證明是在同一程序中的程序。

3 Thread的常用方法和屬性

3.1 守護執行緒:Deamon

  Thread類有一個名為deamon的屬性,標誌該執行緒是否為守護執行緒,預設值為False,當為設為True是表示設定為守護執行緒。是否是守護執行緒有什麼區別呢?我們先來看看deamon值為False(預設)情況時:

import os

import time

import threading

 

def fun():

    print('子執行緒開始執行……')

    for i in range(6):#執行3秒,每秒輸出一次

        time.sleep(1)

        my_thread_name = threading.current_thread().name

        print('{}已執行{}秒……'.format(my_thread_name ,i+1))

    print('子執行緒執行結束……')

 

print('主執行緒開始執行……')

t = threading.Thread(target=fun , name='執行緒1')

print('daemon的值為:{}'.format(t.daemon))

t.start()

for i in range(3):

    time.sleep(1)

    my_thread_name = threading.current_thread().name

    print('{}已執行{}秒……'.format(my_thread_name, i+1))

print('主執行緒結束執行……')

  輸出結果:

  主執行緒開始執行……

  daemon的值為:False

  子執行緒開始執行……

  MainThread已執行1秒……

  執行緒1已執行1秒……

  MainThread已執行2秒……

  執行緒1已執行2秒……

  MainThread已執行3秒……

  主執行緒結束執行……

  執行緒1已執行3秒……

  執行緒1已執行4秒……

  執行緒1已執行5秒……

  執行緒1已執行6秒……

  子執行緒執行結束……

  程式碼中,主執行緒只需要執行3秒即可結束,但子執行緒需要執行6秒,從執行結果中可以看到,主執行緒程式碼執行結束後,子執行緒還可以繼續執行,這就是非守護執行緒的特徵。

再來看看daemon值為True時:

import time

import threading

 

def fun():

    print('子執行緒開始執行……')

    for i in range(6):#執行3秒,每秒輸出一次

        time.sleep(1)

        my_thread_name = threading.current_thread().name

        print('{}已執行{}秒……'.format(my_thread_name ,i+1))

    print('子執行緒執行結束……')

 

print('主執行緒開始執行……')

t = threading.Thread(target=fun , name='執行緒1')

t.daemon=True #設定為守護執行緒

print('daemon的值為:{}'.format(t.daemon))

t.start()

for i in range(3):

    time.sleep(1)

    my_thread_name = threading.current_thread().name

    print('{}已執行{}秒……'.format(my_thread_name, i+1))

print('主執行緒結束執行……')

  輸出結果:

  主執行緒開始執行……

  daemon的值為:True

  子執行緒開始執行……

  MainThread已執行1秒……

  執行緒1已執行1秒……

  MainThread已執行2秒……

  執行緒1已執行2秒……

  MainThread已執行3秒……

  主執行緒結束執行……

  從執行結果中可以看出,當deamon值為True,即設為守護執行緒後,只要主執行緒結束了,無論子執行緒程式碼是否結束,都得跟著結束,這就是守護執行緒的特徵。另外,修改deamon的值必須線上程start()方法呼叫之前,否則會報錯。

3.2 join()方法

  join()方法的作用是在呼叫join()方法處,讓所線上程(主執行緒)同步的等待被join的執行緒(下面的p執行緒),只有p執行緒結束。我們嘗試在不同的位置呼叫join方法,對比執行結果。首先在p執行緒一開始的位置進行join:

import time

import threading

 

def fun():

    print('子執行緒開始執行……')

    for i in range(6):#執行3秒,每秒輸出一次

        time.sleep(1)

        my_thread_name = threading.current_thread().name

        print('{}已執行{}秒……'.format(my_thread_name ,i+1))

    print('子執行緒執行結束……')

 

print('主執行緒開始執行……')

t = threading.Thread(target=fun , name='執行緒1')

t.daemon=True #設定為守護執行緒

t.start()

t.join() #此處進行join

for i in range(3):

    time.sleep(1)

    my_thread_name = threading.current_thread().name

    print('{}已執行{}秒……'.format(my_thread_name, i+1))

print('主執行緒結束執行……')

  輸出結果:

  主執行緒開始執行……

  子執行緒開始執行……

  執行緒1已執行1秒……

  執行緒1已執行2秒……

  執行緒1已執行3秒……

  執行緒1已執行4秒……

  執行緒1已執行5秒……

  執行緒1已執行6秒……

  子執行緒執行結束……

  MainThread已執行1秒……

  MainThread已執行2秒……

  MainThread已執行3秒……

  主執行緒結束執行……

  可以看出,等子執行緒執行完之後,主執行緒才繼續join下面的程式碼。然後在主執行緒即將結束時進行join:

import time

import threading

def fun():

    print('子執行緒開始執行……')

    for i in range(6):#執行3秒,每秒輸出一次

        time.sleep(1)

        my_thread_name = threading.current_thread().name

        print('{}已執行{}秒……'.format(my_thread_name ,i+1))

    print('子執行緒執行結束……')

 

print('主執行緒開始執行……')

t = threading.Thread(target=fun , name='執行緒1')

t.daemon=True #設定為守護執行緒

t.start()

for i in range(3):

    time.sleep(1)

    my_thread_name = threading.current_thread().name

    print('{}已執行{}秒……'.format(my_thread_name, i+1))

t.join()

print('主執行緒結束執行……')

  輸出結果:

  主執行緒開始執行……

  子執行緒開始執行……

  MainThread已執行1秒……

  執行緒1已執行1秒……

  MainThread已執行2秒……

  執行緒1已執行2秒……

  MainThread已執行3秒……

  執行緒1已執行3秒……

  執行緒1已執行4秒……

  執行緒1已執行5秒……

  執行緒1已執行6秒……

  子執行緒執行結束……

  主執行緒結束執行……

  上面程式碼中,子執行緒是設定為守護執行緒的,如果沒有呼叫join()方法主執行緒3秒結束,子執行緒也會跟著結束,但是從執行結果中我們可以看出,主執行緒3秒後,陷入等待,等子執行緒執行完之後,才會繼續下面的程式碼。

4 執行緒間的同步機制

  在預設情況在,多個執行緒之間是併發執行的,這就可能給資料程式碼不安全性,例如有一個全域性變數num=10,執行緒1、執行緒2每次讀取該變數後在原有值基礎上減1。但,如果執行緒1讀取num的值(num=10)後,還沒來得及減1,CPU就切換去執行執行緒2,執行緒2也去讀取num,這時候讀取到的值也還是num=10,然後讓num=9,這是CPU有切換回執行緒1,因為執行緒1讀取到的值是原來的num=10,所以做減1運算後,也做出num=9的結果。兩個執行緒都執行了該任務,但最後的值可不是8。如下程式碼所示:

import time

import threading

def fun():

    global num

    temp = num

    time.sleep(0.2)

    temp -= 1

    num = temp

print('主執行緒開始執行……')

t_lst = []

num =10 # 全域性變數

for i in range(10):

    t = threading.Thread(target=fun)

    t_lst.append(t)

    t.start()

[t.join() for t in t_lst]

print('num最後的值為:{}'.format(num))

print('主執行緒結束執行……')

  輸出結果:

  主執行緒開始執行……

  num最後的值為:9

  主執行緒結束執行……

  最後結果為9,不是0。這就造成了資料混亂。所以,就有了執行緒同步機制。

4.1 互斥鎖:Lock

  執行緒同步能夠保證多個執行緒安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。互斥鎖為資源設定一個狀態:鎖定和非鎖定。某個執行緒要更改共享資料時,先將其鎖定,此時資源的狀態為“鎖定”,其他執行緒不能更改;直到該執行緒釋放資源,將資源的狀態變成“非鎖定”,其他的執行緒才能再次鎖定該資源。互斥鎖保證了每次只有一個執行緒進行寫入操作,從而保證了多執行緒情況下資料的正確性。

import time

import threading

def fun(lock):

    lock.acquire()

    global num

    temp = num

    time.sleep(0.2)

    temp -= 1

    num = temp

    lock.release()

print('主執行緒開始執行……')

t_lst = []

num =10 # 全域性變數

lock = threading.Lock()

for i in range(10):

    t = threading.Thread(target=fun , args=(lock,))

    t_lst.append(t)

    t.start()

[t.join() for t in t_lst]

print('num最後的值為:{}'.format(num))

print('主執行緒結束執行……')

  輸出結果:

  主執行緒開始執行……

  num最後的值為:0

  主執行緒結束執行……

  可以看到,最後輸出結果為0,值正確。當然,如果你運行了上述兩塊程式碼,你就會發現,使用了鎖之後,程式碼執行速度明顯降低,這是因為執行緒由原來的併發執行變成了序列,不過資料安全性得到保證。

  使用Lock的時候必須注意是否會陷入死鎖,所謂死鎖是指兩個或兩個以上的程序或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的程序稱為死鎖程序。關於死鎖一個著名的模型是“科學家吃麵”模型:

import time

from threading import Thread

from threading import Lock

def eatNoodles_1(noodle_lock, fork_lock, scientist):

    noodle_lock.acquire()

    print('{} 拿到了面'.format(scientist))

    fork_lock.acquire()

    print('{} 拿到了叉子'.format(scientist))

    time.sleep(1)

    print('{} 吃到了面'.format(scientist))

    fork_lock.release()

    noodle_lock.release()

    print('{} 放下了面'.format(scientist))

    print('{} 放下了叉子'.format(scientist))

def eatNoodles_2(noodle_lock, fork_lock, scientist):

    fork_lock.acquire()

    print('{} 拿到了叉子'.format(scientist))

    noodle_lock.acquire()

    print('{} 拿到了面'.format(scientist))

    print('{} 吃到了面'.format(scientist))

    noodle_lock.release()

    print('{} 放下了面'.format(scientist))

    fork_lock.release()

    print('{} 放下了叉子'.format(scientist))

 

scientist_list1 = ['霍金','居里夫人']

scientist_list2 = ['愛因斯坦','富蘭克林']

noodle_lock  = Lock()

fork_lock = Lock()

for i in scientist_list1:

    t = Thread(target=eatNoodles_1, args=(noodle_lock, fork_lock, i))

    t.start()

for i in scientist_list2:

    t = Thread(target=eatNoodles_2, args=(noodle_lock, fork_lock, i))

t.start()

  輸出結果:

  霍金 拿到了面

  霍金 拿到了叉子

  霍金 吃到了面

  霍金 放下了面

  霍金 放下了叉子

  愛因斯坦 拿到了叉子

  居里夫人 拿到了面

  霍金吃完後,愛因斯坦拿到了叉子,把叉子鎖住了;居里夫人拿到了面,把面鎖住了。愛因斯坦就想:居里夫人不給我面,我就吃不了面,所以我不給叉子。居里夫人就想:愛因斯坦不給我叉子我也吃不了面,我就不給叉子。所以就陷入了死迴圈。

  為了解決Lock死鎖的情況,就有了遞迴鎖:RLock。

4.2 遞迴鎖:RLock

  所謂的遞迴鎖也被稱為“鎖中鎖”,指一個執行緒可以多次申請同一把鎖,但是不會造成死鎖,這就可以用來解決上面的死鎖問題。

import time

from threading import Thread

from threading import RLock

def eatNoodles_1(noodle_lock, fork_lock, scientist):

    noodle_lock.acquire()

    print('{} 拿到了面'.format(scientist))

    fork_lock.acquire()

    print('{} 拿到了叉子'.format(scientist))

    time.sleep(1)

    print('{} 吃到了面'.format(scientist))

    fork_lock.release()

    noodle_lock.release()

    print('{} 放下了面'.format(scientist))

    print('{} 放下了叉子'.format(scientist))

def eatNoodles_2(noodle_lock, fork_lock, scientist):

    fork_lock.acquire()

    print('{} 拿到了叉子'.format(scientist))

    noodle_lock.acquire()

    print('{} 拿到了面'.format(scientist))

    print('{} 吃到了面'.format(scientist))

    noodle_lock.release()

    print('{} 放下了面'.format(scientist))

    fork_lock.release()

    print('{} 放下了叉子'.format(scientist))

 

scientist_list1 = ['霍金','居里夫人']

scientist_list2 = ['愛因斯坦','富蘭克林']

noodle_lock=fork_lock = RLock()

for i in scientist_list1:

    t = Thread(target=eatNoodles_1, args=(noodle_lock, fork_lock, i))

    t.start()

for i in scientist_list2:

    t = Thread(target=eatNoodles_2, args=(noodle_lock, fork_lock, i))

t.start()

  上面程式碼可以正常執行到所有科學家吃完麵條。

  RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖,二者的區別是:遞迴鎖可以連續acquire多次,而互斥鎖只能acquire一次

4.3 Condition

  Condition可以認為是一把比Lock和RLOK更加高階的鎖,其在內部維護一個瑣物件(預設是RLock),可以在建立Condigtion物件的時候把瑣物件作為引數傳入。Condition也提供了acquire, release方法,其含義與瑣的acquire, release方法一致,其實它只是簡單的呼叫內部瑣物件的對應的方法而已。Condition內部常用方法如下:

  1)acquire(): 上執行緒鎖

  2)release(): 釋放鎖

  3)wait(timeout): 執行緒掛起,直到收到一個notify通知或者超時(可選的,浮點數,單位是秒s)才會被喚醒繼續執行。wait()必須在已獲得Lock前提下才能呼叫,否則會觸發RuntimeError。

  4)notify(n=1): 通知其他執行緒,那些掛起的執行緒接到這個通知之後會開始執行,預設是通知一個正等待該condition的執行緒,最多則喚醒n個等待的執行緒。notify()必須在已獲得Lock前提下才能呼叫,否則會觸發RuntimeError。notify()不會主動釋放Lock。

  5)notifyAll(): 如果wait狀態執行緒比較多,notifyAll的作用就是通知所有執行緒

  需要注意的是,notify()方法、notifyAll()方法只有在佔用瑣(acquire)之後才能呼叫,否則將會產生RuntimeError異常。

  用Condition來實現生產者消費者模型:

import threading

import time

 

# 生產者

def produce(con):

    # 鎖定執行緒

    global num

    con.acquire()

    print("工廠開始生產……")

    while True:

        num += 1

        print("已生產商品數量:{}".format(num))

        time.sleep(1)

        if num >= 5:

            print("商品數量達到5件,倉庫飽滿,停止生產……")

            con.notify()  # 喚醒消費者

            con.wait()# 生產者自身陷入沉睡

    # 釋放鎖

    con.release()
# 消費者 def consumer(con): con.acquire() global num print("消費者開始消費……") while True: num -= 1 print("剩餘商品數量:{}".format(num)) time.sleep(2) if num <= 0: print("庫存為0,通知工廠開始生產……") con.notify() # 喚醒生產者執行緒 con.wait() # 消費者自身陷入沉睡 con.release() con = threading.Condition() num = 0 p = threading.Thread(target=produce , args=(con ,)) c = threading.Thread(target=consumer , args=(con ,)) p.start() c.start()

  輸出結果:

  工廠開始生產……

  已生產商品數量:1

  已生產商品數量:2

  已生產商品數量:3

  已生產商品數量:4

  已生產商品數量:5

  商品數量達到5件,倉庫飽滿,停止生產……

  消費者開始消費……

  剩餘商品數量:4

  剩餘商品數量:3

  剩餘商品數量:2

  剩餘商品數量:1

  剩餘商品數量:0

  庫存為0,通知工廠開始生產……

  已生產商品數量:1

  已生產商品數量:2

  已生產商品數量:3

  已生產商品數量:4

4.4 訊號量:Semaphore

  鎖同時只允許一個執行緒更改資料,而訊號量是同時允許一定數量的程序更改資料 。繼續用上篇博文中用的的吃飯例子,加入有一下應用場景:有10個人吃飯,但只有一張餐桌,只允許做3個人,沒上桌的人不允許吃飯,已上桌吃完飯離座之後,下面的人才能搶佔桌子繼續吃飯,如果不用訊號量,肯定是10人一窩蜂一起吃飯:

from threading import Thread

import time

import random

def fun(i):

    print('{}號顧客上座,開始吃飯'.format(i))

    time.sleep(random.random())

    print('{}號顧客吃完飯了,離座'.format(i))


if __name__=='__main__':

    for i in range(20):

        p = Thread(target=fun, args=(i,))

        p.start()

  輸出結果:

  0號顧客上座,開始吃飯

  1號顧客上座,開始吃飯

  2號顧客上座,開始吃飯

  3號顧客上座,開始吃飯

  4號顧客上座,開始吃飯

  5號顧客上座,開始吃飯

  6號顧客上座,開始吃飯

  7號顧客上座,開始吃飯

  8號顧客上座,開始吃飯

  9號顧客上座,開始吃飯

  3號顧客吃完飯了,離座

  4號顧客吃完飯了,離座

  2號顧客吃完飯了,離座

  0號顧客吃完飯了,離座

  8號顧客吃完飯了,離座

  5號顧客吃完飯了,離座

  1號顧客吃完飯了,離座

  6號顧客吃完飯了,離座

  9號顧客吃完飯了,離座

  7號顧客吃完飯了,離座

  使用訊號量之後:

from threading import Thread

import time

import random

from threading import Semaphore

def fun(i , sem):

    sem.acquire()

    print('{}號顧客上座,開始吃飯'.format(i))

    time.sleep(random.random())

    print('{}號顧客吃完飯了,離座'.format(i))

    sem.release()

if __name__=='__main__':

    sem = Semaphore(3)

    for i in range(10):

        p = Thread(target=fun, args=(i,sem))

        p.start()

  輸出結果:

  0號顧客上座,開始吃飯

  1號顧客上座,開始吃飯

  2號顧客上座,開始吃飯

  2號顧客吃完飯了,離座

  3號顧客上座,開始吃飯

  0號顧客吃完飯了,離座

  4號顧客上座,開始吃飯

  1號顧客吃完飯了,離座

  5號顧客上座,開始吃飯

  3號顧客吃完飯了,離座

  6號顧客上座,開始吃飯

  5號顧客吃完飯了,離座

  7號顧客上座,開始吃飯

  4號顧客吃完飯了,離座

  8號顧客上座,開始吃飯

  8號顧客吃完飯了,離座

  9號顧客上座,開始吃飯

  9號顧客吃完飯了,離座

  6號顧客吃完飯了,離座

  7號顧客吃完飯了,離座

4.5 事件:Event

  如果程式中的其他執行緒需要通過判斷某個執行緒的狀態來確定自己下一步的操作,這時候就可以用threading為我們提供的Event物件,Event物件主要有一下幾個方法:

  isSet():返回event的狀態值;

  wait():如果 isSet()==False將阻塞執行緒;

  set(): 設定event的狀態值為True,所有阻塞池的執行緒啟用進入就緒狀態, 等待作業系統排程;

  clear():恢復event的狀態值為False。

  有如下需求:獲取當前時間的秒數的個位數,如果小於5,設定子執行緒阻塞,如果大於5則設定子程序非阻塞。程式碼如下:

from threading import Event, Thread

import time

from datetime import datetime

def func(e):

    print('子執行緒:開始執行……')

    while True:

        print('子執行緒:現在事件秒數是{}'.format(datetime.now().second))

        e.wait()  # 阻塞等待訊號  這裡插入了一個Flag  預設為 False

        time.sleep(1)

e = Event()

p = Thread(target=func, args=(e,))

p.daemon=True

p.start()

for i in range(10):

    s = int(str(datetime.now().second)[-1])#獲取當前秒數的個位數

    if s < 5:

        print('子執行緒線入阻塞狀態')

        e.clear()  # 使插入的flag為False 執行緒線入阻塞狀態

    else:

        print('子執行緒取消阻塞狀態')

        e.set()  # 執行緒線入非阻塞狀態

    time.sleep(1)

e.set()

print("主執行緒執行結束……")

  輸出結果:

  子執行緒:開始執行……

  子執行緒:現在事件秒數是43

  子執行緒線入阻塞狀態

  子執行緒線入阻塞狀態

  子執行緒取消阻塞狀態

  子執行緒取消阻塞狀態

  子執行緒:現在事件秒數是46

  子執行緒取消阻塞狀態

  子執行緒:現在事件秒數是47

  子執行緒取消阻塞狀態

  子執行緒:現在事件秒數是48

  子執行緒取消阻塞狀態

  子執行緒:現在事件秒數是49

  子執行緒線入阻塞狀態

  子執行緒:現在事件秒數是50

  子執行緒線入阻塞狀態

  子執行緒線入阻塞狀態

  主執行緒執行結束……

4.6 定時器:Timer

  如果想要實現每隔一段時間就呼叫一個函式的話,就要在Timer呼叫的函式中,再次設定Timer。Timer是Thread類的一個子類。

  如果是多長時間後只執行一次:

import threading

import time

def sayTime(name):

    print('你好,{}為您報時,現在時間是:{}'.format(name , time.ctime()))


if __name__ == "__main__":

     timer = threading.Timer(2.0, sayTime, ["Jane"])

     timer.start()

輸出結果:

你好,Jane為您報時,現在時間是:Thu Dec  6 15:03:41 2018

如果要每個多長時間執行一次:

import threading

import time

def sayTime(name):

    print('你好,{}為您報時,現在時間是:{}'.format(name , time.ctime()))

    global timer

    timer = threading.Timer(3.0, sayTime, [name])

    timer.start()

if __name__ == "__main__":

     timer = threading.Timer(2.0, sayTime, ["Jane"])

     timer.start()

  輸出結果:

  你好,Jane為您報時,現在時間是:Thu Dec  6 15:04:30 2018

  你好,Jane為您報時,現在時間是:Thu Dec  6 15:04:33 2018

  你好,Jane為您報時,現在時間是:Thu Dec  6 15:04:36 2018

  你好,Jane為您報時,現在時間是:Thu Dec  6 15:04:39 2018

  ……

5 程序間的通行

5.1佇列:Queue

  python中Queue模組提供了佇列都實現了鎖原語,是執行緒安全的,能夠在多執行緒中直接使用。Queue中的佇列包括以下三種:

  1)FIFO(先進先出)佇列, 第一加入佇列的任務, 被第一個取出;

  2)LIFO(後進先出)佇列,最後加入佇列的任務, 被第一個取出;

  3)PriorityQueue(優先順序)佇列, 保持佇列資料有序, 最小值被先取出。

  Queue模組中的常用方法如下:

  qsize() 返回佇列的規模

  empty() 如果佇列為空,返回True,否則False

  full() 如果佇列滿了,返回True,否則False

  get([block[, timeout]])獲取佇列,timeout等待時間

  get_nowait() 相當get(False)

  put(item) 寫入佇列,timeout等待時間,如果佇列已滿再呼叫該方法會阻塞執行緒

  put_nowait(item) 相當put(item, False)

  task_done() 在完成一項工作之後,task_done()函式向任務已經完成的佇列傳送一個訊號

  join() 實際上意味著等到佇列為空,再執行別的操作。

import queue

import threading

def fun():

    while True:

        try:

            data = q.get(block = True, timeout = 1) #不設定阻塞的話會一直去嘗試獲取資源

        except queue.Empty:

            print(' {}結束……'.format(threading.current_thread().name))

            break

        print(' {}取得資料:{}'.format(threading.current_thread().name , data))

        q.task_done()

        print(' {}結束……'.format(threading.current_thread().name))

print("主執行緒開始執行……")

q = queue.Queue(5)

#往佇列裡面放5個數

for i in range(5):

    q.put(i)

for i in range(0, 3):

    t = threading.Thread(target=fun , name='執行緒'+str(i))

    t.start()

q.join() #等待所有的佇列資源都用完

print("主執行緒結束執行……")

  輸出結果:

  主執行緒開始執行……

  執行緒0取得資料:0

  執行緒0結束……

  執行緒0取得資料:1

  執行緒0結束……

  執行緒1取得資料:2

  執行緒0取得資料:3

  執行緒0結束……

  執行緒0取得資料:4

  執行緒0結束……

  執行緒1結束……

  主執行緒結束執行……

  執行緒1結束……

  執行緒2結束……

  執行緒0結束……

6 執行緒池

  在我們上面執行多個任務時,使用的執行緒方案都是“即時建立, 即時銷燬”的策略。儘管與建立程序相比,建立執行緒的時間已經大大的縮短,但是如果提交給執行緒的任務是執行時間較短,而且執行次數極其頻繁,那麼伺服器將處於不停的建立執行緒,銷燬執行緒的狀態。一個執行緒的執行時間可以分為3部分:執行緒的啟動時間、執行緒體的執行時間和執行緒的銷燬時間。在多執行緒處理的情景中,如果執行緒不能被重用,就意味著每次建立都需要經過啟動、銷燬和執行3個過程。這必然會增加系統相應的時間,降低了效率。所以就有了執行緒池的誕生,

  由於執行緒預先被建立並放入執行緒池中,同時處理完當前任務之後並不銷燬而是被安排處理下一個任務,因此能夠避免多次建立執行緒,從而節省執行緒建立和銷燬的開銷,能帶來更好的效能和系統穩定性。

from concurrent.futures import ThreadPoolExecutor

import time

 

def func(n):

    time.sleep(2)

    print(n)

    return n*n

t = ThreadPoolExecutor(max_workers=5) # 最好不要超過CPU核數的5倍

t_lst = []

for i in range(20):

    r = t.submit(func , 1)#執行任務,傳遞引數

    t_lst.append(r.result())#獲取任務返回結果

t.shutdown()#相當於close() + join()

print(t_lst)

print('主執行緒執行結束……')

7 總結

  關於Python併發程式設計中的多執行緒部分就介紹完了,其中諸多描述略帶倉儲,後續博文中再來補充。

參考資料:

  https://www.cnblogs.com/linshuhui/p/9704128.html

       https://www.cnblogs.com/yoyoketang/p/8337118.html

  https://www.cnblogs.com/chengd/articles/7770898.html