1. 程式人生 > >協程、非同步IO

協程、非同步IO

協程

協程,又稱微執行緒,纖程。英文名Coroutine。一句話----協程是一種使用者態的輕量級執行緒。

協程擁有自己的暫存器上下文和棧。協程排程切換時,將暫存器上下文和棧儲存到其他地方,在切換回來的時候,恢復先前儲存的暫存器上下文和棧。因此,協程能保留上一次呼叫的狀態(即所有區域性狀態的一個特定組合),每次過程重入時,就相當於進入上一次呼叫的狀態,換種說法,進入上一次離開時所處邏輯流的位置。

子程式,或者稱為函式,在所有語言中都是層級呼叫,比如A呼叫B,B在執行過程中又呼叫了C,C執行完畢返回,B執行完畢返回,最後A執行完畢。

所以子程式呼叫時通過棧實現的,一個執行緒就是執行一個子程式。子程式呼叫總是一個入口,一次返回,呼叫順序是明確的。而協程的呼叫和子程式不同。

協程看上去也是子程式,但執行過程中,在子程式內部可中斷,然後轉而執行別的子程式,在適當的時候再返回來接著執行。

注意,在一個子程式中中斷,去執行其他子程式,不是函式呼叫,有點類似CPU的中斷。比如子程式A、B:

def a():

    print("1")

    print("2")

    print("3")

def b():

    print("x")

    print("y")

    print("z")

假設由程式執行,在執行A的過程中,可以隨時中斷,去執行B,B也可能在執行過程中中斷再去執行A,結果可能是:

1
2
x
y
3
z

但是在A中是沒有呼叫B的,所以協程的呼叫比函式呼叫理解起來要難一些。看起來A、B的執行有點像多執行緒,但協程的特點在是一個執行緒執行,和多執行緒比協程有何優勢?

  1. 最大的優勢就是協程極高的執行效率。因為子程式切換不是執行緒切換,而是有程式自身控制,因此,沒有執行緒切換的開銷,和多執行緒比,執行緒數量越多,協程的效能優勢就越明顯。
  2. 第二大優勢就是不需要多執行緒的鎖機制,因為只有一個執行緒,也不存在同時寫變數衝突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多執行緒高很多。

因為協程是一個執行緒執行,那麼怎麼利用多核CPU呢?最簡單的方法是多程序加協程,既充分利用多核,有充分發揮協程的高效率,可獲得極高的效能。

 

協程的優點:

無需執行緒上下文切換的開銷。

無需原子操作鎖定及同步的開銷。原子操作(atomic operation)是不需要synchronized,所謂原子操作是指不會被執行緒排程機制打斷的操作;這種操作一旦開始,就一直執行到結束,中間不會有任何context switch(切換到另一個執行緒)。原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉只執行部分。視作整體是原子性的核心。

方便切換控制流,簡化程式設計模型。

高併發+高擴充套件性+低成本。一個CPU支援上萬的協程都不是問題,所以很適合用於高併發處理。

 

協程的缺點:

無法利用多核資源。協程的本質是個單執行緒,它不能同時將單個CPU的多個核用上,協程需要和程序配合才能執行在多CPU上。當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是CPU密集型應用。

進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程式。

使用yield實現協程操作。

import time,queue

def consumer(name):

    print("-->starting eating xoxo")

while True:

    new_xo = yield

    print("%s is eating xoxo %s" %(name,new_xo))

def producer():

    r = con.__next__()

    r = con2.__next__()

    n = 0

while n < 5:

    n += 1

    con.send(n)

    con2.send(n)

    print("\033[32;1mproducer\033[0m is making xoxo %s"%n)

if __name__ == "__main__":

    con = consumer("c1")

    con2 = consumer("c2")

    p = producer()

輸出:

-->starting eating xoxo
c1 is eating xoxo 1
c2 is eating xoxo 1
producer is making xoxo 1
c1 is eating xoxo 2
c2 is eating xoxo 2
producer is making xoxo 2
c1 is eating xoxo 3
c2 is eating xoxo 3
producer is making xoxo 3
c1 is eating xoxo 4
c2 is eating xoxo 4
producer is making xoxo 4
c1 is eating xoxo 5
c2 is eating xoxo 5
producer is making xoxo 5

 

協程的特點:

1、必須在只有一個單執行緒裡實現併發。

