1. 程式人生 > >處理大併發之 libevent demo詳細分析(對比epoll)

處理大併發之 libevent demo詳細分析(對比epoll)

libevent預設情況下是單執行緒,每個執行緒有且僅有一個event_base,對應一個struct event_base結構體,以及賦予其上的事件管理器,用來安排託管給它的一系列的事件。

當有一個事件發生的時候,event_base會在合適的時間去呼叫繫結在這個事件上的函式,直到這個函式執行完成,然後在返回安排其他事件。需要注意的是:合適的時間並不是立即。

例如:

  1. struct event_base *base;  
  2. base = event_base_new();//初始化libevent

event_base_new對比epoll,可以理解為epoll裡的epoll_create。

event_base內部有一個迴圈,迴圈阻塞在epoll呼叫上,當有一個事件發生的時候,才會去處理這個事件。其中,這個事件是被繫結在event_base上面的,每一個事件就會對應一個struct event,可以是監聽的fd。 

其中struct event 使用event_new 來建立和繫結,使用event_add來啟用,例如:

  1. struct event *listener_event;  
  2. listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);  

引數說明:

base:event_base型別,event_base_new的返回值

listener:監聽的fd,listen的fd

EV_READ|EV_PERSIST事件的型別及屬性

do_accept:繫結的回撥函式

(void*)base:給回撥函式的引數

event_add(listener_event, NULL);

對比epoll:

event_new相當於epoll中的epoll_wait,其中的epoll裡的while迴圈,在libevent裡使用event_base_dispatch。

event_add相當於epoll中的epoll_ctl,引數是EPOLL_CTL_ADD,新增事件。

注:libevent支援的事件及屬性包括(使用bitfield實現,所以要用 | 來讓它們合體)
EV_TIMEOUT: 超時
EV_READ: 只要網路緩衝中還有資料,回撥函式就會被觸發
EV_WRITE: 只要塞給網路緩衝的資料被寫完,回撥函式就會被觸發
EV_SIGNAL: POSIX訊號量
EV_PERSIST: 不指定這個屬性的話,回撥函式被觸發後事件會被刪除
EV_ET: Edge-Trigger邊緣觸發,相當於EPOLL的ET模式

事件建立新增之後,就可以處理髮生的事件了,相當於epoll裡的epoll_wait,在libevent裡使用event_base_dispatch啟動event_base迴圈,直到不再有需要關注的事件。

有了上面的分析,結合之前做的epoll服務端程式,對於一個伺服器程式,流程基本是這樣的:

1. 建立socketbindlisten,設定為非阻塞模式

2. 建立一個event_base,即

  1. struct event_base *  event_base_new(void)  

3. 建立一個event,將該socket託管給event_base,指定要監聽的事件型別,並繫結上相應的回撥函式(及需要給它的引數)

  1. struct event *  event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, shortvoid *), void *arg)  

4. 啟用該事件,即

  1. int  event_add(struct event *ev, conststruct timeval *tv)  

5.  進入事件迴圈,即

  1. int  event_base_dispatch(struct event_base *event_base)  

首先來翻譯下例子上面的一段話:

對於select函式來說,不同的作業系統有不同的代替函式,它包括:poll,epoll,kqueue,evport/dev/poll。這些函式的效能都比select要好,其中epollIO中新增,刪除,通知socket準備好方面效能複雜度為O(1)

不幸的是,沒有一個有效的介面是一個普遍存在的標準,linux下有epollBSDSkqueueSolaris 有evport/dev/poll,等等。沒有任何一個作業系統有它們中所有的,所以如果你想做一個輕便的高效能的非同步應用程式,你就需要把這些介面抽象的封裝起來,並且無論哪一個系統使用它都是最高效的。

這對於你來說就是最低階的libevent API,它提供了統一的介面取代了select,當它在計算機上執行的時候,使用了最有效的版本。

這裡是ROT13伺服器的另外一個版本,這次,他使用了libevent代替了select。這意味著我們不再使用fd_sets,取而代之的使用event_base新增和刪除事件,它可能在selectpollepollkqueue等中執行。

程式碼分析:

這是一個服務端的程式,可以處理客戶端大併發的連線,當收到客戶端的連線後,將收到的資料做了一個變換,如果是 ’a’-‘m’之間的字元,將其增加13,如果是 ’n’-‘z’之間的字元,將其減少13,其他字元不變,然後將轉換後的資料傳送給客戶端。

例如:客戶端傳送:Client 0 send  Message!

服務端會回覆:Pyvrag 0 fraq  Zrffntr!

在這個程式碼中沒有使用bufferevent這個強大的東西,在一個結構體中自己管理了一個緩衝區。結構體為:

  1. struct fd_state {  
  2.     char buffer[MAX_LINE];//緩衝區的大小
  3.     size_t buffer_used;//接收到已經使用的buffer大小,每次將接收到的資料位元組數相加,當傳送的位元組數累計相加和buffer_used都相等時候,將它們都置為1
  4.     size_t n_written;//傳送的累加位元組數
  5.     size_t write_upto;//相當於一個臨時變數,當遇到換行符的時,將其收到的位元組數(換行符除外)賦給該值,當檢測到寫事件的時候,用已經發送的位元組數和該數值做比較,若收到的位元組總數小於該值,則傳送資料,等於該值,將結構體中3個位元組數統計變數都置為1,為什麼會置為1呢,因為有一個換行符吧。
  6.     struct event *read_event;  
  7.     struct event *write_event;  
  8. };  

程式碼中自己管理了一個緩衝區,用於存放接收到的資料,傳送的資料將其轉換後也放入該緩衝區中,程式碼晦澀難懂,我也是經過打日誌分析後,才明白點,這個緩衝區自己還得控制好。但是libevent 2已經提供了一個神器bufferevent,我們在使用的過程中最好不要自己管理這個緩衝區,之所以分析這個程式碼,是為了熟悉libevent 做服務端程式的流程及原理。

下面是程式碼,加有部分註釋和日誌:

程式碼:lowlevel_libevent_server.c 

  1. //說明,為了使我們的程式碼相容win32網路API,我們使用evutil_socket_t代替int,使用evutil_make_socket_nonblocking代替fcntl
  2. /* For sockaddr_in */
  3. #include <netinet/in.h>
  4. /* For socket functions */
  5. #include <sys/socket.h>
  6. /* For fcntl */
  7. #include <fcntl.h>
  8. #include <event2/event.h>
  9. #include <assert.h>
  10. #include <unistd.h>
  11. #include <string.h>
  12. #include <stdlib.h>
  13. #include <stdio.h>
  14. #include <errno.h>
  15. #define MAX_LINE 80
  16. void do_read(evutil_socket_t fd, short events, void *arg);  
  17. void do_write(evutil_socket_t fd, short events, void *arg);  
  18. char rot13_char(char c)  
  19. {  
  20.     /* We don't want to use isalpha here; setting the locale would change 
  21.      * which characters are considered alphabetical. */
  22.     if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))  
  23.         return c + 13;  
  24.     elseif ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))  
  25.         return c - 13;  
  26.     else
  27.         return c;  
  28. }  
  29. struct fd_state {  
  30.     char buffer[MAX_LINE];  
  31.     size_t buffer_used;  
  32.     size_t n_written;  
  33.     size_t write_upto;  
  34.     struct event *read_event;  
  35.     struct event *write_event;  
  36. };  
  37. struct fd_state * alloc_fd_state(struct event_base *base, evutil_socket_t fd)  
  38. {  
  39.     struct fd_state *state = malloc(sizeof(struct fd_state));  
  40.     if (!state)  
  41.         return NULL;  
  42.     state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);  
  43.     if (!state->read_event)  
  44.     {  
  45.         free(state);  
  46.         return NULL;  
  47.     }  
  48.     state->write_event = event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state);  
  49.     if (!state->write_event)  
  50.     {  
  51.         event_free(state->read_event);  
  52.         free(state);  
  53.         return NULL;  
  54.     }  
  55.     state->buffer_used = state->n_written = state->write_upto = 0;  
  56.     assert(state->write_event);  
  57.     return state;  
  58. }  
  59. void fr