1. 程式人生 > >Libevent學習筆記(一):基本使用

Libevent學習筆記(一):基本使用

前言

其實,現在高效能網路伺服器基本都是非同步I/O模式構建的,而Libevent就是對select、poll、epoll等各類非同步模式介面的封裝,通過設定回撥函式的方式,在監聽檔案描述符和套接字讀寫事件的同時,還兼任定時器和訊號接收的管理工作。所以這貨對高效能伺服器後臺開發、跨平臺開發、網路開發都具有很大的參考學習價值。官方主頁顯示很多的專案都用到了Libevent庫,而且還可作為主機內部程序間通訊和資料互動。這貨也考慮到pthread執行緒模型的同步問題,保證關鍵資料結構在多執行緒並行下的資料安全,但是如果能夠封裝一個執行緒池模型就更爽了!
深入瞭解的第一步就是先學會用它。其實Libevent的主要維護者Nick的部落格有一本很好的教程 libevent book ,看完它後再加上Libevent本身附贈的HTTP和DNS伺服器的例子sample(Libevent本身封裝了evhttp和evdns),基本就可以耍起來啦,因為看過後會發現,如果對網路開發本身比較熟悉,Libevent還是比較容易理解的。除此之外,Libevent還有一個比較特色的東西,就是Bufferevent和evbuffer,而兩者的關係呢,算是Bufferevent是基於evbuffer封裝了I/O事件、I/O排程等內容,而evbuffer則是Bufferevent底層的資料承載。
需要注意的是由於手冊的作者就是維護者,所以手冊的內容十分的新,有些手冊內容在穩定版本2.0.22是沒有的,程式碼切換到穩定分支可以使用git branch stable release-2.0.22-stable建立一個穩定分支。

服務端使用步驟

這裡通過手冊描述的過程,對Libevent整個使用過程進行一個梳理。其實,實際使用很多步驟是不用考慮的,因為Libevent在設計上還算是比較智慧——當你沒有提供引數或者設定的時候,系統會自動給你一個最優的或者常用的配置。

2.1

event_base算是Libevent最基礎、最重要的物件,因為修改配置、新增事件等,基本都需要將它作為引數傳遞進去。
event算是Libevent最常用的元素,對於event在其生命週期有initialized、pending、active這幾種狀態,當通過event_new建立了事件並關聯到event_base上之後,其狀態是initialized;然後通過event_add之後,這個事件便是pending的狀態,開始偵聽了;然後當條件滿足之後,其變為active狀態,對應的callback函式被呼叫。
這個物件通過event_base_new建立,在建立之前還可以設定某些引數:

struct event_config *cfg;
struct event_base *base;

cfg = event_config_new();
event_config_avoid_method(cfg, "select");   //避免使用低效率select
event_config_require_features(cfg, EV_FEATURE_ET);  //使用邊沿觸發型別
//event_config_set_flag(cfg, EVENT_BASE_FLAG_PRECISE_TIMER);
//event_base_new(void); 為簡單版本,會根據系統選擇最快最合適的型別,用這個就行了
base = event_base_new_with_config(cfg); event_config_free(cfg); //顯示當前使用的非同步型別 st_d_print("Current Using Method: %s", event_base_get_method(base)); // epoll //可選設定優先順序數目,然後通過event_priority_set設定事件的優先順序 //0為最高,n_priority-1為最低,此後建立的事件預設優先順序為中間優先順序 event_base_priority_init(base, 3);

2.2 針對伺服器端和客戶端的操作

對於網路開發部分,Linux和Windows在網路方面的操作是有差異的,為此Libevent建立了evutil統一的介面來遮蔽兩個平臺底層的網路開發差異(後悔當時移植程式怎麼沒有參考這個有價值的東西)。由於服務端開發和客戶端開發一個主動一個被動,這裡分開進行示例。

2.2.1 服務端操作

