1. 程式人生 > >Python學習--day35-非同步回撥 協程

Python學習--day35-非同步回撥 協程

day36

非同步回撥與協程

一、非同步回撥

1、什麼是回撥:

非同步回撥指的是:在發起一個非同步任務的同時指定一個函式,在非同步任務完成時會自動的呼叫這個函式。

2、為什麼需要回調函式

需要獲取非同步任務的執行結果,但是又不應該讓其阻塞(降低效率),即想要高效的獲取任務的執行結果。

之前在使用執行緒池或程序池提交任務時,如果想要處理任務的執行結果則必須呼叫result函式或是shutdown函式,而它們都是是阻塞的,會等到任務執行完畢後才能繼續執行,這樣一來在這個等待過程中就無法執行其他任務,降低了效率,所以需要一種方案,即保證解析結果的執行緒不用等待,又能保證資料能夠及時被解析,該方案就是非同步回撥。

3、如何使用非同步回撥

通常情況下,非同步都會和回撥函式一起使用,使用方法即是add_done_callback(),給Future物件繫結一個回撥函式。

注意:在多程序中回撥函式 是交給主程序來執行 而在多執行緒中 回撥函式是誰有空誰執行(不是主執行緒)

import requests,re,os,random,time
from concurrent.futures import ProcessPoolExecutor
​
def get_data(url):
    print("%s 正在請求%s" % (os.getpid(),url))
    time.sleep(random.randint(
1,2)) response = requests.get(url) print(os.getpid(),"請求成功 資料長度",len(response.content)) #parser(response) # 3.直接呼叫解析方法 哪個程序請求完成就那個程序解析資料 強行使兩個操作耦合到一起了 return response ​ def parser(obj): data = obj.result() htm = data.content.decode("utf-8") ls = re.findall("href=.*?com"
,htm) print(os.getpid(),"解析成功",len(ls),"個連結") ​ if __name__ == '__main__': pool = ProcessPoolExecutor(3) urls = ["https://www.baidu.com", "https://www.sina.com", "https://www.python.org", "https://www.tmall.com", "https://www.mysql.com", "https://www.apple.com.cn"] # objs = [] for url in urls: # res = pool.submit(get_data,url).result() # 1.同步的方式獲取結果 將導致所有請求任務不能併發 # parser(res) ​ obj = pool.submit(get_data,url) # obj.add_done_callback(parser) # 4.使用非同步回撥,保證了資料可以被及時處理,並且請求和解析解開了耦合 # objs.append(obj) # pool.shutdown() # 2.等待所有任務執行結束在統一的解析 # for obj in objs: # res = obj.result() # parser(res) # 1.請求任務可以併發 但是結果不能被及時解析 必須等所有請求完成才能解析 # 2.解析任務變成了序列,
View Code

 

總結:非同步回撥使用方法就是在提交任務後得到一個Futures物件,呼叫物件的add_done_callback來指定一個回撥函式。

如果把任務比喻為燒水,沒有回撥時就只能守著水壺等待水開,有了回撥相當於換了一個會響的水壺,燒水期間可用作其他的事情,等待水開了水壺會自動發出聲音,這時候再回來處理。水壺自動發出聲音就是回撥。

注意:

  1. 使用程序池時,回撥函式都是主程序中執行執行;

  2. 使用執行緒池時,回撥函式的執行執行緒是不確定的,哪個執行緒空閒就交給哪個執行緒;

  3. 回撥函式預設接收一個引數就是這個任務物件自己,再通過物件的result函式來獲取任務的處理結果。

二、執行緒中的佇列

引入執行緒佇列 : import queue  

  執行緒佇列方法 :

    q = queue.Queue()  #例項化對列,先進先出

    q = queue.LifoQueue()  #例項化佇列,後進先出  ( Last in, first out )

    q = queue.PriorityQueue()  #例項化佇列,優先順序佇列

      優先順序佇列,put() 方法接收的是一個元組,第一個元素是優先順序,第二個元素是資料;

      優先順序可以是數字或字元,只要能夠進行大小比較即可(即優先順序必須要是能夠比較大小的);

      如果優先順序是字串或特殊字元,按照字串或特殊字元的ASCII碼比較,如果ASCII碼相同,按照先進先出原則取出。

