1. 程式人生 > >Day038--Python--Gevent , IO多路複用

Day038--Python--Gevent , IO多路複用

1. 協程: 

  gevent  (遇到IO自動切換)

import gevent
import time
from gevent import monkey; monkey.patch_all()  # ;相當於換行

def eat(name):
    print('%s eat 1' % name)
    # gevent.sleep(1)
    time.sleep(2)    # gevent 不能識別time.sleep, from gevent import monkey; monkey.patch_all()可解決這個問題, 之後就可以使用time.sleep()了
print('%s eat 2' % name) def play(name): print('%s play 1' % name) # gevent.sleep(1) time.sleep(2) print('%s play 2' % name) g1 = gevent.spawn(eat, 'alex') g2 = gevent.spawn(play, name='sylar') # g1.join() # g2.join() gevent.joinall([g1, g2]) print('')

 

  gevent 之同步與非同步

from gevent import spawn,joinall,monkey;monkey.patch_all()

import time
def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print('Task %s done' % pid)


def synchronous():
    for i in range(10):
        task(i)

def asynchronous():
    g_l=[spawn(task,i) for
i in range(10)] joinall(g_l) if __name__ == '__main__': print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()
View Code

 

 

  gevent 應用列舉:

from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time

def get_page(url):
    print('GET: %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        print('%d bytes received from %s' %(len(response.text),url))


start_time=time.time()
gevent.joinall([
    gevent.spawn(get_page,'https://www.python.org/'),
    gevent.spawn(get_page,'https://www.yahoo.com/'),
    gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('協程時間>>> %s' %(stop_time-start_time))

# 協程應用:爬蟲
print('--------------------------------')
s = time.time()
requests.get('https://www.python.org/')
requests.get('https://www.yahoo.com/')
requests.get('https://github.com/')
t = time.time()
print('序列時間>>',t-s)
View Code 協程應用: 爬蟲

  

 

參考: https://www.cnblogs.com/clschao/articles/9712056.html#_label4

 

 

 

 

阻塞 IO

 

 

 

import socket
import time

server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8083))
server.listen(5)
print('你看看卡在哪')
while 1:
    conn, addr = server.accept()    # 等待客戶端連線, 阻塞住
    print('來自%s的連結請求' % addr)
    time.sleep(0.1)
View Code 阻塞IO

 

非阻塞 IO模型 

 

import socket
import time

server=socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8083))
server.listen(5)
print('你看看卡在哪')
server.setblocking(False)   # 不再阻塞等待
while 1:
    try:
        conn, addr = server.accept()
        print('來自%s的連結請求'%addr)
    except BlockingIOError:
        print('去買點藥')
    time.sleep(0.1)

 

 

 

import socket
import time

server=socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8083))
server.listen(5)
print('你看看卡在哪')
server.setblocking(False)
rlist = []
rl = []
while 1:
    try:
        conn, addr = server.accept()
        print(addr)
        rlist.append(conn)
        print('來自%s:%s的連結請求'%(addr[0],addr[1]))
    except BlockingIOError:
        print('去買點藥')

    time.sleep(0.1)   # 防止死迴圈一直高度佔用CPU
    print('rlist',rlist,len(rlist))
    for con in rlist:
        try:
            from_client_msg = con.recv(1024)
        except BlockingIOError:
            continue
        except ConnectionResetError:
            con.close()
            rl.append(con)
    print('>>>>',rl)
    for remove_con in rl:
        rlist.remove(remove_con)
    rl.clear()
View Code 阻塞IO的socket服務端
import socket
import time

ip_port = ('127.0.0.1',8083)

client = socket.socket()

client.connect(ip_port)

while 1:

    client.send(b'dayangge henweisuo ')
    time.sleep(0.1)
View Code 阻塞IO的socket客戶端

 

 

# 服務端
import socket
import time

server=socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8083))
server.listen(5)

server.setblocking(False) #設定不阻塞
r_list=[]  #用來儲存所有來請求server端的conn連線
w_list={}  #用來儲存所有已經有了請求資料的conn的請求資料

