1. 程式人生 > >Libevent原始碼分析-----更多evbuffer操作函式

Libevent原始碼分析-----更多evbuffer操作函式

鎖操作:

        在前一篇博文可以看到很多函式在操作前都需要對這個evbuffer進行加鎖。同event_base不同,如果evbuffer支援鎖的話,要顯式地呼叫函式evbuffer_enable_locking。

//buffer.c檔案

int//引數可以是一個鎖變數也可以是NULL

evbuffer_enable_locking(struct evbuffer *buf, void *lock)

{

#ifdef _EVENT_DISABLE_THREAD_SUPPORT

return -1;

#else

if (buf->lock)

return -1;


if (!lock) {

//自己分配鎖變數

EVTHREAD_ALLOC_LOCK(lock, EVTHREAD_LOCKTYPE_RECURSIVE);

if (!lock)

return -1;

buf->lock = lock;

//該evbuffer擁有鎖,到時需要釋放鎖記憶體

buf->own_lock = 1;

} else {

buf->lock = lock;//使用引數提供的鎖

buf->own_lock = 0;//自己沒有擁有鎖。不需要釋放鎖記憶體

}


return 0;

#endif

}

        可以看到,第二個引數可以為NULL。此時函式內部會申請一個鎖。明顯如果要讓evbuffer能使用鎖,就必須在一開始就呼叫evthread_use_windows_threads()或者evthread_use_pthreads(),關於這個可以參考一篇博文

        因為使用者操控這個evbuffer,所以Libevent提供了加鎖和解鎖介面給使用者使用。

//buffer.c檔案

void

evbuffer_lock(struct evbuffer *buf)

{

EVBUFFER_LOCK(buf);

}


void

evbuffer_unlock(struct evbuffer *buf)

{

EVBUFFER_UNLOCK(buf);

}


//evbuffer-internal.h檔案

#define EVBUFFER_LOCK(buffer) \

do { \

EVLOCK_LOCK((buffer)->lock, 0); \

} while (0)

#define EVBUFFER_UNLOCK(buffer) \

do { \

EVLOCK_UNLOCK((buffer)->lock, 0); \

} while (0)

        在Libevent內部,一般不會使用這個兩個介面,而是直接使用EVBUFFER_LOCK(buf)和EVBUFFER_UNLOCK(buf)。

查詢操作:

下圖展示了evbuffer的一些查詢操作以及呼叫關係。

          

查詢結構體:

        對於一個數組或者一個檔案,只需一個下標或者偏移量就可以定位查找了。但對於evbuffer來說,它的資料是由一個個的evbuffer_chain用連結串列連在一起的。所以在evbuffer中定位,不僅僅要有一個偏移量,還要指明是哪個evbuffer_chain,甚至是在evbuffer_chain中的偏移量。因此Libevent定義了一個查詢(定位)結構體:

//buffer.h檔案

struct evbuffer_ptr {

ev_ssize_t pos;//總偏移量,相對於資料的開始位置


/* Do not alter the values of fields. */

struct {

void *chain;//指明是哪個evbuffer_chain

size_t pos_in_chain; //在evbuffer_chain中的偏移量

} _internal;

};

        有一點要注意,pos_in_chain是從misalign這個錯開空間之後計算的,也就是說其實際偏移量為:chain->buffer+ chain->misalign + pos_in_chain。

        定位結構體有一個對應的操作函式evbuffer_ptr_set,該函式就像fseek函式那樣,可以設定或者移動偏移量,並且可以絕對和相對地移動。

//buffer.h檔案

enum evbuffer_ptr_how {

EVBUFFER_PTR_SET, //偏移量是一個絕對位置

EVBUFFER_PTR_ADD //偏移量是一個相對位置

};



//buffer.c檔案

//設定evbuffer_ptr。evbuffer_ptr_set(buf, &pos, 0, EVBUFFER_PTR_SET)

//將這個pos指向連結串列的開頭

//position指明移動的偏移量,how指明該偏移量是絕對偏移量還是相對當前位置的偏移量。

int//這個函式的作用就像C語言中的fseek,設定檔案指標的偏移量

evbuffer_ptr_set(struct evbuffer *buf, struct evbuffer_ptr *pos,

size_t position, enum evbuffer_ptr_how how)

{

size_t left = position;

struct evbuffer_chain *chain = NULL;


EVBUFFER_LOCK(buf);


//這個switch的作用就是給pos設定新的總偏移量值。

switch (how) {

case EVBUFFER_PTR_SET://絕對位置

chain = buf->first;//從第一個evbuffer_chain算起

pos->pos = position; //設定總偏移量

position = 0;

break;

case EVBUFFER_PTR_ADD://相對位置

chain = pos->_internal.chain;//從當前evbuffer_chain算起

pos->pos += position;//加上相對偏移量

position = pos->_internal.pos_in_chain;

break;

}


//這個偏移量跨了evbuffer_chain。可能不止跨一個chain。

while (chain && position + left >= chain->off) {

left -= chain->off - position;

chain = chain->next;

position = 0;

}



if (chain) {//設定evbuffer_chain內的偏移量

pos->_internal.chain = chain;

pos->_internal.pos_in_chain = position + left;

} else {//跨過了所有的節點

pos->_internal.chain = NULL;

pos->pos = -1;

}


EVBUFFER_UNLOCK(buf);


return chain != NULL ? 0 : -1;

}

        可以看到,該函式只考慮了向後面的chain移動定位指標,不能向。當然如果引數position小於0,並且移動時並不會跨越當前的chain,還是可以的。不過最好不要這樣做。如果確實想移回頭,那麼可以考慮下面的操作。