2、修改共享資料不需加鎖。

3、使用者程式裡自己保持多個控制流的上下文棧。

4、一個協程遇到IO操作自動切換到其它協程。

剛才yield實現的不能算是合格的協程。

Python對協程的支援是通過generator實現的。在generator中,我們不但可以通過for迴圈來迭代,還可以不斷呼叫next()函式獲取由yield語句返回到下一個值。但是python的yield不但可以返回一個值,它可以接收呼叫者發出的引數。

 

Greenlet

greenlet是一個用C實現的協程模組,相比於Python自帶的yield,它可以在任意函式之間隨意切換,而不需把這個函式宣告為generator。

from greenlet import greenlet

def f1():

    print(11)

    gr2.switch()

    print(22)

def f2():

    print(33)

    gr1.switch()

    print(44)

gr1 = greenlet(f1)

gr2 = greenlet(f2)
11
33
22
44

以上例子還有一個問題沒有解決,就是遇到IO操作自動切換。

 

Gevent

Gevent是一個第三方庫,可以輕鬆提供gevent實現併發同步或非同步程式設計,在gevent中用到的主要模式是Greenlet,它是以C擴充套件模組形式接入Python的輕量級協程。Greenlet全部執行在主程式作業系統程序的內部,但它們被協作式地排程。

import gevent

def foo():

    print("Running in foo")

    gevent.sleep()

    print("Explicit contenxt switch to foo agin")

def bar():

    print("Explicit context to bar")

    gevent.sleep(1)

    print("Implict context switch back to bar")

def func3():

    print("running func3")

    gevent.sleep(0)

    print("running func3 again")

gevent.joinall([

    gevent.spawn(foo),

    gevent.spawn(bar),

    gevent.spawn(func3),

])

Running in foo

Explicit context to bar

running func3

Explicit contenxt switch to foo agin

running func3 again

Implict context switch back to bar

同步與非同步的效能區別

def f1(pid):

gevent.sleep(0.5)

print("F1 %s done"%pid)

for i in range(10):

f1(i)

def f3():

threads = [gevent.spawn(f1,i) for i in range(10)]

gevent.joinall(threads)

print("f2")

f2()

print("f3")

f3()

f2

F1 0 done

F1 1 done

F1 2 done

F1 3 done

F1 4 done

F1 5 done

F1 6 done

F1 7 done

F1 8 done

F1 9 done

f3

上面程式的重要部分是將f1函式封裝到Greenlet內部執行緒的gevent.spawn。初始化的greenlet列表存放在陣列threads中,此陣列被傳給gevent.joinall函式,後者阻塞當前流程,並執行所有給定的greenlet。執行流程只會在所有greenlet執行完後才會繼續向下走。

 

IO阻塞自動切換任務

from urllib import request

import gevent,time

from gevent import monkey

# 把當前程式的所有的id操作給單獨的做上標記

monkey.patch_all()

def f(url):

    print("GET:%s"%url)

    resp = request.urlopen(url)

    data = resp.read()

f = open("load.txt","wb")

f.write(data)

f.close()

print("%d bytes received from %s."%(len(data),url))

urls = ['https://www.python.org/',

'http://www.cnblogs.com/yinshoucheng-golden/',

'https://github.com/']

time_start = time.time()

for url in urls:

    f(url)

    print("同步cost",time.time() - time_start)

    async_time_start = time.time()

gevent.spawn(f,'https://www.python.org/'),

gevent.spawn(f,'http://www.cnblogs.com/yinshoucheng-golden/'),

gevent.spawn(f,'https://github.com/'),

    print("非同步cost",time.time() - async_time_start)

通過gevent實現單執行緒下的多socket併發

server side

import sys,socket,time,gevent

from gevent import socket,monkey

def server(port):

    s = socket.socket()

    s.bind(("0.0.0.0",port))

    s.listen(500)

    cli,addr = s.accept()

gevent.spawn(handle_request,cli)

def handle_request(conn):

    try:

        data = conn.recv(1024)

        print("recv:",data)

        if not data:

            conn.shutdown(socket.SHUT_WR)

            conn.send(data)

    except Exception as ex:

        print(ex)

    finally:

        conn.close()

server(6969)

client side

 

import socket

HOST = "localhost"

PORT = 6969

s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

