1. 程式人生 > >python的併發和非同步程式設計例項

python的併發和非同步程式設計例項

關於併發、並行、同步阻塞、非同步非阻塞、執行緒、程序、協程等這些概念,單純通過文字恐怕很難有比較深刻的理解,本文就通過程式碼一步步實現這些併發和非同步程式設計,並進行比較。直譯器方面本文選擇python3,畢竟python3才是python的未來,並且python3用原生的庫實現協程已經非常方便了。
1、準備階段
下面為所有測試程式碼所需要的包

#! python3
# coding:utf-8

import socket
from concurrent import futures
from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ
import
asyncio import aiohttp import time from time import ctime

在進行不同實現方式的比較時,實現場景就是在進行爬蟲開發的時候通過向對方網站發起一系列的http請求訪問,統計耗時來判斷實現方式的優劣,具體地,通過建立通訊套接字,訪問新浪主頁,返回原始碼,作為一次請求。先實現一個裝飾器用來統計函式的執行時間:

def tsfunc(func):
    def wrappedFunc(*args,**kargs):
        start = time.clock()
        action = func(*args,**kargs)
        time_delta = time.clock() - start
        print
('[{0}] {1}() called, time delta: {2}'.format(ctime(),func.__name__,time_delta)) return action return wrappedFunc

輸出的格式為:當前時間,呼叫的函式,函式的執行時間。
2、阻塞/非阻塞和同步/非同步
這兩對概念不是很好區分,從定義上理解:
阻塞:在進行socket通訊過程中,一個執行緒發起請求,如果當前請求沒有返回結果,則進入sleep狀態,期間執行緒掛起不能做其他操作,直到有返回結果,或者超時(如果設定超時的話)。
非阻塞:與阻塞相似,只不過在等待請求結果時,執行緒並不掛起而是進行其他操作,即在不能立刻得到結果之前,該函式不會阻掛起當前執行緒,而會立刻返回。
同步:同步和阻塞比較相似,但是二者並不是同一個概念,同步是指完成事件的邏輯,是指一件事完成之後,再完成第二件事,以此類推…
非同步:非同步和非阻塞比較類似,非同步的概念和同步相對。當一個非同步過程呼叫發出後,呼叫者不能立刻得到結果。實際處理這個呼叫的部件在完成後,通過狀態、通知和回撥來通知呼叫者,實現非同步的方式通俗講就是“等會再告訴你”。
1)阻塞方式
回到程式碼上,首先實現阻塞方式的請求函式:

def blocking_way():
    sock = socket.socket()
    sock.connect(('www.sina.com',80))
    request = 'GET / HTTP/1.0\r\nHOST:www.sina.com\r\n\r\n'
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)
    return response

測試執行緒、多程序和多執行緒

# 阻塞無併發
@tsfunc
def sync_way():
    res = []
    for i in range(10):
        res.append(blocking_way())
    return len(res)
@tsfunc
# 阻塞、多程序
def process_way():
    worker = 10
    with futures.ProcessPoolExecutor(worker) as executor:
        futs = {executor.submit(blocking_way) for i in range(10)}
    return len([fut.result() for fut in futs])
# 阻塞、多執行緒
@tsfunc
def thread_way():
    worker = 10
    with futures.ThreadPoolExecutor(worker) as executor:
        futs = {executor.submit(blocking_way) for i in range(10)}
    return len([fut.result() for fut in futs])

執行結果:

[Wed Dec 13 16:52:25 2017] sync_way() called, time delta: 0.06371647809425328
[Wed Dec 13 16:52:28 2017] process_way() called, time delta: 2.31437644946734
[Wed Dec 13 16:52:28 2017] thread_way() called, time delta: 0.010172946070299727

可見與非併發的方式相比,啟動10個程序完成10次請求訪問耗費的時間最長,程序確實需要很大的系統開銷,相比多執行緒則效果好得多,啟動10個執行緒併發請求,比順序請求速度快了6倍左右。
2)非阻塞方式
實現非阻塞的請求程式碼,與阻塞方式的區別在於等待請求時並不掛起而是直接返回,為了確保能正確讀取訊息,最原始的方式就是迴圈讀取,知道讀取完成為跳出迴圈,程式碼如下:

def nonblocking_way():
    sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sock.setblocking(False)
    try:
        sock.connect(('www.sina.com', 80))
    except BlockingIOError:
        pass
    request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'
    data = request.encode('ascii')
    while True:
        try:
            sock.send(data)
            break
        except OSError:
            pass

    response = b''
    while True:
        try:
            chunk = sock.recv(4096)
            while chunk:
                response += chunk
                chunk = sock.recv(4096)
            break
        except OSError:
            pass

    return response

測試單執行緒非同步非阻塞方式:

@tsfunc
def async_way():
    res = []
    for i in range(10):
        res.append(nonblocking_way())
    return len(res)

測試結果與單執行緒同步阻塞方式相比:

