1. 程式人生 > >python高階5.多工---執行緒

python高階5.多工---執行緒

5.2.多工的概念

什麼叫“多工”呢?簡單地說,就是作業系統可以同時執行多個任務。打個比方,你一邊在用瀏覽器上網,一邊在聽MP3,一邊在用Word趕作業,這就是多工,至少同時有3個任務正在執行。還有很多工悄悄地在後臺同時執行著,只是桌面上沒有顯示而已。

現在,多核CPU已經非常普及了,但是,即使過去的單核CPU,也可以執行多工。由於CPU執行程式碼都是順序執行的,那麼,單核CPU是怎麼執行多工的呢?

答案就是作業系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒……這樣反覆執行下去。表面上看,每個任務都是交替執行的,但是,由於CPU的執行速度實在是太快了,我們感覺就像所有任務都在同時執行一樣。

真正的並行執行多工只能在多核CPU上實現,但是,由於任務數量遠遠多於CPU的核心數量,所以,作業系統也會自動把很多工輪流排程到每個核心上執行。

注意:

併發:指的是任務數多餘cpu核數,通過作業系統的各種任務排程演算法,實現用多個任務“一起”執行(實際上總有一些任務不在執行,因為切換任務的速度相當快,看上去一起執行而已)。(假的多工)

並行:指的是任務數小於等於cpu核數,即任務真的是一起執行的。(真的多工)

【自己總結】

1.區分併發和並行的概念。

併發:假的多工。就是並非所有的任務一起同時執行,而是總有一些任務不在執行,但是由於切換任務的速度相當快,一般為0.00002s左右,讓我們感覺看上去像是在一起執行而已。

並行:真的多工。指的是任務數小於等於cpu核數,所有任務是一起同時執行的。

cpu是central processing unit 即中央處理器,目前一般使用的是4核或者8核,16核的價格較為昂貴。cpu的核數越多,處理執行速度則越快。

5.3執行緒

python的thread模組是比較底層的模組,python的threading模組是對thread做了一些包裝的,可以更加方便的被使用

1.使用threading模組

單執行緒執行
#coding=utf-8
import time

def saySorry():
print(“親愛的,我錯了,我能吃飯了嗎?”)
time.sleep(1)

if name == “main”:
for i in range(5):
saySorry()

多執行緒執行
#coding=utf-8
import threading
import time

def saySorry():
print(“親愛的,我錯了,我能吃飯了嗎?”)
time.sleep(1)

if name == “main”:
for i in range(5):
t = threading.Thread(target=saySorry)
t.start() #啟動執行緒,即讓執行緒開始執行

說明:
1.可以明顯看出使用了多執行緒併發的操作,花費時間要短很多
2.當呼叫start()時,才會真正的建立執行緒,並且開始執行

2.主執行緒會等待所有的子執行緒結束後才結束

#coding=utf-8
import threading
from time import sleep,ctime

def sing():
for i in range(3):
print(“正在唱歌…%d”%i)
sleep(1)

def dance():
for i in range(3):
print(“正在跳舞…%d”%i)
sleep(1)

if name == ‘main’:
print(’—開始—:%s’%ctime())

t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)

t1.start()
t2.start()

#sleep(5) # 遮蔽此行程式碼,試試看,程式是否會立馬結束?
print('---結束---:%s'%ctime())

【自己總結】:
1.主程式碼逐行執行,到t1.start()的時候建立子執行緒1,然後開始執行sing函式即子執行緒1。同時,主執行緒繼續執行(即主程式碼繼續向下執行),執行t2.start()的時候建立子執行緒2,然後開始執行dance函式即子執行緒2。主執行緒繼續向下執行,而後面沒有程式碼了,等待子執行緒執行完畢後則主執行緒結束即整體程式碼執行結束。(主執行緒和子執行緒同步進行,即邊執行函式,主程式程式碼也是在繼續向下執行的)

2.關於target = dance函式,而不是dance()的問題。dance()是呼叫一個函式,而本文程式碼中是將dance作為值賦給t2

3.檢視執行緒數量

#coding=utf-8
import threading
from time import sleep,ctime

def sing():
for i in range(3):
print(“正在唱歌…%d”%i)
sleep(1)

def dance():
for i in range(3):
print(“正在跳舞…%d”%i)
sleep(1)

if name == ‘main’:
print(’—開始—:%s’%ctime())

t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)

t1.start()
t2.start()

while True:
    length = len(threading.enumerate())
    print('當前執行的執行緒數為:%d'%length)
    if length<=1:
        break

    sleep(0.5)

