多程序、協程、事件驅動及select poll epoll
多執行緒的使用場景
IO操作不佔用CPU
計算佔用cpu
python多執行緒不適合cpu密集型操作的任務,適合IO操作密集型的任務
多程序
簡單的一個多程序例子:(用於理解對多執行緒方法的使用)
和執行緒的方法類似,下面是一個簡單的多程序程式碼
import time, multiprocessing def run(name): time.sleep(2) print("hello", name) if __name__ == "__main__": for i in range(6): p = multiprocessing.Process(target=run, args=("dean",)) p.start()
和之前學習的多執行緒結合在一起使用,程式碼如下:
import time,threading import multiprocessing def thread_run(): print(threading.get_ident()) #這裡表示獲取執行緒id def run(name): time.sleep(2) print("hello", name) t = threading.Thread(target=thread_run) t.start() if __name__ == "__main__": for i in range(6): p = multiprocessing.Process(target=run, args=("dean",)) p.start()
執行結果如下:
接著我們檢視下面程式碼:
from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) print("\n\n") def f(name): info('\033[31;1mcalled from child process function f\033[0m') print('hello', name) if __name__ == '__main__': info('\033[32;1mmain process line\033[0m')
執行結果如下:
我們這裡可以看到父程序id:12204,並且會發現無論程式執行多少次都是這個,然後我們在windows工作管理員檢視發現這個是pycharm的程序id.這裡要記住:每一個子程序都是由父程序啟動的
我們將上面程式碼中if __name__=”__main__”進行修改,如下:
if __name__ == '__main__':
info('\033[32;1mmain process line\033[0m')
p = Process(target=f, args=('bob',))
p.start()
執行結果如下:
程序間資料的互動,實現方法
通過Queues和Pipe可以實現程序間資料的傳遞,但是不能實現資料的共享
不同程序間記憶體不是共享的,要想實現兩個程序間的資料交換,有一下方法:
Queues
使用方法和threading裡的queue使用差不多
先回憶一下執行緒之間的資料共享,通過下面程式碼理解:
import threading
import queue
def func():
q.put([22,"dean",'hello'])
if __name__=="__main__":
q = queue.Queue()
t = threading.Thread(target=func)
t.start()
print(q.get(q))
執行結果:
從上述程式碼可以看出執行緒之間的資料是共享的:父執行緒可以訪問子執行緒放入的資料,如果是多程序之間呢?將程式碼進行修改如下,讓子程序呼叫父程序資料:
from multiprocessing import Process
import queue
def f():
q.put([11,None,"hello"])
if __name__=="__main__":
q = queue.Queue()
p = Process(target=f)
p.start()
print(q.get())
執行結果如下:
從這裡我們也可以看出子程序是訪問不到父程序的資料
我們再次將程式碼進行修改,寫f方法的時候直接將q給執行緒傳入,也就是,只有啟動執行緒,就自動傳入執行緒q,程式碼如下:
from multiprocessing import Process
import queue
def f():
q.put([11,None,"hello"])
if __name__=="__main__":
q = queue.Queue()
p = Process(target=f)
p.start()
print(q.get())
執行結果如下:
從這裡我們也可以看出子程序是訪問不到父程序的資料,我們再次將程式碼進行修改,寫f方法的時候直接將q給執行緒傳入,也就是,只有啟動執行緒,就自動傳入執行緒q,程式碼如下:
from multiprocessing import Process
import queue
def f(data):
data.put([11,None,"hello"])
if __name__=="__main__":
q = queue.Queue() #切記這裡是執行緒q
p = Process(target=f,args=(q,))
p.start()
print(q.get())
執行結果如下:
這裡我們需要知道:程序不能訪問執行緒q,所以我們需要改成程序,程式碼如下:
from multiprocessing import Process,Queue
def f(data):
data.put([11,None,"hello"])
if __name__=="__main__":
q = Queue() #這裡的q是程序q
p = Process(target=f,args=(q,))
p.start()
print(q.get())
執行結果:
這次我們就發現在父程序裡就可以呼叫到子程序放入的資料
這裡我們需要明白:這裡的q其實是被克隆了一個q,然後將子執行緒序列化的內容傳入的克隆q,然後再反序列化給q,從而實現了程序之間資料的傳遞
Pipe
實現程式碼例子:
from multiprocessing import Process,Pipe
def f(conn):
conn.send([22,None,"hello from child"])
conn.send([22,None,"hello from child2"])
print(conn.recv())
conn.close()
if __name__=="__main__":
left_conn,right_conn = Pipe()
p = Process(target=f,args=(right_conn,))
p.start()
print(left_conn.recv())
print(left_conn.recv())
left_conn.send("我是left_conn")
執行結果如下:
對上面程式碼分析:pip()會生成兩個值,上面的left_conn和right_conn,這就如同一條網線的兩頭,兩頭都可以傳送和接收資料
通過Manager可以不同程序間實現資料的共享
通過下面程式碼進行理解:
from multiprocessing import Manager,Process
import os
def f(d,l):
d[1]="1"
d["2"] = 2
d[0.25] = None
l.append(os.getpid())
print(l)
if __name__ == "__main__":
with Manager() as manager: #這種方式和直接manager=Manager()一樣
d = manager.dict() #生成一個字典,可以在多個程序間共享
l = manager.list(range(5)) #生成一個列表,可以在多個程序間共享
p_list = []
for i in range(10):
p = Process(target=f,args=(d,l))
p.start()
p_list.append(p)
for res in p_list:
res.join()
print(d)
print(l)
執行結果如下:
通過結果可以看出已經實現了不同程序間資料的共享
程序同步,即程序鎖
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
print('hello world', i)
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
列印結果如下:
可能會覺得這個加鎖沒有上面作用,其實是這樣的,當在螢幕上列印這些內容的時候,不同程序之間是共享這個螢幕的,鎖的作用在於當一個程序開始列印的時候,其他執行緒不能列印,從而防止列印亂內容
在windows上可能看不到效果,當不同程序列印的東西比較多的時候,就可以看到列印資料出現亂的情況
程序池
程序池內部維護一個程序序列,當使用時,則去程序池中獲取一個程序,如果程序池序列中沒有可供使用的進程序,那麼程式就會等待,直到程序池中有可用程序為止。程序池中有兩個方法:apply和apply_async(這個就表示非同步),從下面程式碼一點一點分析
from multiprocessing import Process, Pool
import time
import os
def Foo(i):
time.sleep(2)
print("in the process",os.getpid())
return i + 100
def Bar(arg):
print('-->exec done:', arg)
if __name__ == "__main__":
pool = Pool(5)
for i in range(10):
pool.apply(func=Foo, args=(i,))
print('end')
pool.close()
pool.join() # 程序池中程序執行完畢後再關閉,如果註釋,那麼程式直接關閉。
這樣執行結果發現,程式變成了串行了。
將上述程式碼中的:
pool.apply(func=Foo, args=(i,))
替換為:
pool.apply_async(func=Foo,args=(i,))
之後就解決了之前的的問題,這個時候我們再次將
pool.apply_async(func=Foo,args=(i,))
替換為,這裡的callback叫做回撥函式
pool.apply_async(func=Foo, args=(i,), callback=Bar)
執行結果如下:
下面將程式碼進行修改,確定回撥函式是由子程序還是主程序呼叫
from multiprocessing import Process, Pool
import time
import os
def Foo(i):
time.sleep(2)
print("in the process",os.getpid())
return i + 100
def Bar(arg):
print('-->exec done:', arg,os.getpid())
if __name__ == "__main__":
pool = Pool(5)
print(os.getpid())
for i in range(5):
pool.apply_async(func=Foo, args=(i,), callback=Bar)
#pool.apply(func=Foo, args=(i,))
#pool.apply_async(func=Foo,args=(i,))
print('end')
pool.close()
pool.join() # 程序池中程序執行完畢後再關閉,如果註釋,那麼程式直接關閉。
執行結果如下,可以看出回撥函式的pid和主程序是一樣的
協程
協程,又稱微執行緒,纖程。英文名Coroutine。一句話說明什麼是執行緒:協程是一種使用者態的輕量級執行緒。
協程擁有自己的暫存器上下文和棧。協程排程切換時,將暫存器上下文和棧儲存到其他地方,在切回來的時候,恢復先前儲存的暫存器上下文和棧。因此:協程能保留上一次呼叫時的狀態(即所有區域性狀態的一個特定組合),每次過程重入時,就相當於進入上一次呼叫的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
協程的好處:
無需執行緒上下文切換的開銷
無需原子操作鎖定及同步的開銷
方便切換控制流,簡化程式設計模型
高併發+高擴充套件性+低成本:一個CPU支援上萬的協程都不是問題。所以很適合用於高併發處理。
缺點:
無法利用多核資源:協程的本質是個單執行緒,它不能同時將 單個CPU 的多個核用上,協程需要和程序配合才能執行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程式
先用yield實現簡單的協程
import time
import queue
def consumer(name):
print("--->starting eating baozi...")
while True:
new_baozi = yield
print("[%s] is eating baozi %s" % (name, new_baozi))
time.sleep(1)
def producer():
r = con.__next__()
r = con2.__next__()
n = 0
while n < 5:
n += 1
con.send(n)
con2.send(n)
print("\033[32;1m[producer]\033[0m is making baozi %s" % n)
if __name__ == '__main__':
con = consumer("c1")
con2 = consumer("c2")
p = producer()
執行結果如下:
Greenlet
from greenlet import greenlet
def test1():
print(10)
gr2.switch()
print(11)
gr2.switch()
def test2():
print(12)
gr1.switch()
print(13)
gr1 = greenlet(test1) #啟動一個協程
gr2 = greenlet(test2)
gr1.switch()
這裡的gr1.switch()是手動切換
Gevent
Gevent 是一個第三方庫,可以輕鬆通過gevent實現併發同步或非同步程式設計,在gevent中用到的主要模式是Greenlet, 它是以C擴充套件模組形式接入Python的輕量級協程。 Greenlet全部執行在主程式作業系統程序的內部,但它們被協作式地排程.
通過下面程式碼進行理解:
import gevent
def foo():
print('Running in foo1')
gevent.sleep(2)
print('Running in foo2')
def bar():
print('Running in bar1')
gevent.sleep(1)
print('Running in bar2')
def func3():
print("running in func1")
gevent.sleep(0)
print("running in func2")
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
gevent.spawn(func3),
])
執行結果如下:
從執行結果可以看出,通過gevent.sleep()模擬執行IO操作,從而實現自動切換,程式最終花費的時間還是2秒
用協程gevent寫一個簡單併發爬網頁
from urllib import request
import gevent,time
def f(url):
print("get:%s" %url)
resp = request.urlopen(url)
data = resp.read()
print("%d bytes received from %s" %(len(data),url))
urls = ["http://sina.com.cn",
"http://www.cnblogs.com/",
"https://news.cnblogs.com/"
]
time_start = time.time()
for url in urls:
f(url)
print("同步序列cost:",time.time()-time_start)
async_time = time.time()
gevent.joinall([
gevent.spawn(f,"http://sina.com.cn"),
gevent.spawn(f,"http://www.cnblogs.com/"),
gevent.spawn(f,"https://news.cnblogs.com/")
])
print("非同步cost:",time.time()-async_time)
這樣的執行結果:
這裡可以看出非同步的時候和序列執行的時間基本一樣,其實這裡的非同步並沒有起作用,因為這裡的gevent並不能識別出urllib執行時的IO操作,想要是gevent實現非同步的方法是匯入模組:from gevent import monkey
將程式碼進行修改如下:
from urllib import request
import gevent,time
from gevent import monkey
monkey.patch_all()
def f(url):
print("get:%s" %url)
resp = request.urlopen(url)
data = resp.read()
print("%d bytes received from %s" %(len(data),url))
urls = ["http://sina.com.cn",
"http://www.cnblogs.com/",
"https://news.cnblogs.com/"
]
time_start = time.time()
for url in urls:
f(url)
print("同步序列cost:",time.time()-time_start)
async_time = time.time()
gevent.joinall([
gevent.spawn(f,"http://sina.com.cn"),
gevent.spawn(f,"http://www.cnblogs.com/"),
gevent.spawn(f,"https://news.cnblogs.com/")
])
print("非同步cost:",time.time()-async_time)
然後執行,結果如下:
事件驅動
通常,我們寫伺服器處理模型的程式時,有以下幾種模型:
(1)每收到一個請求,建立一個新的程序,來處理該請求;
(2)每收到一個請求,建立一個新的執行緒,來處理該請求;
(3)每收到一個請求,放入一個事件列表,讓主程序通過非阻塞I/O方式來處理請求
上面的幾種方式,各有千秋,
第(1)中方法,由於建立新的程序的開銷比較大,所以,會導致伺服器效能比較差,但實現比較簡單。
第(2)種方式,由於要涉及到執行緒的同步,有可能會面臨死鎖等問題。
第(3)種方式,在寫應用程式程式碼時,邏輯比前面兩種都複雜。
綜合考慮各方面因素,一般普遍認為第(3)種方式是大多數網路伺服器採用的方式
目前大部分的UI程式設計都是事件驅動模型,如很多UI平臺都會提供onClick()事件,這個事件就代表滑鼠按下事件。事件驅動模型大體思路如下:
1. 有一個事件(訊息)佇列;
2. 滑鼠按下時,往這個佇列中增加一個點選事件(訊息);
3. 有個迴圈,不斷從佇列取出事件,根據不同的事件,呼叫不同的函式,如onClick()、onKeyDown()等;
4. 事件(訊息)一般都各自儲存各自的處理函式指標,這樣,每個訊息都有獨立的處理函式
事件驅動程式設計是一種程式設計正規化,這裡程式的執行流由外部事件來決定。它的特點是包含一個事件迴圈,當外部事件發生時使用回撥機制來觸發相應的處理。另外兩種常見的程式設計正規化是(單執行緒)同步以及多執行緒程式設計。
讓我們用例子來比較和對比一下單執行緒、多執行緒以及事件驅動程式設計模型。下圖展示了隨著時間的推移,這三種模式下程式所做的工作。這個程式有3個任務需要完成,每個任務都在等待I/O操作時阻塞自身。阻塞在I/O操作上所花費的時間已經用灰色框標示出來了。
在單執行緒同步模型中,任務按照順序執行。如果某個任務因為I/O而阻塞,其他所有的任務都必須等待,直到它完成之後它們才能依次執行。這種明確的執行順序和序列化處理的行為是很容易推斷得出的。如果任務之間並沒有互相依賴的關係,但仍然需要互相等待的話這就使得程式不必要的降低了執行速度。
在多執行緒版本中,這3個任務分別在獨立的執行緒中執行。這些執行緒由作業系統來管理,在多處理器系統上可以並行處理,或者在單處理器系統上交錯執行。這使得當某個執行緒阻塞在某個資源的同時其他執行緒得以繼續執行。與完成類似功能的同步程式相比,這種方式更有效率,但程式設計師必須寫程式碼來保護共享資源,防止其被多個執行緒同時訪問。多執行緒程式更加難以推斷,因為這類程式不得不通過執行緒同步機制如鎖、可重入函式、執行緒區域性儲存或者其他機制來處理執行緒安全問題,如果實現不當就會導致出現微妙且令人痛不欲生的bug。
在事件驅動版本的程式中,3個任務交錯執行,但仍然在一個單獨的執行緒控制中。當處理I/O或者其他昂貴的操作時,註冊一個回撥到事件迴圈中,然後當I/O操作完成時繼續執行。回撥描述了該如何處理某個事件。事件迴圈輪詢所有的事件,當事件到來時將它們分配給等待處理事件的回撥函式。這種方式讓程式儘可能的得以執行而不需要用到額外的執行緒。事件驅動型程式比多執行緒程式更容易推斷出行為,因為程式設計師不需要關心執行緒安全問題。
當我們面對如下的環境時,事件驅動模型通常是一個好的選擇:
(1)程式中有許多工
(2)任務之間高度獨立(因此它們不需要互相通訊,或者等待彼此)
(3)在等待事件到來時,某些任務會阻塞。
當應用程式需要在任務間共享可變的資料時,這也是一個不錯的選擇,因為這裡不需要採用同步處理。
網路應用程式通常都有上述這些特點,這使得它們能夠很好的契合事件驅動程式設計模型。
IO多路複用
使用者空間和核心空間
作業系統都是採用虛擬儲存器,對於32位作業系統,它的定址空間(虛擬儲存空間)為4G。作業系統的核心是核心,獨立於普通的應用程式,可以訪問受保護記憶體空間,也有訪問底層硬體裝置的所有許可權,為了保證使用者程序不能直接操作核心,保證核心的安全,作業系統將虛擬空間分為兩部分:一部分為核心空間,一部分是使用者空間,針對linux系統而言,將最高的1G位元組給核心使用,稱為核心空間,將3G位元組的供各個程序使用,稱為使用者空間
檔案描述符fd
檔案描述符是一個用於表述指向檔案的引用的抽象化概念
檔案描述符在形式上是一個非負整數,實際上,它是一個索引值,指核心為每一個程序所維護的程序開啟檔案的記錄的記錄表,當程式開啟一個現有檔案或者建立一個新檔案時,核心向程序返回一個檔案描述符。
快取IO
快取IO,也被稱為標準IO,大多數檔案系統預設IO操作都是快取IO,在Linux的快取IO機制中,作業系統會將IO的資料快取在檔案系統的頁快取(page cache)中,也就是說,資料會先被拷貝到作業系統核心的緩衝區中,然後才會從作業系統核心的緩衝區拷貝到應用程式的地址空間
快取IO的缺點:
資料在傳輸過程中需要在應用程式地址空間和核心進行多次資料拷貝操作,這些資料拷貝操作所帶來的CPU以及記憶體開銷是非常大的
IO模式
對於一次IO訪問(以read為例子),資料會先拷貝到作業系統核心的緩衝區中,然後會從作業系統核心的緩衝區拷貝到應用程式的地址空間,也就是說當一個read操作發生時,它會經歷兩個階段:
1. 等待資料準備
2. 經資料從核心拷貝到程序
正是因為這兩個階段,linux系統產生了五種網路模式的方案
1. 阻塞I/O(blocking IO)
2. 非阻塞I/O(nonblocking IO)
3. I/O多路複用(IO multiplexing)
4. 訊號驅動I/O(signal driven IO)
5. 非同步I/O(asynchromous IO)
注意:訊號驅動I/O(signal driven IO)在實際中不常用
阻塞I/O(blocking IO)
在linux中,預設情況下所有的socket都是blocking,一個典型的讀操作流程大概是這樣:
當用戶程序呼叫了recvfrom這個系統呼叫,kernel就開始了IO的第一個階段:準備資料(對於網路IO來說,很多時候資料在一開始還沒有到達。比如,還沒有收到一個完整的UDP包。這個時候kernel就要等待足夠的資料到來)。這個過程需要等待,也就是說資料被拷貝到作業系統核心的緩衝區中是需要一個過程的。而在使用者程序這邊,整個程序會被阻塞(當然,是程序自己選擇的阻塞)。當kernel一直等到資料準備好了,它就會將資料從kernel中拷貝到使用者記憶體,然後kernel返回結果,使用者程序才解除block的狀態,重新執行起來。
所以,blocking IO的特點就是在IO執行的兩個階段都被block了
非阻塞I/O
linux下,可以通過設定socket使其變為non-blocking。當對一個non-blocking socket執行讀操作時,流程是這個樣子:
當用戶程序發出read操作時,如果kernel中的資料還沒有準備好,那麼它並不會block使用者程序,而是立刻返回一個error。從使用者程序角度講 ,它發起一個read操作後,並不需要等待,而是馬上就得到了一個結果。使用者程序判斷結果是一個error時,它就知道資料還沒有準備好,於是它可以再次傳送read操作。一旦kernel中的資料準備好了,並且又再次收到了使用者程序的system call,那麼它馬上就將資料拷貝到了使用者記憶體,然後返回。
所以,nonblocking IO的特點是使用者程序需要不斷的主動詢問kernel資料好了沒有。
I/O多路複用(IO multiplexing)
IO multiplexing就是我們說的select,poll,epoll,有些地方也稱這種IO方式為event driven IO。select/epoll的好處就在於單個process就可以同時處理多個網路連線的IO。它的基本原理就是select,poll,epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有資料到達了,就通知使用者程序。
當用戶程序呼叫了select,那麼整個程序會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的資料準備好了,select就會返回。這個時候使用者程序再呼叫read操作,將資料從kernel拷貝到使用者程序。
所以,I/O 多路複用的特點是通過一種機制一個程序能同時等待多個檔案描述符,而這些檔案描述符(套接字描述符)其中的任意一個進入讀就緒狀態,select()函式就可以返回。
這個圖和blocking IO的圖其實並沒有太大的不同,事實上,還更差一些。因為這裡需要使用兩個system call (select 和 recvfrom),而blocking IO只調用了一個system call (recvfrom)。但是,用select的優勢在於它可以同時處理多個connection。
所以,如果處理的連線數不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server效能更好,可能延遲還更大。select/epoll的優勢並不是對於單個連線能處理得更快,而是在於能處理更多的連線。)
在IO multiplexing Model中,實際中,對於每一個socket,一般都設定成為non-blocking,但是,如上圖所示,整個使用者的process其實是一直被block的。只不過process是被select這個函式block,而不是被socket IO給block。
非同步I/O(asynchronous IO)
Linux下的asynchronous IO其實用得很少。先看一下它的流程:
使用者程序發起read操作之後,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之後,首先它會立刻返回,所以不會對使用者程序產生任何block。然後,kernel會等待資料準備完成,然後將資料拷貝到使用者記憶體,當這一切都完成之後,kernel會給使用者程序傳送一個signal,告訴它read操作完成了。
關於select poll epoll
select
sekect是通過一個select()系統呼叫來監視多個檔案描述符,當select()返回後,該陣列中就緒的檔案描述符便會被該核心修改標誌位,使得程序可以獲得這些檔案描述符從而進行後續的讀寫操作
select的優點就是支援跨平臺
缺點在於單個程序能夠監視的檔案描述符的數量存在最大限制
另外select()所維護的儲存大量檔案描述符的資料結構,隨著檔案描述符數量的增大,其複製的開銷也線性增長。同時,由於網路響應時間的延遲使得大量TCP連線處於非活躍狀態,但呼叫select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷。
poll
和select在本質上沒有多大差別,但是poll沒有最大檔案描述符數量的限制
poll和select同樣存在一個缺點就是,包含大量檔案描述符的陣列被整體複製於使用者態和核心的地址空間之間,而不論這些檔案描述符是否就緒,它的開銷隨著檔案描述符數量的增加而線性增大。
另外,select()和poll()將就緒的檔案描述符告訴程序後,如果程序沒有對其進行IO操作,那麼下次呼叫select()和poll()的時候將再次報告這些檔案描述符,所以它們一般不會丟失就緒的訊息,這種方式稱為水平觸發(Level Triggered)。
epoll
epoll可以同時支援水平觸發和邊緣觸發(Edge Triggered,只告訴程序哪些檔案描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那麼它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的效能要更高一些,但是程式碼實現相當複雜。
epoll同樣只告知那些就緒的檔案描述符,而且當我們呼叫epoll_wait()獲得就緒檔案描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的檔案描述符即可,這裡也使用了記憶體對映(mmap)技術,這樣便徹底省掉了這些檔案描述符在系統呼叫時複製的開銷。
另一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,程序只有在呼叫一定的方法後,核心才對所有監視的檔案描述符進行掃描,而epoll事先通過epoll_ctl()來註冊一個檔案描述符,一旦基於某個檔案描述符就緒時,核心會採用類似callback的回撥機制,迅速啟用這個檔案描述符,當程序呼叫epoll_wait()時便得到通知
以select方法為例子進行理解
Python的select()方法直接呼叫作業系統的IO介面,它監控sockets,open files, and pipes(所有帶fileno()方法的檔案控制代碼)何時變成readable 和writeable, 或者通訊錯誤,select()使得同時監控多個連線變的簡單,並且這比寫一個長迴圈來等待和監控多客戶端連線要高效,因為select直接通過作業系統提供的C的網路介面進行操作,而不是通過Python的直譯器。
接下來通過echo server例子要以瞭解select 是如何通過單程序實現同時處理多個非阻塞的socket連線的
程式碼如下:
import select
import socket
import queue
server = socket.socket()
server.bind(('127.0.0.1',9999))
server.listen()
server.setblocking(False)#不阻塞
msg_dict = {}
inputs=[server,]
outputs=[]
while True:
readable, writeable, exceptional = select.select(inputs, outputs, inputs)
print(readable, writeable, exceptional)
for r in readable:
if r is server: #代表來了一個新連線
conn,addr = server.accept()
print("來了一個新連線:",addr)
inputs.append(conn) #是因為這個新建立的連線還沒發資料過來,現在就接收的話程式就報錯了
#所以要想要實現這個客戶端發資料來時server端能知道,就需要讓select再監測這個conn
msg_dict[conn] = queue.Queue() #初始化一個佇列,後面需要返回給這個客戶端的資料
else:
data = r.recv(1024)
print("收到資料:",data)
msg_dict[r].put(data)
outputs.append(r) #放入返回的連線佇列裡
for w in writeable: #要返回給客戶端的連線列表
data_to_client = msg_dict[w].get()
w.send(data_to_client) #返回給客戶端源資料
outputs.remove(w) #確保下次迴圈的時候writeable,不能返回這個已經處理完的連線了
for e in exceptional:
if e in outputs:
outputs.remove(e)
inputs.remove(e)
del msg_dict[e]
其實上述的程式碼相對來說是比較麻煩,python已經封裝了selectors模組,並且這個模組中包含了select和epoll,會根據系統自動識別(windows只支援select,linux是二者都支援),預設用epoll
如果將上述程式碼用selectors模組的方式寫,程式碼如下:
import selectors
import socket
sel = selectors.DefaultSelector()
def accept(server,mask):
conn,addr = server.accept()
print("一個新的連線",addr)
print(conn)
conn.setblocking(False)
sel.register(conn,selectors.EVENT_READ,read) #新連線註冊read回撥函式
print("done")
def read(conn,mask):
print("ccc")
print("mask:",mask)
data = conn.recv(1024)
if data:
print(data)
conn.send(data)
else:
print("客戶端斷開連線")
sel.unregister(conn)
conn.close()
server = socket.socket()
server.bind(('127.0.0.1',9999))
server.listen()
server.setblocking(False)
sel.register(server,selectors.EVENT_READ,accept)
while True:
print("cccccccsssssss")
events = sel.select() #預設阻塞,有活動連線,有活動連線就返回活動的連線列表
print(events)
for key,mask in events:
print("key:%s mask:%s"%(key,mask))
callback = key.data #這裡就是回撥函式及上述的accept
print("key.data:",key.data)
print("key.fileobj:",key.fileobj)
callback(key.fileobj,mask) #key.fileobj
我們用客戶端模擬同時併發一萬去連線服務端
客戶端程式碼如下:
import socket
import sys
messages = [ b'This is the message. ',
b'It will be sent ',
b'in parts.',
]
server_address = ('127.0.0.1', 9999)
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(100)]
print('connecting to %s port %s' % server_address)
for s in socks:
s.connect(server_address)
for message in messages:
for s in socks:
print('%s: sending "%s"' % (s.getsockname(), message) )
s.send(message)
for s in socks:
data = s.recv(1024)
print( '%s: received "%s"' % (s.getsockname(), data) )
if not data:
print(sys.stderr, 'closing socket', s.getsockname() )