1. 程式人生 > >Coroutine:協程

Coroutine:協程

協程
協程,又叫微執行緒、纖程,可以認為是比執行緒更小的執行單元。協程自帶CPU上下文,這樣只要在合適的時機,我們可以把一個協程切換到另一個協程,只要這個過程中儲存或恢復 CPU上下文那麼程式還是可以執行的。

通俗的理解:在一個執行緒中的某個函式,可以在任何地方儲存當前函式的一些臨時變數等資訊,然後切換到另外一個函式中執行,注意不是通過呼叫函式的方式做到的,並且切換的次數以及什麼時候再切換到原來的函式都由開發者自己確定
協程和執行緒差異
最大的優勢就是協程極高的執行效率,因為子程式切換不是執行緒切換,而是由程式自身控制,因此,沒有執行緒切換的開銷
執行緒切換從系統層面遠不止儲存和恢復 CPU上下文這麼簡單。
作業系統為了程式執行的高效性每個執行緒都有自己快取Cache等等資料,
作業系統還會幫你做這些資料的恢復操作。所以執行緒的切換非常耗效能。
但是協程的切換隻是單純的操作CPU的上下文,所以一秒鐘切換個上百萬次系統都抗的住。

第二大優勢就是不需要多執行緒的鎖機制,因為只有一個執行緒,也不存在同時寫變數衝突,利用多核CPU,最簡單的方法是多程序+協程

Python對協程的支援是通過generator(生成器)實現的。

協程的問題

但是協程有一個問題,就是系統並不感知,所以作業系統不會幫你做切換。
那麼誰來幫你做切換?讓需要執行的協程更多的獲得CPU時間才是問題的關鍵。

先來一個傳統的函式呼叫,讓A,B函式都執行起來

def A():
    while True:
        print("=====A=====")
        time.sleep(1)
def B():
    while True:
        print("=====B=====")
        time.sleep(1)
  if __name__ == '__main__':
    A()
    B()

執行結果:
B一直在等待,並未執行
我想讓A B函式都執行,結果呢,A在執行時,B函式一直處於等待狀態

import threading
def A():
    while True:
        print("=====A=====")
        time.sleep(1)
        B()

def B():
    while True:
        print("=====B=====")
        time.sleep(1)
        A()
if __name__ == '__main__':
    A()
    B()

哪怕我在A函式中呼叫B函式,B函式依舊無法執行,執行結果如上

用執行緒來解決AB函式無法同時呼叫的問題

import threading
# 實現A、B交替列印
def A():
    while True:
        print("=====A=====")
        time.sleep(1)
    # B()
def B():
    while True:
        print("=====B=====")
        time.sleep(1)
    # A()
if __name__ == '__main__':
    a = threading.Thread(target=A)
    b = threading.Thread(target=B)
    a.start()
    b.start()

執行結果:
在這裡插入圖片描述
這樣,AB函式都會執行了
如果想要AB函式交替執行,在A函式中呼叫B,B函式中呼叫A即可

用協程解決AB函式無法同時呼叫的問題

import time
#A函式是一個generator
def A():  
    while True:
        print("----A---")
        yield
        time.sleep(0.5)
def B(a):
    while True:
        print("----B---")
        next(a)
        time.sleep(0.5)
        
if __name__ == '__main__':
    a = A()
    B(a)

執行結果:
先B後A,依次執行
先B後A,依次執行

用協程做生產者-消費者模型

def consumer():  # consumer函式是一個generator
    while True:
        n = yield "ok"   #生成式,獲取一個ok值
        print('[消費者] 消費了 %s...' % n)
      
def produce(c):
    c.send(None)  # 啟動生成器
    n = 0
    while n < 5:
        n = n + 1
        print('[生產者] 生產了 %s...' % n)
        r = c.send(n)    
         #  c.send(),即consumer().send(),輸出的結果為print('[消費者] 消費了 %s...' % n)的值,即r的值
         #  n的值傳進consumer(),不呼叫生成器,print的結果
        print(n)
        print(r)
        print('[生產者] 消費者結果: %s' % r)
    c.close()

if __name__ == '__main__':
    c = consumer()
    produce(c)

執行結果:
在這裡插入圖片描述

使用greenlet + switch實現協程排程

from greenlet import greenlet
import time

def func1():
    print("我愛你")
    time.sleep(1)
    gr2.switch()  # 把CPU執行權交給gr2

    print("分手吧")
    time.sleep(1)
    gr2.switch()
    pass
    
def func2():
    print("愛著你")
    time.sleep(1)
    gr1.switch()
    
    print("不愛了")
    pass