pos.position -= 20;//移回頭20個位元組。

evbuffer_ptr_set(buf, &pos, 0, EVBUFFER_PTR_SET);

查詢一個字元:

        有下面程式碼的前一個函式是可以用來查詢一個字元的,第二個函式則是獲取對於位置的字元。

static inline ev_ssize_t

evbuffer_strchr(struct evbuffer_ptr *it, const char chr);


static inline char//獲取對應位置的字元

evbuffer_getchr(struct evbuffer_ptr *it);


static inline int

evbuffer_strspn(struct evbuffer_ptr *ptr, const char *chrset);

        函式evbuffer_strchr是從it指向的evbuffer_chain開始查詢,會往後面的連結串列查詢。it是一個值-結果引數,如果查詢到了,那麼it將會指明被查詢字元的位置,並返回相對於evbuffer的總偏移量(即it->pos)。如果沒有找到,就返回-1。由於實現都是一般的字元比較,所以就不列出程式碼了。函式evbuffer_getchr很容易理解也不列出程式碼了。

        第三個函式evbuffer_strspn的引數chrset雖然是一個字串,其實內部也是比較字元的。該函式所做的操作和C語言標準庫裡面的strspn函式是一樣的。這裡也不多說了。關於strspn函式的理解可以檢視這裡。  

查詢一個字串:

        字串的查詢函式有evbuffer_search_range和evbuffer_search,後者呼叫前者完成查詢。

        在講查詢前,先看一個字串比較函式evbuffer_ptr_memcmp。該函式是比較某一個字串和從evbuffer中某個位置開始的字元是否相等。明顯比較的時候需要考慮到跨evbuffer_chain的問題。

static int //匹配成功會返回0

evbuffer_ptr_memcmp(const struct evbuffer *buf, const struct evbuffer_ptr *pos,

const char *mem, size_t len)

{

struct evbuffer_chain *chain;

size_t position;

int r;


//連結串列資料不夠

if (pos->pos + len > buf->total_len)

return -1;


//需要考慮這個要匹配的字串被分散在兩個evbuffer_chain中

chain = pos->_internal.chain;

position = pos->_internal.pos_in_chain;//從evbuffer_chain中的這個位置開始

while (len && chain) {

size_t n_comparable;//該evbuffer_chain中可以比較的字元數

if (len + position > chain->off)

n_comparable = chain->off - position;

else

n_comparable = len;

r = memcmp(chain->buffer + chain->misalign + position, mem,

n_comparable);

if (r)//不匹配

return r;


//考慮跨evbuffer_chain

mem += n_comparable;

len -= n_comparable;//還有這些是沒有比較的

position = 0;

chain = chain->next;

}


return 0;//匹配成功

}

        該函式首先比較pos指向的當前evbuffer_chain,如果字元mem還有一些字元沒有參與比較,那麼就需要用下一個evbuffer_chain的資料。

        由於evbuffer的資料是由連結串列組成的,沒辦法直接用KMP查詢演算法或者直接呼叫strstr函式。有了evbuffer_ptr_memcmp函式,讀者可能會想,一個位元組一個位元組地挪動evbuffer的資料,依次呼叫evbuffer_ptr_memcmp函式。但evbuffer_search_range函式也不是直接呼叫函式evbuffer_ptr_memcmp的。而是先用字元查詢函式,找到要查詢字串中的第一個字元,然後才呼叫那個函式。下面是具體的程式碼。

//buffer.c檔案

struct evbuffer_ptr

evbuffer_search(struct evbuffer *buffer, const char *what, size_t len, const struct evbuffer_ptr *start)

{

return evbuffer_search_range(buffer, what, len, start, NULL);

}


//引數start和end指明瞭查詢的範圍

struct evbuffer_ptr

evbuffer_search_range(struct evbuffer *buffer, const char *what, size_t len, const struct evbuffer_ptr *start, const struct evbuffer_ptr *end)

