IO多路復用/基於IO多路復用+socket實現並發請求/協程
http://www.cnblogs.com/alex3714/articles/5876749.html
http://www.cnblogs.com/Eva-J/articles/8324837.html
http://www.cnblogs.com/linhaifeng/articles/6817679.html
一.IO多路復用
1.IO多路復用作用:檢測多個socket是否已經發生變化(是否已經連接成功/是否已經獲取數據)(可讀/可寫)
2.select,poll,epoll都是I/O多路復用的具體的實現,之所以有這三個鬼東西的存在,其實是他們出現是有先後順序的
(1).I/O多路復用這個概念被提出來以後,select是第一個實現的
select會修改傳入的參數數組,只能監聽1024個鏈接
select不是線程安全的
(2).14年以後,一幫人又實現了poll,poll修復了select的很多問題
poll去掉了1024個鏈接的限制,於是要多少鏈接吶,你開心就好
poll從設計上來說,不再修改傳入的數據,不過這個要看你的平臺,所以行走江湖,小心為妙
(3).2002年,大設備部Davide Libenzi實現了epoll
epoll可以說是I/O多路復用最新的實現,epoll修復了poll和select絕大部分問題
epoll現在是線程安全的
epoll現在不僅告訴你sock組裏有數據,還會告訴你哪個sock組有數據
比較坑爹的是epoll只用Linux提供支持,默認集成到了Linux內核中
Windows Python:
提供:select
Mac Python:
提供:select
Linux Python:
提供:select,poll,epoll
對於select操作:
句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超時時間) 參數: 可接受四個參數(前三個必須) 返回值:三個列表 select方法用來監視文件句柄,如果句柄發生變化,則獲取該句柄。1、當 參數1 序列中的句柄發生可讀時(accetp和read),則獲取發生變化的句柄並添加到 返回值1 序列中 2、當 參數2 序列中含有句柄時,則將該序列中所有的句柄添加到 返回值2 序列中 3、當 參數3 序列中的句柄發生錯誤時,則將該發生錯誤的句柄添加到 返回值3 序列中 4、當 超時時間 未設置,則select會一直阻塞,直到監聽的句柄發生變化 當 超時時間 = 1時,那麽如果監聽的句柄均無任何變化,則select會阻塞 1 秒,之後返回三個空列表,如果監聽的句柄有變化,則直接執行。
2.基於I/O多路復用+socket實現並發請求(一個線程100個請求)
應用功能:I/O多路復用;socket非阻塞
(1).以前所使用的socket發送請求:
import socket import requests # 方式一 ret = requests.get(‘https://www.baidu.com/s?wd=alex‘) # 方式二 client = socket.socket() # 百度創建連接: 阻塞 client.connect((‘www.baidu.com‘,80)) # 問百度我要什麽? client.sendall(b‘GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n‘) # http協議 # 我等著接收百度給我的回復 chunk_list = [] while True: chunk = client.recv(8096) if not chunk: break chunk_list.append(chunk) body = b‘‘.join(chunk_list) print(body.decode(‘utf-8‘))View Code
import socket import requests ##### 解決並發:單線程 ##### # 方式一 key_list = [‘alex‘,‘db‘,‘sb‘] for item in key_list: ret = requests.get(‘https://www.baidu.com/s?wd=%s‘ %item) # 方式二 def get_data(key): # 方式二 client = socket.socket() # 百度創建連接: 阻塞 client.connect((‘www.baidu.com‘,80)) # 問百度我要什麽? client.sendall(b‘GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n‘) # 我等著接收百度給我的回復 chunk_list = [] while True: chunk = client.recv(8096) if not chunk: break chunk_list.append(chunk) body = b‘‘.join(chunk_list) print(body.decode(‘utf-8‘)) key_list = [‘alex‘,‘db‘,‘sb‘] for item in key_list: get_data(item) ##### 解決並發:多線程 ##### import threading key_list = [‘alex‘,‘db‘,‘sb‘] for item in key_list: t = threading.Thread(target=get_data,args=(item,)) t.start() # #################### 解決並發:單線程+IO不等待 #################### # IO請求? # 數據回來了?向百度發送請求搜索三個關鍵字
(2).使用I/O多路復用+socket實現
import socket client = socket.socket() client.setblocking(False) # 將原來阻塞的位置變成非阻塞(報錯) # 百度創建連接: 阻塞 try: client.connect((‘www.baidu.com‘,80)) # 執行了但報錯了 except BlockingIOError as e: pass # 檢測到已經連接成功 # 問百度我要什麽? client.sendall(b‘GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n‘) # 我等著接收百度給我的回復 chunk_list = [] while True: chunk = client.recv(8096) # 將原來阻塞的位置變成非阻塞(報錯) if not chunk: break chunk_list.append(chunk) body = b‘‘.join(chunk_list) print(body.decode(‘utf-8‘))前戲(會報錯)
import socket import select client1 = socket.socket() client1.setblocking(False) # 百度創建連接: 非阻塞 try: client1.connect((‘www.baidu.com‘,80)) except BlockingIOError as e: pass client2 = socket.socket() client2.setblocking(False) # 搜狗創建連接: 非阻塞 try: client2.connect((‘www.sogou.com‘,80)) except BlockingIOError as e: pass client3 = socket.socket() client3.setblocking(False) # 淘寶創建連接: 非阻塞 try: client3.connect((‘www.taobao.com‘,80)) except BlockingIOError as e: pass socket_list = [client1,client2,client3] conn_list = [client1,client2,client3] while True: rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005) # wlist中表示已經連接成功的socket對象 for sk in wlist: if sk == client1: sk.sendall(b‘GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n‘) elif sk==client2: sk.sendall(b‘GET /web?query=fdf HTTP/1.0\r\nhost:www.sogou.com\r\n\r\n‘) else: sk.sendall(b‘GET /s?wd=alex HTTP/1.0\r\nhost:www.taobao.com\r\n\r\n‘) conn_list.remove(sk) for sk in rlist: chunk_list = [] while True: try: chunk = sk.recv(8096) if not chunk: break chunk_list.append(chunk) except BlockingIOError as e: break body = b‘‘.join(chunk_list) # print(body.decode(‘utf-8‘)) print(‘------------>‘,body) sk.close() socket_list.remove(sk) if not socket_list: break完美(單線程的並發)
import socket import select class Req(object): def __init__(self,sk,func): self.sock = sk self.func = func def fileno(self): return self.sock.fileno() class Nb(object): def __init__(self): self.conn_list = [] self.socket_list = [] def add(self,url,func): client = socket.socket() client.setblocking(False) # 非阻塞 try: client.connect((url, 80)) except BlockingIOError as e: pass obj = Req(client,func) self.conn_list.append(obj) self.socket_list.append(obj) def run(self): while True: rlist,wlist,elist = select.select(self.socket_list,self.conn_list,[],0.005) # wlist中表示已經連接成功的req對象 for sk in wlist: # 發生變換的req對象 sk.sock.sendall(b‘GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n‘) self.conn_list.remove(sk) for sk in rlist: chunk_list = [] while True: try: chunk = sk.sock.recv(8096) if not chunk: break chunk_list.append(chunk) except BlockingIOError as e: break body = b‘‘.join(chunk_list) # print(body.decode(‘utf-8‘)) sk.func(body) sk.sock.close() self.socket_list.remove(sk) if not self.socket_list: break def baidu_repsonse(body): print(‘百度下載結果:‘,body) def sogou_repsonse(body): print(‘搜狗下載結果:‘, body) def cnblogs_repsonse(body): print(‘博客園下載結果:‘, body) t1 = Nb() t1.add(‘www.baidu.com‘,baidu_repsonse) t1.add(‘www.sogou.com‘,sogou_repsonse) t1.add(‘www.cnblogs.com‘,cnblogs_repsonse) t1.run()高級版(線程的並發)
(3).基於事件循環實現的異步非阻塞框架:yfz
11111
IO多路復用/基於IO多路復用+socket實現並發請求/協程