1. 程式人生 > >異步IO(協程,消息循環隊列)

異步IO(協程,消息循環隊列)

http框架 for 例如 高並發 bsp 標識 類型 tex 請求

同步是CPU自己主動查看IO操作是否完成,異步是IO操作完成後發出信號通知CPU(CPU是被通知的)

阻塞與非阻塞的區別在於發起IO操作之後,CPU是等待IO操作完成再進行下一步操作,還是不等待去做其他的事直到IO操作完

成了再回來進行。

消息模型:當遇到IO操作時,代碼只負責發出IO請求,不等待IO結果,然後直接結束本輪消息處理,進入下一輪

消息處理過程。當IO操作完成後,將收到一條“IO完成”的消息,處理該消息時就可以直接獲取IO操作結果。

子程序調用總是一個入口,一次返回,調用順序是明確的,而協程的調用和子程序不同(有多個入口,多次返回)。

#協程看上去也是子程序,但執行過程中,在子程序內部可中斷,然後轉而執行別的子程序,在適當的時候再返回來接著執行

協程子程序切換不是線程切換(系統自動執行),而是有程序自身控制。

#Python對協程的支持是通過generator實現的。

#在generator中,我們不但可以通過for循環來叠代,還可以不斷調用next()函數獲取有yield語句返回的下一個值。

#但是Python的yield不但可以返回一個值,還可以接收調用者發出的參數。

def consumer():

r=‘‘

while True:

n=yield r

if not n:

return

print(‘[CONSUMER]Consuming %s...‘ % n)

r=‘200 OK‘

def produce(c):

c.send(None) #調用c.send(None)啟動生成器,程序跳轉到consumer,運行至yield 處結束返回produce.

n=0

while n<5:

n=n+1

print(‘[PRODUCER]Producing %s...‘ %n)

r=c.send(n) #通過c.send(n)切換到consumer執行

print(‘[PRODUCER]cONSUMER RETURN: %s‘ %r)

c.close()

c=consumer()

produce(c)

‘‘‘

1.首先調用c.send(None)啟動生成器

2.然後,一旦產生了東西,通過c.send(n)切換到consumer執行

3.consumer通過yield拿到消息,處理,又通過yield把結果傳回

4.produce拿到consumer處理的結果,繼續生產下一條消息

5.produce決定不生產了,通過c.close()關閉consumer,整個過程結束。

整個流程無鎖,由一個線程執行,produce和consumer協作完成任務,所以稱為“協程”而非線程的搶占式多任務。

send()和yield相互為 發送和接收 參數 r=c.send(n),將參數n 傳遞給c函數,由c函數中的yield 及其後的變

量 接收,c中的程序再次運行到yield時,yield將其後的變量作為返回值,也就是c.send(n)的值

‘‘‘

‘‘‘

[PRODUCER]Producing 1...

[CONSUMER]Consuming 1...

[PRODUCER]cONSUMER RETURN: 200 OK

[PRODUCER]Producing 2...

[CONSUMER]Consuming 2...

[PRODUCER]cONSUMER RETURN: 200 OK

[PRODUCER]Producing 3...

[CONSUMER]Consuming 3...

[PRODUCER]cONSUMER RETURN: 200 OK

[PRODUCER]Producing 4...

[CONSUMER]Consuming 4...

[PRODUCER]cONSUMER RETURN: 200 OK

[PRODUCER]Producing 5...

[CONSUMER]Consuming 5...

[PRODUCER]cONSUMER RETURN: 200 OK

‘‘‘

#asyncio是Python3.4版本引入的標準庫,直接內置了對異步IO的支持

#asyncio的編程模型就是一個消息循環。我們從asyncio模塊中直接獲取一個Eventloop的引用,然後把需要執行的協程扔到Eventloop中執行,就實現了異步IO。

#用asyncio實現Hello world 代碼如下:

‘‘‘

import threading

import asyncio