while 1:
    try:
        conn,addr=server.accept() #不阻塞,會報錯
        r_list.append(conn)  #為了將連線儲存起來,不然下次迴圈的時候,上一次的連線就沒有了
    except BlockingIOError:
        # 強調強調強調:!!!非阻塞IO的精髓在於完全沒有阻塞!!!
        # time.sleep(0.5) # 開啟該行註釋純屬為了方便檢視效果
        print('在做其他的事情')
        print('rlist: ',len(r_list))
        print('wlist: ',len(w_list))


        # 遍歷讀列表,依次取出套接字讀取內容
        del_rlist=[] #用來儲存刪除的conn連線
        for conn in r_list:
            try:
                data=conn.recv(1024) #不阻塞,會報錯
                if not data: #當一個客戶端暴力關閉的時候,會一直接收b'',別忘了判斷一下資料
                    conn.close()
                    del_rlist.append(conn)
                    continue
                w_list[conn]=data.upper()
            except BlockingIOError: # 沒有收成功,則繼續檢索下一個套接字的接收
                continue
            except ConnectionResetError: # 當前套接字出異常,則關閉,然後加入刪除列表,等待被清除
                conn.close()
                del_rlist.append(conn)


        # 遍歷寫列表,依次取出套接字傳送內容
        print('wlist: ', len(w_list))
        del_wlist=[]
        for conn,data in w_list.items():
            try:
                conn.send(data)
                del_wlist.append(conn)
            except BlockingIOError:
                continue


        # 清理無用的套接字,無需再監聽它們的IO操作
        for conn in del_rlist:
            r_list.remove(conn)
        #del_rlist.clear() #清空列表中儲存的已經刪除的內容
        for conn in del_wlist:
            w_list.pop(conn)
        #del_wlist.clear()
View Code 完整版的非阻塞IO服務端
import socket
import os
import time
import threading
client=socket.socket()
client.connect(('127.0.0.1',8083))

while 1:
    res=('%s hello' %os.getpid()).encode('utf-8')
    client.send(res)
    data=client.recv(1024)

    print(data.decode('utf-8'))


##多執行緒的客戶端請求版本
# def func():
#     sk = socket.socket()
#     sk.connect(('127.0.0.1',9000))
#     sk.send(b'hello')
#     time.sleep(1)
#     print(sk.recv(1024))
#     sk.close()
#
# for i in range(20):
#     threading.Thread(target=func).start()
View Code 完整版的非阻塞IO客戶端

 

 

 

select IO多路複用 (重點)

當用戶程序呼叫了select,那麼整個程序會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的資料準備好了,select就會返回。這個時候使用者程序再呼叫read操作,將資料從kernel拷貝到使用者程序。  這個圖和blockingIO的圖其實並沒有太大的不同,事實上還更差一些。因為它不僅阻塞了還多需要使用兩個系統呼叫(select和recvfrom),而blockingIO只調用了一個系統呼叫(recvfrom),當只有一個連線請求的時候,這個模型還不如阻塞IO效率高。但是,用select的優勢在於它可以同時處理多個connection,而阻塞IO那裡不能,我不管阻塞不阻塞,你所有的連線包括recv等操作,我都幫你監聽著(以什麼形式監聽的呢?先不要考慮,下面會講的~~),其中任何一個有變動(有連結,有資料),我就告訴你使用者,那麼你就可以去呼叫這個資料了,這就是他的NB之處。這個IO多路複用模型機制是作業系統幫我們提供的,在windows上有這麼個機制叫做select,那麼如果我們想通過自己寫程式碼來控制這個機制或者自己寫這麼個機制,我們可以使用python中的select模組來完成上面這一系列代理的行為。在一切皆檔案的unix下,這些可以接收資料的物件或者連線,都叫做檔案描述符fd
IO多路複用
import select

fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout])

引數: 可接受四個引數(前三個必須)
    rlist: wait until ready for reading  #等待讀的物件,你需要監聽的需要獲取資料的物件列表
    wlist: wait until ready for writing  #等待寫的物件,你需要寫一些內容的時候,input等等,也就是說我會迴圈他看看是否有需要傳送的訊息,如果有我取出這個物件的訊息併發送出去,一般用不到,這裡我們也給一個[]。
    xlist: wait for an “exceptional condition”  #等待異常的物件,一些額外的情況,一般用不到,但是必須傳,那麼我們就給他一個[]。
    timeout: 超時時間
    當超時時間 = n(正整數)時,那麼如果監聽的控制代碼均無任何變化,則select會阻塞n秒,之後返回三個空列表,如果監聽的控制代碼有變化,則直接執行。
