Python學習--day35-非同步回撥 協程
day36
非同步回撥與協程
一、非同步回撥
1、什麼是回撥:
非同步回撥指的是:在發起一個非同步任務的同時指定一個函式,在非同步任務完成時會自動的呼叫這個函式。
2、為什麼需要回調函式
需要獲取非同步任務的執行結果,但是又不應該讓其阻塞(降低效率),即想要高效的獲取任務的執行結果。
之前在使用執行緒池或程序池提交任務時,如果想要處理任務的執行結果則必須呼叫result函式或是shutdown函式,而它們都是是阻塞的,會等到任務執行完畢後才能繼續執行,這樣一來在這個等待過程中就無法執行其他任務,降低了效率,所以需要一種方案,即保證解析結果的執行緒不用等待,又能保證資料能夠及時被解析,該方案就是非同步回撥。
3、如何使用非同步回撥
通常情況下,非同步都會和回撥函式一起使用,使用方法即是add_done_callback(),給Future物件繫結一個回撥函式。
注意:在多程序中回撥函式 是交給主程序來執行 而在多執行緒中 回撥函式是誰有空誰執行(不是主執行緒)
import requests,re,os,random,time from concurrent.futures import ProcessPoolExecutor def get_data(url): print("%s 正在請求%s" % (os.getpid(),url)) time.sleep(random.randint(View Code1,2)) response = requests.get(url) print(os.getpid(),"請求成功 資料長度",len(response.content)) #parser(response) # 3.直接呼叫解析方法 哪個程序請求完成就那個程序解析資料 強行使兩個操作耦合到一起了 return response def parser(obj): data = obj.result() htm = data.content.decode("utf-8") ls = re.findall("href=.*?com",htm) print(os.getpid(),"解析成功",len(ls),"個連結") if __name__ == '__main__': pool = ProcessPoolExecutor(3) urls = ["https://www.baidu.com", "https://www.sina.com", "https://www.python.org", "https://www.tmall.com", "https://www.mysql.com", "https://www.apple.com.cn"] # objs = [] for url in urls: # res = pool.submit(get_data,url).result() # 1.同步的方式獲取結果 將導致所有請求任務不能併發 # parser(res) obj = pool.submit(get_data,url) # obj.add_done_callback(parser) # 4.使用非同步回撥,保證了資料可以被及時處理,並且請求和解析解開了耦合 # objs.append(obj) # pool.shutdown() # 2.等待所有任務執行結束在統一的解析 # for obj in objs: # res = obj.result() # parser(res) # 1.請求任務可以併發 但是結果不能被及時解析 必須等所有請求完成才能解析 # 2.解析任務變成了序列,
總結:非同步回撥使用方法就是在提交任務後得到一個Futures物件,呼叫物件的add_done_callback來指定一個回撥函式。
如果把任務比喻為燒水,沒有回撥時就只能守著水壺等待水開,有了回撥相當於換了一個會響的水壺,燒水期間可用作其他的事情,等待水開了水壺會自動發出聲音,這時候再回來處理。水壺自動發出聲音就是回撥。
注意:
-
使用程序池時,回撥函式都是主程序中執行執行;
-
使用執行緒池時,回撥函式的執行執行緒是不確定的,哪個執行緒空閒就交給哪個執行緒;
-
回撥函式預設接收一個引數就是這個任務物件自己,再通過物件的result函式來獲取任務的處理結果。
二、執行緒中的佇列
引入執行緒佇列 : import queue
執行緒佇列方法 :
q = queue.Queue() #例項化對列,先進先出
q = queue.LifoQueue() #例項化佇列,後進先出 ( Last in, first out )
q = queue.PriorityQueue() #例項化佇列,優先順序佇列
優先順序佇列,put() 方法接收的是一個元組,第一個元素是優先順序,第二個元素是資料;
優先順序可以是數字或字元,只要能夠進行大小比較即可(即優先順序必須要是能夠比較大小的);
如果優先順序是字串或特殊字元,按照字串或特殊字元的ASCII碼比較,如果ASCII碼相同,按照先進先出原則取出。
from queue import Queue,LifoQueue,PriorityQueue # 1. 先進先出佇列 # q = Queue(1) # q.put("a") # q.put("b",timeout=1) # # print(q.get()) # print(q.get(timeout=2)) # 2.last in first out 後進先出佇列(堆疊) # lq = LifoQueue() # lq.put("a") # lq.put("b") # lq.put("c") # # print(lq.get()) # print(lq.get()) # print(lq.get()) # 3.優先順序佇列 (取出順序是 由小到大 優先順序可以使數字或字元 只要能夠比較大小即可) pq = PriorityQueue() # pq.put((2,"b")) # pq.put((3,"c")) # pq.put((1,"a")) # # print(pq.get()) # print(pq.get()) # print(pq.get()) pq.put((["a"],"bdslkfjdsfjd")) pq.put((["b"],"csdlkjfksdjkfds")) pq.put((["c"],"asd;kjfksdjfkdsf")) print(pq.get()) print(pq.get()) print(pq.get())View Code
三、事件
1、什麼是事件
執行緒的一個關鍵特性是每個執行緒都是獨立執行且狀態不可預測。如果程式中的其 他執行緒需要通過判斷某個執行緒的狀態來確定自己下一步的操作,這時執行緒同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event物件。
2、Event簡述
Event物件包含一個可由執行緒設定的訊號標誌,它允許執行緒等待某些事件的發生。在 初始情況下,Event物件中的訊號標誌被設定為假。如果有執行緒等待一個Event物件, 而這個Event物件的標誌為假,那麼這個執行緒將會被一直阻塞直至該標誌為真。一個執行緒如果將一個Event物件的訊號標誌設定為真,它將喚醒所有等待這個Event物件的執行緒。如果一個執行緒等待一個已經被設定為真的Event物件,那麼它將忽略這個事件, 繼續執行。
## event的常用方法 event.isSet():返回event的狀態值; event.wait():如果 event.isSet()==False將阻塞執行緒; event.set(): 設定event的狀態值為True,所有阻塞池的執行緒啟用進入就緒狀態, 等待作業系統排程; event.clear():恢復event的狀態值為False。
event程式碼示例:
使用變數類完成多執行緒協作
import time from threading import Thread from threading import Event # 建立一個事件(使用非同步修改後) e = Event() #預設False def start(): print("正在啟動伺服器......") time.sleep(5) print("伺服器啟動成功!") e.set() # 就是把事件的值設定為True def connect(): for i in range(3): print("等待伺服器啟動....") e.wait(1) # 會阻塞 直到對方把事件設定為True if e.isSet(): print("連線成功!") break else: print("連線失敗") else: #如果3次都沒成功 就列印這個訊息 print("伺服器沒有啟動") Thread(target=start).start() Thread(target=connect).start()使用Event
四、協程
1、引言
上一節中我們知道GIL鎖將導致CPython無法利用多核CPU的優勢,只能使用單核併發的執行。很明顯效率不高,那有什麼辦法能夠提高效率呢?
效率要高只有一個方法就是讓這個當前執行緒儘可能多的佔用CPU時間,如何做到?
任務型別可以分為兩種 IO密集型 和 計算密集型
對於計算密集型任務而言 ,無需任何操作就能一直佔用CPU直到超時為止,沒有任何辦法能夠提高計算密集任務的效率,除非把GIL鎖拿掉,讓多核CPU並行執行。
對於IO密集型任務任務,一旦執行緒遇到了IO操作CPU就會立馬切換到其他執行緒,而至於切換到哪個執行緒,應用程式是無法控制的,這樣就導致了效率降低。
如何能提升效率呢?想一想如果可以監測到執行緒的IO操作時,應用程式自發的切換到其他的計算任務,是不是就可以留住CPU?的確如此
2、單執行緒實現併發
單執行緒實現併發這句話乍一聽好像在瞎說
首先需要明確併發的定義
併發:指的是多個任務同時發生,看起來好像是同時都在進行
並行:指的是多個任務真正的同時進行
早期的計算機只有一個CPU,既然CPU可以切換執行緒來實現併發,那麼為何不能再執行緒中切換任務來併發呢?
上面的引子中提到,如果一個執行緒能夠檢測IO操作並且將其設定為非阻塞,並自動切換到其他任務就可以提高CPU的利用率,指的就是在單執行緒下實現併發。
3、如何能夠實現併發呢
併發 = 切換任務+儲存狀態,只要找到一種方案,能夠在兩個任務之間切換執行並且儲存狀態,那就可以實現單執行緒併發
python中的生成器就具備這樣一個特點,每次呼叫next都會回到生成器函式中執行程式碼,這意味著任務之間可以切換,並且是基於上一次執行的結果,這意味著生成器會自動儲存執行狀態!
於是乎我們可以利用生成器來實現併發執行:
def task1(): while True: yield print("task1 run") def task2(): g = task1() while True: next(g) print("task2 run") task2()yield實現併發
併發雖然實現了,單這對效率的影響是好是壞呢?來測試一下
yield實現併發的程式碼效能測試
可以看到對於純計算任務而言,單執行緒併發反而使執行效率下降了一半左右,所以這樣的方案對於純計算任務而言是沒有必要的
我們暫且不考慮這樣的併發對程式的好處是什麼,在上述程式碼中,使用yield來切換是的程式碼結構非常混亂,如果十個任務需要切換呢,不敢想象!因此就有人專門對yield進行了封裝,這便有了greenlet模組
4、greenlet模組實現併發
def task1(name): print("%s task1 run1" % name) g2.switch(name) # 切換至任務2 print("task1 run2") g2.switch() # 切換至任務2 def task2(name): print("%s task2 run1" % name) g1.switch() # 切換至任務1 print("task2 run2") g1 = greenlet.greenlet(task1) g2 = greenlet.greenlet(task2) g1.switch("jerry") # 為任務傳引數View Code
該模組簡化了yield複雜的程式碼結構,實現了單執行緒下多工併發,但是無論直接使用yield還是greenlet都不能檢測IO操作,遇到IO時同樣進入阻塞狀態,所以此時的併發是沒有任何意義的。
現在我們需要一種方案 即可檢測IO 又能夠實現單執行緒併發,於是gevent閃亮登場
協程:是單執行緒下的併發,又稱微執行緒,纖程。英文名Coroutine。一句話說明什麼是執行緒:協程是一種使用者態的輕量級執行緒,即協程是由使用者程式自己控制排程的。
需要強調的是:
#1. python的執行緒屬於核心級別的,即由作業系統控制排程(如單執行緒遇到io或執行時間過長就會被迫交出cpu執行許可權,切換其他執行緒執行) #2. 單執行緒內開啟協程,一旦遇到io,就會從應用程式級別(而非作業系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)
對比作業系統控制執行緒的切換,使用者在單執行緒內控制協程的切換
優點如下:
#1. 協程的切換開銷更小,屬於程式級別的切換,作業系統完全感知不到,因而更加輕量級 #2. 單執行緒內就可以實現併發的效果,最大限度地利用cpu
缺點如下:
#1. 協程的本質是單執行緒下,無法利用多核,可以是一個程式開啟多個程序,每個程序內開啟多個執行緒,每個執行緒內開啟協程來儘可能提高效率 #2. 協程指的是單個執行緒,因而一旦協程出現阻塞,將會阻塞整個執行緒
6、gevent協程的使用
import gevent,sys from gevent import monkey # 匯入monkey補丁 monkey.patch_all() # 打補丁 import time print(sys.path) def task1(): print("task1 run") # gevent.sleep(3) time.sleep(3) print("task1 over") def task2(): print("task2 run") # gevent.sleep(1) time.sleep(1) print("task2 over") g1 = gevent.spawn(task1) g2 = gevent.spawn(task2) gevent.joinall([g1,g2])View Code
需要注意:
1.協程執行時要想使任務執行則必須對協程物件呼叫join函式
2.有多個任務時,隨便呼叫哪一個的join都會併發的執行所有任務,但是需要注意如果一個存在io的任務沒有被join該任務將無法正常執行完畢
3.monkey補丁的原理是把原始的阻塞模組替換為修改後的非阻塞模組,即偷樑換柱,來實現IO自定切換,所以打補丁的位置一定要放到匯入阻塞模組之前