s.connect((HOST,PORT))

msg = bytes(input(">>:"),encoding="utf8")

s.sendall(msg)

data = s.recv(1024)

# print(data)

print("Received",repr(data))

s.close()

socket併發
 

import socket,threading

def sock_conn():

    client = socket.socket()

    client.connect(("localhost",6969))

    count = 0

    client.send(("hello %s"%count).encode("utf-8"))

    data = client.recv(1024)

    print("%s from server:%s"%(threading.get_ident(),data.decode()))

    count += 1

    client.close()

for i in range(100):

    t = threading.Thread(target=sock_conn)

    t.start()

 

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------

 

事件驅動與非同步IO

寫伺服器處理模型的程式時,有以下幾種模型:

(1)每收到一個請求,建立一個新的程序,來處理該請求。

(2)每收到一個請求,建立一個新的執行緒,來處理該請求。

(3)每收到一個請求,放入一個事件列表,讓主程式通過非阻塞I/O方式來處理請求。

上面的幾種方式,各有千秋。

第一種方法,由於建立新的程序,記憶體開銷比較大。所以,會導致伺服器效能比較差,但實現比較簡單。

第二種方法,由於要涉及到執行緒的同步,有可能會面臨死鎖等問題。

第三種方法,在寫應用程式程式碼時,邏輯比前面兩種都複雜。

綜合考慮各方面因素,一般普遍認為第三種方式是大多數網路伺服器採用的方式。

 

在UI程式設計中,常常要對滑鼠點選進行相應響應,首先如何獲得滑鼠點選呢?

方式一:建立一個執行緒,該執行緒一直迴圈檢測是否有滑鼠點選,那麼這個方式有以下幾個缺點。

  1. CPU資源浪費,可能滑鼠點選的頻率非常小,但是掃描執行緒還是會一直迴圈檢測,這會造成很多的CPU資源浪費;如果掃描滑鼠點選的介面是阻塞的呢?
  2. 如果是阻塞的,又會出現下面這樣的問題。如果我們不但要掃描滑鼠點選,還要掃描鍵盤是否按下,由於掃描滑鼠時被阻塞了,那麼可能永遠不會去掃描鍵盤。
  3. 如果一個迴圈需要掃描的裝置非常多,這又會引起響應時間的問題。

所以,這種方式非常不好。

 

方式二:事件驅動模型

目前大部分的UI程式設計都是事件驅動模型。如很多UI平臺都會提供onClick()事件,這個事件就代表滑鼠點選事件。事件驅動模型大體思路如下。

  1. 有一個事件(訊息)佇列。
  2. 滑鼠按下時,往這個佇列中增加一個點選事件(訊息)。
  3. 有一個迴圈,不斷從佇列取出事件。根據不同的事件,調出不同的函式,如onClick()、onKeyDown()等。
  4. 事件(訊息)一般都各自儲存各自的處理函式指標,這樣每個訊息都有獨立的處理函式。

事件驅動程式設計是一種程式設計正規化,這裡程式的執行流由外部事件來決定。它的特點是包含一個事件迴圈,當外部事件發生時使用回撥機制來觸發相應的處理。另外兩個常見的程式設計正規化是同步(單執行緒)以及多執行緒程式設計。

對比單執行緒、多執行緒以及事件驅動程式設計模型。下圖表示隨著時間的推移,這三種模式下程式所做的工作。這個程式有3個任務需要完成,每個任務都在等待I/O操作時阻塞自身。阻塞在I/O操作上所花費的時間用灰色框表示。

單執行緒同步模型:  任務按照順序執行。如果某個任務因為I/O而阻塞,其他所有的任務必須等待,直到它完成之後才能依次執行其他操作。這種明確的執行順序和序列化處理的行為可以看出,如果各任務之間並沒有相互依賴的關係,但各任務執行仍然需要互相等待,就使得程式整體執行速度降低了。

多執行緒版本:,這3個任務分別在獨立的執行緒中執行。這些執行緒由作業系統來管理,在多處理器系統上可以並行處理,或者在單處理器系統上交替執行。這使得當某個執行緒阻塞在某個資源的同時其他執行緒得以繼續執行。多執行緒程式更加難以判斷,因為這類程式不得不通過執行緒同步機制加鎖、可重入函式、執行緒區域性儲存或者其他機制來處理執行緒安全問題,如果實現不當就會導致出現微妙且令人痛不欲生的BUG。