from queue import Queue,LifoQueue,PriorityQueue
​
# 1. 先進先出佇列
# q = Queue(1)
# q.put("a")
# q.put("b",timeout=1)
#
# print(q.get())
# print(q.get(timeout=2))
# 2.last in first out 後進先出佇列(堆疊)
# lq = LifoQueue()
# lq.put("a")
# lq.put("b")
# lq.put("c")
#
# print(lq.get())
# print(lq.get())
# print(lq.get())
# 3.優先順序佇列  (取出順序是 由小到大  優先順序可以使數字或字元 只要能夠比較大小即可)
pq = PriorityQueue()
# pq.put((2,"b"))
# pq.put((3,"c"))
# pq.put((1,"a"))
#
# print(pq.get())
# print(pq.get())
# print(pq.get())
​
pq.put((["a"],"bdslkfjdsfjd"))
pq.put((["b"],"csdlkjfksdjkfds"))
pq.put((["c"],"asd;kjfksdjfkdsf"))
​
print(pq.get())
print(pq.get())
print(pq.get())
View Code

 

三、事件

1、什麼是事件

執行緒的一個關鍵特性是每個執行緒都是獨立執行且狀態不可預測。如果程式中的其 他執行緒需要通過判斷某個執行緒的狀態來確定自己下一步的操作,這時執行緒同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event物件。

2、Event簡述

Event物件包含一個可由執行緒設定的訊號標誌,它允許執行緒等待某些事件的發生。在 初始情況下,Event物件中的訊號標誌被設定為假。如果有執行緒等待一個Event物件, 而這個Event物件的標誌為假,那麼這個執行緒將會被一直阻塞直至該標誌為真。一個執行緒如果將一個Event物件的訊號標誌設定為真,它將喚醒所有等待這個Event物件的執行緒。如果一個執行緒等待一個已經被設定為真的Event物件,那麼它將忽略這個事件, 繼續執行。

## event的常用方法
event.isSet():返回event的狀態值;
event.wait():如果 event.isSet()==False將阻塞執行緒;
event.set(): 設定event的狀態值為True,所有阻塞池的執行緒啟用進入就緒狀態, 等待作業系統排程;
event.clear():恢復event的狀態值為False。

 

event程式碼示例:

使用變數類完成多執行緒協作

 

import time
from threading import Thread
from threading import Event
​
# 建立一個事件(使用非同步修改後)
e = Event() #預設False
def start():
​
    print("正在啟動伺服器......")
    time.sleep(5)
    print("伺服器啟動成功!")
    e.set() # 就是把事件的值設定為True
def connect():
    for i in range(3):
        print("等待伺服器啟動....")
        e.wait(1) # 會阻塞 直到對方把事件設定為True
        if e.isSet():
            print("連線成功!")
            break
        else:
            print("連線失敗")
    else: #如果3次都沒成功 就列印這個訊息
        print("伺服器沒有啟動")
​
Thread(target=start).start()
Thread(target=connect).start()
使用Event

 四、協程

1、引言

上一節中我們知道GIL鎖將導致CPython無法利用多核CPU的優勢,只能使用單核併發的執行。很明顯效率不高,那有什麼辦法能夠提高效率呢?

效率要高只有一個方法就是讓這個當前執行緒儘可能多的佔用CPU時間,如何做到?

任務型別可以分為兩種 IO密集型 和 計算密集型

對於計算密集型任務而言 ,無需任何操作就能一直佔用CPU直到超時為止,沒有任何辦法能夠提高計算密集任務的效率,除非把GIL鎖拿掉,讓多核CPU並行執行。

對於IO密集型任務任務,一旦執行緒遇到了IO操作CPU就會立馬切換到其他執行緒,而至於切換到哪個執行緒,應用程式是無法控制的,這樣就導致了效率降低。

如何能提升效率呢?想一想如果可以監測到執行緒的IO操作時,應用程式自發的切換到其他的計算任務,是不是就可以留住CPU?的確如此

2、單執行緒實現併發

單執行緒實現併發這句話乍一聽好像在瞎說

首先需要明確併發的定義

併發:指的是多個任務同時發生,看起來好像是同時都在進行

並行:指的是多個任務真正的同時進行

早期的計算機只有一個CPU,既然CPU可以切換執行緒來實現併發,那麼為何不能再執行緒中切換任務來併發呢?