[Wed Dec 13 17:18:30 2017] sync_way() called, time delta: 0.07342884475822574
[Wed Dec 13 17:18:30 2017] async_way() called, time delta: 0.06509009095694886

非阻塞方式起到了一定的效果,但是並不明顯,原因肯定是讀取訊息的時候雖然不是線上程掛起的時候而是在迴圈讀取訊息的時候浪費了時間,如果大部分時間讀浪費了並沒有發揮非同步程式設計的威力,解決的辦法就是後面要說的【事件驅動】
3、回撥、生成器和協程
a、回撥

class Crawler():
    def __init__(self,url):
        self.url = url
        self.sock = None
        self.response = b''

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('www.sina.com',80))
        except BlockingIOError:
            pass
        selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)

    def connected(self,key,mask):
        selector.unregister(key.fd)
        get = 'GET {0} HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url)
        self.sock.send(get.encode('ascii'))
        selector.register(key.fd,EVENT_READ,self.read_response)

    def read_response(self,key,mask):
        global stopped
        while True:
            try:
                chunk = self.sock.recv(4096)
                if chunk:
                    self.response += chunk
                    chunk = self.sock.recv(4096)
                else:
                    selector.unregister(key.fd)
                    urls_todo.remove(self.url)
                    if not urls_todo:
                        stopped = True
                break
            except:
                pass

def loop():
    while not stopped:
        events = selector.select()
        for event_key,event_mask in events:
            callback = event_key.data
            callback(event_key,event_mask)
 @tsfunc
def callback_way():
    for url in urls_todo:
        crawler = Crawler(url)
        crawler.fetch()
    loop1()

這是通過傳統回撥方式實現的非同步程式設計,結果如下:
[Tue Mar 27 17:52:49 2018] callback_way() called, time delta: 0.054735804048789374
b、生成器

class Crawler2:
    def __init__(self, url):
        self.url = url
        self.response = b''

    def fetch(self):
        global stopped
        sock = socket.socket()
        yield from connect(sock, ('www.sina.com', 80))
        get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url)
        sock.send(get.encode('ascii'))
        self.response = yield from read_all(sock)
        urls_todo.remove(self.url)
        if not urls_todo:
            stopped = True

class Task:
    def __init__(self, coro):
        self.coro = coro
        f = Future1()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            # send會進入到coro執行, 即fetch, 直到下次yield
            # next_future 為yield返回的物件
            next_future = self.coro.send(future.result)
        except StopIteration:
            return
        next_future.add_done_callback(self.step)

def loop1():
    while not stopped:
        events = selector.select()
        for event_key,event_mask in events:
            callback = event_key.data
            callback()

執行結果如下:
[Tue Mar 27 17:54:27 2018] generate_way() called, time delta: 0.2914336347673473

c、協程

def nonblocking_way():
    sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sock.setblocking(False)
    try:
        sock.connect(('www.sina.com', 80))
    except BlockingIOError:
        pass
    request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'
    data = request.encode('ascii')
    while True:
        try:
            sock.send(data)
            break
        except OSError:
            pass

    response = b''
    while True:
        try:
            chunk = sock.recv(4096)
            while chunk:
                response += chunk
                chunk = sock.recv(4096)
            break
        except OSError:
            pass

    return response
@tsfunc
def asyncio_way():
       tasks = [fetch(host+url) for url in urls_todo]
       loop.run_until_complete(asyncio.gather(*tasks))
       return (len(tasks))

執行結果:
[Tue Mar 27 17:56:17 2018] asyncio_way() called, time delta: 0.43688060698484166

到此終於把併發和非同步程式設計例項程式碼測試完,下邊貼出全部程式碼,共讀者自行測試,在任務量加大時,相信結果會大不一樣。

#! python3
# coding:utf-8

import socket
from concurrent import futures
from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ
import asyncio
import aiohttp
import time
from time import ctime

def tsfunc(func):
    def wrappedFunc(*args,**kargs):
        start = time.clock()
        action = func(*args,**kargs)
        time_delta = time.clock() - start
        print ('[{0}] {1}() called, time delta: {2}'.format(ctime(),func.__name__,time_delta))
        return action
    return wrappedFunc

def blocking_way():
    sock = socket.socket()
    sock.connect(('www.sina.com',80))
    request = 'GET / HTTP/1.0\r\nHOST:www.sina.com\r\n\r\n'
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)
    return response

def nonblocking_way():
    sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sock.setblocking(False)
    try:
        sock.connect(('www.sina.com', 80))
    except BlockingIOError:
        pass
    request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'
    data = request.encode('ascii')
    while True:
        try:
            sock.send(data)
            break
        except OSError:
            pass

    response = b''
    while True:
        try:
            chunk = sock.recv(4096)
            while chunk:
                response += chunk
                chunk = sock.recv(4096)
            break
        except OSError:
            pass

    return response


selector = DefaultSelector()
stopped = False
urls_todo = ['/','/1','/2','/3','/4','/5','/6','/7','/8','/9']