事件驅動版本: 3個任務交錯執行,但仍然在一個單獨的執行緒控制中。當處理I/O或其他等待操作時,註冊一個回撥到事件迴圈中,然後當I/O操作完成時繼續執行。回撥描述了該如何處理某個事件。事件迴圈輪詢所有的事件,當事件到來時將它們分配給等待處理事件的回撥函式。這種方式讓程式儘可能的得以執行而不需要用到額外的執行緒。事件驅動型程式比多執行緒程式更容易推斷出行為,因為程式設計師不需要關心執行緒安全問題。

 

I/O多路複用

同步I/O和非同步I/O,阻塞I/O和非阻塞I/O分別是什麼,到底有什麼區別?  本文討論的背景是Linux環境下的network I/O。

概念說明

使用者空間與核心空間

現在作業系統都是採用虛擬儲存器,那麼對32位操作系統而言,它的定址空間(虛擬儲存空間)為4G(2的32次方)。作業系統的核心是核心,獨立於普通的應用程式,可以訪問受保護的記憶體空間,也有訪問底層硬體裝置的所有許可權。為了保證使用者程序不能直接操作核心(kernel),保證核心的安全,作業系統將虛擬空間劃分為兩部分,一部分為核心空間,一部分為使用者空間。針對Linux作業系統而言,將最高的1G位元組(從虛擬地址0xC0000000到0xFFFFFFFF),供核心使用,稱為核心空間,而將較低的3G位元組(從虛擬地址0x00000000到0xBFFFFFFF),供各個程序使用,稱為使用者空間。

程序切換

為了控制程序的執行,核心必須有能力掛起正在CPU上執行的程序,並恢復以前掛起的某個程序的執行。這種行為被稱為程序切換。因此可以說,任何程序都是在作業系統核心的支援下執行的,是與核心緊密相關的。

從一個程序的執行轉到另一個程序上執行,這個過程中經過下面過程:

  1. 儲存處理機上下文,包括程式計數器和其他暫存器。
  2. 更新PCB資訊。
  3. 把程序的PCB移入相應的佇列,如就緒、在某事件阻塞等佇列。
  4. 選擇另一個程序執行,並更新其PCB。
  5. 更新記憶體管理的資料結構。
  6. 恢復處理機上下文。

程序控制塊(Processing Control Block),是作業系統核心中一種資料結構,主要表示程序狀態。其作用是使一個在多道程式環境下不能獨立執行的程式(含資料),成為一個能獨立執行的基本單位或與其它程序併發執行的程序。或者說,作業系統OS是根據PCB來對併發執行的程序進行控制和管理的。PCB通常是系統記憶體佔用區中的一個連續存放區,它存放著作業系統用於描述程序情況及控制程序執行所需的全部資訊。

程序的阻塞

正在執行的程序,由於期待的某些事件未發生,如請求系統資源失敗、等待某種操作的完成、新資料尚未到達或無新任務執行等,則由系統自動執行阻塞(Block),使自己由執行狀態變為阻塞狀態。可見,程序的阻塞是程序自身的一種主動行為,也因此只有處於執行狀態的程序(獲得CPU),才能將其轉為阻塞狀態。當程序進入阻塞狀態,是不佔用CPU資源的。

檔案描述符fd

檔案描述符(File descriptor)是電腦科學中的一個術語,是一個用於表述指向檔案的引用的抽象化概念。

檔案描述符在形式上是一個非負整數。實際上,它是一個索引值,指向核心為每一個程序所維護的該程序開啟檔案的記錄表。當程式開啟一個現有檔案或者建立一個新檔案時,核心向程序返回一個檔案描述符。在程式設計中,一些設計底層的程式編寫往往會圍繞著檔案描述符展開。但是檔案描述符這一概念往往只適用於UNIX、Linux這樣的作業系統。

快取I/O

快取I/O又被稱作標準I/O,大多數檔案系統的預設I/O操作都是快取I/O。在Linux的快取I/O機制中,作業系統會將I/O的資料快取在檔案系統的頁快取(page cache)中,也就是說,資料會先被拷貝到作業系統核心的緩衝區中,然後才會從作業系統核心的緩衝區拷貝到應用程式的地址空間。

