1. 程式人生 > >異步調用與回調機制,協程

異步調用與回調機制,協程

線程等待 AC 2-2 非阻塞 一次 ont 自帶 finally 流程

1.異步調用與回調機制

  上一篇我們已經了解到了兩組比較容易混淆的概念問題,1.同步與異步調用 2.阻塞與非阻塞狀態。在說到異步調用的時候,說到提交任務後,就直接執行下一行代碼,而不去拿結果,這樣明顯存在缺陷,結果是肯定要拿的,這輩子都肯定是要拿到這個結果的,沒有這個結果後面的活又不會幹,沒辦法,只能去拿結果的,那麽問題是異步調用提交任務後,如何實現既要拿到結果又不需要原地等的理想狀態呢?專門為異步調用配備了一個方法——回調機制

先來想想我們之前是怎麽拿到一個函數的結果,就傳給另外一個函數取執行,直接用一個變量接收第一個函數的返回值,然後將其傳入第二個函數中,這種方式並不能達到異步調用的條件,第二種方式是在一個函數體內的結果產生之後,直接調用另外一個函數,這樣的話就能實現異步調用的特征,但是這樣的方式會將兩個獨立的函數耦合到了一起,沒有實現程序與程序之間的解耦和概念,第三種方式就是借助於回調機制,它能夠在一個函數產生運行結果的瞬間,就將該函數的結果作為參數傳給其他函數取執行,有一種實時監控實時響應的感覺,相當於在一個函數體內植入了一段隱藏功能,這個功能在特點條件下自動觸發~~~

技術分享圖片
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
import random

def get(url):
    print(%s GET %s %(os.getpid(),url))
    response=requests.get(url)
    time.sleep(random.randint(1,3))

    if response.status_code == 200:
        pasrse(response.text) #此方法不好的地方在於沒有實現兩個函數之間的解耦和

