python 程序、執行緒 (一)
一、python程式的執行原理
許多時候,在執行一個python檔案的時候,會發現在同一目錄下會出現一個__pyc__資料夾(python3)或者.pyc字尾(python2)的檔案
Python在執行時,首先會將.py檔案中的原始碼編譯成Python的byte code(位元組碼),然後再由Python Virtual Machine(Python虛擬機器)來執行這些編譯好的byte code。
1、執行流程
原始碼.py ——(編譯處理)——>位元組碼.pyc ————>python虛擬機器——(編譯)——>程式
2、編譯
執行 python demo.py 後,將會啟動 Python 的直譯器,然後將 demo.py 編譯成一個位元組碼物件 PyCodeObject。
在 Python 的世界中,一切都是物件,函式也是物件,型別也是物件,類也是物件(類屬於自定義的型別,在 Python 2.2 之前,int, dict 這些內建型別與類是存在不同的,在之後才統一起來,全部繼承自 object),甚至連編譯出來的位元組碼也是物件,.pyc 檔案是位元組碼物件(PyCodeObject)在硬碟上的表現形式。
在執行期間,編譯結果也就是 PyCodeObject 物件,只會存在於記憶體中,而當這個模組的 Python 程式碼執行完後,就會將編譯結果儲存到了 pyc 檔案中,這樣下次就不用編譯,直接載入到記憶體中。pyc 檔案只是 PyCodeObject 物件在硬碟上的表現形式。
這個 PyCodeObject 物件包含了 Python 原始碼中的字串,常量值,以及通過語法解析後編譯生成的位元組碼指令。PyCodeObject 物件還會儲存這些位元組碼指令與原始程式碼行號的對應關係,這樣當出現異常時,就能指明位於哪一行的程式碼。
3、pyc檔案
一個 pyc 檔案包含了三部分資訊:Python 的 magic number、pyc 檔案建立的時間資訊,以及 PyCodeObject 物件。
magic number 是 Python 定義的一個整數值。一般來說,不同版本的 Python 實現都會定義不同的 magic number,這個值是用來保證 Python 相容性的。比如要限制由低版本編譯的 pyc 檔案不能讓高版本的 Python 程式來執行,只需要檢查 magic number 不同就可以了。由於不同版本的 Python 定義的位元組碼指令可能會不同,如果不做檢查,執行的時候就可能出錯。
4、位元組碼指令
為什麼 pyc 檔案也稱作位元組碼檔案?因為這些檔案儲存的都是一些二進位制的位元組資料,而不是能讓人直觀檢視的文字資料。
Python 標準庫提供了用來生成程式碼對應位元組碼的工具 dis。dis 提供一個名為 dis 的方法,這個方法接收一個 code 物件,然後會輸出 code 物件裡的位元組碼指令資訊。
# test1.py
import dis
def add(a):
a = a+1
return a
print(dis.dis(add))
# 輸出
10 0 LOAD_FAST 0 (a)
3 LOAD_CONST 1 (1)
6 BINARY_ADD
7 STORE_FAST 0 (a)
11 10 LOAD_FAST 0 (a)
13 RETURN_VALUE
5、python虛擬機器
demo.py 被編譯後,接下來的工作就交由 Python 虛擬機器來執行位元組碼指令了。Python 虛擬機器會從編譯得到的 PyCodeObject 物件中依次讀入每一條位元組碼指令,並在當前的上下文環境中執行這條位元組碼指令。我們的程式就是通過這樣迴圈往復的過程才得以執行。
二、程序執行緒
1、程序
程式僅僅只是一堆程式碼而已,而程序指的是程式的執行過程。需要強調的是:同一個程式執行兩次,那也是兩個程序。
程序:資源管理單位(容器)。
執行緒:最小執行單位,管理執行緒的是程序。
程序就是一個程式在一個數據集上的一次動態執行過程。程序一般由程式、資料集、程序控制塊三部分組成。我們編寫的程式用來描述程序要完成哪些功能以及如何完成;資料集則是程式在執行過程中所需要使用的資源;程序控制塊用來記錄程序的外部特徵,描述程序的執行變化過程,系統可以利用它來控制和管理程序,它是系統感知程序存在的唯一標誌。
2、執行緒
執行緒的出現是為了降低上下文切換的消耗,提高系統的併發性,並突破一個程序只能幹一樣事的缺陷,使到程序內併發成為可能。
執行緒也叫輕量級程序,它是一個基本的CPU執行單元,也是程式執行過程中的最小單元,由執行緒ID、程式計數器、暫存器集合和堆疊共同組成。執行緒的引入減小了程式併發執行時的開銷,提高了作業系統的併發效能。執行緒沒有自己的系統資源。
3、執行緒與程序關係
在傳統作業系統中,每個程序有一個地址空間,而且預設就有一個控制執行緒。
多執行緒(即多個控制執行緒)的概念是,在一個程序中存在多個控制執行緒,控制該程序的地址空間。
程序只是用來把資源集中到一起(程序只是一個資源單位,或者說資源集合),而執行緒才是cpu上的執行單位。
程序和執行緒的關係:
(1)一個執行緒只能屬於一個程序,而一個程序可以有多個執行緒,但至少有一個執行緒。
(2)資源分配給程序,同一程序的所有執行緒共享該程序的所有資源。
(3)CPU分給執行緒,即真正在CPU上執行的是執行緒。
4、序列,並行與併發
比較重要的就是,無論是並行還是併發,在使用者看來都是'同時'執行的,而一個cpu同一時刻只能執行一個任務。
並行:同時執行,只有具備多個cpu才能實現並行。
併發:是偽並行,即看起來是同時執行,單個cpu+多道技術。
多道技術:記憶體中同時存入多道(多個)程式,cpu從一個程序快速切換到另外一個,並且切換時間十分短暫,所以給人的感覺是我可以邊打遊戲邊聽歌。多個程式並行執行,其實是偽並行即併發。
5 同步與非同步
同步就是指一個程序在執行某個請求的時候,若該請求需要一段時間才能返回資訊,那麼這個程序將會一直等待下去,直到收到返回資訊才繼續執行下去;非同步是指程序不需要一直等下去,而是繼續執行下面的操作,不管其他程序的狀態。當有訊息返回時系統會通知程序進行處理,這樣可以提高執行的效率。
打電話時就是同步通訊,發短息時就是非同步通訊。
二、全域性直譯器鎖
全域性直譯器鎖(Global Interpreter Lock):簡稱GIL,多程序(mutilprocess) 和 多執行緒(threading)的目的是用來被多顆CPU進行訪問, 提高程式的執行效率。 但是多執行緒之間資料完整性和狀態同步是一個很大的問題,所以在python內部存在一種機制(GIL),在多執行緒 時同一時刻只允許一個執行緒來訪問CPU,也就是不同執行緒對共享資源的互斥。 在一個執行緒擁有了直譯器的訪問權之後,其他的所有執行緒都必須等待它釋放直譯器的訪問權,即使這些執行緒的下一條指令並不會互相影響。GIL 並不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。因為CPython是大部分環境下預設的Python執行環境。所以在很多人的概念裡CPython就是Python,也就想當然的把 GIL 歸結為Python語言的缺陷。GIL並不是Python的特性,Python完全可以不依賴於GIL。例如Jython(java編寫的python直譯器)就不會存在GIL。
- python中一個執行緒對應於c語言中的一個執行緒
- GIL使得同一個時刻只有一個執行緒在一個cpu上執行位元組碼, 無法將多個執行緒對映到多個cpu上執行,因此python是無法利用多核CPU實現多執行緒的
- 大量的第三方包都是基於CPython編寫的,所以短期內想把GIL去掉不太可能
1、GIL優缺點
缺點:多處理器退化為單處理器;
優點:避免大量的加鎖解鎖操作
2、GIL釋放
由於CPython自帶的GIL,要實現python的多執行緒就需要藉助標準庫threading
# test2.py
import threading
total = 0
def add():
# 連續執行total的加操作
global total
for i in range(1000000):
total += 1
def reduce():
# 連續執行total的減操作
global total
for i in range(1000000):
total -= 1
# 建立兩個執行緒
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
# 執行緒開始
thread1.start()
thread2.start()
# 執行緒結束
thread1.join()
thread2.join()
print(total)
使用total作為標誌,通過total的值判斷執行緒的實現。
如果實現GIL沒有釋放的的話,那麼兩個執行緒先後完成,列印結果應該是0,而實際列印結果卻不是0,並且每次列印結果也都不一致,說明實現了GIL主動釋放掉了。
total變數是一個全域性變數,其實在add與reduce內部的賦值語句total+=1與total-=1時,高階語言每一條語句在CPU上執行的時候又被對應成許多語句,比如total+=1對應成x1=total+1,total=x1,而total-=1被對應成x2=toal-1,total=x2,每一個x都是函式內部的區域性變數。
注:可以對應位元組碼指令來理解,可以參照上面GIL中的例項使用dis模組獲取位元組碼檢視,PVM(python虛擬機器)其實執行的也就是位元組碼指令。。
正常執行:
初始total=0
add:
x1 = total +1 # x1 = 1
total = x1
total = 1
reduce:
x2 = total-1 # x2 = 0
total = x2
total = 0
最終迴圈一次結果0
正常應該是無論多少次迴圈結果total都是0
多執行緒共享變數,兩個執行緒交替佔用cpu,:
total=0
add:
x1 = total + 1 # x1 = 1
reduce:
x2 = total - 1 # x2 = -1
total = x2 # total = -1
add:
total = x1
total =1
最終迴圈結果為1
只要進行足夠多的迴圈,total的值就會出現不可預計的結果
所以,在修改total值的時候,需要多條語句。但是執行一個執行緒的語句時,可能會被別的執行緒打斷,從而一個變數被多個執行緒給修改亂了。因此,執行緒之間共享資料最大的危險在於多個執行緒同時改一個變數。所以在進行python多執行緒變成的時候,一般會進行細粒度的自定義加鎖,以保證安全性。
問題:GIL至於什麼時候會釋放?
- 執行的位元組碼行數到達一定閾值
- 通過時間片劃分,到達一定時間閾值
- 在遇到IO操作時,主動釋放
三、python多執行緒
對於I/O操作的時候,程序與執行緒的效能差別不大,甚至由於執行緒更輕量級,效能更高。這裡的I/O包括網路I/O和檔案I/O
1、例項
假如利用socket傳送http請求,也就是網路I/O。爬取列表網頁中的寫href連結,然後獲取href連結之後,在爬去連結的網頁詳情。
如果不適用多執行緒的話,程式序列的執行,結果就是要先等待列表網頁獲取所有的href的連結之後,才可以逐個的爬去href連結所指的網頁詳情,這就使得等待時間很長。
如果使用多執行緒程式設計,執行緒A執行第一個列表網頁程式,遇到I/O操作,GIL釋放,當獲取到第一個href連結之後,執行緒B就自動的去獲取href連結所指的網頁詳情。
2、多執行緒實現
# test3.py
import time
import threading
def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
# 函式方法 arg 為函式引數
thread1 = threading.Thread(target=get_detail_html, args=("",))
thread2 = threading.Thread(target=get_detail_url, args=("",))
start_time = time.time()
# 子執行緒1,2開始
thread1.start()
thread2.start()
print ("last time: {}".format(time.time()-start_time))
# 執行結果
get detail html started
get detail url started
last time: 0.0019958019256591797 # 忽略為0
get detail html end
get detail url end
按照上面執行緒並行執行的邏輯應該是列印時間為2秒,但是結果卻為0。
任何程序預設就會啟動一個執行緒,該執行緒稱為主執行緒,主執行緒又可以啟動新的執行緒。上面的thread1與thread2就是主執行緒啟動的兩個新的執行緒,那麼在兩個子執行緒啟動之後,主執行緒中其餘的程式段print函式也在並行執行,所以時間為0。當兩個子執行緒執行完畢之後,主執行緒退出,程序關閉,程式執行結束。才會打印出get detail html end,get detail url end。
3、守護執行緒
那麼如何使得主執行緒退出的時候子執行緒也退出。或者說,主執行緒推出的時候kill掉子執行緒?
<1>、將子執行緒設定成守護執行緒
# test4.py
import time
import threading
def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
# 函式方法 arg 為函式引數
thread1 = threading.Thread(target=get_detail_html, args=("",))
thread2 = threading.Thread(target=get_detail_url, args=("",))
thread1.setDaemon(True)
thread2.setDaemon(True)
# 將兩個執行緒設定為守護執行緒,即主執行緒退出,這兩個子執行緒也退出,kill
start_time = time.time()
# 子程開始
thread1.start()
thread2.start()
#當主執行緒退出的時候, 子執行緒kill掉
print ("last time: {}".format(time.time()-start_time))
# 輸出
get detail html started
get detail url started
last time: 0.0
將兩個執行緒設定為守護執行緒,即主執行緒退出,這兩個守護執行緒也退出。列印結果中執行到print之後直接程式結束。
由於兩個執行緒的時間不相同,那麼兩者有什麼區別呢
<2>、先將thread1設定為守護執行緒
# test5.py
import time
import threading
def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
# 函式方法 arg 為函式引數
thread1 = threading.Thread(target=get_detail_html, args=("",))
thread2 = threading.Thread(target=get_detail_url, args=("",))
thread1.setDaemon(True) # 只將thread設定為守護執行緒
# thread2.setDaemon(True)
start_time = time.time()
thread1.start()
thread2.start()
#當主執行緒退出的時候, 子執行緒kill掉
print ("last time: {}".format(time.time()-start_time))
# 結果
get detail html started
get detail url started
last time: 0.000997781753540039
get detail html end
get detail url end
只將thread1設定為守護執行緒之後,由於thread2的sleep時間為4秒,所以主執行緒仍會等待thread2執行結束之後才退出,而thread1由於時間為2秒,所以也會列印。
<3>、先將thread2設定為守護執行緒
# test6.py
import time
import threading
def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
# 函式方法 arg 為函式引數
thread1 = threading.Thread(target=get_detail_html, args=("",))
thread2 = threading.Thread(target=get_detail_url, args=("",))
# thread1.setDaemon(True)
thread2.setDaemon(True)
start_time = time.time()
thread1.start()
thread2.start()
print ("last time: {}".format(time.time()-start_time))
# 輸出
get detail html started
get detail url started
last time: 0.0029969215393066406
get detail html end
由於只將thread2設定為守護執行緒,print函式執行結束的時候會首先kill掉thread2執行緒。但是由於thread1執行緒還未結束,程式仍會等待兩秒輸出get detail html end才結束。
4、執行緒阻塞
上面說了如何在主執行緒結束的時候,直接kill掉子執行緒。那麼如何使子執行緒執行結束才執行主執行緒,就是阻塞主程序。
<1>、結束兩個子執行緒
# test7.py
import time
import threading
def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
# 函式方法 arg 為函式引數
thread1 = threading.Thread(target=get_detail_html, args=("",))
thread2 = threading.Thread(target=get_detail_url, args=("",))
start_time = time.time()
# 子執行緒開始
thread1.start()
thread2.start()
# 子執行緒程結束
thread1.join()
thread2.join()
#當主執行緒退出的時候, 子執行緒kill掉
print ("last time: {}".format(time.time()-start_time))
#輸出
get detail html started
get detail url started
get detail html end
get detail url end
last time: 4.001712799072266
由於呼叫了兩個thread的join方法,主執行緒阻塞,當子執行緒結束之後,print函式執行後主執行緒退出,程式結束。
<2>、結束thread1執行緒
# test8.py
import time
import threading
def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
# 函式方法 arg 為函式引數
thread1 = threading.Thread(target=get_detail_html, args=("",))
thread2 = threading.Thread(target=get_detail_url, args=("",))
start_time = time.time()
# 子執行緒開始
thread1.start()
thread2.start()
# 1執行緒程結束
thread1.join()
# thread2.join()
#當主執行緒退出的時候, 子執行緒kill掉
print ("last time: {}".format(time.time()-start_time))
# 輸出
get detail html started
get detail url started
get detail html end
last time: 2.001251220703125
get detail url end
由於呼叫了thread1的join方法,阻塞主執行緒,thread1直接結束之後print列印時間,但是對另一個執行緒沒有影響。所以在列印last time: 2.001251220703125時間,等待兩秒列印get detail url end,主執行緒才會退出。
<3>、結束thread2執行緒
# test9.py
import time
import threading
def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
# 函式方法 arg 為函式引數
thread1 = threading.Thread(target=get_detail_html, args=("",))
thread2 = threading.Thread(target=get_detail_url, args=("",))
start_time = time.time()
# 子執行緒開始
thread1.start()
thread2.start()
# 2執行緒程結束
# thread1.join()
thread2.join()
#當主執行緒退出的時候, 子執行緒kill掉
print ("last time: {}".format(time.time()-start_time))
# 輸出
get detail html started
get detail url started
get detail html end
get detail url end
last time: 4.002287864685059
由於thread2執行緒的sleep的時間為4秒,期間thread1已經執行完畢,所以列印時間為4秒。
5、Thread類繼承式建立
同樣的也可以使用類繼承的方法建立執行緒例項,效果一樣的
# test10.py
import time
import threading
class GetDetailHtml(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print("get detail html started")
time.sleep(2)
print("get detail html end")
class GetDetailUrl(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
# 類繼承方法
thread1 = GetDetailHtml("get_detail_html")
thread2 = GetDetailUrl("get_detail_url")
start_time = time.time()
# 子執行緒開始
thread1.start()
thread2.start()
# 子執行緒程結束
thread1.join()
thread2.join()
#當主執行緒退出的時候, 子執行緒kill掉
print ("last time: {}".format(time.time()-start_time))
6、執行緒通訊
<1>、共享變數通訊
共享變數通訊,是執行緒間通訊最簡單的方式,但也是最容易出問題的方式。以上面爬去頁面和網頁連結的例項進行擴充套件。在上面的例項中,因為要解決請求列表頁面的時候網路時延問題,引入了多執行緒並行,邊爬去列表頁獲取href,再爬取href指向的想起那個頁面,下面將爬去的頁面存入列表實現。
# test11.py
import threading
import time
detail_url_list = [] # 儲存著爬取下來的href連結
def get_detail_html(detail_url_list): # 引數這裡作為對全域性變數的引用
while True:
# 使用while語句使得執行緒持續爬去
if len(detail_url_list):
url = detail_url_list.pop()
print('get detail html start')
time.sleep(2)
print('get detail html end')
def get_detail_url(detail_url_list):
while True:
# 使用while語句使得執行緒持續爬取
print('get detail url start')
time.sleep(4)
for i in range(20):
detail_url_list.append('http://www.xxxx.com/{}.html'.format(i))
print('get detail end')
if __name__ == "__main__":
thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_list,))
for i in range(10):
# 為了模擬多個執行緒併發,這裡建立了十個子執行緒
html_thread = threading.Thread(target=get_detail_html, args=(detail_url_list,))
html_thread.start()
start_time = time.time()
print("last time: {}".format(time.time() - start_time))
但是上面問題也會很明顯,在GIL的示例中,total變數由於變數共享的緣故,沒有按照預期的執行。而在上面的爬蟲例項中,detail_url_list作為全域性共享變數,pop操作,append操作,多個執行緒共用資源,都不是執行緒安全的操作,會出現問題。所以就必須給變數加上鎖,保持安全性。為了擺脫這種問題,使用訊息佇列通訊
<2>、訊息佇列通訊
訊息佇列通訊也就是使用Queue這個類來表示變數,從而達到執行緒安全,由於Queue這個類內部封裝了deque,也就是python中的雙端佇列。雙端對列本身就是安全界別很高的一種型別,實現執行緒間的安全操作。
# test12.py
#通過queue的方式進行執行緒間同步
from queue import Queue
import time
import threading
def get_detail_html(queue):
#爬取文章詳情頁
while True:
url = queue.get()
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(queue):
# 爬取文章列表頁
while True:
print("get detail url started")
time.sleep(4)
for i in range(20):
queue.put("http://projectsedu.com/{id}".format(id=i))
print("get detail url end")
if __name__ == "__main__":
detail_url_queue = Queue(maxsize=1000)
thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,))
for i in range(10):
html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,))
html_thread.start()
start_time = time.time()
# detail_url_queue.task_done()
detail_url_queue.join()
#當主執行緒退出的時候, 子執行緒kill掉
print ("last time: {}".format(time.time()-start_time))
使用了訊息佇列替代共享變數
- Queue([maxsize]):建立共享的程序佇列,Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞。
- q.put方法用以插入資料到佇列中,put方法還有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該佇列有剩餘的空間。如果超時,會丟擲Queue.Full異常。如果blocked為False,但該Queue已滿,會立即丟擲Queue.Full異常。沒有引數時,q.put的個數大於佇列數時,會一直阻塞住。
- q.get方法可以從佇列讀取並且刪除一個元素。同樣,get方法有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,那麼在等待時間內沒有取到任何元素,會丟擲Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果佇列為空,則立即丟擲Queue.Empty異常。沒有引數時,q.get的個數大於佇列數時,會一直阻塞住。
- q.put_nowait()等價於q.put(block=False)佇列滿時再存也會拋異常
- q.get_nowait()等價於q.get(block=False)佇列為空取不出時會拋異常
- q.task_done():使用者使用此方法發出訊號,表示q.get()的返回專案已經被處理。如果呼叫此方法的次數大於從佇列中刪除專案的數量,將引發ValueError異常
- q.join():生產者呼叫此方法進行阻塞,直到佇列中所有的專案均被處理。阻塞將持續到佇列中的每個專案均呼叫q.task_done()方法為止
7、加鎖
在上面的第一個GIL示例中,由於GIL釋放的緣故,多個執行緒共享變數,導致total的值不像預期那樣為0的問題發生,也就是如何執行緒同步。最簡單的方式就是加鎖。加鎖使得一個執行緒在佔用資源的時候,別的執行緒都必須等待,只有當這個執行緒主動釋放資源的時候,其他執行緒才能使用資源。這樣就可要保證共享變數的安全性。
# test13.py
from threading import Lock
#在同一個執行緒裡面,可以連續呼叫多次acquire, 一定要注意acquire的次數要和release的次數相等
total = 0
lock = Lock()
def add():
global lock
global total
for i in range(1000000):
lock.acquire() # 加鎖
total += 1
lock.release() # 釋放鎖
def desc():
global total
global lock
for i in range(1000000):
lock.acquire() # 加鎖
total -= 1
lock.release() # 釋放鎖
import threading
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)
# 輸出
在等待了一段時間後輸出0
0 # total的列印結果為0
加鎖的時候要保證加上鎖執行完成之後,就要釋放掉,不然會一直佔用資源。
加鎖的結果使得在執行total-=1或者total+=1的賦值語句的時候,該賦值語句對應的多條位元組碼指令執行完之後,才會其他程序執行修改total值。該執行緒佔用了鎖,所以其他執行緒不能修改total值,只有當該釋放了鎖,其他執行緒才能修改total值,不會造成修改共享變數的衝突。這是加鎖的好處,那麼代價也十分明顯
加鎖缺點:
- 加鎖效能
- 死鎖風險
補充:另外自己加的鎖使使用者級別的與GIL不同。
<1>、效能問題
本來的多執行緒,由於加鎖的緣故,首先是阻止了多執行緒併發執行,包含鎖的某段程式碼實際上只能以單執行緒模式執行。並且由於來回切換執行緒的緣故,程式效能變得低下
將test2.py改成如下
# test14.py
total = 0
def add():
global total
for i in range(1000000):
total += 1
def desc():
global total
for i in range(1000000):
total -= 1
import threading
import time
start_time = time.time()
add()
desc()
print(total)
print("last time: {}".format(time.time() - start_time))
# 輸出
0
last time: 0.314816951751709
這是簡單的單執行緒程式,持續時間為0.3秒。沒有使用thread多執行緒
下面使用threading多執行緒,並且加鎖
# test15.py
from threading import Lock
total = 0
lock = Lock()
def add():
global lock
global total
for i in range(1000000):
lock.acquire()
total += 1
lock.release()
def desc():
global total
global lock
for i in range(1000000):
lock.acquire()
total -= 1
lock.release()
import threading
import time
start_time = time.time()
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)
print("last time: {}".format(time.time() - start_time))
# 輸出
0
last time: 5.062084674835205
使用了多執行緒,為了保證共享變數的安全性操作,執行緒同步,加鎖導致類似單執行緒,程式的執行時間達到了5秒鐘。可見執行緒之間的切換十分浪費時間。所以說,CPython的GIL本意是用來保護所有全域性的直譯器和環境狀態變數的,如果去掉GIL,就需要更多的更細粒度的鎖對直譯器的眾多全域性狀態進行保護。做過測試將GIL去掉,加入更細粒度的鎖。但是實踐檢測對單執行緒來說,效能更低。
<2>、死鎖風險
來看下面例子
這裡為了在一個執行緒中多次呼叫lock,使用可重入的鎖Rlock物件
Lock與Rlock區別:
RLock允許在同一執行緒中被多次acquire。而Lock卻不允許這種情況。注意:如果使用RLock,那麼acquire和release必須成對出現,即呼叫了n次acquire,必須呼叫n次的release才能真正釋放所佔用的瑣。
# test15.py
from threading import RLock # 可重入的鎖
total = 0
lock = RLock()
def add():
global lock
global total
for i in range(1000000):
lock.acquire()
lock.acquire() # 這裡加了兩次鎖
total += 1
lock.release()
def desc():
global total
global lock
for i in range(1000000):
lock.acquire()
total -= 1
lock.release()
import threading
import time
start_time = time.time()
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)
print("last time: {}".format(time.time() - start_time))
由於在add函式中加了兩次鎖lock.acquire(),結果就是執行緒永遠都不獲釋放掉共享變數。一直佔用資源,其他的執行緒請求資源沒有結果,多個執行緒掛起,既不能執行,也無法結束,一直處於等待狀態,造成死鎖,只能靠作業系統強制終止。最終程式也沒有任何結果輸出。
所以在同一個執行緒裡面,可以連續呼叫多次acquire, 一定要注意acquire的次數要和release的次數相等
還有就是,執行緒的相互等待,假如記憶體中又兩中資源a和b,而執行緒A(a,b)和執行緒B(a,b)都申請資源。
第一步
執行緒A先申請a資源,執行緒B先申請b資源,因此沒有問題
第二步
由於a,b均已被A,B佔用,並且A申請b,B申請b,在位獲得新的資源的時候兩者都不會退出對現有資源的佔用,這就造成了兩個執行緒相互等待,並且這種等待會一直持續下去,造成死鎖。
8、執行緒複雜通訊
在上面看到執行緒進行通訊的時候需要加鎖,如果如何使用鎖進行執行緒的對話功能,例如
- 執行緒A:hello,你好啊
- 執行緒B:你好
- 執行緒A:吃飯了嗎
- 執行緒B:吃過了,你呢
- 執行緒A:我也吃過了,咱們去搞PVM吧
- 執行緒B:ok,走吧
<1>、簡單鎖
像上面,就是簡單的執行緒通訊,如果使用簡單的Rlock鎖
import threading
class ThreadA(threading.Thread):
def __init__(self, lock):
super().__init__(name="執行緒A")
self.lock = lock
def run(self):
self.lock.acquire()
print("{} : hello, 你好 ".format(self.name))
self.lock.release()
self.lock.acquire()
print("{} : 吃過飯了嗎 ".format(self.name))
self.lock.release()
self.lock.acquire()
print("{} : 我也吃過了,咱們去找PVM吧".format(self.name))
self.lock.release()
class ThreadB(threading.Thread):
def __init__(self, lock):
super().__init__(name="執行緒B")
self.lock = lock
def run(self):
self.lock.acquire()
print("{} : 你好 ".format(self.name))
self.lock.release()
self.lock.acquire()
print("{} : 吃過了,你呢".format(self.name))
self.lock.release()
self.lock.acquire()
print("{} : ok,走吧 ".format(self.name))
self.lock.release()
if __name__ == "__main__":
lock = threading.RLock()
a_thread = ThreadA(lock)
b_thread = ThreadB(lock)
a_thread.start()
b_thread.start()
# 輸出
執行緒A : hello, 你好
執行緒A : 吃過飯了嗎
執行緒A : 我也吃過了,咱們去找PVM吧
執行緒B : 你好
執行緒B : 吃過了,你呢
執行緒B : ok,走吧
顯然沒有完成執行緒通訊的基本功能。
<2>、threading.Condition()
解決方案:線上程複雜通訊時使用threading.Condition(),可以把Condiftion理解為一把高階的瑣,它提供了比Lock, RLock更高階的功能,允許我們能夠控制複雜的執行緒同步問題。threadiong.Condition在內部維護一個瑣物件(預設是RLock),可以在建立Condigtion物件的時候把瑣物件作為引數傳入。Condition也提供了acquire, release方法,其含義與瑣的acquire, release方法一致,其實它只是簡單的呼叫內部瑣物件的對應的方法而已。Condition還提供wait方法、notify方法、notifyAll方法。這些方法只有在佔用瑣(acquire)之後才能呼叫,否則將會報RuntimeError異常。
方法介紹:
- acquire()/release():獲得/釋放 Lock
- wait([timeout]):執行緒掛起,直到收到一個notify通知或者超時(可選的,浮點數,單位是秒s)才會被喚醒繼續執行。wait()必須在已獲得Lock前提下才能呼叫,否則會觸發RuntimeError。呼叫wait()會釋放Lock,直至該執行緒被Notify()、NotifyAll()或者超時執行緒又重新獲得Lock.
- notify(n=1):通知其他執行緒,那些掛起的執行緒接到這個通知之後會開始執行,預設是通知一個正等待該condition的執行緒,最多則喚醒n個等待的執行緒。notify()必須在已獲得Lock前提下才能呼叫,否則會觸發RuntimeError。notify()不會主動釋放Lock。
- notifyAll(): 如果wait狀態執行緒比較多,notifyAll的作用就是通知所有執行緒
原始碼分析:
# 部分原始碼
_PyRLock = _RLock
class Condition:
def __init__(self, lock=None):
if lock is None:
lock = RLock()
self._lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
def __enter__(self):
return self._lock.__enter__()
def __exit__(self, *args):
return self._lock.__exit__(*args)
進入Condition這個類中檢視原始碼發現,在預設的情況下,Condition是封裝的鎖物件是Rlock,另外Condition類實現了__enter__,__exit__兩個特殊方法,由鴨子型別可知,說明可以像上下文管理器一樣使用它。
而在__enter__與__exit__兩個特殊方法中分別呼叫了self.acquire()與self.release()兩個方法,所以說不使用with上下文管理器的話也可以直接使用acquire()與release()兩個方法進行加鎖釋放鎖。
解決例項:
class ThreadA(threading.Thread):
def __init__(self, cond):
super().__init__(name="執行緒A")
self.cond = cond
def run(self):
with self.cond:
print("{} : hello, 你好 ".format(self.name)) # 4
self.cond.notify() # 5
self.cond.wait() # 6
print("{} : 吃過飯了嗎 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 我也吃過了,咱們去找PVM吧".format(self.name))
self.cond.notify()
self.cond.wait()
class ThreadB(threading.Thread):
def __init__(self, cond):
super().__init__(name="執行緒B")
self.cond = cond
def run(self):
with self.cond:
self.cond.wait() # 2
print("{} : 你好 ".format(self.name)) # 7
self.cond.notify()
self.cond.wait()
print("{} : 吃過了,你呢".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : ok,走吧 ".format(self.name))
self.cond.notify()
if __name__ == "__main__":
cond = threading.Condition()
b_thread = ThreadB(cond)
a_thread = ThreadA(cond)
b_thread.start() # 1
a_thread.start() # 3
# 輸出結果
執行緒A : hello, 你好
執行緒B : 你好
執行緒A : 吃過飯了嗎
執行緒B : 吃過了,你呢
執行緒A : 我也吃過了,咱們去找PVM吧
執行緒B : ok,走吧
完成執行緒之間的複雜通訊。
這裡需要注意的是:兩個執行緒之間的開啟先後順序。b執行緒需要先於a執行緒開啟。原因:
1 先開啟b執行緒
2 wait方法會首先上一把鎖,執行緒處於阻塞態
3 開啟a執行緒
4 列印 執行緒A:hello,你好啊
5 這個時候cond物件呼叫notify方法,會釋放掉之前上的鎖
6 呼叫wait方法,為自己又上了一把鎖
7 由於notify方法已經打開了鎖,或繼續執行,列印 執行緒B:你好
其實wait方法會維持一個鎖,而這個鎖只有notify方法才能開啟。如果a執行緒先開啟,則是呼叫了wait方法維持了一把鎖,並沒有其他的執行緒會呼叫notify方法釋放這把鎖。則最終只會輸出 執行緒A : hello, 你好 ,而執行緒一直處於死鎖狀態。
補充:Condition物件會維持兩層鎖,而不是兩個鎖,更不是簡單的一個鎖。在開啟或者關閉上下文管理器物件的時候__enter__,__exit__方法會開啟釋放掉底層鎖(直接使用acquire()與release()兩個方法也行),這一層鎖是一個。而在持續連續呼叫的wait和notify方法則是對第二層鎖進行操作,而這一層所在Condition物件內部是封裝到一個雙端佇列中,在每次呼叫wait的時候分配一把鎖並放入到cond的等待佇列中,等到notify方法的喚醒。可以進入Condition原始碼檢視
9、Semaphore(訊號量)
同時只有n個執行緒可以獲得semaphore,即可以限制最大連線數為n),也就是執行緒最大併發量的控制。
Semaphore管理一個內建的計數器,每當呼叫acquire()時內建計數器-1;呼叫release() 時內建計數器+1;計數器不能小於0;當計數器為0時,acquire()將阻塞執行緒直到其他執行緒呼叫release()。
訊號量使得一個程式中有很多個執行緒,但是隻有n多個執行緒獲得訊號量,處於執行態
class HtmlSpider(threading.Thread):
def __init__(self, url, sem):
super().__init__()
self.url = url
self.sem = sem
def run(self):
time.sleep(2)
print("got html text success, time is {}".format(time.ctime()))
self.sem.release()
class UrlProducer(threading.Thread):
def __init__(self, sem):
super().__init__()
self.sem = sem
def run(self):
for i in range(20):
self.sem.acquire()
html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)
html_thread.start()
if __name__ == "__main__":
sem = threading.Semaphore(4) # 每次只有4個執行緒獲取訊號量
url_producer = UrlProducer(sem)
url_producer.start()
在上面示例中,模擬爬蟲,建立20個子執行緒爬取html頁面,如果不是用訊號量,二十條資料一次返回。使用訊號量,使得每次只有4個執行緒執行。
# 輸出結果
got html text success, time is Tue Nov 20 17:17:55 2018
got html text success, time is Tue Nov 20 17:17:55 2018
got html text success, time is Tue Nov 20 17:17:55 2018
got html text success, time is Tue Nov 20 17:17:55 2018
got html text success, time is Tue Nov 20 17:17:57 2018
got html text success, time is Tue Nov 20 17:17:57 2018
got html text success, time is Tue Nov 20 17:17:57 2018
got html text success, time is Tue Nov 20 17:17:57 2018
got html text success, time is Tue Nov 20 17:17:59 2018
got html text success, time is Tue Nov 20 17:17:59 2018
got html text success, time is Tue Nov 20 17:17:59 2018
got html text success, time is Tue Nov 20 17:17:59 2018
got html text success, time is Tue Nov 20 17:18:01 2018
got html text success, time is Tue Nov 20 17:18:01 2018
got html text success, time is Tue Nov 20 17:18:01 2018
got html text success, time is Tue Nov 20 17:18:01 2018
got html text success, time is Tue Nov 20 17:18:03 2018
got html text success, time is Tue Nov 20 17:18:03 2018
got html text success, time is Tue Nov 20 17:18:03 2018
got html text success, time is Tue Nov 20 17:18:03 2018
每個兩秒列印一次結果,一次四條資料。總共二十個。
10、執行緒池
Python標準庫為我們提供了threading和multiprocessing模組編寫相應的多執行緒/多程序程式碼,但是當專案達到一定的規模,頻繁建立/銷燬程序或者執行緒是非常消耗資源的,這個時候我們就要編寫自己的執行緒池/程序池,以空間換時間。但從Python3.2開始,標準庫為我們提供了concurrent.futures模組,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫執行緒池/程序池提供了直接的支援。
concurrent.futures模組的基礎是Exectuor,Executor是一個抽象類,它不能被直接使用。但是它提供的兩個子類ThreadPoolExecutor和ProcessPoolExecutor卻是非常有用,顧名思義兩者分別被用來建立執行緒池和程序池的程式碼。我們可以將相應的tasks直接放入執行緒池/程序池,不需要維護Queue來操心死鎖的問題,執行緒池/程序池會自動幫我們排程。
Future你可以把它理解為一個在未來完成的操作,這是非同步程式設計的基礎,傳統程式設計模式下比如我們操作queue.get的時候,在等待返回結果之前會產生阻塞,cpu不能讓出來做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。
<1>、使用submit來操作執行緒池/程序池:
from concurrent.futures import ThreadPoolExecutor
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
with urllib.request.urlopen(url, timeout=60) as conn:
print('%r page is %d bytes' % (url, len(conn.read())))
executor = ThreadPoolExecutor(max_workers=3)
for url in URLS:
future = executor.submit(load_url,url)
print(future.done())
print('主執行緒')
# 執行結果:
False
False
False
主執行緒
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 75633 bytes
'http://www.163.com' page is 703974 bytes
根據執行結果,使用submit方法來往執行緒池中加入一個task,submit返回一個Future物件,對於Future物件可以簡單地理解為一個在未來完成的操作。由於執行緒池非同步提交了任務,主執行緒並不會等待執行緒池裡建立的執行緒執行完畢,所以執行了print('主執行緒'),相應的執行緒池中建立的執行緒並沒有執行完畢,故future.done()返回結果為False。
<2>、 用map來操作執行緒池/程序池:
from concurrent.futures import ThreadPoolExecutor
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
with urllib.request.urlopen(url, timeout=60) as conn:
print('%r page is %d bytes' % (url, len(conn.read())))
executor = ThreadPoolExecutor(max_workers=3)
executor.map(load_url,URLS)
print('主執行緒')
# 結果
主執行緒
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 75633 bytes
'http://www.163.com' page is 703974 bytes
從執行結果可以看出,map是按照URLS列表元素的順序返回的,並且寫出的程式碼更加簡潔直觀,可以根據具體的需求任選一種。
<3>、wait
wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另外一個是uncompleted(未完成的)。使用wait方法的一個優勢就是獲得更大的自由度,它接收三個引數FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,預設設定為ALL_COMPLETED
如果採用預設的ALL_COMPLETED,程式會阻塞直到執行緒池裡面的所有任務都完成,再執行主執行緒:
from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
with urllib.request.urlopen(url, timeout=60) as conn:
print('%r page is %d bytes' % (url, len(conn.read())))
executor = ThreadPoolExecutor(max_workers=3)
f_list = []
for url in URLS:
future = executor.submit(load_url,url)
f_list.append(future)
print(wait(f_list))
print('主執行緒')
# 輸出
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 75627 bytes
'http://www.163.com' page is 703988 bytes
DoneAndNotDoneFutures(done={<Future at 0x2ab6ea89d30 state=finished returned NoneType>, <Future at 0x2ab6ea89240 state=finished returned NoneType>, <Future at 0x2ab6e93f7b8 state=finished returned NoneType>}, not_done=set())
主執行緒
如果採用FIRST_COMPLETED引數,程式並不會等到執行緒池裡面所有的任務都完成:
from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
with urllib.request.urlopen(url, timeout=60) as conn:
print('%r page is %d bytes' % (url, len(conn.read())))
executor = ThreadPoolExecutor(max_workers=3)
f_list = []
for url in URLS:
future = executor.submit(load_url,url)
f_list.append(future)
print(wait(f_list,return_when='FIRST_COMPLETED'))
print('主執行緒')
# 輸出
'https://www.baidu.com/' page is 227 bytes
DoneAndNotDoneFutures(done={<Future at 0x2cd5581a240 state=finished returned NoneType>}, not_done={<Future at 0x2cd5581ad30 state=running>, <Future at 0x2cd556cf7f0 state=running>})
主執行緒
'http://www.163.com' page is 703991 bytes
'https://github.com/' page is 75625 bytes
<4>、回撥函式
import requests
import time
from concurrent.futures import ThreadPoolExecutor
def get(url):
print('GET {}'.format(url))
response = requests.get(url)
time.sleep(2)
if response.status_code == 200: # 200代表狀態:下載成功了
return {'url': url, 'content': response.text}
def parse(res):
print('%s parse res is %s' % (res['url'], len(res['content'])))
return '%s parse res is %s' % (res['url'], len(res['content']))
def save(res):
print('save', res)
def task(res):
res = res.result()
par_res = parse(res)
save(par_res)
if __name__ == '__main__':
urls = [
'http://www.cnblogs.com',
'https://www.python.org',
'https://www.openstack.org',
]
pool = ThreadPoolExecutor(2)
for i in urls:
pool.submit(get, i).add_done_callback(task)
'''
這裡的回撥函式拿到的是一個物件。得
先把返回的res得到一個結果。即在前面加上一個res.result()
誰好了誰去掉回撥函式
回撥函式也是一種程式設計思想。不僅線上程池用,在程序池也用
'''
pool.shutdown() # 相當於程序池裡的close和join
# 輸出
GET http://www.cnblogs.com
GET https://www.python.org
https://www.python.org parse res is 50114
save https://www.python.org parse res is 50114
GET https://www.openstack.org
https://www.openstack.org parse res is 63253
save https://www.openstack.org parse res is 63253
http://www.cnblogs.com parse res is 40382
save http://www.cnblogs.com parse res is 40382