【自我總結】:
1.關於程式碼中的enumerate()函式,將返回結果變的不再一定是一個單獨的元素。
eg: ipyhton3中執行

names = [“aa”,“bb”,“cc”]
for temp in names:
print(temp)
…aa
…bb
…cc

for temp in enumerate(names):
print(temp)
…(0,‘aa’)
…(1,‘bb’)
…(2,‘cc’)

for (i,name) in enumerate(names):
print(i,name)

5.4執行緒-注意點

  1. 執行緒執行程式碼的封裝
    通過上一小節,能夠看出,通過使用threading模組能完成多工的程式開發,為了讓每個執行緒的封裝性更完美,所以使用threading模組時,往往會定義一個新的子類class,只要繼承threading.Thread就可以了,然後重寫run方法

示例如下:
#coding=utf-8
import threading
import time

class MyThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
msg = "I’m "+self.name+’ @ '+str(i) #name屬性中儲存的是當前執行緒的名字
print(msg)

if name == ‘main’:
t = MyThread()
t.start()

說明
python的threading.Thread類有一個run方法,用於定義執行緒的功能函式,可以在自己的執行緒類中覆蓋該方法。而建立自己的執行緒例項後,通過Thread類的start方法,可以啟動該執行緒,交給python虛擬機器進行排程,當該執行緒獲得執行的機會時,就會呼叫run方法執行執行緒。

【自己總結】:
之前的變數建立都是針對函式,如t=threading.Thread(target=函式名)。但有時後面需要定義一個新的子類,這個類就必須要繼承threading.Thread。在此基礎上重新定義run方法,必須有run方法。關於run方法的相關知識點見上面的說明

2.執行緒的執行順序

#coding=utf-8
import threading
import time

class MyThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
msg = "I’m "+self.name+’ @ '+str(i)
print(msg)
def test():
for i in range(5):
t = MyThread()
t.start()
if name == ‘main’:
test()

說明

從程式碼和執行結果我們可以看出,多執行緒程式的執行順序是不確定的。當執行到sleep語句時,執行緒將被阻塞(Blocked),到sleep結束後,執行緒進入就緒(Runnable)狀態,等待排程。而執行緒排程將自行選擇一個執行緒執行。上面的程式碼中只能保證每個執行緒都執行完整個run函式,但是執行緒的啟動順序、run函式中每次迴圈的執行順序都不能確定。

[自己總結]在這裡插入圖片描述
【自己總結:】
一個執行緒同一時刻只能執行一個函式。當start執行後,會自動呼叫run方法。那麼如何呼叫MyThread類裡面的login和register方法呢?在run方法裡面寫入self.login()和self.register()來呼叫login和register方法。

3.總結

1.每個執行緒預設有一個名字,儘管上面的例子中沒有指定執行緒物件的name,但是python會自動為執行緒指定一個名字。
2.當執行緒的run()方法結束時該執行緒完成。
3.無法控制執行緒排程程式,但可以通過別的方式來影響執行緒排程的方式。

5.5多執行緒-共享全域性變數
from threading import Thread
import time

g_num = 100

def work1():
global g_num
for i in range(3):
g_num += 1

print("----in work1, g_num is %d---"%g_num)

def work2():
global g_num
print("----in work2, g_num is %d—"%g_num)

print("—執行緒建立之前g_num is %d—"%g_num)

t1 = Thread(target=work1)
t1.start()

#延時一會,保證t1執行緒中的事情做完
time.sleep(1)

t2 = Thread(target=work2)
t2.start()

列表當做實參傳遞到執行緒中

from threading import Thread
import time

def work1(nums):
nums.append(44)
print("----in work1—",nums)

def work2(nums):
#延時一會,保證t1執行緒中的事情做完
time.sleep(1)
print("----in work2—",nums)

g_nums = [11,22,33]

t1 = Thread(target=work1, args=(g_nums,))
t1.start()

t2 = Thread(target=work2, args=(g_nums,))
t2.start()

總結:

在一個程序內的所有執行緒共享全域性變數,很方便在多個執行緒間共享資料

缺點就是,執行緒是對全域性變數隨意遂改可能造成多執行緒之間對全域性變數的混亂(即執行緒非安全)