{

struct evbuffer_ptr pos;

struct evbuffer_chain *chain, *last_chain = NULL;

const unsigned char *p;

char first;


EVBUFFER_LOCK(buffer);


//初始化pos

if (start) {

memcpy(&pos, start, sizeof(pos));

chain = pos._internal.chain;

} else {

pos.pos = 0;

chain = pos._internal.chain = buffer->first;

pos._internal.pos_in_chain = 0;

}


if (end)

last_chain = end->_internal.chain;


if (!len || len > EV_SSIZE_MAX)

goto done;


first = what[0];


//在本函式裡面並不考慮到what的資料量比較連結串列的總資料量還多。

//但在evbuffer_ptr_memcmp函式中會考慮這個問題。此時該函式直接返回-1。

//本函式之所以沒有考慮這樣情況,可能是因為,在[start, end]之間有多少

//資料是不值得統計的,時間複雜度是O(n)。不是一個簡單的buffer->total_len

//就能獲取到的

while (chain) {

const unsigned char *start_at =

chain->buffer + chain->misalign +

pos._internal.pos_in_chain;

//const void * memchr ( const void * ptr, int value, size_t num );

//函式的作用是:在ptr指向的記憶體塊中(長度為num個位元組),需找字元value。

//如果找到就返回對應的位置,找不到返回NULL

p = memchr(start_at, first,

chain->off - pos._internal.pos_in_chain);

if (p) {//找到了what[0]

pos.pos += p - start_at;

pos._internal.pos_in_chain += p - start_at;

//經過上面的兩個 += 後,pos指向了這個chain中出現等於what[0]字元的位置

//但本函式是要匹配一個字串,而非一個字元


//evbuffer_ptr_memcmp比較整個字串。如果有需要的話,該函式會跨

//evbuffer_chain進行比較,但不會修改pos。如果成功匹配,那麼返回0

if (!evbuffer_ptr_memcmp(buffer, &pos, what, len)) {//匹配成功

//雖然匹配成功了,但可能是用到了end之後的連結串列資料。這也等於沒有找到

if (end && pos.pos + (ev_ssize_t)len > end->pos)

goto not_found;

else

goto done;

}


//跳過這個等於what[0]的字元

++pos.pos;

++pos._internal.pos_in_chain;


//這個evbuffer_chain已經全部都對比過了。沒有發現目標

if (pos._internal.pos_in_chain == chain->off) {

chain = pos._internal.chain = chain->next;

pos._internal.pos_in_chain = 0;//下一個chain從0開始

}

} else {//這個evbuffer_chain都沒有找到what[0]

if (chain == last_chain)

goto not_found;


//此時直接跳過這個evbuffer_chain

pos.pos += chain->off - pos._internal.pos_in_chain;

chain = pos._internal.chain = chain->next;

pos._internal.pos_in_chain = 0;//下一個chain從0開始

}

}


not_found:

pos.pos = -1;

pos._internal.chain = NULL;

done:

EVBUFFER_UNLOCK(buffer);

return pos;

}

        evbuffer_ptr_memcmp函式和evbuffer_search函式是有區別的。前者只會比較從pos指定位置開始的字串,不會在另外的地方找一個字串。而後者則會在後面另外找一個字串進行比較。

查詢換行符:

        換行符是一個比較重要的符號,例如http協議就基於行的。Libevent實現了一個簡單的http伺服器,因此在內部Libevent實現了一些讀取一行資料函式以及與行相關的操作。

        有些系統行尾用\r\n有些則直接用\n,這些不統一給程式設計造成了一些麻煩。因此在Libevent中定義了一個列舉型別,專門來用表示eol(end of line)的。

  • EVBUFFER_EOL_LF:行尾是’\n’字元
  • EVBUFFER_EOL_CRLF_STRICT:行尾是”\r\n”,一個回車符一個換行符
  • EVBUFFER_EOL_CRLF:行尾是”\r\n”或者’\n’。這個是很有用的,因為可能標準的協議裡面要求”\r\n”,但一些不遵循標準的使用者可能使用’\n’
  • EVBUFFER_EOL_ANY:行尾是任意次序或者任意數量的’\r’或者’\n’。這種格式不是很有用,只是用來向後相容而已

         函式evbuffer_readln是用來讀取evbuffer中的一行資料(不會讀取行尾符號)。

enum evbuffer_eol_style {

EVBUFFER_EOL_ANY,

EVBUFFER_EOL_CRLF,

EVBUFFER_EOL_CRLF_STRICT,

EVBUFFER_EOL_LF

};


//成功返回讀取到的一行資料。否則返回NULL。該行資料會自動加上'\0'結尾

//如果n_read_out不為NULL,則被賦值為讀取到的一行的字元數

char *

evbuffer_readln(struct evbuffer *buffer, size_t *n_read_out,

enum evbuffer_eol_style eol_style)

{

struct evbuffer_ptr it;

char *line;

size_t n_to_copy=0, extra_drain=0;

char *result = NULL;


EVBUFFER_LOCK(buffer);


if (buffer->freeze_start) {

goto done;

}


//根據eol_style行尾型別找到行尾。返回值的位置偏移量就指向那個行尾符號

//行尾符號前面的evbuffer資料就是一行的內容。extra_drain指明這個行尾

//有多少個字元。後面需要把這個行尾符號刪除,方便以後再次讀取一行

it = evbuffer_search_eol(buffer, NULL, &extra_drain, eol_style);

if (it.pos < 0)

goto done;

n_to_copy = it.pos;//並不包括換行符


if ((line = mm_malloc(n_to_copy+1)) == NULL) {

event_warn("%s: out of memory", __func__);

goto done;

}


//複製並刪除n_to_copy位元組

evbuffer_remove(buffer, line, n_to_copy);

line[n_to_copy] = '\0';


//extra_drain指明是行尾符號佔有的位元組數。現在要刪除之

evbuffer_drain(buffer, extra_drain);

result = line;

done:

EVBUFFER_UNLOCK(buffer);


if (n_read_out)

*n_read_out = result ? n_to_copy : 0;


return result;

}

        在函式內部會申請空間,並且從evbuffer中提取出一行資料。下面看看函式evbuffer_search_eol是怎麼實現的。該函式執行查詢行尾的工作,它在內部區分4種不同的行尾型別。

struct evbuffer_ptr

evbuffer_search_eol(struct evbuffer *buffer,

struct evbuffer_ptr *start, size_t *eol_len_out,

enum evbuffer_eol_style eol_style)