服務端流程:建立套接字、設定套接字引數(nonblocking等)、繫結地址埠、偵聽新連線。
這麼多操作,Libevent封裝到了evconnlistener_new_bind中,並建立了連線事件的相應函式accept_conn_cb,同時還可以設定錯誤回撥函式accept_error_cb。

struct evconnlistener *listener;
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(0);  //繫結0.0.0.0地址
sin.sin_port = htons(8080); /* Port 8080 */

// -1表示backlog,無限制
listener = evconnlistener_new_bind(base, accept_conn_cb, NULL, LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1, (struct sockaddr*)&sin, sizeof(sin));

evconnlistener_set_error_cb(listener, accept_error_cb);

對於上面引用的兩個回撥函式,其實現的模板為

static void accept_conn_cb(struct evconnlistener *listener,
    evutil_socket_t fd, struct sockaddr *address, int socklen, void *ctx)
{
    char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
    getnameinfo (address, socklen,
               hbuf, sizeof(hbuf),sbuf, sizeof(sbuf),
    //st_print是libevent裡的函式??
    //為新的客戶連線socket fd建立Bufferevent事件偵聽
   struct event_base *base = evconnlistener_get_base(listener);
   struct bufferevent *bev = bufferevent_socket_new(
            base, fd, BEV_OPT_CLOSE_ON_FREE);

   bufferevent_priority_set(bev, 2);
   bufferevent_enable(bev, EV_READ|EV_WRITE); //預設EV_WRITE是使能的,但EV_READ不是

}

static void accept_error_cb(struct evconnlistener *listener, void *ctx)
{
    struct event_base *base = evconnlistener_get_base(listener);
    int err = EVUTIL_SOCKET_ERROR();

    st_d_print( "Got an error %d (%s) on the listener. "
            "Shutting down.\n", err, evutil_socket_error_to_string(err));
    event_base_loopexit(base, NULL);
}

對於上面的accept_conn_cb函式中,為accept建立的新fd建立EV_READ|EV_WRITE事件偵聽。但是上面的回撥函式只設置了bufferread_cb和bufferevent_cb,而沒有對寫設定回撥函式。其實這也是現實中常用的情況,程式大多數都阻塞在讀的任務上,而一般的寫任務也都是基於讀到的結果產生對應的寫內容,如果為寫任務設定回撥函式,那麼系統檢測到輸出快取區可用,便一直呼叫寫回調函式,這可能不是你想要的。
可以從accept_error_cb中學習Libevent常見的錯誤處理方式。在Linux中,所有的錯誤都是通過全域性的errno來檢測錯誤資訊的,但是Windows使用WSAGetLastError()這種函式得到網路類的錯誤資訊,所以需要使用封裝後的EVUTIL_SOCKET_ERROR()和evutil_socket_error_to_string()來實現。
對於bufferread_cb,就是通用的網路I/O操作,跟伺服器端和客戶端沒有什麼差異,這裡貼出demo的程式碼來:

void bufferread_cb(struct bufferevent *bev, void *ptr)
{
    char *msg = "SERVER MESSAGE: WOSHINICOL 桃子大人";
    char buf[1024]; int n;
    struct evbuffer *input = bufferevent_get_input(bev);
    struct evbuffer *output = bufferevent_get_output(bev);

    while ((n = evbuffer_remove(input, buf, sizeof(buf))) > 0)
    {
        fwrite("BUFFERREAD_CB:", 1, strlen("BUFFERREAD_CB:"), stderr);
        fwrite(buf, 1, n, stderr);
    }

    //bufferevent_write(bev, msg, strlen(msg));
    evbuffer_add(output, msg, strlen(msg));
}

由上面可以看見,對於evbuffer操作,既可以呼叫Bufferevent層的封裝函式,也可以呼叫底層的evbuffer的函式介面,Bufferevent介面簡單,但是evbuffer類介面比較的底層,但是函式功能很多。具體的細節後文再行描述。

2.2.2 客戶端操作

客戶端的開發比較的簡單,主要就是建立套接字,連線服務端,就可以進行I/O操作了

struct bufferevent *bev;
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
inet_aton("192.168.1.161", &sin.sin_addr.s_addr);
sin.sin_port = htons(8080); /* Port 8080 */

//int sockfd = socket(AF_INET, SOCK_STREAM, 0);
//evutil_make_listen_socket_reuseable(sockfd);
//evutil_make_socket_nonblocking(sockfd);
//bev = bufferevent_socket_new(base, sockfd, BEV_OPT_CLOSE_ON_FREE);
bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, bufferread_cb, bufferwrite_cb, bufferevent_cb, base);
bufferevent_enable(bev, EV_READ|EV_WRITE);
bufferevent_socket_connect(bev, (struct sockaddr *)&sin, sizeof(sin))