快取I/O的缺點:

資料在傳輸過程中需要在應用程式地址空間和核心進行多次資料拷貝操作,這些資料拷貝操作所帶來的CPU以及記憶體開銷是非常大的。

IO模式

一次IO訪問(以read為例),資料會先被拷貝到作業系統核心的緩衝區,然後從系統核心緩衝區拷貝到應用程式的地址空間。

當一個read操作發生時,會經歷兩個階段:

  1. 等待資料準備(waiting for the data to be ready)。
  2. 將資料從核心拷貝到程序中(Copying the data from the kernel to the process)。

正是因為這兩個階段,Linux系統產生了下面五種網路模式的方案。

  1. 阻塞I/O(blocking IO)。
  2. 非阻塞I/O(nonblocking IO)
  3. I/O多路複用(IO multiplexing)
  4. 訊號驅動I/O(signal driven IO)
  5. 非同步I/O(asynchronous IO)

由於訊號驅動I/O(signal driven IO)在實際中並不常用,所以只剩下四種IO模式。

 

阻塞I/O(blocking IO)

在Linux中,預設情況下所有的Socket都是blocking,一個典型的讀操作流程如下:

當用戶程序呼叫了recvfrom,kernel就開始了IO的第一個階段,準備資料。對於網路IO來說,很多時候資料在一開始還沒有到達。比如還沒有收到一個完整的UDP包,這個時候kernel就要等待足夠的資料到來。這個過程需要等待,也就是說資料被拷貝到作業系統核心的緩衝區中是需要一個過程的。而在使用者程序這邊,整個程序會被阻塞。當kernel一直等到資料準備好了,它就會將資料從kernel中拷貝到使用者記憶體,然後kernel返回結果,使用者程序才解除block的狀態,重新執行起來。

所以,blocking IO的特點就是在IO執行的兩個階段都被block了。

Linux下,可以通過設定Socket使其變為non-blocking。當對一個non-blocking socket執行讀操作時,流程如下:

當用戶程序發出read操作時,如果kernel中的資料還沒有準備好,那麼它並不會block使用者程序,而是立刻返回一個error。從使用者程序角度講,它發起一個read操作後,並不需要等待,而是馬上就得到了一個結果。使用者程序判斷結果是一個error時,它就知道資料還沒有準備好,於是它可以再次傳送read操作。一旦kernel中的資料準備好了,並且又再次收到了使用者程序的system call,那麼它馬上將資料拷貝到了使用者記憶體,然後返回。

所以,nonblocking IO的特點是使用者程序需要不斷的主動詢問kernel資料好了沒有。

 

IO multiplexing(IO多路複用 也叫事件驅動IO)

就是平時所說的select、poll、epoll

select/epoll的好處就在於單個process就可以同時處理多個網路連線的IO

基本原理: select、poll、epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有資料到達了,就通知使用者程序。

當用戶程序呼叫了select,那麼整個程序會被block。而同時kernel會"監視"所有select負責的socket,當任何一個socket中的資料準備好了,select就會返回。這個時候使用者程序再呼叫read操作,將資料從kernel拷貝到使用者程序。

所以,I/O多了複用的特點是通過一種機制一個程序能同時等待多個檔案描述符,而這些檔案描述符(套接字描述符)其中的任意一個進入讀就緒狀態,select()函式就可以返回。

這個圖和blocking IO的圖其實並沒有太大的不同。事實上還更差一些,因為這裡需要使用兩個system call(select和recvfrom),而blocking IO只調用了一個system call(recvfrom)。但是用select的優勢在於它可以同時處理多個connection。

實際在IO multiplexing Model中,對於每一個socket一般都設定成為non-blocking。但是如上圖所示整個使用者的process其實是一直被block的。只不過process是被select這個函式block,而不是被socket IO給block。

Linux下的asynchronous IO其實用得很少。

使用者程序發起read操作之後,離開就可以開始去做其它的事。而另一個方面,從kernel的角度,當它受到一個asynchronous read之後,首先它會立刻返回,所以不會對使用者程序產生任何block。然後kernel會等待資料準備完成,然後將資料拷貝到使用者記憶體,當這一切都完成之後,kernel會給使用者程序傳送一個signal,告訴它read操作完成了。

 

總結

blocking和non-blocking的區別