{

struct evbuffer_ptr it, it2;

size_t extra_drain = 0;

int ok = 0;


EVBUFFER_LOCK(buffer);//加鎖


if (start) {

memcpy(&it, start, sizeof(it));

} else {//從頭開始找

it.pos = 0;

it._internal.chain = buffer->first;

it._internal.pos_in_chain = 0;

}


switch (eol_style) {

case EVBUFFER_EOL_ANY:

if (evbuffer_find_eol_char(&it) < 0)

goto done;

memcpy(&it2, &it, sizeof(it));

//該case的就是尋找在最前面的任意數量的\r和\n。

//evbuffer_strspn返回第一個不是\r或者\n的下標。

//此時extra_drain前面的都是\r或者\n。直接刪除即可

extra_drain = evbuffer_strspn(&it2, "\r\n");

break;

case EVBUFFER_EOL_CRLF_STRICT: {//\r\n

it = evbuffer_search(buffer, "\r\n", 2, &it);

if (it.pos < 0)//沒有找到

goto done;

extra_drain = 2;

break;

}

case EVBUFFER_EOL_CRLF://\n或者\r\n

while (1) {//這個迴圈的一個chain一個chain地檢查的

//從it指向的evbuffer_chain開始往連結串列後面查詢\n和\r。

//找到兩個中的一個即可。兩個都有就優先查詢\n。

//會修改it的內容,使得it指向查詢到的位置。查詢失敗返回-1。

//如果一個evbuffer_chain有\n或者\r就馬上返回。

if (evbuffer_find_eol_char(&it) < 0)

goto done;

if (evbuffer_getchr(&it) == '\n') {//獲取對應位置的字元

extra_drain = 1;

break;

} else if (!evbuffer_ptr_memcmp(

buffer, &it, "\r\n", 2)) {//如果剛才找到的是\r就再測測是不是\r\n

extra_drain = 2;

break;

} else {

//\r的後面不是\n,此時跳過\r,繼續查詢

if (evbuffer_ptr_set(buffer, &it, 1,

EVBUFFER_PTR_ADD)<0)

goto done;

}

}

break;

case EVBUFFER_EOL_LF://\n

if (evbuffer_strchr(&it, '\n') < 0)//沒有找到

goto done;

extra_drain = 1;

break;

default:

goto done;

}


ok = 1;

done:

EVBUFFER_UNLOCK(buffer);


if (!ok) {

it.pos = -1;

}

if (eol_len_out)

*eol_len_out = extra_drain;


return it;

}

回撥函式:

        evbuffer有一個回撥函式佇列成員callbacks,向evbuffer刪除或者新增資料時,就會呼叫這些回撥函式。之所以是回撥函式佇列,是因為一個evbuffer是可以新增多個回撥函式的,而且同一個回撥函式可以被新增多次。

        使用回撥函式時有一點要注意:因為當evbuffer被新增或者刪除資料時,就會呼叫這些回撥函式,所以在回撥函式裡面不要新增或者刪除資料,不然將導致遞迴,死迴圈。

        evbuffer的回撥函式對bufferevent來說是非常重要的,bufferevent的一些重要功能都是基於evbuffer的回撥函式完成的。

回撥相關結構體:

//buffer.h檔案

struct evbuffer_cb_info {

//新增或者刪除資料之前的evbuffer有多少位元組的資料

size_t orig_size;

size_t n_added;//添加了多少資料

size_t n_deleted;//刪除了多少資料


//因為每次刪除或者新增資料都會呼叫回撥函式,所以上面的三個成員只能記錄從上一次

//回撥函式被呼叫後,到本次回撥函式被呼叫這段時間的情況。

};



//兩個回撥函式型別

typedef void (*evbuffer_cb_func)(struct evbuffer *buffer, const struct evbuffer_cb_info *info, void *arg);


//buffer_compat.h檔案。這個型別的回撥函式已經不被推薦使用了

typedef void (*evbuffer_cb)(struct evbuffer *buffer, size_t old_len, size_t new_len, void *arg);


//evbuffer-internal.h檔案

//內部結構體,結構體成員對使用者透明

struct evbuffer_cb_entry {

/** Structures to implement a doubly-linked queue of callbacks */

TAILQ_ENTRY(evbuffer_cb_entry) next;

/** The callback function to invoke when this callback is called.

If EVBUFFER_CB_OBSOLETE is set in flags, the cb_obsolete field is

valid; otherwise, cb_func is valid. */

union {//哪個回撥型別。一般都是evbuffer_cb_func

evbuffer_cb_func cb_func;

evbuffer_cb cb_obsolete;

} cb;

void *cbarg;//回撥函式的引數

ev_uint32_t flags;//該回調的標誌

};


struct evbuffer {

...

//可以新增多個回撥函式。所以需要一個佇列儲存

TAILQ_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks;

};

設定回撥函式:

        下面看一下怎麼設定回撥函式。

struct evbuffer_cb_entry *

evbuffer_add_cb(struct evbuffer *buffer, evbuffer_cb_func cb, void *cbarg)

