1. 程式人生 > >IO多路復用/基於IO多路復用+socket實現並發請求/協程

IO多路復用/基於IO多路復用+socket實現並發請求/協程

所有 remove 告訴 安全 pso rgs 一個 epo 新的

http://www.cnblogs.com/alex3714/articles/5876749.html

http://www.cnblogs.com/Eva-J/articles/8324837.html

http://www.cnblogs.com/linhaifeng/articles/6817679.html

一.IO多路復用

1.IO多路復用作用:檢測多個socket是否已經發生變化(是否已經連接成功/是否已經獲取數據)(可讀/可寫)

2.select,poll,epoll都是I/O多路復用的具體的實現,之所以有這三個鬼東西的存在,其實是他們出現是有先後順序的

(1).I/O多路復用這個概念被提出來以後,select是第一個實現的

  select會修改傳入的參數數組,只能監聽1024個鏈接

   select不是線程安全的

(2).14年以後,一幫人又實現了poll,poll修復了select的很多問題

  poll去掉了1024個鏈接的限制,於是要多少鏈接吶,你開心就好

  poll從設計上來說,不再修改傳入的數據,不過這個要看你的平臺,所以行走江湖,小心為妙

(3).2002年,大設備部Davide Libenzi實現了epoll

  epoll可以說是I/O多路復用最新的實現,epoll修復了poll和select絕大部分問題

  epoll現在是線程安全的

  epoll現在不僅告訴你sock組裏有數據,還會告訴你哪個sock組有數據

比較坑爹的是epoll只用Linux提供支持,默認集成到了Linux內核中

Windows Python:
    提供:select

Mac Python:
    提供:select

Linux Python:
    提供:select,poll,epoll

對於select操作:

句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超時時間)

參數: 可接受四個參數(前三個必須)
返回值:三個列表

select方法用來監視文件句柄,如果句柄發生變化,則獲取該句柄。
1、當 參數1 序列中的句柄發生可讀時(accetp和read),則獲取發生變化的句柄並添加到 返回值1 序列中 2、當 參數2 序列中含有句柄時,則將該序列中所有的句柄添加到 返回值2 序列中 3、當 參數3 序列中的句柄發生錯誤時,則將該發生錯誤的句柄添加到 返回值3 序列中 4、當 超時時間 未設置,則select會一直阻塞,直到監聽的句柄發生變化 當 超時時間 = 1時,那麽如果監聽的句柄均無任何變化,則select會阻塞 1 秒,之後返回三個空列表,如果監聽的句柄有變化,則直接執行。

2.基於I/O多路復用+socket實現並發請求(一個線程100個請求)

應用功能:I/O多路復用;socket非阻塞

(1).以前所使用的socket發送請求:

技術分享圖片
import socket
import requests