如上面註釋的程式碼,對客戶端的操作,有兩個方法:
(1)首先建立套接字,然後設定套接字引數,在將這個套接字傳遞給bufferevent_socket_new函式建立Bufferevent事件,並呼叫bufferevent_socket_connect連結客戶端。
(2)直接呼叫bufferevent_socket_new並將套接字引數設定為-1,那麼系統會自動建立套接字並完成相應的設定。
這裡是否有誤?為什麼client端的sockfd還需要reuse

2.3偵聽訊號事件

ct event *ev_signal;
//evsignal_new(base, signum, cb, arg)為簡潔版本
ev_signal = event_new(base, SIGUSR1, EV_SIGNAL|EV_PERSIST, sigusr1_cb, base);
event_priority_set(ev_signal, 2);
event_add(ev_signal, NULL);

void sigusr1_cb(evutil_socket_t fd, short what, void *arg);

對於訊號事件,手冊說明:對於一個程序,如果有多個event_base,那麼請只使用一個event_base處理所有的訊號事件,一個程式只有一個event_base能接收到訊號事件。

2.4建立定時器回撥

struct event *ev_timer;
struct timeval one_sec = { 1, 0 }; //1s
int n_calls = 0;
//evtimer_new(base, callback, arg)為簡潔版本
//EV_TIMEOUT的引數實際是可被忽略的,不傳遞也是可以的
ev_timer = event_new(base, -1, EV_PERSIST, timer_cb, &one_sec);
event_priority_set(ev_timer, 2);
event_add(ev_timer, &one_sec);

void timer_cb(evutil_socket_t fd, short what, void *arg);

對於上面的signal和timer事件,其實都沒有關聯到某一個具體的socket或者fd,其實可以公用同一個callback,然後在處理的callback中,使用what引數來區分到底是由於訊號、定時器哪個事件激活了這個回撥函式。

2.5進入事件迴圈

就像是通常的epoll_wait在一個大的迴圈裡,Libevent提供如下函式進行事件迴圈檢測

event_base_loop(base, 0); //進入事件迴圈直到沒有pending的事件就返回
//EVLOOP_ONCE    阻塞直到有event啟用,執行回撥函式後返回
//EVLOOP_NONBLOCK 非阻塞型別,立即檢查event啟用,如果有執行最高優先順序的那一類,完畢後退出迴圈

如果後面的flags引數為0,那麼等價於呼叫event_base_dispatch。預設情況下,如果event_base有pending的事件,就不會結束迴圈,可以通過呼叫event_base_loopbreak、event_base_loopexit等函式來跳出終止迴圈。還需要注意的是,event_base_free只會呼叫event_del接觸event和本身的關係,不會釋放event相關的資源,所以如果優雅地寫程式碼的話,需要呼叫event_free、evconnlistener_free等函式來善後。

三、Bufferevent和evbuffer

如上面介紹的,如果上面的內容是讓系統的各類事件和對應回撥函式建立關聯,助力於整個系統的設計和架構的話,Bufferevent和evbuffer則是關注於整個系統的資料承載,以完成實際的I/O通訊。Bufferevent可以看作是基於evbuffer實現對EV_READ|EV_WRITE事件的偵聽,而evbuffer是底層實際資料的承載。