5.6多執行緒-共享全域性變數-問題
在這裡插入圖片描述
【自己總結全域性變數知識點】
在一個函式中對全域性變數進行修改的時候,到底是否需要使用global進行說明:要看 是否對全域性變數的執行指向進行了修改。如果修改了執行指向,即讓全域性變數指向了一個新的地方,那麼必須使用global。如果,僅僅是修改了指向的空間中的資料,此時不用必須使用global。(字串,整型資料,元組不可修改,其他型別可以修改)

多執行緒開發可能遇到的問題

假設兩個執行緒t1和t2都要對全域性變數g_num(預設是0)進行加1運算,t1和t2都各對g_num加10次,g_num的最終的結果應該為20。

但是由於是多執行緒同時操作,有可能出現下面情況:

在g_num=0時,t1取得g_num=0。此時系統把t1排程為”sleeping”狀態,把t2轉換為”running”狀態,t2也獲得g_num=0
然後t2對得到的值進行加1並賦給g_num,使得g_num=1
然後系統又把t2排程為”sleeping”,把t1轉為”running”。執行緒t1又把它之前得到的0加1後賦值給g_num。
這樣導致雖然t1和t2都對g_num加1,但結果仍然是g_num=1

測試1

import threading
import time

g_num = 0

def work1(num):
global g_num
for i in range(num):
g_num += 1
print("----in work1, g_num is %d—"%g_num)

def work2(num):
global g_num
for i in range(num):
g_num += 1
print("----in work2, g_num is %d—"%g_num)

print("—執行緒建立之前g_num is %d—"%g_num)

t1 = threading.Thread(target=work1, args=(100,))
t1.start()

t2 = threading.Thread(target=work2, args=(100,))
t2.start()

while len(threading.enumerate()) != 1:
time.sleep(1)

print(“2個執行緒對同一個全域性變數操作之後的最終結果是:%s” % g_num)

測試2

import threading
import time

g_num = 0

def work1(num):
global g_num
for i in range(num):
g_num += 1
print("----in work1, g_num is %d—"%g_num)

def work2(num):
global g_num
for i in range(num):
g_num += 1
print("----in work2, g_num is %d—"%g_num)

print("—執行緒建立之前g_num is %d—"%g_num)

t1 = threading.Thread(target=work1, args=(1000000,))
t1.start()

t2 = threading.Thread(target=work2, args=(1000000,))
t2.start()

while len(threading.enumerate()) != 1:
time.sleep(1)

print(“2個執行緒對同一個全域性變數操作之後的最終結果是:%s” % g_num)

結論
在這裡插入圖片描述
如果多個執行緒同時對同一個全域性變數操作,會出現資源競爭問題,從而資料結果會不正確。(由於資源競爭,可能執行緒1的整個操作過程還沒有結束,沒有儲存+1後的結果,就被執行緒2或者其他執行緒擠出去了,多次執行時就會導致結果出錯。當args的引數number越大時,這種資源競爭導致的結果不正確問題就被放大的越明顯)

5.7同步

同步的概念

同步就是協同步調,按預定的先後次序進行執行。如:你說完,我再說。

"同"字從字面上容易理解為一起動作

其實不是,"同"字應是指協同、協助、互相配合。

如程序、執行緒同步,可理解為程序或執行緒A和B一塊配合,A執行到一定程度時要依靠B的某個結果,於是停下來,示意B執行;B執行,再將結果給A;A再繼續操作。

解決執行緒同時修改全域性變數的方式

對於上一小節提出的那個計算錯誤的問題,可以通過執行緒同步來進行解決

思路,如下:

系統呼叫t1,然後獲取到g_num的值為0,此時上一把鎖,即不允許其他執行緒操作g_num
t1對g_num的值進行+1
t1解鎖,此時g_num的值為1,其他的執行緒就可以使用g_num了,而且是g_num的值不是0而是1
同理其他執行緒在對g_num進行修改時,都要先上鎖,處理完後再解鎖,在上鎖的整個過程中不允許其他執行緒訪問,就保證了資料的正確性

5.8互斥鎖

當多個執行緒幾乎同時修改某一個共享資料的時候,需要進行同步控制

執行緒同步能夠保證多個執行緒安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。

互斥鎖為資源引入一個狀態:鎖定/非鎖定

某個執行緒要更改共享資料時,先將其鎖定,此時資源的狀態為“鎖定”,其他執行緒不能更改;直到該執行緒釋放資源,將資源的狀態變成“非鎖定”,其他的執行緒才能再次鎖定該資源。互斥鎖保證了每次只有一個執行緒進行寫入操作,從而保證了多執行緒情況下資料的正確性。