呼叫blocking IO會一直block,直到對應的程序操作完成。而non-blocking IO在kernel還在準備資料的情況下就會立刻返回。

synchronous IO和asynchronous IO的區別

在說明synchronous IO和asynchronous IO的區別之前,需要先給出兩者的定義。POSIX的定義:

  • synchronous IO會導致請求程序被阻塞,直到該輸I/O操作完成。
  • asynchronous IO不會導致請求程序被阻塞。

兩者的區別就在於synchronous IO做"IO operation"的時候會將process阻塞。按照這個定義之前所述的blocking IO、non-blocking IO、IO multiplexing都屬於synchronous IO。

有人認為non-blocking IO並沒有被block,這裡是非常容易誤解的地方。定義中所指的"IO operation"是指真實的IO操作,就是例子中的recvfrom這個system call。non-blocking IO在執行recvfrom這個system call的時候,如果kernel的資料沒有準備好,這時候不會block程序。但是當kernel中資料準備好的時候,recvfrom會將資料從kernel拷貝到使用者記憶體中,這個時候程序是被block了,這段時間內程序是被block的。

而asynchronous IO則不一樣,當程序發起IO操作之後,就直接返回再也不理睬了,直到kernel傳送一個訊號,告訴程序說IO完成。在這整個過程中程序完全沒有被block。

各個IO model的比較如下圖:

通過上面的圖片可以發現non-blocking IO和asynchronous IO的區別還是很明顯的。在non-blocking IO中,雖然程序大部分時間都不會被block,但是它仍然要求程序主動的check,並且當資料準備完成之後,也需要程序主動的再次呼叫recvfrom來講資料拷貝到使用者記憶體。而asynchronous IO則完全不同,它就像是使用者程序將整個IO操作交給了他人(kernel)完成,然後kernel做完後發訊號通知。在此期間使用者程序不需要去檢查IO操作的狀態,也不需要主動的去拷貝資料。

 

 

I/O多路複用select、poll、epoll詳解

select、poll、epoll都是IO多路複用的機制。I/O多路複用就是通過一種機制,一個程序可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程式進行相應的讀寫操作。但select、poll、epoll本質上都是同步I/O,因為他們都需要在讀寫事件就緒後自己負責進行讀寫,也就是說這個讀寫過程是阻塞的,而非同步I/O則無需自己負責進行讀寫,非同步I/O的實現會負責把資料從核心拷貝到使用者空間。

select

select(rlist,wlist,xlist,timeout=None)

select函式監視的檔案描述符分3類,分別是writefds、readfds和execptfds。呼叫後select函式會阻塞,直到有描述符就緒(有資料可讀、可寫或有except)或者超時(timeout指定等待時間,如果立即返回設為null即可)函式返回。當select函式返回後,可以通過遍歷fdset,來找到就緒的描述符。

select目前幾乎在所有的平臺上支援,其良好跨平臺支援也是它的一個優點。select的一個缺點在於單個程序能夠監視的檔案描述符的數量存在最大限制,在Linux上一般為1024,可以通過修改巨集定義甚至重新編譯核心的方式提升這一限制,但是這樣也會造成效率的降低。

poll

int poll(struct pollfd *fds,unsigned,int nfds,int timeout)

select使用了三個點陣圖來表示三個fdset的方式,poll使用一個pollfd的指標實現。

struct pollfd{

int fd; # 檔案描述符

short events; # 請求

short revents; # 響應

}

pollfd結構包含了要監視的event和發生的event,不再使用select"引數-值"傳遞的方式。同時pollfd並沒有最大數量限制(但是數量過多後效能也是會下降)。和select函式一樣,poll返回後,需要輪詢pollfd來獲取就緒的描述符。

從上面可以看出,select和poll都需要在返回後通過遍歷檔案描述符來獲取已經就緒的socket。事實上,同時連線的大量客戶端在一時刻可能只有很少的處於就緒狀態,因此隨著監視的描述符數量的增長,其效率也會線性下降。

epoll

epoll是在2.6核心中提出的,是之前的select和poll的增強版本。相對於select和poll來說,epoll更加靈活,沒有描述符限制。epoll使用一個檔案描述符管理多個描述符,將使用者關係的檔案描述符的事件存放到核心的一個事件表中,這樣在使用者空間和核心空間的copy只需一次。

epoll操作過程需要三個介面。

