1. 程式人生 > >python併發程式設計之協程

python併發程式設計之協程

一.引子?

     首先我們來回顧一下併發的本質:切換+儲存狀態,此次的協程就是基於單執行緒來實現併發(只用一個主執行緒,很明顯可利用的cpu只有一個)

    cpu正在執行一個任務,會在兩種情況下切走去執行其他的任務(切換由作業系統強制控制),一種情況是該任務發生了阻塞,另外一種情況是該任務計算的時間過長或有一個優先順序更高的程式替代了它,下圖為執行緒的3種狀態

    

    1.其中第二種情況並不能提升效率,只是為了讓cpu能夠雨露均沾,實現看起來所有任務都被“同時”執行的效果,如果多個任務都是純計算的,這種切換反而會降低效率。為此我們可以基於yield來驗證。yield本身就是一種在單執行緒下可以儲存任務執行狀態的方法,我們來簡單複習一下:

    #1 yiled可以儲存狀態,yield的狀態儲存與作業系統的儲存執行緒狀態很像,但是yield是程式碼級別控制的,更輕量級

    #2 send可以把一個函式的結果傳給另外一個函式,以此實現單執行緒內程式之間的切換

'''
1、協程:
    單執行緒實現併發
    在應用程式裡控制多個任務的切換+儲存狀態
    優點:
        應用程式級別速度要遠遠高於作業系統的切換
    缺點:
        多個任務一旦有一個阻塞沒有切,整個執行緒都阻塞在原地
        該執行緒內的其他的任務都不能執行了

        一旦引入協程,就需要檢測單執行緒下所有的IO行為,
        實現遇到IO就切換,少一個都不行,以為一旦一個任務阻塞了,整個執行緒就阻塞了,
        其他的任務即便是可以計算,但是也無法運行了

2、協程式的目的:
    想要在單執行緒下實現併發
    併發指的是多個任務看起來是同時執行的
    併發=切換+儲存狀態
'''

#序列執行
import time

def func1():
    for i in range(10000000):
        i+1

def func2():
    for i in range(10000000):
        i+1

start = time.time()
func1()
func2()
stop = time.time()
print(stop - start)


#基於yield併發執行
import time
def func1():
    while True:
        yield

def func2():
    g=func1()
    for i in range(10000000):
        i+1
        next(g)

start=time.time()
func2()
stop=time.time()
print(stop-start)

單純地切換反而會降低執行效率

  2.第一種情況的切換。在任務一遇到io情況下,切到任務二去執行,這樣就可以利用任務一阻塞的時間完成任務二的計算,效率的提升就在於此。

import time
def func1():
    while True:
        print('func1')
        yield

def func2():
    g=func1()
    for i in range(10000000):
        i+1
        next(g)
        time.sleep(3)
        print('func2')
start=time.time()
func2()
stop=time.time()
print(stop-start)

yield不能檢測IO,實現遇到IO自動切換

 對於單執行緒下,我們不可避免程式中出現io操作,但如果我們能在自己的程式中(即使用者程式級別,而非作業系統級別)控制單執行緒下的多個任務能在一個任務遇到io阻塞時就切換到另外一個任務去計算,這樣就保證了該執行緒能夠最大限度地處於就緒態,即隨時都可以被cpu執行的狀態,相當於我們在使用者程式級別將自己的io操作最大限度地隱藏起來,從而可以迷惑作業系統,讓其看到:該執行緒好像是一直在計算,io比較少,從而更多的將cpu的執行許可權分配給我們的執行緒。

協程的本質就是在單執行緒下,由使用者自己控制一個任務遇到io阻塞了就切換另外一個任務去執行,以此來提升效率。為了實現它,我們需要找尋一種可以同時滿足以下條件的解決方案:

#1. 可以控制多個任務之間的切換,切換之前將任務的狀態儲存下來,以便重新執行時,可以基於暫停的位置繼續執行。

#2. 作為1的補充:可以檢測io操作,在遇到io操作的情況下才發生切換

二.協程的介紹

協程:是單執行緒下的併發,又稱微執行緒,纖程。英文名Coroutine。一句話說明什麼是執行緒:協程是一種使用者態的輕量級執行緒,即協程是由使用者程式自己控制排程的。

需要強調的是:

#1. python的執行緒屬於核心級別的,即由作業系統控制排程(如單執行緒遇到io或執行時間過長就會被迫交出cpu執行許可權,切換其他執行緒執行)