3.1 Bufferevent

Bufferevent支援的型別有:socket-based、asynchronous-IO、filtering、paired型別。socket-based算是最常見的型別,asynchronous-IO主要是Windows下的完成埠非同步非阻塞通訊型別,filtering和paired常常是針對特殊通訊需求的情況。
Bufferevent建立時候支援的重要標誌有:BEV_OPT_CLOSE_ON_FREE當bufferevent被釋放的時候,底層的傳輸也會被釋放,比如關閉套接字、釋放底層bufferevent等BEV_OPT_THREADSAFE為Bufferevent建立鎖結構,以保證執行緒安全的,當用戶提供的回撥函式被執行的時候,會持有這個鎖結構; BEV_OPT_DEFER_CALLBACKS延遲執行,事件的回撥函式會被排隊,當常規event回撥執行完之後,才會執行其回撥函式
Bufferevent的操作在上面已經有示例了,這裡將其資料介面整理出來

// 將data指向的資料新增到bufev的輸出緩衝區尾部
int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size);
// 將buf的整個資料移除移動到bufev輸出緩衝區的尾部
int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);

// 將bufev的輸入緩衝區的資料移動到目標位置,注意bufferevent_read返回的是實際讀取的數目
size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
int bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf);

然後,當建立Bufferevent的時候如果使用了BEV_OPT_THREADSAFE引數的時候,初始化這個bufferevent會給這個結構本身以及input、output、underlying等結構產生鎖,而且在讀寫callback以及defer延遲的callback時候,都會呼叫_bufferevent_incref_and_lock、_bufferevent_decref_and_unlock等來獲得和釋放鎖。只有當顯式指明BEV_OPT_UNLOCK_CALLBACKS的時候,才會在呼叫BEV_OPT_DEFER_CALLBACKS的時候不加鎖結構。或許也這種情況下才是使用者空間鎖操作鎖的時候,為了更小的鎖粒度獲得效率的提升?

void bufferevent_lock(struct bufferevent *bufev);
void bufferevent_unlock(struct bufferevent *bufev);

3.2 evbuffer

evbuffer是一個被優化為前端刪除,後端新增的bytes queue,用於方便高效的網路IO。其程式碼實現在buffer.c中,而標頭檔案evbuffer-internel.h為其內部資料結構定義的地方,而使用者呼叫介面的宣告主要在event2/buffer.h中。結構上,每個evbuffer內部有多個evbuffer_chain結構構成的連結串列組成,所以資料在其內部不一定是物理連續的。

3.2.1 執行緒安全

int evbuffer_enable_locking(struct evbuffer *buf, void *lock);
void evbuffer_lock(struct evbuffer *buf);
void evbuffer_unlock(struct evbuffer *buf);

多個執行緒訪問evbuffer是不安全的,所以如果要在多個執行緒中訪問,首先需要使用evbuffer_enable_locking來讓evbuffer支援鎖結構。通過檢視文件和程式碼,對於evbuffer的底層函式(比如evbuffer_read、evbuffer_write),都是自動加了鎖的,如果函式只調用了這些操作一次,那麼不需要額外的加鎖結構,如果在函式某個階段有多次的evbuffer操作,那麼需要使用上面的evbuffer_lock/evbuffer_unlock來加解鎖保護。

3.2.2 evbuffer常用介面羅列

size_t evbuffer_get_length(const struct evbuffer *buf);
size_t evbuffer_get_contiguous_space(const struct evbuffer *buf);

evbuffer_get_length返回evbuffer整體儲存了的資料的位元組數。evbuffer_get_contiguous_space返回第一個evbuffer_chain的offset位置,而offset=buffer+misalign+實際負載,實際就是開頭空餘空間+實際的負載位元組數,也就是末尾空閒空間開始的位置。