def pasrse(res):
    print(
%s 解析結果為:%s %(os.getpid(),len(res))) if __name__ == __main__: urls=[ https://www.baidu.com, https://www.baidu.com, https://www.baidu.com, https://www.baidu.com, https://www.baidu.com, https://www.baidu.com, https://www.baidu.com,
https://www.baidu.com, https://www.python.org, ] pool=ProcessPoolExecutor(4) for url in urls: pool.submit(get,url)
第二種方式(沒有解耦和)
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
import random

def get(url):
    print(%s GET %s %(os.getpid(),url))
    response=requests.get(url)
    time.sleep(random.randint(1,3))

    if response.status_code == 200:
        # 幹解析的活
        return response.text

def pasrse(obj):
    res=obj.result()
    print(%s 解析結果為:%s %(os.getpid(),len(res)))

if __name__ == __main__:
    urls=[
        https://www.baidu.com,
        https://www.baidu.com,
        https://www.baidu.com,
        https://www.baidu.com,
        https://www.baidu.com,
        https://www.baidu.com,
        https://www.baidu.com,
        https://www.baidu.com,
        https://www.python.org,

    ]

    pool=ProcessPoolExecutor(4)
    for url in urls:
        obj=pool.submit(get,url)
        obj.add_done_callback(pasrse) #回調函數,在創建的進程中綁上一個方法,使得綁定的進程一旦運行完畢產生結果就立馬拿到返回的結果去給另外一個函數執行
        #之所以叫回調是因為進程的創建是由主進程申請的,等子進程裏產生結果後,會再次返回給主進程,由主進程來負責執行數據分析的活!
        #如果是線程的話,則是哪個線程空閑就由哪個線程去幹,(線程沒有主次)
    # 問題:
    # 1、任務的返回值不能得到及時的處理,必須等到所有任務都運行完畢才能統一進行處理
    # 2、解析的過程是串行執行的,如果解析一次需要花費2s,解析9次則需要花費18s
    print(主進程,os.getpid())

2.協程

  本節的主題是基於單線程來實現並發,即只用一個主線程(很明顯可利用的cpu只有一個)情況下實現並發,為此我們需要先回顧下並發的本質:切換+保存狀態

也就是說只要是能做到多個任務只要有一個遇到I/O我們就能控制程序立馬跳轉到其他任務去執行,實現一遇到I/O就保存狀態並切換執行,那麽就滿足了並發的條件,而一個線程裏面是有多個任務的,也就是說一個線程裏面只要能達到上述要求也就能實現——一個線程下實現多並發的效果!

cpu正在運行一個任務,會在兩種情況下切走去執行其他的任務(切換由操作系統強制控制),一種情況是該任務發生了阻塞,另外一種情況是該任務計算的時間過長或有一個優先級更高的程序替代了它

技術分享圖片

ps:在介紹進程理論時,提及進程的三種執行狀態,而線程才是執行單位,所以也可以將上圖理解為線程的三種狀態

一:其中第二種情況並不能提升效率,只是為了讓cpu能夠雨露均沾,實現看起來所有任務都被“同時”執行的效果,如果多個任務都是純計算的,這種切換反而會降低效率。

二:第一種情況的切換。在任務一遇到io情況下,切到任務二去執行,這樣就可以利用任務一阻塞的時間完成任務二的計算,效率的提升就在於此。

對於單線程下,我們不可避免程序中出現io操作,但如果我們能在自己的程序中(即用戶程序級別,而非操作系統級別)控制單線程下的多個任務能在一個任務遇到io阻塞時就切換到另外一個任務去計算,這樣就保證了該線程能夠最大限度地處於就緒態,即隨時都可以被cpu執行的狀態,相當於我們在用戶程序級別將自己的io操作最大限度地隱藏起來,從而可以迷惑操作系統,讓其看到:該線程好像是一直在計算,io比較少,從而更多的將cpu的執行權限分配給我們的線程。

協程的本質就是在單線程下,由用戶自己控制一個任務遇到io阻塞了就切換另外一個任務去執行,以此來提升效率。為了實現它,我們需要找尋一種可以同時滿足以下條件的解決方案:

1. 可以控制多個任務之間的切換,切換之前將任務的狀態保存下來,以便重新運行時,可以基於暫停的位置繼續執行。

2. 作為1的補充:可以檢測io操作,在遇到io操作的情況下才發生切換

二 協程介紹

協程:是單線程下的並發,又稱微線程,纖程。英文名Coroutine。一句話說明什麽是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。

需要強調的是:

1. python的線程屬於內核級別的,即由操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運行)
2. 單線程內開啟協程,一旦遇到io,就會從應用程序級別(而非操作系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)

對比操作系統控制線程的切換,用戶在單線程內控制協程的切換

優點如下:

1. 協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級
2. 單線程內就可以實現並發的效果,最大限度地利用cpu

缺點如下:

1. 協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程
2. 協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程

總結協程特點:

  1. 必須在只有一個單線程裏實現並發
  2. 修改共享數據不需加鎖
  3. 用戶程序裏自己保存多個控制流的上下文棧
  4. 附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模塊(select機制)

Gevent介紹

Gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。

#用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)創建一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的

g2=gevent.spawn(func2)

g1.join() #等待g1結束

g2.join() #等待g2結束

#或者上述兩步合作一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

遇到IO阻塞時會自動切換任務

import gevent
def eat(name):
    print(%s eat 1 %name)
    gevent.sleep(2)
    print(%s eat 2 %name)

def play(name):
    print(%s play 1 %name)
    gevent.sleep(1)
    print(%s play 2 %name)


g1=gevent.spawn(eat,egon)
g2=gevent.spawn(play,name=egon)
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print()

上例gevent.sleep(2)模擬的是gevent可以識別的io阻塞,

而time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行代碼,打補丁,就可以識別了

from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊之前

或者我們幹脆記憶成:要用gevent,需要將from gevent import monkey;monkey.patch_all()放到文件的開頭

from gevent import monkey;monkey.patch_all()

import gevent
import time
def eat():
    print(eat food 1)
    time.sleep(2)
    print(eat food 2)

def play():
    print(play 1)
    time.sleep(1)
    print(play 2)