#2. 單執行緒內開啟協程,一旦遇到io,就會從應用程式級別(而非作業系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)

對比作業系統控制執行緒的切換,使用者在單執行緒內控制協程的切換

優點如下:

 1.協程的切換開銷更小,屬於程式級別的切換,作業系統完全感知不到,因而更加輕量級

 2.單執行緒內就可以實現併發的效果,最大限度地利用cpu

缺點如下:

 1.協程的本質是單執行緒下,無法利用多核,可以是一個程式開啟多個程序,每個程序內開啟多個執行緒,每個執行緒內開啟協程

 2.協程指的是單個執行緒,因而一旦協程出現阻塞,將會阻塞整個執行緒

總結協程特點:

  1. 必須在只有一個單執行緒裡實現併發
  2. 修改共享資料不需加鎖
  3. 使用者程式裡自己儲存多個控制流的上下文棧
  4. 附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模組(select機制)

三.Greenlet

如果我們在單個執行緒內有20個任務,要想實現在多個任務之間切換,使用yield生成器的方式過於麻煩(需要先得到初始化一次的生成器,然後再呼叫send。。。非常麻煩),而使用greenlet模組可以非常簡單地實現這20個任務直接的切換

安裝  pip3 insytall greenlet

#順序執行
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i

def f2():
    res=1
    for i in range(100000000):
        res*=i

start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) #10.985628366470337

#切換
from greenlet import greenlet
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i
        g2.switch()

def f2():
    res=1
    for i in range(100000000):
        res*=i
        g1.switch()

start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 52.763017892837524

單純的切換(在沒有io的情況下或者沒有重複開闢記憶體空間的操作),反而會降低程式的執行速度

greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。

單執行緒裡的這20個任務的程式碼通常會既有計算操作又有阻塞操作,我們完全可以在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。。如此,才能提高效率,這就用到了Gevent模組

四.Gevent介紹

    安裝 pip3 install gevent

Gevent 是一個第三方庫,可以輕鬆通過gevent實現併發同步或非同步程式設計,在gevent中用到的主要模式是Greenlet, 它是以C擴充套件模組形式接入Python的輕量級協程。 Greenlet全部執行在主程式作業系統程序的內部,但它們被協作式地排程。

#用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一個協程物件g1,spawn括號內第一個引數是函式名,如eat,後面可以有多個引數,可以是位置實參或關鍵字實參,都是傳給函式eat的

g2=gevent.spawn(func2)

g1.join() #等待g1結束

g2.join() #等待g2結束

#或者上述兩步合作一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

遇到IO阻塞時會自動切換任務

from gevent import monkey;monkey.patch_all()
import time
import  gevent
from threading import current_thread

def eat(name):
    print("%s eat 1" %current_thread().getName())
    time.sleep(2)
    print("%s eat 2" %current_thread().getName())


def play(name):
    print("%s play 1" %current_thread().getName())
    time.sleep(1)
    print("%s play 2" %current_thread().getName())


g1=gevent.spawn(eat,"egon")
g2=gevent.spawn(play,"zs")
g1.join()
g2.join()
print("main")

上例gevent.sleep(2)模擬的是gevent可以識別的io阻塞,

而time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行程式碼,打補丁,就可以識別了

from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模組之前

或者我們乾脆記憶成:要用gevent,需要將from gevent import monkey;monkey.patch_all()放到檔案的開頭

 五.Gevent之應用舉例

#客戶端
from socket import *
from threading import Thread


def task():
    client = socket(AF_INET, SOCK_STREAM)
    client.connect(("127.0.0.1", 8989))

    while True:
        
        client.send("hello".encode("utf-8"))
        res=client.recv(1024)
        print(res.decode('utf-8'))


if __name__ == '__main__':
    for i in range(20):
        t=Thread(target=task)
        t.start()

#服務端
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent
from threading import Thread,current_thread


def link(ip,port):
    server=socket(AF_INET,SOCK_STREAM)
    server.bind((ip,port))
    server.listen()
    
    while True:
        conn,client_addr=server.accept()
        print(conn)
        gevent.spawn(communite,conn,client_addr)


def communite(conn,client_addr):
    while True:
         try:
             msg=conn.recv(1024)
             if not msg:break
             print('%s msg:%s'%(current_thread().getName(),msg))
             conn.send(msg.upper())
         except ConnectionResetError:
             conn.close()
             break
                         
if __name__ == '__main__':
    g1=gevent.spawn(link,"127.0.0.1",8989)
    g1.join()