int epoll_create(int size); # 建立一個epoll的控制代碼,size用來告訴核心監聽的數量

int epoll_ctl(int epfd,int op,int fd,struct epoll_event *event);

int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout);

int epoll_create(int size);

建立一個epoll的控制代碼,size用來告訴核心監聽的數量,這個引數不同於select()中的第一個引數,給出最大監聽的fd+1的值,引數size並不是限制了epoll所能監聽的描述符最大個數,只是對核心初始分配內部資料結構的一個建議。

當建立好epoll控制代碼後,它就會佔用一個fd值,在linux下如果檢視/proc/程序id/fd/,是能夠看到這個fd的,所以在使用完epoll後,必須呼叫close()關閉,否則可能導致fd被耗盡。

函式是對指定描述符fd執行op操作。

epfd:epoll_create()的返回值。

op:op操作,用三個巨集來表示,新增EPOLL_CTL_ADD,刪除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分別新增、刪除和修改對fd的監聽事件。

fd:需要監聽的fd(檔案描述符)。

epoll_event:核心需要監聽的目標。

等待epfd上的io事件,最多返回maxevents個事件。

引數events用來從核心得到事件的集合,maxevents告之核心這個events有多大,這個maxevents的值不能大於建立epoll_create()時的size,引數timeout是超時時間(毫秒,0會立即返回,-1將不確定)。該函式返回需要處理的事件數目,如返回0表示已超時。

 

select、poll、epoll三者的區別

select

select最早於1983年出現在4.2BSD中,它通過一個select()系統呼叫來監視多個檔案描述符的陣列,當select()返回後,該陣列中就緒的檔案描述符便會被核心修改標誌位,使得程序可以獲得這些檔案描述符從而進行後續的讀寫操作。

select目前幾乎在所有的平臺上支援,其良好跨平臺支援是它的一個優點,從現在看來,這也是它所剩不多的優點之一。

select的一個缺點在於單個程序能夠監視的檔案描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改巨集定義甚至重新編譯核心方式提升這一限制。

另外,select()所維護的儲存大量檔案描述符的資料結構,隨著檔案描述符數量的增大,其複製的開銷也線性增大。同時,由於網路響應時間的延遲使得大量TCP連線處於非活躍狀態,但呼叫select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷。

poll

poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差別,但是poll沒有最大檔案描述符數量的限制

poll和select同樣存在一個缺點就是,包含大量檔案描述符的陣列被整體複製與使用者態和核心的地址空間之間,而不論這些檔案描述符是否就緒,它的開銷隨著檔案描述符數量的增加而線性增大。

另外,select()和poll()將就緒的檔案描述符告訴程序後,如果程序沒有對其進行IO操作,那麼下次呼叫select()和poll()的時候將再次報告這些檔案描述符,所以它們一般不會丟失就緒的訊息,這種方式稱為水平觸發(Level Triggered)。

epoll

直到Linux 2.6才出現了由核心直接支援的實現方法,那就是epoll,它幾乎具備了之前所說的一切優點,被公認為Linux 2.6下效能最好的多路I/O就緒通知方法。