int evbuffer_add(struct evbuffer *buf, const void *data, size_t datlen);
int evbuffer_add_printf(struct evbuffer *buf, const char *fmt, ...);
int evbuffer_add_vprintf(struct evbuffer *buf, const char *fmt, va_list ap);
int evbuffer_prepend(struct evbuffer *buf, const void *data, size_t size);
int evbuffer_prepend_buffer(struct evbuffer *dst, struct evbuffer* src);

add類函式都是將資料新增到evbuffer結尾的操作,而prepend類函式是將資料新增到evbuffer開始的操作。

int evbuffer_add_buffer(struct evbuffer *dst, struct evbuffer *src);
int evbuffer_remove_buffer(struct evbuffer *src, struct evbuffer *dst, size_t datlen);

都是將evbuffer的資料從src移動到dst中,這裡的函式都是優化過的,如果可能就只有evbuffer_chain結構的轉移,不會有底層實際資料的拷貝。

unsigned char *evbuffer_pullup(struct evbuffer *buf, ev_ssize_t size);

重新排列evbuffer,使得前size個位元組保證在同一個evbuffer_chain的連續位置,當size<0的時候,會對所有的資料進行重排,如果size等於0或者大於實際的datalen,不會進行任何操作,返回NULL;否則返回重排後實際資料的開始地址。

int evbuffer_drain(struct evbuffer *buf, size_t len);
int evbuffer_remove(struct evbuffer *buf, void *data, size_t datlen);
ev_ssize_t evbuffer_copyout(struct evbuffer *buf, void *data, size_t datlen);

evbuffer_remove會從開頭將資料拷貝到data指向的位置,evbuffer_drain會從開頭直接釋放len長度的buf資料。兩者當len的引數大於實際的資料長度的時候,會對所有的資料進行操作,返回實際拷貝/刪除的位元組數。evbuffer_copyout會將資料拷貝到data,但是不會drain刪除evbuffer的資料。

enum evbuffer_eol_style {
EVBUFFER_EOL_ANY,  //不建議使用
EVBUFFER_EOL_CRLF,  //"\r\n"或者"\n"
EVBUFFER_EOL_CRLF_STRICT,  //"\r\n"
EVBUFFER_EOL_LF  //"\n" };
char *evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out, enum evbuffer_eol_style eol_style);

這是基於某些程式基於行處理而設計的,比如HTTP協議的頭部資訊等。其會從頭提取並drain到一行內容,然後通過malloc分配的空間,拷貝出不帶換行符的資訊出來(換行符會在evbuffer中刪除掉),並在結尾新增’\0’結束符,拷貝的資料長度會儲存在n_read_out引數中。用完之後,使用者記得free返回的記憶體空間。

struct evbuffer_ptr {
       ev_ssize_t pos;
       /* Do not alter the values of fields. */
       struct {
          void *chain;
          size_t pos_in_chain;
    } _internal;
};
struct evbuffer_ptr evbuffer_search(struct evbuffer *buffer, const char *what, size_t len, const struct evbuffer_ptr *start);
struct evbuffer_ptr evbuffer_search_range(struct evbuffer *buffer, const char *what, size_t len, const struct evbuffer_ptr *start, const struct evbuffer_ptr *end);
struct evbuffer_ptr evbuffer_search_eol(struct evbuffer *buffer, struct evbuffer_ptr *start, size_t *eol_len_out, enum evbuffer_eol_style eol_style);

提供在evbuffer中搜索特定長度len字串what的功能。Libevent使用了struct evbuffer_ptr這麼一個結構,將evbuffer內部離散的evbufferchain的buffer對映成pos這麼一個連續的偏移空間。如果搜尋到,那麼pos為其位置,否則-1表示沒有搜尋到。

enum evbuffer_ptr_how {
        EVBUFFER_PTR_SET,
        EVBUFFER_PTR_ADD
};
int evbuffer_ptr_set(struct evbuffer *buffer, struct evbuffer_ptr *pos, size_t position, enum evbuffer_ptr_how how);
evbuffer_ptr_set(buf, &p, 0, EVBUFFER_PTR_SET);