{

struct evbuffer_cb_entry *e;

if (! (e = mm_calloc(1, sizeof(struct evbuffer_cb_entry))))

return NULL;

EVBUFFER_LOCK(buffer);//加鎖

e->cb.cb_func = cb;

e->cbarg = cbarg;

e->flags = EVBUFFER_CB_ENABLED;//標誌位, 允許回撥

TAILQ_INSERT_HEAD(&buffer->callbacks, e, next);

EVBUFFER_UNLOCK(buffer);//解鎖

return e;

}

        引數cbarg就是回撥函式被呼叫時的那個arg引數,這點對於熟悉Libevent的讀者應該不難理解。上面這個函式是被一個evbuffer_cb_entry結構體指標插入到callbacks佇列的前面,有關TAILQ_HEAD佇列和相關的插入操作可以參考博文《TAILQ_QUEUE佇列》。

        上面函式返回一個evbuffer_cb_entry結構體指標。使用者可以利用這個返回的結構體作一些處理,因為這個結構體已經和新增的回撥函式綁定了。比如可以設定這個回撥函式的標誌值。或者利用這個結構體指標作為標識,從佇列中找到這個回撥函式並刪除之。如下面程式碼所示:

int //設定標誌

evbuffer_cb_set_flags(struct evbuffer *buffer,

struct evbuffer_cb_entry *cb, ev_uint32_t flags)

{

/* the user isn't allowed to mess with these. */

flags &= ~EVBUFFER_CB_INTERNAL_FLAGS;

EVBUFFER_LOCK(buffer);

cb->flags |= flags;

EVBUFFER_UNLOCK(buffer);

return 0;

}


int //清除某個標誌

evbuffer_cb_clear_flags(struct evbuffer *buffer,

struct evbuffer_cb_entry *cb, ev_uint32_t flags)

{

/* the user isn't allowed to mess with these. */

flags &= ~EVBUFFER_CB_INTERNAL_FLAGS;

EVBUFFER_LOCK(buffer);

cb->flags &= ~flags;

EVBUFFER_UNLOCK(buffer);

return 0;

}


int //從佇列中刪除這個回撥函式

evbuffer_remove_cb_entry(struct evbuffer *buffer,

struct evbuffer_cb_entry *ent)

{

EVBUFFER_LOCK(buffer);

TAILQ_REMOVE(&buffer->callbacks, ent, next);

EVBUFFER_UNLOCK(buffer);

mm_free(ent);

return 0;

}



int //根據使用者設定的回撥函式和回撥引數這兩個量 從佇列中刪除

evbuffer_remove_cb(struct evbuffer *buffer, evbuffer_cb_func cb, void *cbarg)

{

struct evbuffer_cb_entry *cbent;

int result = -1;

EVBUFFER_LOCK(buffer);

TAILQ_FOREACH(cbent, &buffer->callbacks, next) {

if (cb == cbent->cb.cb_func && cbarg == cbent->cbarg) {

result = evbuffer_remove_cb_entry(buffer, cbent);

goto done;

}

}

done:

EVBUFFER_UNLOCK(buffer);

return result;

}


//Libevent還是提供了一個刪除所有回撥函式的介面

static void

evbuffer_remove_all_callbacks(struct evbuffer *buffer)

{

struct evbuffer_cb_entry *cbent;


while ((cbent = TAILQ_FIRST(&buffer->callbacks))) {

TAILQ_REMOVE(&buffer->callbacks, cbent, next);

mm_free(cbent);

}

}

        前面的程式碼展示了兩個回撥函式的型別,分別是evbuffer_cb_func和evbuffer_cb。後者在回撥的時候可以得知刪除或者新增資料之前的資料量和之後的資料量。但這兩個數值都可以通過evbuffer_cb_info獲取。所以evbuffer_cb回撥型別的優勢沒有了。此外,還有一個問題。一般是通過evbuffer_setcb函式設定evbuffer_cb型別的回撥函式。而這個函式會先刪除之前新增的所有回撥函式。

void

evbuffer_setcb(struct evbuffer *buffer, evbuffer_cb cb, void *cbarg)

{

EVBUFFER_LOCK(buffer);


if (!TAILQ_EMPTY(&buffer->callbacks))

evbuffer_remove_all_callbacks(buffer);//清空之前的回撥函式


if (cb) {

struct evbuffer_cb_entry *ent =

evbuffer_add_cb(buffer, NULL, cbarg);

ent->cb.cb_obsolete = cb;

ent->flags |= EVBUFFER_CB_OBSOLETE;

}

EVBUFFER_UNLOCK(buffer);

}

        可以看到,evbuffer_setcb為標誌位flags加上了EVBUFFER_CB_OBSOLETE屬性。從名字可以看到這是一個已經過時的屬性。其實evbuffer_setcb已經被推薦使用了。

Libevent如何呼叫回撥函式:

        下面看一下是怎麼呼叫回撥函式的。其實現也簡單,直接遍歷回撥佇列,然後依次呼叫回撥函式。

static void //在evbuffer_add中呼叫的該函式,running_deferred為0

evbuffer_run_callbacks(struct evbuffer *buffer, int running_deferred)