# 方式一
ret = requests.get(https://www.baidu.com/s?wd=alex)

# 方式二
client = socket.socket()

# 百度創建連接: 阻塞
client.connect((www.baidu.com,80))

# 問百度我要什麽?
client.sendall(bGET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n)    # http協議

# 我等著接收百度給我的回復
chunk_list = []
while True:
    chunk = client.recv(8096)
    if not chunk:
        break
    chunk_list.append(chunk)

body = b‘‘.join(chunk_list)
print(body.decode(utf-8))
View Code 技術分享圖片
import socket
import requests
##### 解決並發:單線程 #####
# 方式一
key_list = [alex,db,sb]
for item in key_list:
    ret = requests.get(https://www.baidu.com/s?wd=%s %item)

# 方式二
def get_data(key):
    # 方式二
    client = socket.socket()

    # 百度創建連接: 阻塞
    client.connect((www.baidu.com,80))

    # 問百度我要什麽?
    client.sendall(bGET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n)

    # 我等著接收百度給我的回復
    chunk_list = []
    while True:
        chunk = client.recv(8096)
        if not chunk:
            break
        chunk_list.append(chunk)

    body = b‘‘.join(chunk_list)
    print(body.decode(utf-8))

key_list = [alex,db,sb]
for item in key_list:
    get_data(item)


##### 解決並發:多線程 #####
import threading

key_list = [alex,db,sb]
for item in key_list:
    t = threading.Thread(target=get_data,args=(item,))
    t.start()

# #################### 解決並發:單線程+IO不等待 ####################
# IO請求?
# 數據回來了?
向百度發送請求搜索三個關鍵字

(2).使用I/O多路復用+socket實現

技術分享圖片
import socket

client = socket.socket()
client.setblocking(False) # 將原來阻塞的位置變成非阻塞(報錯)
# 百度創建連接: 阻塞

try:
    client.connect((www.baidu.com,80)) # 執行了但報錯了
except BlockingIOError as e:
    pass

# 檢測到已經連接成功

# 問百度我要什麽?
client.sendall(bGET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n)

# 我等著接收百度給我的回復
chunk_list = []
while True:
    chunk = client.recv(8096) # 將原來阻塞的位置變成非阻塞(報錯)
    if not chunk:
        break
    chunk_list.append(chunk)

body = b‘‘.join(chunk_list)
print(body.decode(utf-8))
前戲(會報錯) 技術分享圖片
import socket
import select

client1 = socket.socket()
client1.setblocking(False) # 百度創建連接: 非阻塞

try:
    client1.connect((www.baidu.com,80))
except BlockingIOError as e:
    pass


client2 = socket.socket()
client2.setblocking(False) # 搜狗創建連接: 非阻塞
try:
    client2.connect((www.sogou.com,80))
except BlockingIOError as e:
    pass


client3 = socket.socket()
client3.setblocking(False) # 淘寶創建連接: 非阻塞
try:
    client3.connect((www.taobao.com,80))
except BlockingIOError as e:
    pass

socket_list = [client1,client2,client3]
conn_list = [client1,client2,client3]

while True:
    rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005)
    # wlist中表示已經連接成功的socket對象
    for sk in wlist:
        if sk == client1:
            sk.sendall(bGET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n)
        elif sk==client2:
            sk.sendall(bGET /web?query=fdf HTTP/1.0\r\nhost:www.sogou.com\r\n\r\n)
        else:
            sk.sendall(bGET /s?wd=alex HTTP/1.0\r\nhost:www.taobao.com\r\n\r\n)
        conn_list.remove(sk)
    for sk in rlist:
        chunk_list = []
        while True:
            try:
                chunk = sk.recv(8096)
                if not chunk:
                    break
                chunk_list.append(chunk)
            except BlockingIOError as e:
                break
        body = b‘‘.join(chunk_list)
        # print(body.decode(‘utf-8‘))
        print(------------>,body)
        sk.close()
        socket_list.remove(sk)
    if not socket_list:
        break
完美(單線程的並發) 技術分享圖片
import socket
import select

class Req(object):
    def __init__(self,sk,func):
        self.sock = sk
        self.func = func

    def fileno(self):
        return self.sock.fileno()


class Nb(object):

    def __init__(self):
        self.conn_list = []
        self.socket_list = []

    def add(self,url,func):
        client = socket.socket()
        client.setblocking(False)  # 非阻塞
        try:
            client.connect((url, 80))
        except BlockingIOError as e:
            pass
        obj = Req(client,func)
        self.conn_list.append(obj)
        self.socket_list.append(obj)

    def run(self):

        while True:
            rlist,wlist,elist = select.select(self.socket_list,self.conn_list,[],0.005)
            # wlist中表示已經連接成功的req對象
            for sk in wlist:
                # 發生變換的req對象
                sk.sock.sendall(bGET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n)
                self.conn_list.remove(sk)
            for sk in rlist:
                chunk_list = []
                while True:
                    try:
                        chunk = sk.sock.recv(8096)
                        if not chunk:
                            break
                        chunk_list.append(chunk)
                    except BlockingIOError as e:
                        break
                body = b‘‘.join(chunk_list)
                # print(body.decode(‘utf-8‘))
                sk.func(body)
                sk.sock.close()
                self.socket_list.remove(sk)
            if not self.socket_list:
                break


def baidu_repsonse(body):
    print(百度下載結果:,body)

def sogou_repsonse(body):
    print(搜狗下載結果:, body)

def cnblogs_repsonse(body):
    print(博客園下載結果:, body)


t1 = Nb()
t1.add(www.baidu.com,baidu_repsonse)
t1.add(www.sogou.com,sogou_repsonse)
t1.add(www.cnblogs.com,cnblogs_repsonse)
t1.run()
高級版(線程的並發)

(3).基於事件循環實現的異步非阻塞框架:yfz

11111

IO多路復用/基於IO多路復用+socket實現並發請求/協程