if __name__ == '__main__':
    gr1 = greenlet(func1)
    gr2 = greenlet(func2)
    gr1.switch() 

執行結果:
在這裡插入圖片描述
執行過程:
先用gr1,呼叫func1(), 執行print(“我愛你”) ,然後
-------- 交給gr2 ----------
用gr2 ,呼叫func2(), 執行print(“愛著你”) ,然後
-------- 交給gr1 ----------
用gr1 ,呼叫func1(),執行print(“分手吧”) ,然後
-------- 交給gr2 ----------
用gr2 ,呼叫func2(),執行print(“不愛了”) ,結束

使用gevent + sleep自動將CPU執行權分配給當前未睡眠的協程

import gevent
from gevent import Greenlet

def func1():
    gevent.sleep(5)
    print("離離原上草")

    gevent.sleep(10)
    print("草真多")
    pass
    
def func2():
    gevent.sleep(1)
    print("一歲一枯榮")

    gevent.sleep(5)
    print("燒沒了")
    pass
    
def simpleGevent():
    gr1 = gevent.spawn(func1)
    gr2 = gevent.spawn(func2)
    gevent.joinall([
        gr1, gr2
    ])
if __name__ == '__main__':
    simpleGevent()

執行結果:
在這裡插入圖片描述
執行過程:
按照先gr1,後gr2的順序開始計時
1, print(“一歲一枯榮”) sleep時間1秒, print(“離離原上草”) sleep時間5秒,故執行 一歲一枯榮
2,在第一次print後重新計時
以print(“一歲一枯榮”)的時間開始計時,print(“燒沒了”) sleep時間5秒,即列印時間為第1+5秒
以print(“離離原上草”)的時間開始計時,print(“草真多”) sleep時間10秒,即列印時間為第5+10秒
按照先gr1,後gr2的次序,以時間為順序列印
同一函式內的列印順序不變,自上而下一次執行

一個很有意思的小玩意,可以幫你更好的理解協程

import gevent

answers = {
    "我們是什麼": "瀏覽器",
    "我們要什麼": "速度",
    "什麼時候要": "現在",
    "還有問題嗎": None,
}

def firefox(question):
    gevent.sleep(1)
    print("firefox:", answers[question])
    pass
def opera(question):
    gevent.sleep(1)
    print("opera:", answers[question])
    pass
def safari(question):
    gevent.sleep(1)
    print("safari:", answers[question])
    pass
def ie(question):
    gevent.sleep(10)
    print("ie:", answers[question])
    pass

if __name__ == '__main__':
    q1 = "我們是什麼"
    q2 = "我們要什麼"
    q3 = "什麼時候要"
    q4 = "還有問題嗎"

    print(q1)
    gevent.joinall([
        gevent.spawn(ie, q1),
        gevent.spawn(firefox, q1),
        gevent.spawn(opera, q1),
        gevent.spawn(safari, q1)
    ], timeout=2)
    print("----------\n")

    print(q2)
    gevent.joinall([
        gevent.spawn(ie, q2),
        gevent.spawn(firefox, q2),
        gevent.spawn(opera, q2),
        gevent.spawn(safari, q2)
    ], timeout=2)
    print("----------\n")

    print(q3)
    gevent.joinall([
        gevent.spawn(ie, q3),
        gevent.spawn(firefox, q3),
        gevent.spawn(opera, q3),
        gevent.spawn(safari, q3)
    ], timeout=2)
    print("----------\n")

    print(q4)
    gevent.joinall([
        gevent.spawn(ie, q4),
        gevent.spawn(firefox, q4),
        gevent.spawn(opera, q4),
        gevent.spawn(safari, q4)
    ], timeout=5)
    print("----------\n")
    pass

使用gevent + monkey.patch_all()自動排程網路IO協程

import gevent
import requests
import time
from gevent import monkey

def getPageText(url, order=0):
    print("No%d:%s請求開始..." % (order, url))
    resp = requests.get(url)  # 發起網路請求,返回需要時間——阻塞IO
    html = resp.text
    print("No%d:%s成功返回:長度為%d" % (order, url, len(html)))
    pass
# 將【標準庫-阻塞IO實現】替換為【gevent-非阻塞IO實現】
monkey.patch_all()
if __name__ == '__main__':
    start = time.time()
    # time.clock()
    gevent.joinall([
        gevent.spawn(getPageText, "http://www.sina.com", order=1),
        gevent.spawn(getPageText, "http://www.baidu.com", order=2),
    ])
    end = time.time()   
    print("over,耗時%d秒"%(end - start))    非阻塞實現,耗時0秒哦
    # print(time.clock())
    pass