1. 程式人生 > >Python基礎15 - 協程、異步IO

Python基礎15 - 協程、異步IO

lee 進程 遇到 lap nbsp gen 機制 等待 io操作

@@@文章內容參照老男孩教育 Alex金角大王,武Sir銀角大王@@@

一、協程

線程和進程的操作是由程序觸發系統接口,最後的執行者是系統;協程的操作則是程序員

協程,又稱微線程,纖程。英文名Coroutine。一句說明什麽是線程:協程是一種用戶態的輕量級線程

協程存在的意義:對於多線程應用,CPU通過切片的方式來切換線程間的執行,線程切換時需要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序

協程的適用場景:當程序中存在大量不需要CPU的操作時(IO),適用於協程

1、Greenlet

greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可以使你在任意函數之間隨意切換,而不需把這個函數先聲明為generator

 1 from greenlet import greenlet
 2 
 3 def test1():
 4     print(12)
 5     gr2.switch()
 6     print(34)
 7     gr2.switch()
 8 
 9 def test2():
10     print(56)
11     gr1.switch()
12     print(78)
13 
14 gr1 = greenlet(test1)
15 gr2 = greenlet(test2)
16 gr1.switch()

2、Gevent

Gevent是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是Greenlet,它是以C擴展模塊形式接入Python的輕量級協程。Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度

 1 import gevent
 2 
 3 def foo():
 4     print(foo)
 5     gevent.sleep(1)
 6     print(foo again)
 7 
 8 def bar():
 9     print(bar)
10     gevent.sleep(2)
11     print(bar again)
12 
13 
14 gevent.joinall([
15     gevent.spawn(foo),
16     gevent.spawn(bar),
17 ])

遇到IO阻塞時會自動切換任務

from urllib.request import
urlopen import gevent ,time from gevent import monkey monkey.patch_all() # 把當前程序的所有IO操作單獨做上標記 def f(url): print(GET: %s %url) resp = urlopen(url) data = resp.read() print(%d字節 …… %s%(len(data),url)) urls = [https://www.python.org/, https://www.github.com/, http://www.163.com, ] time_start = time.time() for url in urls: f(url) print(同步cost,time.time() - time_start) async_time_start = time.time() gevent.joinall([ gevent.spawn(f,https://www.python.org/), gevent.spawn(f,https://www.github.com/), gevent.spawn(f,http://www.163.com/), ]) print(異步cost,time.time() - async_time_start)

通過gevent實現單線程下的多socket並發

技術分享圖片
 1 # 服務器端
 2 import sys
 3 import socket
 4 import time
 5 import gevent
 6 from gevent import socket,monkey
 7 monkey.patch_all()
 8 
 9 def server(port):
10     s = socket.socket()
11     s.bind((0.0.0.0,port))
12     s.listen(500)
13     while True:
14         conn ,addr = s.accept()
15         gevent.spawn(handle_request,conn)
16 
17 def handle_request(conn):
18     try:
19         while True:
20             data = conn.recv(1024)
21             print(recv:,data.decode())
22             conn.send(data)
23             if not data:
24                 conn.shutdown(socket.SHUT_WR)
25     except Exception as ex:
26         print(ex)
27     finally:
28         conn.close()
29 
30 if __name__ == __main__:
31     server(8001)
Gevent_server
技術分享圖片
 1 import socket
 2 
 3 HOST = localhost
 4 PORT = 8001
 5 s = socket.socket()
 6 s.connect((HOST,PORT))
 7 while True:
 8     msg = input(>>:)
 9     s.send(msg.encode(utf-8))
10     data = s.recv(1024)
11     print(recv:,data.decode())
12 s.close()
Gevent_client 技術分享圖片
 1 import socket
 2 import threading
 3 
 4 def sock_conn():
 5     client = socket.socket()
 6     client.connect((localhost,8001))
 7     count = 0
 8     while True:
 9         client.send((hello %s%count).encode(utf-8))
10         data = client.recv(1024)
11         print([%s] recv from server:%threading.get_ident(),data.decode())
12         count += 1
13     client.close()
14 
15 for i in range(100):
16     t = threading.Thread(target=sock_conn)
17     t.start()
並發100個sock連接

二、事件驅動與異步IO

通常,我們寫服務器處理模型的程序時,有以下幾種模型:

(1)每收到一個請求,創建一個新的進程,來處理該請求;

(2)每收到一個請求,創建一個新的線程,來處理該請求;

(3)每收到一個請求,放入一個件列表,讓主進程通過非阻塞I/O方式來處理請求

上面的幾種方式,各有千秋,

第(1)種方式,由於創建新的進程的開銷比較大,所以,會導致服務器性能比較差,但實現比較簡單

第(2)種方式,由於要涉及到線程的同步,有可能會面臨死鎖等問題

第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都復雜。

綜合考慮各方面因素,一般普遍認為第(3)種方式是大多數網絡服務器采用的方式

事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特點是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。簡而言之,事件驅動分為二個部分:第一,註冊事件;第二,觸發事件。

另外兩種常見的編程範式是(單線程)同步以及多線程編程。

Select\Poll\Epoll異步IO

番外篇 http://www.cnblogs.com/alex3714/articles/5876749.html

Select
select最早於1983年出現在4.2BSD中,它通過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程可以獲得這些文件描述符從而進行後續的讀寫操作

select目前幾乎在所有的平臺上支持,其良好跨平臺支持也是它的一個優點,事實上從現在看來,這也是它所剩不多的優點之一