threading模組中定義了Lock類,可以方便的處理鎖定:
#建立鎖
mutex = threading.Lock()

#鎖定
mutex.acquire()

#釋放
mutex.release()

注意:

如果這個鎖之前是沒有上鎖的,那麼acquire不會堵塞
如果在呼叫acquire對這個鎖上鎖之前 它已經被 其他執行緒上了鎖,那麼此時acquire會堵塞,直到這個鎖被解鎖為止

使用互斥鎖完成2個執行緒對同一個全域性變數各加100萬次的操作

import threading
import time

g_num = 0

def test1(num):
global g_num
for i in range(num):
mutex.acquire() # 上鎖
g_num += 1
mutex.release() # 解鎖

print("---test1---g_num=%d"%g_num)

def test2(num):
global g_num
for i in range(num):
mutex.acquire() # 上鎖
g_num += 1
mutex.release() # 解鎖

print("---test2---g_num=%d"%g_num)

#建立一個互斥鎖
#預設是未上鎖的狀態
mutex = threading.Lock()

#建立2個執行緒,讓他們各自對g_num加1000000次
p1 = threading.Thread(target=test1, args=(1000000,))
p1.start()

p2 = threading.Thread(target=test2, args=(1000000,))
p2.start()

#等待計算完成
while len(threading.enumerate()) != 1:
time.sleep(1)

print(“2個執行緒對同一個全域性變數操作之後的最終結果是:%s” % g_num)

執行結果:
—test1—g_num=1909909
—test2—g_num=2000000
2個執行緒對同一個全域性變數操作之後的最終結果是:2000000

可以看到最後的結果,加入互斥鎖後,其結果與預期相符。

上鎖解鎖過程

當一個執行緒呼叫鎖的acquire()方法獲得鎖時,鎖就進入“locked”狀態。

每次只有一個執行緒可以獲得鎖。如果此時另一個執行緒試圖獲得這個鎖,該執行緒就會變為“blocked”狀態,稱為“阻塞”,直到擁有鎖的執行緒呼叫鎖的release()方法釋放鎖之後,鎖進入“unlocked”狀態。

執行緒排程程式從處於同步阻塞狀態的執行緒中選擇一個來獲得鎖,並使得該執行緒進入執行(running)狀態。

【自我總結】互斥鎖的概念類似於,衛生間裡面的那個鎖。每個執行緒完成自己任務的期間,cpu被上鎖,無法切換,被其他執行緒佔用。當該執行緒完成任務時,自動解鎖,可以被另外一個佔用,然後另外一個也會重複相同的操作。(就好比每個人上衛生間的時候,都會上鎖,外面的人只能排隊等著,如此迴圈有序的進行)

總結

鎖的好處:

確保了某段關鍵程式碼只能由一個執行緒從頭到尾完整地執行

鎖的壞處:

阻止了多執行緒併發執行,包含鎖的某段程式碼實際上只能以單執行緒模式執行,效率就大大地下降了
由於可以存在多個鎖,不同的執行緒持有不同的鎖,並試圖獲取對方持有的鎖時,可能會造成死鎖

5.9死鎖
現實社會中,男女雙方都在等待對方先道歉

如果雙方都這樣固執的等待對方先開口,弄不好,就分手了

1.死鎖

線上程間共享多個資源的時候,如果兩個執行緒分別佔有一部分資源並且同時等待對方的資源,就會造成死鎖。

儘管死鎖很少發生,但一旦發生就會造成應用的停止響應。下面看一個死鎖的例子。

#coding=utf-8
import threading
import time

class MyThread1(threading.Thread):
def run(self):
# 對mutexA上鎖
mutexA.acquire()

    # mutexA上鎖後,延時1秒,等待另外那個執行緒 把mutexB上鎖
    print(self.name+'----do1---up----')
    time.sleep(1)

    # 此時會堵塞,因為這個mutexB已經被另外的執行緒搶先上鎖了
    mutexB.acquire()
    print(self.name+'----do1---down----')
    mutexB.release()

    # 對mutexA解鎖
    mutexA.release()

class MyThread2(threading.Thread):
def run(self):
# 對mutexB上鎖
mutexB.acquire()

    # mutexB上鎖後,延時1秒,等待另外那個執行緒 把mutexA上鎖
    print(self.name+'----do2---up----')
    time.sleep(1)

    # 此時會堵塞,因為這個mutexA已經被另外的執行緒搶先上鎖了
    mutexA.acquire()
    print(self.name+'----do2---down----')
    mutexA.release()

    # 對mutexB解鎖
    mutexB.release()

