python的多執行緒程式設計
1,python中一個執行緒對應於c語言中的一個執行緒
gil使得同一個時刻只有一個執行緒在一個cpu上執行位元組碼, 無法將多個執行緒對映到多個cpu上執行
gil會根據執行的位元組碼行數以及時間片釋放gil,gil在遇到io的操作時候主動釋放
total = 0 def add(): #1. dosomething1 #2. io操作 # 1. dosomething3 global total for i in range(1000000): total += 1 def desc(): global total for i inrange(1000000): total -= 1 import threading thread1 = threading.Thread(target=add) thread2 = threading.Thread(target=desc) thread1.start() thread2.start() thread1.join() thread2.join() print(total)
每一次執行的結果都會不一樣,所以有GIL的python執行緒也不是安全的,但是python遇到io操作的話,會等到io操作時候主動釋放GIL,
2,多執行緒程式設計
①對於io操作來說,多執行緒和多程序效能差別不大
----------------------------------------------------
方式1:
通過Thread類來例項化
import time import threading def get_detail_html(url): print("get detail html started") time.sleep(2) print("get detail html end") def get_detail_url(url): print("get detail url started") time.sleep(4) print("get detail url end") if __name__ =="__main__": thread1 = threading.Thread(target=get_detail_html, args=("",)) thread2 = threading.Thread(target=get_detail_url, args=("",)) start_time = time.time() thread1.start() thread2.start() print("last time {}".format(time.time()-start_time))
get detail html started get detail url started last time 0.0010006427764892578 get detail html end get detail url end
執行時間居然是0,兩個執行緒並行時間不應該是2秒嗎?其實實際上這是有3個執行緒,可以通過pycharm的IDE中進行debug
可以看得到其實是三個執行緒的
那就意味著三個執行緒並行,2個執行緒睡2秒,但第三個執行緒依舊可以繼續向下進行,因為他們是並行的,因此,時間才會接近於0,
但是此時雖然主執行緒結束了,但是並沒有退出!子執行緒依舊可以執行,如何設定主執行緒退出之後立即kill掉子執行緒呢?
thread1 = threading.Thread(target=get_detail_html, args=("",)) thread2 = threading.Thread(target=get_detail_url, args=("",)) start_time = time.time() thread1.setDaemon(True) # setDaemon 設定為True是將其設定為守護執行緒 thread2.setDaemon(True) thread1.start() thread2.start()
但是如何讓這個主執行緒等待其餘2個子執行緒結束之後再去執行呢?
thread1 = threading.Thread(target=get_detail_html, args=("",)) thread2 = threading.Thread(target=get_detail_url, args=("",)) start_time = time.time() thread1.start() thread2.start() thread1.join() thread2.join()
join()就是設定主執行緒必須等待子執行緒結束之後才能夠退出,注意:必須在start()之後寫
那如何簡化多執行緒程式設計呢?(繼承Thread類)
②通過繼承Thread來實現多執行緒
class GetDetailHtml(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): 過載run方法 print("get detail html started") time.sleep(2) print("get detail html end") class GetDetailUrl(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): print("get detail url started") time.sleep(4) print("get detail url end")
if __name__ == "__main__": thread1 = GetDetailHtml("get_detail_html") thread2 = GetDetailUrl("get_detail_url") start_time = time.time() thread1.start() thread2.start() thread1.join() thread2.join() # 當主執行緒退出的時候, 子執行緒kill掉 print("last time: {}".format(time.time() - start_time))
那歸根到底就能夠自定義很多複雜的邏輯了
---------------------------------------------------------
執行緒間的通訊和共享變數
從第一個例子中我們就公用了同一個total變數
但是共享變數會導致變數被反覆修改
# 通過queue的方式進行執行緒間同步 from queue import Queue import time import threading def get_detail_html(queue): # 爬取文章詳情頁 while True: url = queue.get() # queue是一個阻塞方法,佇列中沒有值得時候他會一直阻塞 # for url in detail_url_list: print("get detail html started") time.sleep(2) print("get detail html end") def get_detail_url(queue): # 爬取文章列表頁 while True: print("get detail url started") time.sleep(4) for i in range(20): queue.put("http://projectsedu.com/{id}".format(id=i)) # 佇列滿了也會阻塞住 print("get detail url end") # 1. 執行緒通訊方式- 共享變數 if __name__ == "__main__": detail_url_queue = Queue(maxsize=1000) # 宣告最大值的訊息佇列,執行緒是安全的 thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,)) for i in range(10): html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,)) html_thread.start() # # thread2 = GetDetailUrl("get_detail_url") start_time = time.time() # thread_detail_url.start() # thread_detail_url1.start() # # thread1.join() # thread2.join() detail_url_queue.task_done() # 必須呼叫 detail_url_queue.join() # 和執行緒一致 # 當主執行緒退出的時候, 子執行緒kill掉 print("last time: {}".format(time.time() - start_time))
因此,當涉及到共享變數的時候,首先推薦採用queue來完成
1,執行緒安全
2,對於可以採用task_done 隨時停止
-----------------------------------------------------------------------------------------
4,執行緒同步:(鎖機制)
# -*- coding:UTF-8 -*- __autor__ = 'zhouli' __date__ = '2018/12/18 21:44' from threading import Lock total = 0 lock = RLock() def add(): # 1. dosomething1 # 2. io操作 # 1. dosomething3 global lock global total for i in range(1000000): lock.acquire() total += 1 lock.release() def desc(): global total global lock for i in range(1000000): lock.acquire() total -= 1 lock.release() import threading thread1 = threading.Thread(target=add) thread2 = threading.Thread(target=desc) thread1.start() thread2.start() # thread1.join() thread2.join() print(total) # 1. 用鎖會影響效能 # 2. 鎖會引起死鎖 # 死鎖的情況 A(a,b)
加鎖一定要釋放!!否則死鎖!!
因為使用鎖的情況下會很繞,所以python給我們重新定義了一個Rlock(可重入的鎖)
# 在同一個執行緒裡面,可以連續呼叫多次acquire, 一定要注意acquire的次數要和release的次數相等
程式碼修改如下:
from threading import Lock, RLock, Condition # 可重入的鎖 # 在同一個執行緒裡面,可以連續呼叫多次acquire, 一定要注意acquire的次數要和release的次數相等 total = 0 lock = RLock() def add(): # 1. dosomething1 # 2. io操作 # 1. dosomething3 global lock global total for i in range(1000000): lock.acquire() lock.acquire() total += 1 lock.release() lock.release() def desc(): global total global lock for i in range(1000000): lock.acquire() total -= 1 lock.release() import threading thread1 = threading.Thread(target=add) thread2 = threading.Thread(target=desc) thread1.start() thread2.start() # thread1.join() thread2.join() print(total) # 1. 用鎖會影響效能 # 2. 鎖會引起死鎖 # 死鎖的情況 A(a,b) """ A(a、b) acquire (a) acquire (b) B(a、b) acquire (a) acquire (b) """
在同一個執行緒裡面才是如此,不同執行緒之間還是一個互相競爭的關係!
多執行緒的難點:condition(條件變數)
他是多執行緒中用於複雜的多執行緒通訊中的鎖,條件變數
通過原始碼可知其中的wait和notify方法
其中wait()方法是等待執行緒的的啟動,notify去通知另一個執行緒的啟動
import threading # 條件變數, 用於複雜的執行緒間同步 # class XiaoAi(threading.Thread): # def __init__(self, lock): # super().__init__(name="小愛") # self.lock = lock # # def run(self): # self.lock.acquire() # print("{} : 在 ".format(self.name)) # self.lock.release() # # self.lock.acquire() # print("{} : 好啊 ".format(self.name)) # self.lock.release() # # # class TianMao(threading.Thread): # def __init__(self, lock): # super().__init__(name="天貓精靈") # self.lock = lock # # def run(self): # self.lock.acquire() # print("{} : 小愛同學 ".format(self.name)) # self.lock.release() # # self.lock.acquire() # print("{} : 我們來對古詩吧 ".format(self.name)) # self.lock.release() # 通過condition完成協同讀詩 class XiaoAi(threading.Thread): def __init__(self, cond): super().__init__(name="小愛") self.cond = cond def run(self): with self.cond: # 一定要使用with語句 self.cond.wait() # 後說話使用先要等待 print("{} : 在 ".format(self.name)) self.cond.notify() # 去通知 self.cond.wait() print("{} : 好啊 ".format(self.name)) self.cond.notify() self.cond.wait() print("{} : 君住長江尾 ".format(self.name)) self.cond.notify() self.cond.wait() print("{} : 共飲長江水 ".format(self.name)) self.cond.notify() self.cond.wait() print("{} : 此恨何時已 ".format(self.name)) self.cond.notify() self.cond.wait() print("{} : 定不負相思意 ".format(self.name)) self.cond.notify() class TianMao(threading.Thread): def __init__(self, cond): super().__init__(name="天貓精靈") self.cond = cond def run(self): with self.cond: print("{} : 小愛同學 ".format(self.name)) self.cond.notify() # 先去通知 self.cond.wait() # 等待 print("{} : 我們來對古詩吧 ".format(self.name)) self.cond.notify() self.cond.wait() print("{} : 我住長江頭 ".format(self.name)) self.cond.notify() self.cond.wait() print("{} : 日日思君不見君 ".format(self.name)) self.cond.notify() self.cond.wait() print("{} : 此水幾時休 ".format(self.name)) self.cond.notify() self.cond.wait() print("{} : 只願君心似我心 ".format(self.name)) self.cond.notify() self.cond.wait() if __name__ == "__main__": from concurrent import futures cond = threading.Condition() xiaoai = XiaoAi(cond) tianmao = TianMao(cond) # 啟動順序很重要 # 在呼叫with cond之後才能呼叫wait或者notify方法 # condition有兩層鎖, 一把底層鎖會線上程呼叫了wait方法的時候釋放, 上面的鎖會在每次呼叫wait的時候分配一把並放入到cond的等待佇列中,等到notify方法的喚醒 xiaoai.start() tianmao.start()
5,Semaphore的使用
# Semaphore 是用於控制進入數量的鎖 # 檔案, 讀、寫, 寫一般只是用於一個執行緒寫,讀可以允許有多個 # 做爬蟲 import threading import time class HtmlSpider(threading.Thread): def __init__(self, url, sem): super().__init__() self.url = url self.sem = sem def run(self): time.sleep(2) print("got html text success") self.sem.release() # 一定要注意鎖的釋放的位置,一旦鎖被釋放sem就會增加1 class UrlProducer(threading.Thread): def __init__(self, sem): super().__init__() self.sem = sem def run(self): for i in range(20): self.sem.acquire() html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem) html_thread.start() if __name__ == "__main__": sem = threading.Semaphore(3) url_producer = UrlProducer(sem) url_producer.start()
6,執行緒池
from concurrent.futures import ThreadPoolExecutor
為什麼要執行緒池?
主執行緒中可以獲取某一個執行緒的狀態或者某一個任務的狀態,以及返回值
當一個執行緒完成的時候我們主執行緒能立即知道
futures可以讓多執行緒和多程序編碼介面一致
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED # 未來物件,task的返回容器 # 執行緒池, 為什麼要執行緒池 # 主執行緒中可以獲取某一個執行緒的狀態或者某一個任務的狀態,以及返回值 # 當一個執行緒完成的時候我們主執行緒能立即知道 # futures可以讓多執行緒和多程序編碼介面一致 import time def get_html(times): time.sleep(times) print("get page {} success".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 通過submit函式提交執行的函式到執行緒池中, submit 是立即返回 task1 = executor.submit(get_html, (3,)) # 第一個引數是函式名稱,第二個引數是引數 task2 = executor.submit(get_html, (2,)) # submit的返回時是非常重要,用於判斷是否執行成功等 print(task1.done) # 判斷任務是否完成
結果
當然task1.result()方法也是可以的,檢視task的結果
實際上我們也可以將某一個任務關閉掉,但是要注意,任務在執行中或者是執行完成時是無法取消的,只有未開始執行才會被cancel()掉
# 要獲取已經成功的task的返回 urls = [3, 2, 4] all_task = [executor.submit(get_html, (url,)) for url in urls] for future in as_completed(all_task): # as_completed 實際上是一個生成器,將已經完成的返回 data = future.result() print("get {} page".format(data))
這個執行結果順序是誰先完成任務誰先出來
或者
# 要獲取已經成功的task的返回 urls = [3, 2, 4] all_task = [executor.submit(get_html, (url,)) for url in urls] # for future in as_completed(all_task): # as_completed 實際上是一個生成器,將已經完成的返回 # data = future.result() # print("get {} page".format(data)) # 通過executor的map獲取已經完成的task的值 for data in executor.map(get_html, urls): # map方法更加簡單 print("get {} page".format(data))
但是這樣和上面的不一樣的是,這邊直接返回的就是結果了,也就是data = future.result()這一步被省略了
而且map方法返回的順序是列表的順序
wait 方法:(讓主執行緒進行阻塞)
# 要獲取已經成功的task的返回 urls = [3, 2, 4] all_task = [executor.submit(get_html, (url,)) for url in urls] wait(all_task, return_when=FIRST_COMPLETED) # 讓主執行緒阻塞,如果沒有return_when引數 預設是等待全部任務結束放行 print("main") # for future in as_completed(all_task): # as_completed 實際上是一個生成器,將已經完成的返回 # data = future.result() # print("get {} page".format(data)) # 通過executor的map獲取已經完成的task的值 for data in executor.map(get_html, urls): # map方法更加簡單 print("get {} page".format(data))
放上完整版
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED from concurrent.futures import Future from multiprocessing import Pool # 未來物件,task的返回容器 # 執行緒池, 為什麼要執行緒池 # 主執行緒中可以獲取某一個執行緒的狀態或者某一個任務的狀態,以及返回值 # 當一個執行緒完成的時候我們主執行緒能立即知道 # futures可以讓多執行緒和多程序編碼介面一致 import time def get_html(times): time.sleep(times) print("get page {} success".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 通過submit函式提交執行的函式到執行緒池中, submit 是立即返回 task1 = executor.submit(get_html, (3,)) # 第一個引數是函式名稱,第二個引數是引數 task2 = executor.submit(get_html, (2,)) # submit的返回時是非常重要,用於判斷是否執行成功等 # 要獲取已經成功的task的返回 urls = [3, 2, 4] all_task = [executor.submit(get_html, (url,)) for url in urls] wait(all_task, return_when=FIRST_COMPLETED) # 讓主執行緒阻塞,如果沒有return_when引數 預設是等待全部任務結束放行 print("main") # for future in as_completed(all_task): # as_completed 實際上是一個生成器,將已經完成的返回 # data = future.result() # print("get {} page".format(data)) # 通過executor的map獲取已經完成的task的值 for data in executor.map(get_html, urls): # map方法更加簡單 print("get {} page".format(data)) # #done方法用於判定某個任務是否完成 # print(task1.done()) # print(task2.cancel()) # time.sleep(3) # print(task1.done()) # # #result方法可以獲取task的執行結果 # print(task1.result())