1. 程式人生 > >python並行程式設計 - 非同步篇

python並行程式設計 - 非同步篇

目錄1

介紹篇
執行緒篇
程序篇
非同步篇2
GPU篇
分散式篇


介紹

除了線性並行執行模式外,還有非同步模式,它與事件程式設計一樣,十分重要
在併發的非同步模式中,不同的任務在時間線上是相互交錯的,而且一切都是在單一控制流(單執行緒)下進行的


1.asyncio (過時)

基本使用

1.1 使用asyncio實現事件迴圈管理

什麼是事件迴圈?
在計算系統中,能夠產生事件的實體被稱為事件源(event source),而負責協商管理事件的實體被稱為事件處理器(event handler)
它實現了管理計算程式碼中所有事件的功能:在程式執行期間事件迴圈不斷週期反覆,追蹤某個資料內部發生的事件,將其納入佇列,如果主執行緒空閒則呼叫事件處理器一個一個地處理這些事件

:事件迴圈不能使用@asyncio.coroutine標為協程

示例1:
延遲3秒後執行

import asyncio
import time

def A(x):
    print(x)
    time.sleep(1)   # 使用run_forever()不能用ayncio.sleep()延時
    loop.call_soon(B)
    print('c')

def B():
    print('b')
    loop.stop()


loop = asyncio.get_event_loop()
# loop.call_soon(A, 'a')
loop.
call_later(3.0, A, 'a') loop.run_forever() loop.close() print('end')

輸出:
a
c
b
end
在A()中再利用loop呼叫其它函式B()時,A也並不停下來,實現協程效果

1.2使用asyncio實現協程

什麼是協程?
當程式變得冗長複雜時,將其劃分成子例程的方式會使處理變得更加便利,每個子例程完成一個特定的任務
子例程無法獨立執行,只能在主程式的要求下才能執行,主程式負責協調子例程的使用,協程就是子例程的泛化。在協程中,可以暫停執行點,同時保持干預時的本地狀態,便於後續繼續執行
協程相互交錯的控制組件就是事件迴圈,事件迴圈追蹤全部的協程,並安排其執行時間

協程的其它重要特點:

  1. 協程支援多個進入點,可以多次生成(yield)
  2. 協程能夠執行轉移至任何其它協程

生成(yield)這個術語用於描述那些暫停並將控制流傳遞給另一個協程的協程,協程可以同時傳遞控制流和值

示例2:
A()和B()類似並行

import asyncio

@asyncio.coroutine
def A():
    print('a - start')
    yield from asyncio.sleep(1)
    print('a - end')

@asyncio.coroutine
def B(x):
    print('b - start')
    result = yield from C()
    print(x)
    yield from asyncio.sleep(1)
    print(f'b :{result}')

@asyncio.coroutine
def C():
    print('c - start')
    yield from asyncio.sleep(1)
    print('c - end')
    return 'this is C return'

loop = asyncio.get_event_loop()
# loop.run_until_complete(A())   # 只執行一個
# loop.run_until_complete(asyncio.wait([A(), B('d')]))  # 併發執行方法1
tasks = [asyncio.Task(A()), asyncio.Task(B('b - end'))]
loop.run_until_complete(asyncio.wait(tasks))            # 併發執行方法2
loop.close()
print('end')

# 類asyncio.Task(coroutine)用於排程協程的執行
# asyncio.wait(tasks)將等待給定協程執行完畢

輸出:
a - start
b - start
c - start
a - end
c - end
b - end
b :this is C return
end
分析:asyncio.sleep()期間,主執行緒並未等待,而是去執行EventLoop中可執行的coroutine

@asyncio.coroutine把一個generator標記為coroutine型別,再把這個coroutine放到EventLoop中執行(實測,可以不@標記)

相關方法

loop = get_event_loop() : 獲得當前上下文的事件迴圈
如果close()關閉了後,重新開啟需要以下操作:
loop = asyncio.new_event_loop() : 建立新的時間迴圈物件
asyncio.set_event_loop(loop) : 將當前上下文的時間迴圈設定為指定的迴圈

loop.call_soon(callback, *args) : 立即呼叫回撥物件,引數
loop.call_later(delay, callback, *args) : 延時delay秒後,呼叫回撥物件
loop.call_at(when, callback, *args) : 在指定的時間呼叫回撥物件,(when是絕對時間,可以參考loop.time()設定)

loop.run_forever() : 一直執行,直到呼叫stop()
loop.run_until_complete(future) : 執行指定的協程函式(Future3

loop.time() : 獲取事件迴圈的內部時鐘
loop.close() : 關閉事件迴圈
loop.is_running() : 是否執行中
loop.is_close() : 是否關閉

使用示例

示例:
非同步網路並行訪問

import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.baidu.com', 'www.aliyun.com', 'www.qq.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

2.async/await

2.1 客戶端使用

為了簡化標識非同步io,python3.5引入新語法asyncawait
只需將2步替換:

  1. asyncio.coroutine -> async
  2. yield from -> await

示例:

import asyncio

@asyncio.coroutine
def A():
    print('a')
    yield from asyncio.sleep(1)
    print('c')

loop = asyncio.get_event_loop()
tasks = [asyncio.Task(A())]
loop.run_until_complete(asyncio.wait(tasks))            # 併發執行方法2
loop.close()
print('end')

替換為:

import asyncio

async def A():
    print('a')
    await asyncio.sleep(1)
    print('c')

loop = asyncio.get_event_loop()
tasks = [asyncio.Task(A())]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print('end')

2.2 伺服器端使用

asyncio可以實現單執行緒併發io操作,如果僅用於客戶端,效果不大
可以用在伺服器端,由於HTTP連線就是io操作,因此可以使用單執行緒+協程實現多使用者的高併發

asyncio實現了TCP、UDP、SSL等協議,aiohttp則是基於asyncio實現的HTTP框架

示例:
啟動一個web服務,通過瀏覽器訪問localhost:8000

import asyncio

from aiohttp import web

async def index(request):
    await asyncio.sleep(0.5)
    return web.Response(body='<h1>Index</h1>')

async def hello(request):
    await asyncio.sleep(0.5)
    text = '<h1>hello, %s!</h1>' % request.match_info['name']
    return web.Response(body=text)

async def init(loop):
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', index)
    app.router.add_route('GET', '/hello/{name}', hello)
    srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
    print('Server started at http://127.0.0.1:8000...')
    return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()


  1. 參考書籍:《Python並行程式設計手冊》 ↩︎

  2. 這篇主要參考:廖雪峰 - asyncio:https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/00143208573480558080fa77514407cb23834c78c6c7309000 ↩︎

  3. Future:是Asyncio的一個類,與concurrent.futures.Futures非常相似,Futures類代表一個還不可用的結果,它是對尚未完成的任務的抽象表示;Python 3.2引入concurrent.futures模組,支援管理併發程式設計任務,如程序池和執行緒池、非確定性執行流、多程序、執行緒同步(這個目前沒看出有什麼特別的,池化管理不是多執行緒和多程序庫自帶嗎?concurrent.futures.ProcessPoolExecutor↩︎