{

struct evbuffer_cb_entry *cbent, *next;

struct evbuffer_cb_info info;

size_t new_size;

ev_uint32_t mask, masked_val;

int clear = 1;


if (running_deferred) {

mask = EVBUFFER_CB_NODEFER|EVBUFFER_CB_ENABLED;

masked_val = EVBUFFER_CB_ENABLED;

} else if (buffer->deferred_cbs) {

mask = EVBUFFER_CB_NODEFER|EVBUFFER_CB_ENABLED;

masked_val = EVBUFFER_CB_NODEFER|EVBUFFER_CB_ENABLED;

/* Don't zero-out n_add/n_del, since the deferred callbacks

will want to see them. */

clear = 0;

} else { //一般都是這種情況

mask = EVBUFFER_CB_ENABLED;

masked_val = EVBUFFER_CB_ENABLED;

}


if (TAILQ_EMPTY(&buffer->callbacks)) {//使用者沒有設定回撥函式

//清零

buffer->n_add_for_cb = buffer->n_del_for_cb = 0;

return;

}


//沒有新增或者刪除資料

if (buffer->n_add_for_cb == 0 && buffer->n_del_for_cb == 0)

return;


new_size = buffer->total_len;

info.orig_size = new_size + buffer->n_del_for_cb - buffer->n_add_for_cb;

info.n_added = buffer->n_add_for_cb;

info.n_deleted = buffer->n_del_for_cb;

if (clear) {//清零,為下次計算做準備

buffer->n_add_for_cb = 0;

buffer->n_del_for_cb = 0;

}


//遍歷回撥函式佇列,呼叫回撥函式

for (cbent = TAILQ_FIRST(&buffer->callbacks);

cbent != TAILQ_END(&buffer->callbacks);

cbent = next) {


next = TAILQ_NEXT(cbent, next);


//該回調函式沒有enable

if ((cbent->flags & mask) != masked_val)

continue;


if ((cbent->flags & EVBUFFER_CB_OBSOLETE))//已經不被推薦使用了

cbent->cb.cb_obsolete(buffer,

info.orig_size, new_size, cbent->cbarg);

else

cbent->cb.cb_func(buffer, &info, cbent->cbarg);//呼叫使用者設定的回撥函式

}

}

        無論是刪除資料還是新增資料的函式,例如evbuffer_add和evbuffer_drain函式,都是會呼叫evbuffer_invoke_callbacks函式的。而這個函式會呼叫evbuffer_run_callbacks函式。

evbuffer與網路IO:

從Socket中讀取資料:

        Libevent通過evbuffer_read函式從一個socket中讀取資料到evbuffer中。在讀取socket資料之前,Libevent會呼叫ioctl函式來獲取這個socket的讀緩衝區中有多少位元組,進而確定本次要讀多少位元組到evbuffer中。Libevent會根據要讀取的位元組數,在真正read之前會先把evbuffer擴容,免得在read的時候緩衝區不夠。

        在擴容的時候,如果所在的系統是支援類似readv這樣的可以把資料讀取到像一個iovec結構體函式,那麼Libevent就會選擇在n個evbuffer_chain中找到足夠的空閒空間(往往通過申請堆空間),因為這樣可以使用類似Linux的iovec結構體。把連結串列的各個evbuffer_chain的空閒空間的地址賦值給iovec陣列(如下圖所示),然後呼叫readv函式直接讀取,readv會把資料讀取到相應的chain中。

        

        上圖中,有一點是和Libevent的實際情況不符合的。在evbuffer_read中,最後的那個幾個evbuffer_chain一般是沒有資料的,只有空閒的區域。上圖為了好看,就加上了data這區域。

        將連結串列的各個evbuffer_chain的空閒空間的地址賦值給iovec陣列,這個操作是由函式_evbuffer_read_setup_vecs完成的。

//讓vecs陣列的指標指向evbuffer中的可用chain.標明哪個chain可用並且從chain的哪裡開始,以及可用的位元組數

//howmuch是要擴容的大小。vecs、n_vecs_avail分別是iovec陣列和陣列的大小

//chainp是值-結果引數,它最後指向第一個有可用空間的chain

int

_evbuffer_read_setup_vecs(struct evbuffer *buf, ev_ssize_t howmuch,

struct evbuffer_iovec *vecs, int n_vecs_avail,

struct evbuffer_chain ***chainp, int exact)

{

struct evbuffer_chain *chain;

struct evbuffer_chain **firstchainp;

size_t so_far;

int i;


if (howmuch < 0)

return -1;


so_far = 0;


//因為找的是evbuffer連結串列中的空閒空間,所以從最後一個有資料的chain中開始找

firstchainp = buf->last_with_datap;

if (CHAIN_SPACE_LEN(*firstchainp) == 0) {//這個chain已經沒有空間了

firstchainp = &(*firstchainp)->next;//那麼只能下一個chain了

}


//因為Libevent在呼叫本函式之前,一般會呼叫_evbuffer_expand_fast來擴大

//evbuffer的可用空間。所以下面的迴圈中並沒有判斷chain是否為NULL,就直接

//chain->next

chain = *firstchainp;

for (i = 0; i < n_vecs_avail && so_far < (size_t)howmuch; ++i) {

size_t avail = (size_t) CHAIN_SPACE_LEN(chain);

//如果exact為真,那麼即使這個chain有更多的可用空間,也不會使用。只會

//要自己正需要的空間

if (avail > (howmuch - so_far) && exact)

avail = howmuch - so_far;

vecs[i].iov_base = CHAIN_SPACE_PTR(chain);//這個chain的可用空間的開始位置

vecs[i].iov_len = avail;//可用長度

so_far += avail;

chain = chain->next;

}


*chainp = firstchainp; //指向第一個有可用空間的chain

return i;//返回需要多少個chain才能有howmuch這麼多的空閒空間

}

        在Windows系統,雖然沒有readv函式,但它有WSARecv函式,可以把資料讀取到一個類似iovec的結構體中,所以在Windows系統中,Libevent還是選擇在n個evbuffer_chain中找到足夠的空閒空間。所以在Libevent中有下面的巨集定義:

//buffer.c檔案

//sys/uio.h檔案定義了readv函式

#if defined(_EVENT_HAVE_SYS_UIO_H) || defined(WIN32)

#define USE_IOVEC_IMPL //該巨集標誌所在的系統支援類似readv的函式

#endif


//Windows系統定義了下面結構體

typedef struct __WSABUF {

u_long len;

char FAR *buf;

} WSABUF, *LPWSABUF;

        如果所在的系統不支援類似readv這樣的函式,那麼Libevent就只能在一個evbuffer_chain申請一個足夠大的空間,然後直接呼叫read函數了。

        前面說到的擴容,分別是由函式_evbuffer_expand_fast和函式evbuffer_expand_singlechain完成的。在《evbuffer結構與基本操作》一文中已經有對這兩個函式的介紹,這裡就不多說了。

        由於存在是否支援類似readv函式 這兩種情況,所以evbuffer_read在實現上也出現了兩種實現。

        上面說了這麼多,還是來看一下evbuffer_read的具體實現吧。

//buffer.c檔案

//返回讀取到的位元組數。錯誤返回-1,斷開了連線返回0

int //howmuch指出此時evbuffer可以使用的空間大小

evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)

{

struct evbuffer_chain **chainp;

int n;

int result;


#ifdef USE_IOVEC_IMPL //所在的系統支援iovec或者是Windows作業系統

int nvecs, i, remaining;

#else

struct evbuffer_chain *chain;

unsigned char *p;

#endif


EVBUFFER_LOCK(buf);//加鎖


//凍結緩衝區尾部,禁止在尾部追加資料

if (buf->freeze_end) {

result = -1;

goto done;

}


//獲取這個socket接收緩衝區裡面有多少位元組可讀.通過ioctl實現

n = get_n_bytes_readable_on_socket(fd);

if (n <= 0 || n > EVBUFFER_MAX_READ)//每次只讀EVBUFFER_MAX_READ(4096)個字元

n = EVBUFFER_MAX_READ;

if (howmuch < 0 || howmuch > n)

howmuch = n;


#ifdef USE_IOVEC_IMPL //所在的系統支援iovec或者是Windows作業系統

//NUM_READ_IOVEC等於4

//擴大evbuffer,使得其有howmuch位元組的空閒空間

//在NUM_READ_IOVEC個evbuffer_chain中擴容足夠的空閒空間

if (_evbuffer_expand_fast(buf, howmuch, NUM_READ_IOVEC) == -1) {

result = -1;

goto done;

} else {

//在posix中IOV_TYPE為iovec,在Windows中為WSABUF

IOV_TYPE vecs[NUM_READ_IOVEC];


#ifdef _EVBUFFER_IOVEC_IS_NATIVE //所在的系統支援iovec結構體

nvecs = _evbuffer_read_setup_vecs(buf, howmuch, vecs,

NUM_READ_IOVEC, &chainp, 1);

#else //Windows系統。因為沒有native的 iovec


//在Windows系統中,evbuffer_iovec定義得和posix中的iovec一樣.

//因為_evbuffer_read_setup_vecs函式只接受像iovec那樣結構體的引數

struct evbuffer_iovec ev_vecs[NUM_READ_IOVEC];

nvecs = _evbuffer_read_setup_vecs(buf, howmuch, ev_vecs, 2,

&chainp, 1);


//因為在Windows中,需要使用WSABUF結構體讀取資料,所以要從evbuffer_iovec

//中將一些值提出出來,放到vecs中

for (i=0; i < nvecs; ++i)

WSABUF_FROM_EVBUFFER_IOV(&vecs[i], &ev_vecs[i]);

#endif


//完成把資料從socket fd中讀取出來

#ifdef WIN32

{

DWORD bytesRead;

DWORD flags=0;

//雖然Windows支援類似readv的函式,但Windows沒有readv函式,只有下面的函式

if (WSARecv(fd, vecs, nvecs, &bytesRead, &flags, NULL, NULL)) {

if (WSAGetLastError() == WSAECONNABORTED)

n = 0;

else

n = -1;

} else

n = bytesRead;

}

#else

n = readv(fd, vecs, nvecs);//POSIX

#endif

}


//如果所在的系統不支援 iovec並且不是Windows系統。也就是說不支援類似

//readv這樣的函式。那麼只能把所有的資料都讀到一個chain中

#else /*!USE_IOVEC_IMPL*/


//把一個chain擴大得可以有howmuch位元組的空閒空間

if ((chain = evbuffer_expand_singlechain(buf, howmuch)) == NULL) {

result = -1;

goto done;

}


p = chain->buffer + chain->misalign + chain->off;


//讀取資料

#ifndef WIN32

n = read(fd, p, howmuch);

#else

n = recv(fd, p, howmuch, 0);

#endif


#endif /* USE_IOVEC_IMPL *///終止前面的if巨集



if (n == -1) {//錯誤

result = -1;

goto done;

}

if (n == 0) {//斷開了連線

result = 0;

goto done;

}



#ifdef USE_IOVEC_IMPL//使用了iovec結構體讀取資料。需要做一些額外的處理


//chainp是由_evbuffer_read_setup_vecs函式呼叫得到。它指向未從fd讀取資料時

//第一個有空閒空間位置的chain


remaining = n;//n等於讀取到的位元組數

//使用iovec讀取資料時,只是把資料往chain中填充,並沒有修改evbuffer_chain

//的成員,比如off偏移量成員。此時就需要把這個off修改到正確值

for (i=0; i < nvecs; ++i) {

//CHAIN_SPACE_LEN(*chainp)返回的是填充資料前的空閒空間。

//除了最後那個chain外,其他的chain都會被填滿的。所以對於非last

//chain,直接把off加上這個space即可。

ev_ssize_t space = (ev_ssize_t) CHAIN_SPACE_LEN(*chainp);

if (space < remaining) {//前面的chain

(*chainp)->off += space;

remaining -= (int)space;

} else {//最後那個chain

(*chainp)->off += remaining;

buf->last_with_datap = chainp;//指向最後一個有資料的chain

break;

}

chainp = &(*chainp)->next;

}

#else

chain->off += n;

//調整last_with_datap,使得*last_with_datap指向最後一個有資料的chain

advance_last_with_data(buf);

#endif


buf->total_len += n;

buf->n_add_for_cb += n;//添加了n位元組


/* Tell someone about changes in this buffer */

evbuffer_invoke_callbacks(buf);//evbuffer添加了資料,就需要呼叫回撥函式

result = n;

done:

EVBUFFER_UNLOCK(buf);

return result;

}