epoll可以同時支援水平觸發和邊緣觸發(Edge Triggered,只告訴程序哪些檔案描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那麼它就不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的效能要更高一些,但程式碼實現相當複雜。

epoll同樣只告知那些就緒的檔案描述符,而且當我們呼叫epoll_wait()獲得就緒檔案描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的檔案描述符即可,這裡也使用了記憶體對映(mmap)技術,這樣便徹底省掉了這些檔案描述符在系統呼叫時複製的開銷。

另一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,程序只有在呼叫一定的方法後,核心才對所有監視的檔案描述符進行描述,而epoll事先通過epoll_ctl()來註冊一個檔案描述符,一旦基於某個檔案描述符就緒時,核心會採用類似callback的回撥機制,迅速啟用這個檔案描述符,當程序呼叫epoll_wait()時便得到通知。

 

Python select

Python的select()方法直接呼叫作業系統的IO介面,它監控sockets、open files、pipes(所有帶fileno()方法的檔案控制代碼)何時變成readable和writeable或者通訊錯誤,select()使得同時監控多個連線變得簡單,並且這比寫一個長迴圈來等待和監控多客戶端連線要高效,因為select直接通過作業系統提供的C的網路介面進行操作,而不是通過Python的直譯器。

注意:Using Python's file objects with select() works for Unix, but is not supported under Windows.

select_socket_server

__author__ = 'Golden'

#!/usr/bin/env python3

# -*- coding:utf-8 -*-

import select,socket,sys,queue

server = socket.socket()

server.setblocking(0)

server_addr = ('localhost',6969)

print('starting up on %s port %s'%server_addr)

server.bind(server_addr)

server.listen(5)

# 監測自己,因為server本身也是個fd

inputs = [server,]

outputs = []

message_queues = {}

print('waiting for next event...')

# 如果沒有任何fd就緒,程式會一直阻塞在這裡

readable,writeable,exeptional = select.select(inputs,outputs,inputs)

# 每個s就是一個socket

for s in readable:

# 上面server自己也當做一個fd放在了inputs列表裡,傳給了select,如果s是server代表server這個fd就緒了,即新的連線進來

if s is server:

# 接收這個連線

conn,client_addr = s.accept()

print('new connection from',client_addr)

conn.setblocking(0)

"""

為了不阻塞整個程式,不會立刻在這裡開始接收客戶端發來的資料,把它放到inputs裡,下一次loop時,

這個新連線就會被交給select去監聽,如果這個連線的客戶端發來了資料,那麼這個連線的fd在server端就會變成就緒的,

select就會把這個資料返回到readable列表裡,然後就可以loop readable列表,取出這個連線,開始接收資料

inputs.append(conn)

# 接收到客戶端的資料後,不立刻返回,暫存在佇列裡,以後傳送

message_queues[conn] = queue.Queue()

# s不是server那就只會是一個與客戶端建立的連線的fd

else:

# 接收客戶端的資料

if data:

print('收到來自【%s】的資料:'%s.getpeername()[0],data)

# 收到的資料先放入queue裡,一會返回給客戶端

message_queues[s].put(data)

if s not in outputs:

# 為了不影響處理與其它客戶端的連線,這裡不立刻返回資料給客戶端

outputs.append(s)

# 如果收不到data,代表客戶端已斷開

print('客戶端已斷開...',s)

if s in outputs:

# 清理已斷開的連線

outputs.remove(s)

inputs.remove(s)

del message_queues[s]

for s in writeable:

next_msg = message_queues[s].get_nowait()

except queue.Empty:

print('client [%s]'%s.getpeername()[0],'queue is empty...')

print('sending msg to [%s]'%s.getpeername()[0],next_msg)

s.send(next_msg.upper())

for s in exeptional:

print('handling exception for',s.getpeername())

select_socket_client

import socket,sys

messages = [b'This is the message.',

b'It will be sent',

b'in parts.',

]

server_address = ('localhost',6969)

# 建立一個TCP/IP連線

socks = [socket.socket(socket.AF_INET,socket.SOCK_STREAM),

socket.socket(socket.AF_INET,socket.SOCK_STREAM),

socket.socket(socket.AF_INET,socket.SOCK_STREAM),]

print('connecting to %s port %s'%server_address)

for s in socks:

s.connect(server_address)

for message in messages:

# 傳送資料

print('%s:sending "%s"'%(s.getsockname(),message))

s.send(message)

# 接收資料

print('%s:received "%s"'%(s.getsockname(),data))

print(sys.stderr,'closing socket',s.getsockname())

selectors

selectors模組可以實現IO多路複用,它具有根據平臺選出最佳的IO多路機制,例如在windows上預設是select模式,而在linux上預設是epoll。常分為三種模式select、poll和epoll。

selector_socket_server:

import selectors,socket

sel = selectors.DefaultSelector()

def accept(sock,mask):

conn,addr = sock.accept()

print('accrpted',conn,'form',addr)

sel.register(conn,selectors.EVENT_READ,read)

def read(conn,mask):

print('echoing',repr(data),'to',conn)

print('closing',conn)

sel.unregister(conn)

sock = socket.socket()

sock.bind(('localhost',6969))

sock.listen(100)

sock.setblocking(0)

sel.register(sock,selectors.EVENT_READ,accept)

events = sel.select()

for key,mask in events:

callback = key.data

callback(key.fileobj,mask)

程式碼方面還有些許粗糙。大家多多見諒,要改進的地方,歡迎大家在下方留言。