返回值:三個列表與上面的三個引數列表是對應的
  select方法用來監視檔案描述符(當檔案描述符條件不滿足時,select會阻塞),當某個檔案描述符狀態改變後,會返回三個列表
    1、當引數1 序列中的fd滿足“可讀”條件時,則獲取發生變化的fd並新增到fd_r_list中
    2、當引數2 序列中含有fd時,則將該序列中所有的fd新增到 fd_w_list中
    3、當引數3 序列中的fd發生錯誤時,則將該發生錯誤的fd新增到 fd_e_list中
    4、當超時時間為空,則select會一直阻塞,直到監聽的控制代碼發生變化
View Code介紹
#服務端
from socket import *
import select
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8093))
server.listen(5)
# 設定為非阻塞
server.setblocking(False)

# 初始化將服務端socket物件加入監聽列表,後面還要動態新增一些conn連線物件,當accept的時候sk就有感應,當recv的時候conn就有動靜
rlist=[server,]
rdata = {}  #存放客戶端傳送過來的訊息

wlist=[]  #等待寫物件
wdata={}  #存放要返回給客戶端的訊息

print('預備!監聽!!!')
count = 0 #寫著計數用的,為了看實驗效果用的,沒用
while True:
    # 開始 select 監聽,對rlist中的服務端server進行監聽,select函式阻塞程序,直到rlist中的套接字被觸發(在此例中,套接字接收到客戶端發來的握手訊號,從而變得可讀,滿足select函式的“可讀”條件),被觸發的(有動靜的)套接字(伺服器套接字)返回給了rl這個返回值裡面;
    rl,wl,xl=select.select(rlist,wlist,[],0.5)
    print('%s 次數>>'%(count),wl)
    count = count + 1
    # 對rl進行迴圈判斷是否有客戶端連線進來,當有客戶端連線進來時select將觸發
    for sock in rl:
        # 判斷當前觸發的是不是socket物件, 當觸發的物件是socket物件時,說明有新客戶端accept連線進來了
        if sock == server:
            # 接收客戶端的連線, 獲取客戶端物件和客戶端地址資訊
            conn,addr=sock.accept()
            #把新的客戶端連線加入到監聽列表中,當客戶端的連線有接收訊息的時候,select將被觸發,會知道這個連線有動靜,有訊息,那麼返回給rl這個返回值列表裡面。
            rlist.append(conn)
        else:
            # 由於客戶端連線進來時socket接收客戶端連線請求,將客戶端連線加入到了監聽列表中(rlist),客戶端傳送訊息的時候這個連線將觸發
            # 所以判斷是否是客戶端連線物件觸發
            try:
                data=sock.recv(1024)
                #沒有資料的時候,我們將這個連線關閉掉,並從監聽列表中移除
                if not data:
                    sock.close()
                    rlist.remove(sock)
                    continue
                print("received {0} from client {1}".format(data.decode(), sock))
                #將接受到的客戶端的訊息儲存下來
                rdata[sock] = data.decode()

                #將客戶端連線物件和這個物件接收到的訊息加工成返回訊息,並新增到wdata這個字典裡面
                wdata[sock]=data.upper()
                #需要給這個客戶端回覆訊息的時候,我們將這個連線新增到wlist寫監聽列表中
                wlist.append(sock)
            #如果這個連接出錯了,客戶端暴力斷開了(注意,我還沒有接收他的訊息,或者接收他的訊息的過程中出錯了)
            except Exception:
                #關閉這個連線
                sock.close()
                #在監聽列表中將他移除,因為不管什麼原因,它畢竟是斷開了,沒必要再監聽它了
                rlist.remove(sock)
    # 如果現在沒有客戶端請求連線,也沒有客戶端傳送訊息時,開始對傳送訊息列表進行處理,是否需要傳送訊息
    for sock in wl:
        sock.send(wdata[sock])
        wlist.remove(sock)
        wdata.pop(sock)

    # #將一次select監聽列表中有接收資料的conn物件所接收到的訊息列印一下
    # for k,v in rdata.items():
    #     print(k,'發來的訊息是:',v)
    # #清空接收到的訊息
    # rdata.clear()

---------------------------------------
#客戶端
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8093))


while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode('utf-8'))

client.close()

select網路IO模型的示例程式碼
select網路IO模型示例程式碼

 

 

 select IO多路複用 非同步

非同步IO  asyncio (很厲害, 高併發)