類似檔案系統seek的方式來操作pos位置,由於evbuffer內部不一定是連續的位置,所以不能簡單的修改pos的位置,只能通過這種方式,將pos的更改更新到內部結構的evbufferchain和偏移上去。返回0表示修改成功,否則-1。

struct evbuffer_iovec {
        void *iov_base;
        size_t iov_len;
};
int evbuffer_peek(struct evbuffer *buffer, ev_ssize_t len, struct evbuffer_ptr *start_at, struct evbuffer_iovec *vec_out, int n_vec);

n = evbuffer_peek(buf, 4096, NULL, NULL, 0);
v = malloc(sizeof(struct evbuffer_iovec)*n);
n = evbuffer_peek(buf, 4096, NULL, v, n);

高速網路的一個關鍵就是避免資料的拷貝,為此,Libevent建立了一個evbuffer_iovec的結構,然後通過evbuffer_peek,可以將evbuffer內部的evbuffer_chain結構的資料位置暴露到evbuffer_iovec,使用者可以直接訪問讀取evbuffer的內部資料了。需要注意的是這裡只作讀取,修改資料會導致不可預料的結果。evbuffer_peek會在要麼指定的位元組數都映射了,或者傳遞evbuffer_iovec使用完了就會返回。通常使用方式如上文,是先呼叫evbuffer_peek決定需要多少個evbuffer_iovec結構數目,然後再進行對映操作,保證需要的位元組數目都能對映完成。

int evbuffer_reserve_space(struct evbuffer *buf, ev_ssize_t size, struct evbuffer_iovec *vec, int n_vecs);
int evbuffer_commit_space(struct evbuffer *buf, struct evbuffer_iovec *vec, int n_vecs);

這是一個evbuffer高速寫入的方式,因為先前的evbuffer_add實際也是先將資料準備好,然後再memcpy拷貝到evbuffer內部的,而這裡先通過evbuffer_reserve_space在evbuffer的結尾先預留出需要寫的資料空間,然後將空間的地址通過evbuffer_iovec返回,應用程式就可以直接操作這些地址,最後通過evbuffer_commit_space提交就可以了。這裡n_vecs可用的只有1、2兩個數字,通常推薦2,因為1很有可能會導致資料的重排,降低效率。
evbuffer_commit_space成功返回0,失敗返回-1。
使用這些函式的時候必須格外的小心,在呼叫evbuffer_reserve_space之後和evbuffer_commit_space之前,不能呼叫任何重排或者追加evbuffer的操作,那樣會導致之前evbuffer_reserve_space返回的地址不一致了。在多執行緒中也要注意用鎖保護相應的資料。

int evbuffer_write(struct evbuffer *buffer, evutil_socket_t fd);
int evbuffer_write_atmost(struct evbuffer *buffer, evutil_socket_t fd, ev_ssize_t howmuch);
int evbuffer_read(struct evbuffer *buffer, evutil_socket_t fd, int howmuch);

實際的I/O網路操作,注意如果evbuffer被關聯到了bufferevent,那麼網路I/O是自動觸發的,使用者不需要使用這些函式。他們會返回實際讀取或者寫入的位元組數目,需要注意的是,如果fd是非阻塞的套接字/檔案描述符,需要檢查錯誤的型別來決定是因為I/O當前無法完成還是別的錯誤型別。

int evbuffer_add_file(struct evbuffer *output, int fd, ev_off_t offset, size_t length);

直接將開啟的檔案描述符fd作為引數,然後將檔案中的資料用於讀取、網路傳送等操作。其內部運用sendfile/mmap等機制,避免資料拷貝到使用者空間再拷貝到核心空間,增加了操作的效率。

int evbuffer_freeze(struct evbuffer *buf, int at_front);
int evbuffer_unfreeze(struct evbuffer *buf, int at_front);

會禁止對evbuffer的頭部/尾部的修改操作,通常在內部使用。