@asyncio.coroutine

def hello():

print(‘Hello world! (%s)‘ %threading.currentThread())

yield from asyncio.sleep(1)

print(‘Hello again!(%s)‘ % threading.currentThread())

loop=asyncio.get_event_loop()

tasks=[hello(),hello()]

loop.run_until_complete(asyncio.wait(tasks))

loop.close()

‘‘‘

‘‘‘

Hello world! (<_MainThread(MainThread, started 5524)>)

Hello world! (<_MainThread(MainThread, started 5524)>)

(暫停約1秒)

Hello again!(<_MainThread(MainThread, started 5524)>)

Hello again!(<_MainThread(MainThread, started 5524)>)

‘‘‘

# @asyncio.coroutine把一個generator標記為corroutine類型,然後,我們就把這個coroutine扔到Eventloop

中執行。

#hello()會首先打印出Hello world!,然後,yield from 語法可以讓我們方便地調用另一個generator.由於

asyncio.sleep()也是一個coroutine,所以線程不會等待

#asyncio.sleep(),而是直接中斷並執行下一個消息循環。當asyncio.sleep()返回時,線程就可以從yield from

拿到返回值(此處是None),然後接著執行下一行語句。

#把asyncio.sleep(1)看成是一個耗時1秒的IO操作,在此期間,主線程並未等待,而是去執行Eventloop中其他可以執

行的coroutine了,因此可以實現並發執行。

多個coroutine可以封裝成一組task然後並發執行。

asyncio的編程模型就是一個消息循環。我們從asyncio模塊中直接獲取一個Eventloop的引用,然後把需要執行的協程扔

到Eventloop中執,就實現了異步IO。

為了簡化並更好地標識異步IO,從Python3.5開始引入新的語法async和await,可以讓coroutine的代碼更簡潔易讀。

‘‘‘

1. 把@asyncio.coroutine替換為async;

2. 把yield from 替換為await

‘‘‘

#例

‘‘‘

@asyncio.coroutine

def hello():

print("hello world!")

r = yield from asyncio.sleep(1)

print("hello again!")

‘‘‘

#可寫成

‘‘‘

async def hello():

print("hello world!")

r=await asyncio.sleep(1)

print("hello again!")

‘‘‘

#剩下的代碼保持不變

#aiohttp

#asyncio可以實現單線程並發IO操作。如果僅用在客戶端,發揮的威力不大。如果把asyncio用在服務器端,例如web服務器,由於http連接就是IO操作,因此可以用

#單線程+coroutine實現多用戶的高並發支持。

#asyncio實現了TCP UDP SSL等協議,aiohttp則是基於asyncio實現的http框架。

#編寫一個http服務器 ,分別處理一下URL:

‘‘‘

* / -首頁返回b‘<h1>Index</h1>‘;

* /hello/{name} -根據URL參數返回文本 hello,%s!.

‘‘‘

import asyncio

from aiohttp import web

async def index(request):

await asyncio.sleep(0.5)

return web.Response(body=b‘<h1>Index</h1>‘,content_type=‘text/html‘)

async def hello(request):

await asyncio.sleep(0.5)

text=‘<h1>hello, %s!<h1>‘ %request.match_info[‘name‘]

return web.Response(body=text.encode(‘utf-8‘),content_type=‘text/html‘)

async def init(loop):

app=web.Application(loop=loop)

app.router.add_route(‘GET‘,‘/‘,index)

app.router.add_route(‘GET‘,‘/hello/{name}‘,hello)

srv=await loop.create_server(app.make_handler(),‘127.0.0.1‘,8000)

print(‘Server started at http://127.0.0.1:8000...‘)

return srv

loop=asyncio.get_event_loop()

loop.run_until_complete(init(loop))

loop.run_forever()

#aiohttp的初始化函數init()也是一個coroutine,loop.create_server()則利用asyncio創建TCP服務

異步IO(協程,消息循環隊列)