網路IO-阻塞、非阻塞、IO複用、非同步
網路socket輸入操作分為兩個階段:等待網路資料到達和將到達核心的資料複製到應用程序緩衝區。對這兩個階段不同的處理方式將網路IO分為不同的模型:IO阻塞模型、非阻塞模型、多路複用和非同步IO。本文可執行程式碼連結:https://github.com/killianxu/network_example
一 阻塞模型
阻塞模型原理如下圖1.1,當進行系統呼叫recvfrom時,應用程序進入核心態,核心判斷是否已收到資料報,若沒有則阻塞直到資料報準備好,接著複製資料到應用程序緩衝區,然後函式返回。
圖1.1 阻塞IO模型
阻塞模型缺點:若資料報未準備好,則執行緒阻塞,不能進行其它操作和網路連線請求。
利用執行緒池或連線池,可以減少資源消耗。執行緒池利用已有執行緒,減少執行緒頻繁建立和銷燬,執行緒維持在一定數量,當有新的連線請求時,重用已有執行緒。連線池儘量重用已有連線,減少連線的建立和關閉。執行緒池和連線池一定程度上緩解頻繁IO的資源消耗,但執行緒池和連線池都有一定規模,當連線請求數遠超過池上線,池系統構成的響應並不比多執行緒方案好多少。[1]
阻塞模型python例項demo如下:
def start_blocking(self): """同步阻塞server""" self.ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.ssock.bind(('', 8080)) self.ssock.listen(5) count = 0 while True: conn, addr = self.ssock.accept() count += 1 print 'Connected by', addr print 'Accepted clinet count:%d' % count data = conn.recv(1024) #若無資料則阻塞 if data: conn.sendall(data) conn.close()
阻塞模型client
def start_blocking(self): self.host = '123.207.123.108' self.port = 8080 self.csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.csock.connect((self.host, self.port)) data = self.csock.recv(1024) print data
執行server端,並執行兩個client例項去連線服務端,執行結果如下圖1.2,可以看到雖然有兩個客戶端去連線,但卻只有一個連線上,服務端的socket conn為阻塞套接字,conn.recv(1024)未收到客戶端傳送的資料,處於阻塞狀態,服務端無法再響應另一個客戶端的連線。
圖1.2 阻塞IO服務端執行結果
二 非阻塞模型
由於阻塞IO無法滿足大規模請求的缺點,因此出現了非阻塞模型。非阻塞IO模型如下圖1.3所示,當資料報未準備好,recvfrom立即返回一個EWOULDBLOCK錯誤,可以利用輪詢不停呼叫recvfrom,當資料報準備好,核心則將資料複製到應用程序緩衝區。
圖1.3 非阻塞IO模型
非阻塞IO模型需要利用輪詢不斷呼叫recvfrom,浪費大量CPU時間,且當核心接收到資料時,需要等到下一次輪詢才能複製到應用程序緩衝區,資料得不到立刻處理。
非阻塞模型python demo如下:
非阻塞服務端
def start_noblocking(self): """ 同步非阻塞 """ self.ssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.ssock.bind(('', 8080)) self.ssock.listen(5) count = 0 while True: conn, addr = self.ssock.accept() conn.setblocking(0) #設定為非阻塞socket count += 1 print 'Connected by', addr print 'Accepted clinet count:%d' % count try: data = conn.recv(1024) #非阻塞,沒有資料會立刻返回 if data: conn.sendall(data) except Exception as e: pass finally: conn.close()
執行非阻塞服務端和兩個客戶端例項,結果如下圖1.4所示,服務端接收兩個連線請求。由於conn被設定為非阻塞socket,即使客戶端並沒有向服務端傳送資料,conn.recv(1024)也會立即返回,不會阻塞,從而程序可以接收新的連線請求。
圖1.4 非阻塞服務端執行結果
三 IO複用
IO複用在linux中包括select、poll、epoll模型三種,這三個IO複用模型有各自的API實現,以select模型為例,呼叫select函式,程序進入阻塞, 同時監控多個套接字描述符的狀態 ,當有資料報變為可讀或阻塞超時才返回,接著程序可呼叫recvfrom接收資料報到應用程序緩衝區。
圖3.1 IO複用模型
使用IO複用的優點是可以等待多個描述符就緒。[2]
3.1 select模型
select模型api如下:
int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,struct timeval *timeout);
timout表示核心等待任一描述符就緒可等待的時間,有三種情況:
1) 空指標,表示可以一直等下去,直到有描述符就緒。
2) timeout時間為0,不等待檢查描述符狀態立即返回。
3) 時間不為0,表示等待一定時間,在有描述符準備好但不超過timeval結構所指定的秒數和微秒數。
readset、writeset、exceptset指定需要核心測試讀、寫和異常條件的描述符。fd_set表示描述符集,在select中用整數陣列表示,整數的每一位表示一個描述符, readset、writeset、exceptset這三個引數是值-結果型別。
可以用以下幾個巨集設定和測試fd_set。在呼叫select函式前,用1、2、3設定需要監控的描述符,迴圈呼叫4測試呼叫select函式後的描述符,看是否準備好。
1) int FD_ZERO(int fd, fd_set *fdset);
2) int FD_CLR(int fd, fd_set *fdset);
3) int FD_SET(int fd, fd_set *fd_set);
4) int FD_ISSET(int fd, fd_set *fdset);
導致select返回某個套接字就緒的條件如下:
圖3.2 就緒條件
maxfd1表示指定待測試描述符個數,值為待測試描述符最大值加1,用這個引數可告訴核心最大隻遍歷到maxfd1-1的描述符。maxfd1最大不能超過常量FD_SETSIZE(值預設為1024,更改該值需重新編譯核心)。
select函式的返回值為整數,表示跨所有描述符集已就緒的總位數。如果超時則返回0。返回-1表示出錯,比如被中斷[3]。
select實現原理:從使用者空間拷貝fd_set到核心空間,遍歷所有fd,將當前程序掛到各個裝置的等待佇列中,掛到佇列的同時會返回是否就緒的掩碼,當所有fd返回的掩碼均未就緒,則當前程序睡眠。當fd對應裝置驅動發現可讀寫時,則會喚醒處於睡眠態的程序。如果超過一定時間還未喚醒, 則呼叫select的程序會重新被喚醒獲得CPU,進而重新遍歷fd,判斷有沒有就緒的fd,將fd_set從核心空間拷到使用者空間[4]。
select實現的缺點:
1) 每次都需要將fd_set拷貝到核心空間,當fd_set較大時開銷很大
2) 每次都需要在核心中遍歷fd加入到等待佇列,fd較多開銷較大
3) Select支援的檔案描述符太小,預設為1024。
select模型python demo如下:
select模型服務端
def start(self): # create a socket server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False) # set option reused server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_address = ('', 8080) server.bind(server_address) server.listen(10) # sockets from which we except to read inputs = [server] # sockets from which we expect to write outputs = [] # Outgoing message queues (socket:Queue) message_queues = {} # A optional parameter for select is TIMEOUT timeout = 20 while inputs: print "waiting for next event" # 每次呼叫select函式,需要將所有socket重新傳一次 readable, writable, exceptional = select.select( inputs, outputs, inputs, timeout) # When timeout reached , select return three empty lists if not (readable or writable or exceptional): print "Time out ! " break for s in readable: if s is server: # 監聽套接字 # A "readable" socket is ready to accept a connection connection, client_address = s.accept() print " connection from ", client_address connection.setblocking(0) inputs.append(connection) message_queues[connection] = Queue.Queue() else: data = s.recv(1024) # 接收到資料 if data: print " received ", data, "from ", s.getpeername() message_queues[s].put(data) # Add output channel for response if s not in outputs: outputs.append(s) else: # 讀這端的連線關閉 # Interpret empty result as closed connection print " closing", client_address if s in outputs: outputs.remove(s) inputs.remove(s) s.close() # remove message queue del message_queues[s] for s in writable: try: next_msg = message_queues[s].get_nowait() except Queue.Empty: print " ", s.getpeername(), 'queue empty' outputs.remove(s) else: print " sending ", next_msg, " to ", s.getpeername() s.send(next_msg) for s in exceptional: print " exception condition on ", s.getpeername() # stop listening for input on the connection inputs.remove(s) if s in outputs: outputs.remove(s) s.close() # Remove message queue del message_queues[s]
select模型客戶端
def start(self): messages = ["hello world"] print "Connect to the server" server_address = ("123.207.123.108",8080) #Create a TCP/IP sock socks = [] for i in range(3): socks.append(socket.socket(socket.AF_INET,socket.SOCK_STREAM)) for s in socks: s.connect(server_address) counter = 0 for message in messages : #Sending message from different sockets for s in socks: counter+=1 print " %s sending %s" % (s.getpeername(),message+" version "+str(counter)) s.send(message+" version "+str(counter)) #Read responses on both sockets for s in socks: data = s.recv(1024) print " %s received %s" % (s.getpeername(),data) if not data: print "%s closing socket "%s.getpeername() s.close()
分別執行服務端和客戶端,結果如下:
圖3.3 select模型服務端執行結果
圖3.4 select模型客戶端執行結果
3.2 poll模型
poll模型api如下[8]:
#include <poll.h> int poll(struct pollfd fds[], nfds_t nfds, int timeout); typedef struct pollfd { int fd; // 需要被檢測或選擇的檔案描述符 short events; // 對檔案描述符fd上感興趣的事件 short revents; // 檔案描述符fd上當前實際發生的事件*/ } pollfd_t;
1) poll()函式返回fds集合中就緒的讀、寫,或出錯的描述符數量,返回0表示超時,返回-1表示出錯;
2) fds是一個struct pollfd型別的陣列,用於存放需要檢測其狀態的socket描述符,並且呼叫poll函式之後fds陣列不會被清空;
3) nfds記錄陣列fds中描述符的總數量;
4) timeout是呼叫poll函式阻塞的超時時間,單位毫秒;
5) 一個pollfd結構體表示一個被監視的檔案描述符,通過傳遞fds[]指示 poll() 監視多個檔案描述符。其中,結構體的events域是監視該檔案描述符的事件掩碼,由使用者來設定這個域,結構體的revents域是檔案描述符的操作結果事件掩碼,核心在呼叫返回時設定這個域。events域中請求的任何事件都可能在revents域中返回。
合法的事件如下:
1) POLLIN 有資料可讀
2) POLLRDNORM 有普通資料可讀
3) POLLRDBAND 有優先資料可讀
4) POLLPRI 有緊迫資料可讀
5) POLLOUT 寫資料不會導致阻塞
6) POLLWRNORM 寫普通資料不會導致阻塞
7) POLLWRBAND 寫優先資料不會導致阻塞
8) POLLERR 發生錯誤
9) POLLHUP 發生掛起
當需要監聽多個事件時,使用POLLIN | POLLPRI設定 events 域;當poll呼叫之後檢測某事件是否發生時,fds[i].revents & POLLIN進行判斷
poll模型和select模型相似,poll模型同樣需要將所有監控的描述符重新拷貝到核心,並在核心中對所有描述符進行遍歷,沒有解決select模型的效能問題,但是poll模型沒有最大檔案描述符數量的限制。
select()和poll()將就緒的檔案描述符告訴程序後,如果程序沒有對其進行IO操作,那麼下次呼叫select()和poll()的時候將再次報告這些檔案描述符,所以它們一般不會丟失就緒的訊息,這種方式稱為水平觸發[5]。
poll模型python demo如下:
def start(self)://poll模型服務端 # Create a TCP/IP socket, and then bind and listen server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False) server_address = ('', 8080) print "Starting up on %s port %s" % server_address server.bind(server_address) server.listen(5) message_queues = {} # The timeout value is represented in milliseconds, instead of seconds. timeout = 5000 # Create a limit for the event,POLLIN = POLLRDNORM | POLLRDBAND READ_ONLY = (select.POLLIN | select.POLLPRI) READ_WRITE = (READ_ONLY | select.POLLOUT) #POLLOUT=POLLWRNORM | POLLWRBAND # Set up the poller poller = select.poll() poller.register(server, READ_ONLY) # Map file descriptors to socket objects fd_to_socket = {server.fileno(): server, } while True: print "Waiting for the next event" events = poller.poll(timeout) if len(events) == 0: print 'Time out' break print "*" * 20 print len(events) print events print "*" * 20 for fd, flag in events: s = fd_to_socket[fd] if flag & (select.POLLIN | select.POLLPRI): if s is server: # A readable socket is ready to accept a connection connection, client_address = s.accept() print " Connection ", client_address connection.setblocking(False) fd_to_socket[connection.fileno()] = connection poller.register(connection, READ_ONLY) # Give the connection a queue to send data message_queues[connection] = Queue.Queue() else: data = s.recv(1024) if data: # A readable client socket has data print " received %s from %s " % (data, s.getpeername()) message_queues[s].put(data) poller.modify(s, READ_WRITE) else: # Close the connection print " closing", s.getpeername() # Stop listening for input on the connection poller.unregister(s) s.close() del message_queues[s] elif flag & select.POLLHUP: # A client that "hang up" , to be closed. print " Closing ", s.getpeername(), "(HUP)" poller.unregister(s) s.close() elif flag & select.POLLOUT: # Socket is ready to send data , if there is any to send try: next_msg = message_queues[s].get_nowait() except Queue.Empty: # No messages waiting so stop checking print s.getpeername(), " queue empty" poller.modify(s, READ_ONLY) else: print " sending %s to %s" % (next_msg, s.getpeername()) s.send(next_msg) elif flag & select.POLLERR: # Any events with POLLERR cause the server to close the # socket print " exception on", s.getpeername() poller.unregister(s) s.close() del message_queues[s]
3.3 epoll模型
epoll模型api包含三個系統呼叫[7]:
#include <sys/epoll.h> int epoll_create(int size); int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
1. epoll_create建立epoll控制代碼epfd。size表示在這個epoll fd上能關注的最大fd數,失敗時返回-1。
2. epoll_ctl註冊要監聽的事件。
1) epfd表示epoll控制代碼;
2) op表示fd操作型別:EPOLL_CTL_ADD(註冊新的fd到epfd中),EPOLL_CTL_MOD(修改已註冊的fd的監聽事件),EPOLL_CTL_DEL(從epfd中刪除一個fd)
3) fd是要監聽的描述符;
4) event表示要監聽的事件; EPOLLIN表示對應的檔案描述符可以讀(包括對端SOCKET正常關閉);EPOLLOUT表示對應的檔案描述符可以寫;EPOLLPRI表示對應的檔案描述符有緊急的資料可讀(這裡應該表示有帶外資料到來);EPOLLERR表示對應的檔案描述符發生錯誤;EPOLLHUP表示對應的檔案描述符被結束通話;EPOLLET將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來說的。EPOLLONESHOT只監聽一次事件,當監聽完這次事件之後,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL佇列裡[8]。
3. epoll_wait函式等待事件就緒,成功時返回就緒的事件數目,呼叫失敗時返回 -1,等待超時返回 0。
1) epfd是epoll控制代碼
2) events表示從核心得到的就緒事件集合
3) maxevents告訴核心events的大小
4) timeout表示等待的超時事件
epoll_event結構體定義如下:
struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ }; typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t;
epoll模型利用三個函式代替select和poll模型的三個函式,可以避免select模型的三個缺點。
1) 不需要每次都將相同的fd監聽事件重新拷貝到核心。epoll的解決方案在epoll_ctl函式中。每次註冊新的事件到epoll控制代碼中時(在epoll_ctl中指定EPOLL_CTL_ADD),會把所有的fd拷貝進核心,而不是在epoll_wait的時候重複拷貝。epoll保證了每個fd在整個過程中只會拷貝一次。
2) 不需要再核心中遍歷所有fd來看事件是否就緒。epoll的解決方案不像select或poll一樣每次都把current程序輪流加入fd對應的裝置等待佇列中,而只在epoll_ctl時把current程序掛一遍(這一遍必不可少)併為每個fd指定一個回撥函式,當裝置就緒,喚醒等待佇列上的等待者時,就會呼叫這個回撥函式,而這個回撥函式會把就緒的fd加入一個就緒連結串列)。epoll_wait的工作實際上就是在這個就緒連結串列中檢視有沒有就緒的fd。
3) 所監聽的檔案描述符的數目不像select有上限限制, 所支援的FD上限是最大可以開啟檔案的數目。
epoll對檔案描述符的操作有兩種模式:LT(level trigger,水平觸發)和ET(edge trigger)。
1) 水平觸發:預設工作模式,即當epoll_wait檢測到某描述符事件就緒並通知應用程式時,應用程式可以不立即處理該事件;下次呼叫epoll_wait時,會再次通知此事件。
2) 邊緣觸發:當epoll_wait檢測到某描述符事件就緒並通知應用程式時,應用程式必須立即處理該事件。如果不處理,下次呼叫epoll_wait時,不會再次通知此事件。(直到你做了某些操作導致該描述符變成未就緒狀態了,也就是說邊緣觸發只在狀態由未就緒變為就緒時通知一次)。
ET模式很大程度上減少了epoll事件的觸發次數,因此效率比LT模式高。
epoll模型python demo如下:
def start(self): # Create a TCP/IP socket, and then bind and listen server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False) server_address = ('', 8080) print "Starting up on %s port %s" % server_address server.bind(server_address) server.listen(5) message_queues = {} # The timeout value is represented in milliseconds, instead of seconds. timeout = 5000 # Create a limit for the event READ_ONLY = (select.EPOLLIN) READ_WRITE = (READ_ONLY | select.EPOLLOUT) # Set up the epoll epoll = select.epoll() epoll.register(server.fileno(), READ_ONLY) # Map file descriptors to socket objects fd_to_socket = {server.fileno(): server, } while True: print "Waiting for the next event" events = epoll.poll(timeout) if len(events) == 0: print 'Time out' break print "*" * 20 print len(events) print events print "*" * 20 for fd, flag in events: s = fd_to_socket[fd] if flag & (select.EPOLLIN): if s is server: # A readable socket is ready to accept a connection connection, client_address = s.accept() print " Connection ", client_address connection.setblocking(False) fd_to_socket[connection.fileno()] = connection epoll.register(connection, READ_ONLY) # Give the connection a queue to send data message_queues[connection] = Queue.Queue() else: data = s.recv(1024) if data: # A readable client socket has data print " received %s from %s " % (data, s.getpeername()) message_queues[s].put(data) epoll.modify(s, READ_WRITE) else: # Close the connection print " closing", s.getpeername() # Stop listening for input on the connection epoll.unregister(s) s.close() del message_queues[s] elif flag & select.EPOLLHUP: # A client that "hang up" , to be closed. print " Closing ", s.getpeername(), "(HUP)" epoll.unregister(s) s.close() elif flag & select.EPOLLOUT: # Socket is ready to send data , if there is any to send try: next_msg = message_queues[s].get_nowait() except Queue.Empty: # No messages waiting so stop checking print s.getpeername(), " queue empty" epoll.modify(s, READ_ONLY) else: print " sending %s to %s" % (next_msg, s.getpeername()) s.send(next_msg) elif flag & select.epollERR: # Any events with epollR cause the server to close the # socket print " exception on", s.getpeername() epoll.unregister(s) s.close() del message_queues[s]
3.4 IO複用小結
沒有IO複用之前,用阻塞型IO,必須為每個建立的連線建立執行緒或執行緒,當面對大量連線時, 嚴重浪費系統資源,影響程序的響應效率,用非阻塞IO,需要輪詢測試socket集合是否已經讀寫就緒,在已經就緒和測試到就緒有一定的時延,資料得不到及時處理。利用IO複用, 同時可監控多個套接字描述符的狀態,而不用像阻塞型IO,每個套接字需要一個執行緒或程序處理,也不像非阻塞IO,存在處理時延,IO複用函式是阻塞函式,不用輪詢測試,有socket就緒或超時才會返回。
IO複用分為select、poll、epoll模型三種,select模型存在如下三個缺點:
1) 每次都需要將fd_set拷貝到核心空間,當fd_set較大時開銷很大
2) 每次都需要在核心中遍歷fd加入到等待佇列,fd較多開銷較大
3) select支援的檔案描述符太小,預設為1024
poll模型不存在同時監聽的描述符大小限制,但是仍然存在缺點1和2。epoll模型克服了這三個缺點,epoll模型對於加入監聽的socket描述符,會將描述符和監聽的事件記在核心,無需像select和poll每次都需要將檔案描述符集拷貝到核心。在判斷是否有讀寫就緒時。當有讀寫事件就緒時,核心會呼叫函式將就緒的fd加入就緒連結串列,因此epoll模型只需讀就緒連結串列,而不需要將所有fd遍歷一遍,效能會比select和poll模型高。
四 訊號驅動和非同步IO
4.1 訊號驅動IO
訊號驅動式IO模型原理如下圖4.1:
圖4.1 訊號驅動IO
Signal Driven I/O 的工作原理就是使用者程序首先和 kernel 之間建立訊號的通知機制,即使用者程序告訴 kernel,如果 kernel 中資料準備好了,就通過 SIGIO 訊號通知程序。然後使用者空間的程序就會呼叫 read 系統呼叫將準備好的資料從 kernel 拷貝到使用者空間。
但是這種 I/O 模型存在一個非常重大的缺陷問題:SIGIO 這種訊號對於每個程序來說只有一個!如果使該訊號對程序中的兩個描述符(這兩個檔案描述符都等待著 I/O 操作)都起作用,那麼程序在接到此訊號後就無法判別是哪一個檔案描述符準備好了。所以 Signal Driven I/O 模型在現實中用的非常少。
4.2 非同步IO
非同步IO模型原理如下圖:
圖4.2 非同步IO
在非同步IO中,使用者程序呼叫aio_read立即返回,直到核心將資料拷貝到程序緩衝區,然後通知程序完成,整個過程完全沒阻塞,連recvfrom都不用使用者程序呼叫。其它的IO模型都屬於同步IO。
在非同步非阻塞 I/O 中,可以同時發起多個傳輸操作。這需要每個傳輸操作都有惟一的上下文,這樣才能在它們完成時區分到底是哪個傳輸操作完成了。在 AIO 中,這是一個 aiocb(AIO I/O Control Block)結構。這個結構包含了有關傳輸的所有資訊,包括為資料準備的使用者緩衝區。在產生 I/O (稱為完成)通知時,aiocb 結構就被用來惟一標識所完成的 I/O 操作。這個 API 的展示顯示瞭如何使用它[10]。
aiocb結構如下:
struct aiocb { int aio_fildes; // File Descriptor int aio_lio_opcode; // Valid only for lio_listio (r/w/nop) volatile void *aio_buf; // Data Buffer size_t aio_nbytes; // Number of Bytes in Data Buffer struct sigevent aio_sigevent; // Notification Structure /* Internal fields */ ... };
sigevent 結構告訴 AIO 在 I/O 操作完成時應該執行什麼操作。Aio api如下:
1) int aio_read( struct aiocb *aiocbp ) 請求非同步讀操作
2) aio_error 檢查非同步請求的狀態
3) aio_return 獲得完成的非同步請求的返回狀態
4) aio_write 請求非同步寫操作
5) aio_suspend 掛起呼叫程序,直到一個或多個非同步請求已經完成(或失敗)
6) aio_cancel 取消非同步 I/O 請求
7) lio_listio 發起一系列 I/O 操作
為了便於理解,這裡使用c語言,使用 aio_read 進行非同步讀操作c例項如下:
//使用aio api讀例項
#include <aio.h> ... int fd, ret; struct aiocb my_aiocb; fd = open( "file.txt", O_RDONLY ); if (fd < 0) perror("open"); /* Zero out the aiocb structure (recommended) */ bzero( (char *)&my_aiocb, sizeof(struct aiocb) ); /* Allocate a data buffer for the aiocb request */ my_aiocb.aio_buf = malloc(BUFSIZE+1);// 清空了 aiocb 結構,分配一個數據緩衝區 if (!my_aiocb.aio_buf) perror("malloc"); /* Initialize the necessary fields in the aiocb */ my_aiocb.aio_fildes = fd; //檔案描述符 my_aiocb.aio_nbytes = BUFSIZE;//緩衝區大小 my_aiocb.aio_offset = 0;// // 將 aio_offset 設定成 0(該檔案中的第一個偏移量) ret = aio_read( &my_aiocb );//發起非同步讀請求 if (ret < 0) perror("aio_read"); while ( aio_error( &my_aiocb ) == EINPROGRESS ) ;//檢查非同步請求是否完成 if ((ret = aio_return( &my_iocb )) > 0) {//所傳輸的位元組數,如果發生錯誤,返回值就為 -1 /* got ret bytes on the read */ } else { /* read failed, consult errno */ }
當非同步請求完成時,核心有兩種方式通知程序,一種是通過訊號,另一種是呼叫回撥函式。
使用訊號作為AIO通知demo如下,應用程式對指定訊號註冊訊號處理函式, 在產生指定的訊號時就會呼叫這個處理程式。並指定AIO操作完成時,由核心發出指定訊號,將aiocb作為訊號的上下文,用來分辨多個IO請求。
AIO完成通知-訊號 void setup_io( ... ) { int fd; struct sigaction sig_act; struct aiocb my_aiocb; ... /* Set up the signal handler */ sigemptyset(&sig_act.sa_mask); sig_act.sa_flags = SA_SIGINFO; sig_act.sa_sigaction = aio_completion_handler; /* Set up the AIO request */ bzero( (char *)&my_aiocb, sizeof(struct aiocb) ); my_aiocb.aio_fildes = fd; my_aiocb.aio_buf = malloc(BUF_SIZE+1); my_aiocb.aio_nbytes = BUF_SIZE; my_aiocb.aio_offset = next_offset; /* Link the AIO request with the Signal Handler */ my_aiocb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;//指定訊號作為通知方法 my_aiocb.aio_sigevent.sigev_signo = SIGIO; my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb; /* Map the Signal to the Signal Handler */ ret = sigaction( SIGIO, &sig_act, NULL ); ... ret = aio_read( &my_aiocb ); } void aio_completion_handler( int signo, siginfo_t *info, void *context ) { struct aiocb *req; /* Ensure it's our signal */ if (info->si_signo == SIGIO) { req = (struct aiocb *)info->si_value.sival_ptr; /* Did the request complete? */ if (aio_error( req ) == 0) { /* Request completed successfully, get the return status */ ret = aio_return( req ); } } return; }
使用回撥函式作為非同步請求通知demo如下, 這種機制不會為通知而產生一個訊號,而是會呼叫使用者空間的一個函式來實現通知功能.
//AIO完成通知-回撥函式 void setup_io( ... ) { int fd; struct aiocb my_aiocb; ... /* Set up the AIO request */ bzero( (char *)&my_aiocb, sizeof(struct aiocb) ); my_aiocb.aio_fildes = fd; my_aiocb.aio_buf = malloc(BUF_SIZE+1); my_aiocb.aio_nbytes = BUF_SIZE; my_aiocb.aio_offset = next_offset; /* Link the AIO request with a thread callback */ my_aiocb.aio_sigevent.sigev_notify = SIGEV_THREAD;// SIGEV_THREAD 指定執行緒回撥函式來作為通知方法 my_aiocb.aio_sigevent.notify_function = aio_completion_handler; my_aiocb.aio_sigevent.notify_attributes = NULL; my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb; ... ret = aio_read( &my_aiocb ); } void aio_completion_handler( sigval_t sigval ) { struct aiocb *req; req = (struct aiocb *)sigval.sival_ptr; /* Did the request complete? */ if (aio_error( req ) == 0) { /* Request completed successfully, get the return status */ ret = aio_return( req ); } return; }
總結
網路IO模型包括阻塞、非阻塞、IO複用、訊號驅動IO和非同步IO五種型別。阻塞IO無法應對多個連線的情形,單個socket操作阻塞會導致服務端無法接受其他連線,雖然可以用多執行緒、多程序的方式,將不同的連線放在不同的執行緒中和客戶端互動,並利用執行緒池和連線池進行優化。但建立程序和執行緒會佔用系統資源,當面對大規模連線時,系統資源浪費嚴重,系統響應效率不高。
非阻塞模型當socket讀寫操作未就緒時會立即返回,而不會阻塞等待,可以利用輪詢的方式來進行讀寫操作,但當核心收到資料報到應用程序感知並處理會有時延。
利用IO複用,將監控socket讀寫操作是否就緒和進行讀寫操作分開,且IO複用可監控socket集合,IO複用包含select、poll、epoll三種模型。
select模型存在如下三種缺點:
1) 每次都需要將fd_set拷貝到核心空間,當fd_set較大時開銷很大
2) 每次都需要在核心中遍歷fd加入到等待佇列,fd較多開銷較大
3) select支援的檔案描述符太小,預設為1024。
poll模型可同時監控的socket沒有上線限制,取決於系統資源,但poll模型不能避免缺點1和2。epoll模型可以避免select和poll模型的缺點。select,poll每次呼叫都要把fd集合從使用者態往核心態拷貝一次,並且要把current程序往裝置等待佇列中掛一次,而epoll只要一次拷貝,而且把current程序往等待佇列上掛也只掛一次。這也能節省不少的開銷。select,poll內部實現需要自己不斷輪詢所有fd集合,直到裝置就緒,期間可能要睡眠和喚醒多次交替。而epoll其實也需要呼叫epoll_wait不斷輪詢就緒連結串列,期間也可能多次睡眠和喚醒交替,但是它是裝置就緒時,呼叫回撥函式,把就緒fd放入就緒連結串列中,並喚醒在epoll_wait中進入睡眠的程序。雖然都要睡眠和交替,但是select和poll在“醒著”的時候要遍歷整個fd集合,而epoll在“醒著”的時候只要判斷一下就緒連結串列是否為空就行了,這節省了大量的CPU時間。這就是回撥機制帶來的效能提升。
訊號驅動式IO,當核心資料準備好時,發出訊號,呼叫程序提前註冊好的訊號處理函式,但當存在多個socket操作時,無法分清是哪個socket準備好,因此實際應用中較少。
無論是阻塞IO、非阻塞IO、IO複用還是訊號驅動IO模型,都是同步IO模型。其要麼是監控socket就緒,要麼是從核心拷貝資料到程序緩衝區,至少其中一個是阻塞的,不會立即返回。非同步IO模型發起讀寫操作後,立即返回,可以接著進行其它操作,核心完成將資料拷貝到應用程序後,通過訊號或者回調函式通知程序。
參考文獻
[1]. 阻塞IO(blocking IO). https://www.chenxie.net/archives/1956.html
[2]. Unix網路程式設計卷1.124~125.
[3]. linux select函式詳解. https://blog.csdn.net/lingfengtengfei/article/details/12392449
[4]. select,poll,epoll實現分析—結合核心原始碼. https://www.linuxidc.com/Linux/2012-05/59873.htm
[5]. Python網路程式設計中的select 和 poll I/O複用的簡單使用. https://www.cnblogs.com/coser/archive/2012/01/06/2315216.html
[6]. socket選項總結(setsocketopt). https://blog.csdn.net/c1520006273/article/details/50420408
[7]. Linux下I/O多路複用系統呼叫(select, poll, epoll)介紹. https://zhuanlan.zhihu.com/p/22834126
[8]. IO多路複用:select、poll、epoll示例. https://blog.csdn.net/lisonglisonglisong/article/details/51328062
[9]. Linux I/O 模型. https://woshijpf.github.io/linux/2017/07/10/Linux-IO%E6%A8%A1%E5%9E%8B.html.
[10]. 使用非同步 I/O 大大提高應用程式的效能. https://www.ibm.com/developerworks/cn/linux/l-async/
&n