class Crawler():
    def __init__(self,url):
        self.url = url
        self.sock = None
        self.response = b''

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('www.sina.com',80))
        except BlockingIOError:
            pass
        selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)

    def connected(self,key,mask):
        selector.unregister(key.fd)
        get = 'GET {0} HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url)
        self.sock.send(get.encode('ascii'))
        selector.register(key.fd,EVENT_READ,self.read_response)

    def read_response(self,key,mask):
        global stopped
        while True:
            try:
                chunk = self.sock.recv(4096)
                if chunk:
                    self.response += chunk
                    chunk = self.sock.recv(4096)
                else:
                    selector.unregister(key.fd)
                    urls_todo.remove(self.url)
                    if not urls_todo:
                        stopped = True
                break
            except:
                pass

def loop():
    while not stopped:
        events = selector.select()
        for event_key,event_mask in events:
            callback = event_key.data
            callback(event_key,event_mask)


# 基於生成器的協程
class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self,fn):
        self._callbacks.append(fn)

    def set_result(self,result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

class Crawler1():
    def __init__(self,url):
        self.url = url
        self.response = b''

    def fetch(self):
        sock = socket.socket()
        sock.setblocking(False)
        try:
            sock.connect(('www.sina.com',80))
        except BlockingIOError:
            pass

        f = Future()
        def on_connected():
            f.set_result(None)

        selector.register(sock.fileno(),EVENT_WRITE,on_connected)
        yield f
        selector.unregister(sock.fileno())
        get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url)
        sock.send(get.encode('ascii'))

        global stopped
        while True:
            f = Future()
            def on_readable():
                f.set_result(sock.recv(4096))
            selector.register(sock.fileno(),EVENT_READ,on_readable)
            chunk = yield f
            selector.unregister(sock.fileno())
            if chunk:
                self.response += chunk
            else:
                urls_todo.remove(self.url)
                if  not urls_todo:
                    stopped = True
                break


# yield from 改進的生成器協程
class Future1:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self,fn):
        self._callbacks.append(fn)

    def set_result(self,result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

    def __iter__(self):
        yield self
        return self.result

def connect(sock, address):
    f = Future1()
    sock.setblocking(False)
    try:
        sock.connect(address)
    except BlockingIOError:
        pass

    def on_connected():
        f.set_result(None)

    selector.register(sock.fileno(), EVENT_WRITE, on_connected)
    yield from f
    selector.unregister(sock.fileno())

def read(sock):
    f = Future1()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield from f
    selector.unregister(sock.fileno())
    return chunk

def read_all(sock):
    response = []
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)
    return b''.join(response)

class Crawler2:
    def __init__(self, url):
        self.url = url
        self.response = b''

    def fetch(self):
        global stopped
        sock = socket.socket()
        yield from connect(sock, ('www.sina.com', 80))
        get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url)
        sock.send(get.encode('ascii'))
        self.response = yield from read_all(sock)
        urls_todo.remove(self.url)
        if not urls_todo:
            stopped = True


class Task:
    def __init__(self, coro):
        self.coro = coro
        f = Future1()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            # send會進入到coro執行, 即fetch, 直到下次yield
            # next_future 為yield返回的物件
            next_future = self.coro.send(future.result)
        except StopIteration:
            return
        next_future.add_done_callback(self.step)

def loop1():
    while not stopped:
        events = selector.select()
        for event_key,event_mask in events:
            callback = event_key.data
            callback()


# asyncio 協程
host = 'http://www.sina.com'
loop = asyncio.get_event_loop()

async def fetch(url):
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get(url) as response:
            response = await response.read()
            return response

@tsfunc
def asyncio_way():
       tasks = [fetch(host+url) for url in urls_todo]
       loop.run_until_complete(asyncio.gather(*tasks))
       return (len(tasks))

@tsfunc
def sync_way():
    res = []
    for i in range(10):
        res.append(blocking_way())
    return len(res)

@tsfunc
def process_way():
    worker = 10
    with futures.ProcessPoolExecutor(worker) as executor:
        futs = {executor.submit(blocking_way) for i in range(10)}
    return len([fut.result() for fut in futs])

@tsfunc
def thread_way():
    worker = 10
    with futures.ThreadPoolExecutor(worker) as executor:
        futs = {executor.submit(blocking_way) for i in range(10)}
    return len([fut.result() for fut in futs])

@tsfunc
def async_way():
    res = []
    for i in range(10):
        res.append(nonblocking_way())
    return len(res)

@tsfunc
def callback_way():
    for url in urls_todo:
        crawler = Crawler(url)
        crawler.fetch()
    loop1()

@tsfunc
def generate_way():
    for url in urls_todo:
        crawler = Crawler2(url)
        Task(crawler.fetch())
    loop1()

if __name__ == '__main__':

    #sync_way()
    #process_way()
    #thread_way()
    #async_way()
    #callback_way()
    #generate_way()
    asyncio_way()