上面的引子中提到,如果一個執行緒能夠檢測IO操作並且將其設定為非阻塞,並自動切換到其他任務就可以提高CPU的利用率,指的就是在單執行緒下實現併發。

3、如何能夠實現併發呢

併發 = 切換任務+儲存狀態,只要找到一種方案,能夠在兩個任務之間切換執行並且儲存狀態,那就可以實現單執行緒併發

python中的生成器就具備這樣一個特點,每次呼叫next都會回到生成器函式中執行程式碼,這意味著任務之間可以切換,並且是基於上一次執行的結果,這意味著生成器會自動儲存執行狀態!

於是乎我們可以利用生成器來實現併發執行:

def task1():
    while True:
        yield
        print("task1 run")
​
def task2():
    g = task1()
    while True:
        next(g)
        print("task2 run")
task2()
yield實現併發

 

併發雖然實現了,單這對效率的影響是好是壞呢?來測試一下

yield實現併發的程式碼效能測試

 

可以看到對於純計算任務而言,單執行緒併發反而使執行效率下降了一半左右,所以這樣的方案對於純計算任務而言是沒有必要的

我們暫且不考慮這樣的併發對程式的好處是什麼,在上述程式碼中,使用yield來切換是的程式碼結構非常混亂,如果十個任務需要切換呢,不敢想象!因此就有人專門對yield進行了封裝,這便有了greenlet模組

4、greenlet模組實現併發

def task1(name):
    print("%s task1 run1" % name)
    g2.switch(name) # 切換至任務2
    print("task1 run2") 
    g2.switch() # 切換至任務2
def task2(name):
    print("%s task2 run1" % name)
    g1.switch() # 切換至任務1
    print("task2 run2")
​
g1 = greenlet.greenlet(task1)
g2 = greenlet.greenlet(task2)
g1.switch("jerry") # 為任務傳引數
View Code

 

該模組簡化了yield複雜的程式碼結構,實現了單執行緒下多工併發,但是無論直接使用yield還是greenlet都不能檢測IO操作,遇到IO時同樣進入阻塞狀態,所以此時的併發是沒有任何意義的。

現在我們需要一種方案 即可檢測IO 又能夠實現單執行緒併發,於是gevent閃亮登場

5、協程概述

協程:是單執行緒下的併發,又稱微執行緒,纖程。英文名Coroutine。一句話說明什麼是執行緒:協程是一種使用者態的輕量級執行緒,即協程是由使用者程式自己控制排程的。

需要強調的是:

#1. python的執行緒屬於核心級別的,即由作業系統控制排程(如單執行緒遇到io或執行時間過長就會被迫交出cpu執行許可權,切換其他執行緒執行)
#2. 單執行緒內開啟協程,一旦遇到io,就會從應用程式級別(而非作業系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)

 

對比作業系統控制執行緒的切換,使用者在單執行緒內控制協程的切換

優點如下:

#1. 協程的切換開銷更小,屬於程式級別的切換,作業系統完全感知不到,因而更加輕量級
#2. 單執行緒內就可以實現併發的效果,最大限度地利用cpu

 

缺點如下:

#1. 協程的本質是單執行緒下,無法利用多核,可以是一個程式開啟多個程序,每個程序內開啟多個執行緒,每個執行緒內開啟協程來儘可能提高效率
#2. 協程指的是單個執行緒,因而一旦協程出現阻塞,將會阻塞整個執行緒

 

6、gevent協程的使用

import gevent,sys
from gevent import monkey # 匯入monkey補丁
monkey.patch_all() # 打補丁 
import time
​
print(sys.path)
​
def task1():
    print("task1 run")
    # gevent.sleep(3)
    time.sleep(3)
    print("task1 over")
​
def task2():
    print("task2 run")
    # gevent.sleep(1)
    time.sleep(1)
    print("task2 over")
​
g1 = gevent.spawn(task1)
g2 = gevent.spawn(task2)
gevent.joinall([g1,g2])
View Code

 

需要注意:

1.協程執行時要想使任務執行則必須對協程物件呼叫join函式

2.有多個任務時,隨便呼叫哪一個的join都會併發的執行所有任務,但是需要注意如果一個存在io的任務沒有被join該任務將無法正常執行完畢

3.monkey補丁的原理是把原始的阻塞模組替換為修改後的非阻塞模組,即偷樑換柱,來實現IO自定切換,所以打補丁的位置一定要放到匯入阻塞模組之前