mutexA = threading.Lock()
mutexB = threading.Lock()

if name == ‘main’:
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()

此時已經進入到了死鎖狀態,可以使用ctrl - c
退出

2.避免死鎖

程式設計時要儘量避免(銀行家演算法)
新增超時時間等

附錄-銀行家演算法
[背景知識]

一個銀行家如何將一定數目的資金安全地借給若干個客戶,使這些客戶既能借到錢完成要乾的事,同時銀行家又能收回全部資金而不至於破產,這就是銀行家問題。這個問題同作業系統中資源分配問題十分相似:銀行家就像一個作業系統,客戶就像執行的程序,銀行家的資金就是系統的資源。

[問題的描述]

一個銀行家擁有一定數量的資金,有若干個客戶要貸款。每個客戶須在一開始就宣告他所需貸款的總額。若該客戶貸款總額不超過銀行家的資金總數,銀行家可以接收客戶的要求。客戶貸款是以每次一個資金單位(如1萬RMB等)的方式進行的,客戶在借滿所需的全部單位款額之前可能會等待,但銀行家須保證這種等待是有限的,可完成的。

例如:有三個客戶C1,C2,C3,向銀行家借款,該銀行家的資金總額為10個資金單位,其中C1客戶要借9各資金單位,C2客戶要借3個資金單位,C3客戶要借8個資金單位,總計20個資金單位。某一時刻的狀態如圖所示。
在這裡插入圖片描述
對於a圖的狀態,按照安全序列的要求,我們選的第一個客戶應滿足該客戶所需的貸款小於等於銀行家當前所剩餘的錢款,可以看出只有C2客戶能被滿足:C2客戶需1個資金單位,小銀行家手中的2個資金單位,於是銀行家把1個資金單位借給C2客戶,使之完成工作並歸還所借的3個資金單位的錢,進入b圖。同理,銀行家把4個資金單位借給C3客戶,使其完成工作,在c圖中,只剩一個客戶C1,它需7個資金單位,這時銀行家有8個資金單位,所以C1也能順利借到錢並完成工作。最後(見圖d)銀行家收回全部10個資金單位,保證不賠本。那麼客戶序列{C1,C2,C3}就是個安全序列,按照這個序列貸款,銀行家才是安全的。否則的話,若在圖b狀態時,銀行家把手中的4個資金單位借給了C1,則出現不安全狀態:這時C1,C3均不能完成工作,而銀行家手中又沒有錢了,系統陷入僵持局面,銀行家也不能收回投資。

綜上所述,銀行家演算法是從當前狀態出發,逐個按安全序列檢查各客戶誰能完成其工作,然後假定其完成工作且歸還全部貸款,再進而檢查下一個能完成工作的客戶,…。如果所有客戶都能完成工作,則找到一個安全序列,銀行家才是安全的。

5.10 案例:多工udp聊天器
在這裡插入圖片描述
說明:
編寫一個有2個執行緒的程式
執行緒1用來接收資料然後顯示
執行緒2用來檢測鍵盤資料然後通過udp傳送資料

要求
實現上述要求
總結多工程式的特點

參考程式碼
import socket
import threading

def send_msg(udp_socket):
“”“獲取鍵盤資料,並將其傳送給對方”""
while True:
# 1. 從鍵盤輸入資料
msg = input("\n請輸入要傳送的資料:")
# 2. 輸入對方的ip地址
dest_ip = input("\n請輸入對方的ip地址:")
# 3. 輸入對方的port
dest_port = int(input("\n請輸入對方的port:"))
# 4. 傳送資料
udp_socket.sendto(msg.encode(“utf-8”), (dest_ip, dest_port))

def recv_msg(udp_socket):
“”“接收資料並顯示”""
while True:
# 1. 接收資料
recv_msg = udp_socket.recvfrom(1024)
# 2. 解碼
recv_ip = recv_msg[1]
recv_msg = recv_msg[0].decode(“utf-8”)
# 3. 顯示接收到的資料
print(">>>%s:%s" % (str(recv_ip), recv_msg))

def main():
# 1. 建立套接字
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 2. 繫結本地資訊
udp_socket.bind(("", 7890))

# 3. 建立一個子執行緒用來接收資料
t = threading.Thread(target=recv_msg, args=(udp_socket,))
t.start()
# 4. 讓主執行緒用來檢測鍵盤資料並且傳送
send_msg(udp_socket)

if name == “main”:
main()