select的一個缺點在於單個進程能夠監視的文件描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改宏定義甚至重新編譯內核的方式提升這一限制

另外,select()所維護的存儲大量文件描述符的數據結構,隨著文件描述符數量的增大,其復制的開銷也線性增長。同時,由於網絡響應時間的延遲使得大量TCP連接處於非活躍狀態,但調用select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷

Poll
poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差別,但是poll沒有最大文件描述符數量的限制

poll和select同樣存在一個缺點就是,包含大量文件描述符的數組被整體復制於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨著文件描述符數量的增加而線性增大。

另外,select()和poll()將就緒的文件描述符告訴進程後,如果進程沒有對其進行IO操作,那麽下次調用select()和poll()的時候將再次報告這些文件描述符,所以它們一般不會丟失就緒的消息,這種方式稱為水平觸發(Level Triggered)

Epoll
直到Linux2.6才出現了由內核直接支持的實現方法,那就是epoll,它幾乎具備了之前所說的一切優點,被公認為Linux2.6下性能最好的多路I/O就緒通知方法

epoll可以同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那麽它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些,但是代碼實現相當復雜

epoll同樣只告知那些就緒的文件描述符,而且當我們調用epoll_wait()獲得就緒文件描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的文件描述符即可,這裏也使用了內存映射(mmap)技術,這樣便徹底省掉了這些文件描述符在系統調用時復制的開銷

另一個本質的改進在於epoll采用基於事件的就緒通知方式。在select/poll中,進程只有在調用一定的方法後,內核才對所有監視的文件描述符進行掃描,而epoll事先通過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會采用類似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便得到通知

Python select

Python的select()方法直接調用操作系統的IO接口,它監控sockets,open files, and pipes(所有帶fileno()方法的文件句柄)何時變成readable 和writeable, 或者通信錯誤,select()使得同時監控多個連接變的簡單,並且這比寫一個長循環來等待和監控多客戶端連接要高效,因為select直接通過操作系統提供的C的網絡接口進行操作,而不是通過Python的解釋器

技術分享圖片
 1 import select
 2 import socket
 3 import queue
 4 
 5 server = socket.socket()
 6 server.bind((localhost,8001))
 7 server.listen(1000)
 8 
 9 server.setblocking(False) # 不阻塞
10 
11 msg_dic = {}
12 inputs = [server,]
13 outputs = []
14 while True:
15     readable, writeable, exceptional = select.select(inputs, outputs, inputs)
16     for r in readable:
17         if r is server: # 代表來了一個新連接
18             conn,addr = r.accept()
19             print(來了個新連接,addr)
20             inputs.append(conn)
21             msg_dic[conn] = queue.Queue() # 初始化一個隊列,後面存要返回給這個客戶端的數據
22         else:
23             try:
24                 data = r.recv(1024)
25                 print(收到數據,data)
26                 msg_dic[r].put(data)
27                 outputs.append(r) # 放入返回的連接隊列裏
28             except:break
29 
30     for w in writeable: # 要返回給客戶端的連接列表
31         data_to_client = msg_dic[w].get()
32         w.send(data_to_client) # 返回給客戶端數據
33         outputs.remove(w) # 確保下次循環的時候,不返回這個已經處理完的連接
34 
35     for e in exceptional:
36         if e in outputs:
37             outputs.remove(e)
38         inputs.remove(e)
39         del msg_dic[e]
select_socket_server 技術分享圖片
 1 import socket
 2 import sys
 3 
 4 messages = [bThis is the message. ,
 5             bIt will be sent ,
 6             bin parts.,
 7             ]
 8 server_address = (127.0.0.1, 8001)
 9 
10 socks = [socket.socket() for i in range(100)]
11 
12 print (sys.stderr, connecting to %s port %s % server_address)
13 for s in socks:
14     s.connect(server_address)
15 
16 for message in messages:
17 
18     for s in socks:
19         # print >> sys.stderr, ‘%s: sending "%s"‘ % (s.getsockname(), message)
20         s.send(message)
21 
22     for s in socks:
23         data = s.recv(1024)
24         # print >> sys.stderr, ‘%s: received "%s"‘ % (s.getsockname(), data)
25         if not data:
26             print (sys.stderr, closing socket, s.getsockname())
27             s.close()
select_socket_client

selectors模塊

該模塊基於select模塊原語構建的高級別和高效的/輸出多路復用。推薦用戶使用這個模塊,除非他們希望對os級別原語進行精確控制

 1 import selectors
 2 import socket
 3 
 4 sel = selectors.DefaultSelector()
 5 
 6 def accept(sock, mask):
 7     conn, addr = sock.accept()
 8     print(addr)
 9     conn.setblocking(False)
10     sel.register(conn, selectors.EVENT_READ, read)
11 
12 def read(conn, mask):
13     data = conn.recv(1024)
14     if data:
15         print(recv:,data)
16         conn.send(data)
17     else:
18         print(closing,conn)
19         sel.unregister(conn)
20         conn.close()
21 
22 sock = socket.socket()
23 sock.bind((localhost,8001))
24 sock.listen(100)
25 sock.setblocking(False)
26 sel.register(sock, selectors.EVENT_READ ,accept)
27 
28 while True:
29     events = sel.select()
30     for key, mask in events:
31         callback = key.data
32         callback(key.fileobj, mask)

Python基礎15 - 協程、異步IO