往socket寫入資料:

        因為evbuffer是用連結串列的形式存放資料,所以要把這些連結串列上的資料寫入socket,那麼使用writev這個函式是十分有效的。同前面一樣,使用iovec結構體陣列,就需要設定陣列元素的指標。這個工作由evbuffer_write_iovec函式完成。

        正如前面的從socket讀出資料,可能所在的系統並不支援writev這樣的函式。此時就只能使用一般的write函數了,但這個函式要求資料放在一個連續的空間。所以Libevent有一個函式evbuffer_pullup,用來把連結串列記憶體拉直,即把一定數量的資料從連結串列中copy到一個連續的記憶體空間。這個連續的空間也是由某個evbuffer_chain的buffer指標指向,並且這個evbuffer_chain會被插入到連結串列中。這個時候就可以直接使用write或者send函式傳送這特定數量的資料了。

        不同於讀,寫操作還有第三種可能。那就是sendfile。如果所在的系統支援sendfile,並且使用者是通過evbuffer_add_file新增資料的,那麼此時Libevent就是所在系統的sendfile函式傳送資料。

        Libevent內部一般通過evbuffer_write函式把資料寫入到socket fd中。下面是具體的實現。

//buffer.c檔案

int

evbuffer_write(struct evbuffer *buffer, evutil_socket_t fd)

{

//把evbuffer的所有資料都寫入到fd中

return evbuffer_write_atmost(buffer, fd, -1);

}



int//howmuch是要寫的位元組數。如果小於0,那麼就把buffer裡的所有資料都寫入fd

evbuffer_write_atmost(struct evbuffer *buffer, evutil_socket_t fd,

ev_ssize_t howmuch)

{

int n = -1;


EVBUFFER_LOCK(buffer);


//凍結了連結串列頭,無法往fd寫資料。因為寫之後,還要把資料從evbuffer中刪除

if (buffer->freeze_start) {

goto done;

}


if (howmuch < 0 || (size_t)howmuch > buffer->total_len)

howmuch = buffer->total_len;


if (howmuch > 0) {

#ifdef USE_SENDFILE //所在的系統支援sendfile

struct evbuffer_chain *chain = buffer->first;

//需通過evbuffer_add_file新增資料,才會使用sendfile

if (chain != NULL && (chain->flags & EVBUFFER_SENDFILE)) //並且要求使用sendfile

n = evbuffer_write_sendfile(buffer, fd, howmuch);

else {

#endif

#ifdef USE_IOVEC_IMPL //所在的系統支援writev這類函式

//函式內部會設定陣列元素的成員指標,以及長度成員

n = evbuffer_write_iovec(buffer, fd, howmuch);

#elif defined(WIN32)

/* XXX(nickm) Don't disable this code until we know if

* the WSARecv code above works. */

//把evbuffer前面的howmuch位元組拉直。使得這howmuch位元組都放在一個chain裡面

//也就是放在一個連續的空間,不再是之前的多個連結串列節點。這樣就能直接用

//send函式傳送了。

void *p = evbuffer_pullup(buffer, howmuch);

n = send(fd, p, howmuch, 0);

#else

void *p = evbuffer_pullup(buffer, howmuch);

n = write(fd, p, howmuch);

#endif

#ifdef USE_SENDFILE

}

#endif

}


if (n > 0)

evbuffer_drain(buffer, n);//從連結串列中刪除已經寫入到socket的n個位元組


done:

EVBUFFER_UNLOCK(buffer);

return (n);

}

參考:

本文來自 luotuo44 的CSDN 部落格 ,全文地址請點選:https://blog.csdn.net/luotuo44/article/details/39325447?utm_source=copy