g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print()

我們可以用threading.current_thread().getName()來查看每個g1和g2,查看的結果為DummyThread-n,即假線程

技術分享圖片
from gevent import spawn,joinall,monkey;monkey.patch_all()

import time
def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print(Task %s done % pid)


def synchronous():
    for i in range(10):
        task(i)

def asynchronous():
    g_l=[spawn(task,i) for i in range(10)]
    joinall(g_l)

if __name__ == __main__:
    print(Synchronous:)
    synchronous()

    print(Asynchronous:)
    asynchronous()
#上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行所有給定的greenlet。執行流程只會在 所有greenlet執行完後才會繼續向下走。
View Code

Gevent之應用舉例一

技術分享圖片
from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time

def get_page(url):
    print(GET: %s %url)
    response=requests.get(url)
    if response.status_code == 200:
        print(%d bytes received from %s %(len(response.text),url))


start_time=time.time()
gevent.joinall([
    gevent.spawn(get_page,https://www.python.org/),
    gevent.spawn(get_page,https://www.yahoo.com/),
    gevent.spawn(get_page,https://github.com/),
])
stop_time=time.time()
print(run time is %s %(stop_time-start_time))
協程應用:爬蟲

Gevent之應用舉例二

通過gevent實現單線程下的socket並發(from gevent import monkey;monkey.patch_all()一定要放到導入socket模塊之前,否則gevent無法識別socket的阻塞)

技術分享圖片
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent

#如果不想用money.patch_all()打補丁,可以用gevent自帶的socket
# from gevent import socket
# s=socket.socket()

def server(server_ip,port):
    s=socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind((server_ip,port))
    s.listen(5)
    while True:
        conn,addr=s.accept()
        gevent.spawn(talk,conn,addr)

def talk(conn,addr):
    try:
        while True:
            res=conn.recv(1024)
            print(client %s:%s msg: %s %(addr[0],addr[1],res))
            conn.send(res.upper())
    except Exception as e:
        print(e)
    finally:
        conn.close()

if __name__ == __main__:
    server(127.0.0.1,8080)

服務端
服務端 技術分享圖片
#_*_coding:utf-8_*_
__author__ = Linhaifeng

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect((127.0.0.1,8080))


while True:
    msg=input(>>: ).strip()
    if not msg:continue

    client.send(msg.encode(utf-8))
    msg=client.recv(1024)
    print(msg.decode(utf-8))
客戶端 技術分享圖片
from threading import Thread
from socket import *
import threading

def client(server_ip,port):
    c=socket(AF_INET,SOCK_STREAM) #套接字對象一定要加到函數內,即局部名稱空間內,放在函數外則被所有線程共享,則大家公用一個套接字對象,那麽客戶端端口永遠一樣了
    c.connect((server_ip,port))

    count=0
    while True:
        c.send((%s say hello %s %(threading.current_thread().getName(),count)).encode(utf-8))
        msg=c.recv(1024)
        print(msg.decode(utf-8))
        count+=1
if __name__ == __main__:
    for i in range(500):
        t=Thread(target=client,args=(127.0.0.1,8080))
        t.start()
多線程並發多個客戶端

EVENT事件

同進程的一樣

線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標誌被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標誌為假,那麽這個線程將會被一直阻塞直至該標誌為真。一個線程如果將一個Event對象的信號標誌設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那麽它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值;

event.wait():如果 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;

event.clear():恢復event的狀態值為False。

技術分享圖片

例如,有多個工作線程嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務正常才讓那些工作線程去連接MySQL服務器,如果連接不成功,都會去嘗試重新連接。那麽我們就可以采用threading.Event機制來協調各個工作線程的連接操作

技術分享圖片
from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError(鏈接超時)
        print(<%s>第%s次嘗試鏈接 % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print(<%s>鏈接成功 %threading.current_thread().getName())


def check_mysql():
    print(\033[45m[%s]正在檢查mysql\033[0m % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == __main__:
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()
View Code

異步調用與回調機制,協程