1. 程式人生 > >Python學習:非同步IO:協程和asyncio

Python學習:非同步IO:協程和asyncio

所謂協程就是在一個執行緒中切換子程序,相比多執行緒有如下好處:最大的優勢就是協程極高的執行效率。因為子程式切換不是執行緒切換,而是由程式自身控制,因此,沒有執行緒切換的開銷,和多執行緒比,執行緒數量越多,協程的效能優勢就越明顯。第二大優勢就是不需要多執行緒的鎖機制,因為只有一個執行緒,也不存在同時寫變數衝突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多執行緒高很多。

python的協程通過generator支援的.

yield可以接受引數|傳遞引數.

send呼叫協程,第一次要使用send(None)啟動generator.

下面使用協程實現簡單的生產者-效果者模型.

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:return
        print('consuming the event %s'%n)
        r = '200 OK'

def produce(c):
    c.send(None)
    for n in range(1,6):
        print('producing event %s'%n)
        r = c.send(n)
        print('the consumer response %s'
%r) c.close() c = consumer() produce(c)

總結一下,執行緒就是特殊的協程.

但是隻有協程還不夠,還不足以實現非同步IO,我們必須實現訊息迴圈和狀態的控制.這就要引入asynico,它直接內建了對非同步IO的支援.asyncio的程式設計模型就是一個訊息迴圈。我們從asyncio模組中直接獲取一個EventLoop的引用,然後把需要執行的協程扔到EventLoop中執行,就實現了非同步IO。

[0]:先看一個例子:

import asyncio,sys,time
@asyncio.coroutine
def hello():
    time.sleep(3
) print("Hello world!") # 非同步呼叫asyncio.sleep(1): r = yield from asyncio.sleep(2) print("Hello again!") # 獲取EventLoop: loop = asyncio.get_event_loop() # 執行coroutine loop.run_until_complete(asyncio.wait([hello() for i in range(3)])) loop.close()

在這個例子中,可以發現一旦協程阻塞,就會中斷當前的協程處理,然後切換到下一個訊息處理,同時把阻塞的協程加入訊息佇列的後面.

注意,如果是你自己寫的協程,eventloop好像會直接執行yield from後面的協程,不管該協程是否有阻塞的行為(可能不能識別?)具體怎麼處理還沒有學習到,以後再更新

下面是使用協程來實現非同步網路連線:

import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

[0]asyncio.open_connection接受host引數和port引數以及一些可選的關鍵字引數.返回一個reader和一個writer,redaer is a StreamReader instance; the writer is a StreamWriter instance.

writer.write就和socket.send差不多…

[1]:對於writer.drain()的用法,它會阻塞如果writer的buffer已經滿了…當然在我們這個例子裡面buffer是充足的,因為我們只發送了幾個GET請求。

When the size of the transport buffer reaches the high-water limit (the protocol is paused), block until the size of the buffer is drained down to the low-water limit and the protocol is resumed. When there is nothing to wait for, the yield-from continues immediately.