1. 程式人生 > >Libevent原始碼分析-----bufferevent工作流程探究

Libevent原始碼分析-----bufferevent工作流程探究

        和之前的《Libevent工作流程探究》一樣,這裡也是用一個例子來探究bufferevent的工作流程。具體的例子可以參考《》,這裡就不列出了。其實要做的例子也就是bufferevent_socket_new、bufferevent_setcb、bufferevent_enable這幾個函式。

        因為本文會用到《Libevent工作流程探究》中提到的說法,比如將一個event插入到event_base中。所以讀者最好先讀一下那篇博文。此外,因為bufferevent結構體本身會使用evbuffer結構體和還會呼叫相應的一些操作,所以讀者還應該先閱讀《evbuffer結構與基本操作
》和《更多evbuffer操作函式》。

bufferevent結構體:

        bufferevent其實也就是在event_base的基礎上再進行一層封裝,其本質還是離不開event和event_base,從bufferevent的結構體就可以看到這一點。

        bufferevent結構體中有兩個event,分別用來監聽同一個fd的可讀事件和可寫事件。為什麼不用一個event同時監聽可讀和可寫呢?這是因為監聽可寫是困難的,下面會說到原因。讀者也可以自問一下,自己之前有沒有試過用最原始的event監聽一個fd的可寫。

        由於socket 是全雙工的,所以在bufferevent結構體中,也有兩個evbuffer成員,分別是讀緩衝區和寫緩衝區。 bufferevent結構體定義如下:

//bufferevent_struct.h檔案
struct bufferevent {
	struct event_base *ev_base;
	
	//操作結構體,成員有一些函式指標。類似struct eventop結構體
	const struct bufferevent_ops *be_ops;

	struct event ev_read;//讀事件event
	struct event ev_write;//寫事件event

	struct evbuffer *input;//讀緩衝區

	struct evbuffer *output; //寫緩衝區

	struct event_watermark wm_read;//讀水位
	struct event_watermark wm_write;//寫水位

	
	bufferevent_data_cb readcb;//可讀時的回撥函式指標
	bufferevent_data_cb writecb;//可寫時的回撥函式指標
	bufferevent_event_cb errorcb;//錯誤發生時的回撥函式指標
	void *cbarg;//回撥函式的引數

	struct timeval timeout_read;//讀事件event的超時值
	struct timeval timeout_write;//寫事件event的超時值

	/** Events that are currently enabled: currently EV_READ and EV_WRITE
	    are supported. */
	short enabled;
};

        如果看過Libevent的參考手冊的話,應該還會知道bufferevent除了用於socket外,還可以用於socketpair 和 filter。如果用面向物件的思維,應從這個三個應用中抽出相同的部分作為父類,然後派生出三個子類。

        Libevent雖然是用C語言寫的,不過它還是提取出一些公共部分,然後定義一個bufferevent_private結構體,用於儲存這些公共部分成員。從集合的角度來說,bufferevent_private應該是bufferevent的一個子集,即一部分。但在Libevent中,bufferevent確實bufferevent_private的一個成員。下面是bufferevent_private結構體。

//bufferevent-internal.h檔案
struct bufferevent_private {
	struct bufferevent bev;

	//設定input evbuffer的高水位時,需要一個evbuffer回撥函式配合工作
	struct evbuffer_cb_entry *read_watermarks_cb;

	/** If set, we should free the lock when we free the bufferevent. */
	//鎖是Libevent自動分配的,還是使用者分配的
	unsigned own_lock : 1;

	...

	//這個socket是否處理正在連線伺服器狀態
	unsigned connecting : 1;
	//標誌連線被拒絕
	unsigned connection_refused : 1;

	//標誌是什麼原因把 讀 掛起來
	bufferevent_suspend_flags read_suspended;
		//標誌是什麼原因把 寫 掛起來
	bufferevent_suspend_flags write_suspended;

	enum bufferevent_options options;
		int refcnt;// bufferevent的引用計數

	//鎖變數
	void *lock;
};



新建一個bufferevent:

        函式bufferevent_socket_new可以完成這個工作。

//bufferevent-internal.h檔案
struct bufferevent_ops {
	const char *type;//型別名稱

	off_t mem_offset;//成員bev的偏移量

	//啟動。將event加入到event_base中
	int (*enable)(struct bufferevent *, short);

	//關閉。將event從event_base中刪除
	int (*disable)(struct bufferevent *, short);
	//銷燬
	void (*destruct)(struct bufferevent *);
	//調整event的超時值
	int (*adj_timeouts)(struct bufferevent *);
	/** Called to flush data. */
	int (*flush)(struct bufferevent *, short, enum bufferevent_flush_mode);
	//獲取成員的值。具體看實現
	int (*ctrl)(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
};


//bufferevent_sock.c檔案
const struct bufferevent_ops bufferevent_ops_socket = {
	"socket",
	evutil_offsetof(struct bufferevent_private, bev),
	be_socket_enable,
	be_socket_disable,
	be_socket_destruct,
	be_socket_adj_timeouts,
	be_socket_flush,
	be_socket_ctrl,
};

//由於有幾個不同型別的bufferevent,而且它們的enable、disable等操作是不同的。所以
//需要的一些函式指標指明某個型別的bufferevent應該使用哪些操作函式。結構體bufferevent_ops_socket
//就應運而生。對於socket,其操作函式如上。

//bufferevent_sock.c檔案
struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
    int options)
{
	struct bufferevent_private *bufev_p;
	struct bufferevent *bufev;

	...//win32

	//結構體記憶體清零,所有成員都為0
	if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
		return NULL;

	//如果options中需要執行緒安全,那麼就會申請鎖
	//會新建一個輸入和輸出快取區
	if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
				    options) < 0) {
		mm_free(bufev_p);
		return NULL;
	}
	bufev = &bufev_p->bev;
	//設定將evbuffer的資料向fd傳
	evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);

	//將fd與event相關聯。同一個fd關聯兩個event
	event_assign(&bufev->ev_read, bufev->ev_base, fd,
	    EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
	event_assign(&bufev->ev_write, bufev->ev_base, fd,
	    EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);

	//設定evbuffer的回撥函式,使得外界給寫緩衝區新增資料時,能觸發
	//寫操作,這個回撥對於寫事件的監聽是很重要的
	evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

	//凍結讀緩衝區的尾部,未解凍之前不能往讀緩衝區追加資料
	//也就是說不能從socket fd中讀取資料
	evbuffer_freeze(bufev->input, 0);

	//凍結寫緩衝區的頭部,未解凍之前不能把寫緩衝區的頭部資料刪除
	//也就是說不能把資料寫到socket fd
	evbuffer_freeze(bufev->output, 1);

	return bufev;
}
        留意函式裡面的evbuffer_add_cb呼叫,後面會說到。

        函式在最後面會凍結兩個緩衝區。其實,雖然這裡凍結了,但實際上Libevent在讀資料或者寫資料之前會解凍的讀完或者寫完資料後,又會馬上凍結。這主要防止資料被意外修改。使用者一般不會直接呼叫evbuffer_freeze或者evbuffer_unfreeze函式。一切的凍結和解凍操作都由Libevent內部完成。還有一點要注意,因為這裡只是把寫緩衝區的頭部凍結了。所以還是可以往寫緩衝區的尾部追加資料。同樣,此時也是可以從讀緩衝區讀取資料。這個是必須的。因為在Libevent內部不解凍的時候,使用者需要從讀緩衝區中獲取資料(這相當於從socket fd中讀取資料),使用者也需要把資料寫到寫緩衝區中(這相當於把資料寫入到socket fd中)。

        在bufferevent_socket_new函式裡面會呼叫函式bufferevent_init_common完成公有部分的初始化。

//bufferevent.c檔案
int
bufferevent_init_common(struct bufferevent_private *bufev_private,
    struct event_base *base,
    const struct bufferevent_ops *ops,
    enum bufferevent_options options)
{
	struct bufferevent *bufev = &bufev_private->bev;

	//分配輸入緩衝區
	if (!bufev->input) {
		if ((bufev->input = evbuffer_new()) == NULL)
			return -1;
	}

	//分配輸出緩衝區
	if (!bufev->output) {
		if ((bufev->output = evbuffer_new()) == NULL) {
			evbuffer_free(bufev->input);
			return -1;
		}
	}

	bufev_private->refcnt = 1;//引用次數為1
	bufev->ev_base = base;

	/* Disable timeouts. */
	//預設情況下,讀和寫event都是不支援超時的
	evutil_timerclear(&bufev->timeout_read);
	evutil_timerclear(&bufev->timeout_write);

	bufev->be_ops = ops;

	/*
	 * Set to EV_WRITE so that using bufferevent_write is going to
	 * trigger a callback.  Reading needs to be explicitly enabled
	 * because otherwise no data will be available.
	 */
	 //可寫是預設支援的
	bufev->enabled = EV_WRITE;

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
	if (options & BEV_OPT_THREADSAFE) {
		//申請鎖。
		if (bufferevent_enable_locking(bufev, NULL) < 0) {
			/* cleanup */
			evbuffer_free(bufev->input);
			evbuffer_free(bufev->output);
			bufev->input = NULL;
			bufev->output = NULL;
			return -1;
		}
	}
#endif
	...//延遲呼叫的初始化,一般不需要用到

	bufev_private->options = options;

	//將evbuffer和bufferevent相關聯
	evbuffer_set_parent(bufev->input, bufev);
	evbuffer_set_parent(bufev->output, bufev);

	return 0;
}

        程式碼中可以看到,預設是enable  EV_WRITE的。





設定回撥函式:

        函式bufferevent_setcb完成這個工作。該函式相當簡單,也就是進行一些賦值操作。

//bufferevent.c檔案
void
bufferevent_setcb(struct bufferevent *bufev,
    bufferevent_data_cb readcb, bufferevent_data_cb writecb,
    bufferevent_event_cb eventcb, void *cbarg)
{
	//bufferevent結構體內部有一個鎖變數
	BEV_LOCK(bufev);

	bufev->readcb = readcb;
	bufev->writecb = writecb;
	bufev->errorcb = eventcb;

	bufev->cbarg = cbarg;
	BEV_UNLOCK(bufev);
}

        如果不想設定某個操作的回撥函式,直接設定為NULL即可。

令bufferevent可以工作:

        相信讀者也知道,即使呼叫了bufferevent_socket_new和bufferevent_setcb,這個bufferevent還是不能工作,必須呼叫bufferevent_enable。為什麼會這樣的呢?

        如果看過之前的那些博文,相信讀者知道,一個event能夠工作,不僅僅需要new出來,還要呼叫event_add函式,把這個event新增到event_base中。在本文前面的程式碼中,並沒有看到event_add函式的呼叫。所以還需要呼叫一個函式,把event新增到event_base中。函式bufferevent_enable就是完成這個工作的。

//bufferevent.c檔案
int
bufferevent_enable(struct bufferevent *bufev, short event)
{
	struct bufferevent_private *bufev_private =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
	short impl_events = event;
	int r = 0;

	//增加引用並加鎖
	//增加引用是為了防止其他執行緒呼叫bufferevent_free,釋放了bufferevent
	_bufferevent_incref_and_lock(bufev);

	//掛起了讀,此時不能監聽讀事件
	if (bufev_private->read_suspended)
		impl_events &= ~EV_READ;

	//掛起了寫,此時不能監聽寫事情
	if (bufev_private->write_suspended)
		impl_events &= ~EV_WRITE;

	bufev->enabled |= event;

	//呼叫對應型別的enbale函式。因為不同型別的bufferevent有不同的enable函式
	if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
		r = -1;

	//減少引用並解鎖
	_bufferevent_decref_and_unlock(bufev);
	return r;
}

        上面程式碼可以看到,最終會呼叫對應bufferevent型別的enable函式,對於socket bufferevent,其enable函式是be_socket_enable,程式碼如下:

//bufferevent.c檔案
int
_bufferevent_add_event(struct event *ev, const struct timeval *tv)
{
	if (tv->tv_sec == 0 && tv->tv_usec == 0)
		return event_add(ev, NULL);
	else
		return event_add(ev, tv);
}


//bufferevent_sock.c檔案
#define be_socket_add(ev, t)			\
	_bufferevent_add_event((ev), (t))


static int
be_socket_enable(struct bufferevent *bufev, short event)
{
	if (event & EV_READ) {
		if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1)
			return -1;
	}
	if (event & EV_WRITE) {
		if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1)
			return -1;
	}
	return 0;
}

        如果讀者熟悉Libevent的超時事件,那麼可以知道Libevent是在event_add函式裡面確定一個event的超時的。上面程式碼也展示了這一點,如果讀或者寫event設定了超時(即其超時值不為0),那麼就會作為引數傳給event_add函式。如果讀者不熟悉的Libevent的超時事件的話,可以參考《超時event的處理》。

        使用者可以呼叫函式bufferevent_set_timeouts,設定讀或者寫事件的超時。程式碼如下:

//bufferevent.c檔案
int
bufferevent_set_timeouts(struct bufferevent *bufev,
			 const struct timeval *tv_read,
			 const struct timeval *tv_write)
{
	int r = 0;
	BEV_LOCK(bufev);
	if (tv_read) {
		bufev->timeout_read = *tv_read;
	} else {
		evutil_timerclear(&bufev->timeout_read);
	}
	if (tv_write) {
		bufev->timeout_write = *tv_write;
	} else {
		evutil_timerclear(&bufev->timeout_write);
	}

	if (bufev->be_ops->adj_timeouts)
		r = bufev->be_ops->adj_timeouts(bufev);
	BEV_UNLOCK(bufev);

	return r;
}


//bufferevent_sock.c檔案
static int
be_socket_adj_timeouts(struct bufferevent *bufev)
{
	int r = 0;
	//使用者監聽了讀事件
	if (event_pending(&bufev->ev_read, EV_READ, NULL))
		if (be_socket_add(&bufev->ev_read, &bufev->timeout_read) < 0)
			r = -1;

	//使用者監聽了寫事件
	if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) {
		if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) < 0)
			r = -1;
	}
	return r;
}

        從上面程式碼可以看到:使用者不僅僅可以設定超時值,還可以修改超時值,也是通過這個函式進行修的。當然也是可以刪除超時的,直接把超時引數設定成NULL即可。

        至此,已經完成了bufferevent的初始化工作,只需呼叫event_base_dispatch函式,啟動發動機就可以工作了。





處理讀事件:

        接下來的任務:底層的socket fd接收資料後,bufferevent是怎麼工作的。

讀事件的水位:

        在講解讀事件之前,先來看一下水位問題,函式bufferevent_setwatermark可以設定讀和寫的水位。這裡只講解讀事件的水位。

        水位有兩個:低水位和高水位。

        低水位比較容易懂,就是當可讀的資料量到達這個低水位後,才會呼叫使用者設定的回撥函式。比如使用者想每次讀取100位元組,那麼就可以把低水位設定為100。當可讀資料的位元組數小於100時,即使有資料都不會打擾使用者(即不會呼叫使用者設定的回撥函式)。可讀資料大於等於100位元組後,才會呼叫使用者的回撥函式。

        高水位是什麼呢?其實,這和使用者的回撥函式沒有關係。它的意義是:把讀事件的evbuffer的資料量限制在高水位之下。比如,使用者認為讀緩衝區不能太大(太大的話,連結串列會很長)。那麼使用者就會設定讀事件的高水位。當讀緩衝區的資料量達到這個高水位後,即使socket fd還有資料沒有讀,也不會讀進這個讀緩衝區裡面。一句話說,就是控制evbuffer的大小。

        雖然控制了evbuffer的大小,但socket fd可能還有資料。有資料就會觸發可讀事件,但處理可讀的時候,又會發現設定了高水位,不能讀取資料evbuffer。socket fd的資料沒有被讀完,又觸發……。這個貌似是一個死迴圈。實際上是不會出現這個死迴圈的,因為Libevent發現evbuffer的資料量到達高水位後,就會把可讀事件給掛起來,讓它不能再觸發了。Libevent使用函式bufferevent_wm_suspend_read把監聽讀事件的event掛起來。下面看一下Libevent是怎麼把一個event掛起來的。

//bufferevent-internal.h檔案
#define bufferevent_wm_suspend_read(b) \
	bufferevent_suspend_read((b), BEV_SUSPEND_WM)


//bufferevent.c檔案
void
bufferevent_suspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
	struct bufferevent_private *bufev_private =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
	BEV_LOCK(bufev);
	if (!bufev_private->read_suspended)//不能掛多次
		bufev->be_ops->disable(bufev, EV_READ);//實際呼叫be_socket_disable函式
	bufev_private->read_suspended |= what;//因何而被掛起
	BEV_UNLOCK(bufev);
}


//bufferevent_sock.c檔案
static int
be_socket_disable(struct bufferevent *bufev, short event)
{
	struct bufferevent_private *bufev_p =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
	if (event & EV_READ) {
		if (event_del(&bufev->ev_read) == -1)
			return -1;
	}
	/* Don't actually disable the write if we are trying to connect. */
	if ((event & EV_WRITE) && ! bufev_p->connecting) {
		if (event_del(&bufev->ev_write) == -1)//刪掉這個event
			return -1;
	}
	return 0;
}

        居然是直接刪除這個監聽讀事件的event,真的是掛了!!!

        看來不能隨便設定高水位,因為它會暫停讀。如果只想設定低水位而不想設定高水位,那麼在呼叫bufferevent_setwatermark函式時,高水位的引數設為0即可。

        那麼什麼時候取消掛起,讓bufferevent可以繼續讀socket 資料呢?從高水位的意義來說,當然是當evbuffer裡面的資料量小於高水位時,就能再次讀取socket資料了。現在來看一下Libevent是怎麼恢復讀的。看一下設定水位的函式bufferevent_setwatermark吧,它進行了一些為高水位埋下了一個回撥函式。對,就是evbuffer的回撥函式。前一篇博文說到,當evbuffer裡面的資料新增或者刪除時,是會觸發一些回撥函式的。當用戶移除evbuffer的一些資料量時,Libevent就會檢查這個evbuffer的資料量是否小於高水位,如果小於的話,那麼就恢復 讀事件。

        不說這麼多了,上程式碼。

//bufferevent.c檔案
void
bufferevent_setwatermark(struct bufferevent *bufev, short events,
    size_t lowmark, size_t highmark)
{
	struct bufferevent_private *bufev_private =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);

	BEV_LOCK(bufev);

	if (events & EV_READ) {
		bufev->wm_read.low = lowmark;
		bufev->wm_read.high = highmark;

		if (highmark) {//高水位
			/* There is now a new high-water mark for read.
			   enable the callback if needed, and see if we should
			   suspend/bufferevent_wm_unsuspend. */

			//還沒設定高水位的回撥函式
			if (bufev_private->read_watermarks_cb == NULL) {
				bufev_private->read_watermarks_cb =
				    evbuffer_add_cb(bufev->input,
						    bufferevent_inbuf_wm_cb,
						    bufev);//添加回調函式
			}
			evbuffer_cb_set_flags(bufev->input,
				      bufev_private->read_watermarks_cb,
				      EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER);

			//設定(修改)高水位時,evbuffer的資料量已經超過了水位值
			//可能是把之前的高水位調高或者調低
			//掛起操作和取消掛起操作都是冪等的(即多次掛起的作用等同於掛起一次)
			if (evbuffer_get_length(bufev->input) > highmark)
				bufferevent_wm_suspend_read(bufev);
			else if (evbuffer_get_length(bufev->input) < highmark)//調低了
				bufferevent_wm_unsuspend_read(bufev);
		} else {
			//高水位值等於0,那麼就要取消掛起 讀事件
			//取消掛起操作是冪等的
			/* There is now no high-water mark for read. */
			if (bufev_private->read_watermarks_cb)
				evbuffer_cb_clear_flags(bufev->input,
				    bufev_private->read_watermarks_cb,
				    EVBUFFER_CB_ENABLED);
			bufferevent_wm_unsuspend_read(bufev);
		}
	}
	BEV_UNLOCK(bufev);
}

        這個函式,不僅僅為高水位設定回撥函式,還會檢查當前evbuffer的資料量是否超過了高水位。因為這個設定水位函式可能是在bufferevent工作一段時間後才新增的,所以evbuffer是有可能已經有資料的了,因此需要檢查。如果超過了水位值,那麼就需要掛起讀。當然也存在另外一種可能:使用者之前設定過了一個比較大的高水位,掛起了讀。現在發現錯了,就把高水位調低一點,此時就需要恢復讀。

        現在假設使用者移除了一些evbuffer的資料,進而觸發了evbuffer的回撥函式,當然也就呼叫了函式bufferevent_inbuf_wm_cb。下面看一下這個函式是怎麼恢復讀的。

//bufferevent.c檔案
static void
bufferevent_inbuf_wm_cb(struct evbuffer *buf,
    const struct evbuffer_cb_info *cbinfo,
    void *arg)
{
	struct bufferevent *bufev = arg;
	size_t size;

	size = evbuffer_get_length(buf);

	if (size >= bufev->wm_read.high)
		bufferevent_wm_suspend_read(bufev);
	else
		bufferevent_wm_unsuspend_read(bufev);
}

//bufferevent-internal.h檔案
#define bufferevent_wm_unsuspend_read(b) \
	bufferevent_unsuspend_read((b), BEV_SUSPEND_WM)

//bufferevent.c檔案
void
bufferevent_unsuspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
	struct bufferevent_private *bufev_private =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);

	BEV_LOCK(bufev);
	bufev_private->read_suspended &= ~what;
	if (!bufev_private->read_suspended && (bufev->enabled & EV_READ))
		bufev->be_ops->enable(bufev, EV_READ);//重新把event插入到event_base中
	BEV_UNLOCK(bufev);
}

        因為使用者可以手動為這個evbuffer新增資料,此時也會呼叫bufferevent_inbuf_wm_cb函式。此時就要檢查evbuffer的資料量是否已經超過高水位了,而不能僅僅檢查是否低於高水位。

        高水位導致讀的掛起和之後讀的恢復,一切工作都是由Libevent內部完成的,使用者不用做任何工作。




從socket中讀取資料:

        從前面的一系列博文可以知道,如果一個socket可讀了,那麼監聽可讀事件的event的回撥函式就會被呼叫。這個回撥函式是在bufferevent_socket_new函式中被Libevent內部設定的,設定為bufferevent_readcb函式,使用者並不知情。

        當socket有資料可讀時,Libevent就會監聽到,然後呼叫bufferevent_readcb函式處理。該函式會呼叫evbuffer_read函式,把資料從socket fd中讀取到evbuffer中。然後再呼叫使用者在bufferevent_setcb函式中設定的讀事件回撥函式。所以,當用戶的讀事件回撥函式被呼叫時,資料已經在evbuffer中了,使用者拿來就用,無需呼叫read這類會阻塞的函式。

        下面看一下bufferevent_readcb函式的具體實現。
static void
bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
{
	struct bufferevent *bufev = arg;
	struct bufferevent_private *bufev_p =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
	struct evbuffer *input;
	int res = 0;
	short what = BEV_EVENT_READING;
	ev_ssize_t howmuch = -1, readmax=-1;

	_bufferevent_incref_and_lock(bufev);

	if (event == EV_TIMEOUT) {
		/* Note that we only check for event==EV_TIMEOUT. If
		 * event==EV_TIMEOUT|EV_READ, we can safely ignore the
		 * timeout, since a read has occurred */
		what |= BEV_EVENT_TIMEOUT;
		goto error;
	}

	input = bufev->input;


	//使用者設定了高水位
	if (bufev->wm_read.high != 0) {
		howmuch = bufev->wm_read.high - evbuffer_get_length(input);
		/* we somehow lowered the watermark, stop reading */
		if (howmuch <= 0) {
			bufferevent_wm_suspend_read(bufev);
			goto done;
		}
	}

	//因為使用者可以限速,所以這麼要檢測最大的可讀大小。
	//如果沒有限速的話,那麼將返回16384位元組,即16K
	//預設情況下是沒有限速的。
	readmax = _bufferevent_get_read_max(bufev_p);
	if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"
					       * uglifies this code. XXXX */
		howmuch = readmax;

	//一些原因導致讀 被掛起,比如加鎖了。
	if (bufev_p->read_suspended)
		goto done;

	//解凍,使得可以在input的後面追加資料
	evbuffer_unfreeze(input, 0);
	res = evbuffer_read(input, fd, (int)howmuch); //從socket fd中讀取資料
	evbuffer_freeze(input, 0);//凍結

	if (res == -1) {
		int err = evutil_socket_geterror(fd);
		if (EVUTIL_ERR_RW_RETRIABLE(err))//EINTER or EAGAIN
			goto reschedule;

		//不是 EINTER or EAGAIN 這兩個可以重試的錯誤,那麼就應該是其他致命的錯誤
		//此時,應該報告給使用者
		what |= BEV_EVENT_ERROR;/**< unrecoverable error encountered */
	} else if (res == 0) {//斷開了連線
		what |= BEV_EVENT_EOF;
	}

	if (res <= 0)
		goto error;

	//速率相關的操作
	_bufferevent_decrement_read_buckets(bufev_p, res);

	
	//evbuffer的資料量大於低水位值。
	if (evbuffer_get_length(input) >= bufev->wm_read.low)
		_bufferevent_run_readcb(bufev);//呼叫使用者設定的回撥函式

	goto done;

 reschedule:
	goto done;

 error:
 	//把監聽可讀事件的event從event_base的事件佇列中刪除掉.event_del
	bufferevent_disable(bufev, EV_READ);//會呼叫be_socket_disable函式
	_bufferevent_run_eventcb(bufev, what);//會呼叫使用者設定的錯誤處理函式

 done:
	_bufferevent_decref_and_unlock(bufev);
}

        細心的讀者可能會發現:對使用者的讀事件回撥函式的觸發是邊緣觸發的。這也就要求,在回撥函式中,使用者應該儘可能地把evbuffer的所有資料都讀出來。如果想等到下一次回撥時再讀,那麼需要等到下一次socketfd接收到資料才會觸發使用者的回撥函式。如果之後socket fd一直收不到任何資料,那麼即使evbuffer還有資料,使用者的回撥函式也不會被呼叫了。


處理寫事件:

        對一個可讀事件進行監聽是比較容易的,但對於一個可寫事件進行監聽則比較困難。為什麼呢?因為可讀監聽是監聽fd的讀緩衝區是否有資料了,如果沒有資料那麼就一直等待。對於可寫,首先要明白“什麼是可寫”,可寫就是fd的寫緩衝區(這個緩衝區在核心)還沒滿,可以往裡面放資料。這就有一個問題,如果寫緩衝區沒有滿,那麼就一直是可寫狀態。如果一個event監聽了可寫事件,那麼這個event就會一直被觸發(死迴圈)。因為一般情況下,如果不是發大量的資料這個寫緩衝區是不會滿的。

        也就是說,不能監聽可寫事件。但我們確實要往fd中寫資料,那怎麼辦?Libevent的做法是:當我們確實要寫入資料時,才監聽可寫事件。也就是說我們呼叫bufferevent_write寫入資料時,Libevent才會把監聽可寫事件的那個event註冊到event_base中。當Libevent把資料都寫入到fd的緩衝區後,Libevent又會把這個event從event_base中刪除。比較煩瑣。        

        bufferevent_writecb函式不僅僅要處理上面說到的那個問題,還要處理另外一個坑爹的問題。那就是:判斷socket fd是不是已經連線上伺服器了。這是因為這個socket fd是非阻塞的,所以它呼叫connect時,可能還沒連線上就返回了。對於非阻塞socket fd,一般是通過判斷這個socket是否可寫,從而得知這個socket是否已經連線上伺服器。如果可寫,那麼它就已經成功連線上伺服器了。這個問題,這裡先提一下,後面會詳細講。

        同前面的監聽可讀一樣,Libevent是在bufferevent_socket_new函式設定可寫的回撥函式,為bufferevent_writecb。

//bufferevent_sock.c檔案
static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
{
	struct bufferevent *bufev = arg;
	struct bufferevent_private *bufev_p =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
	int res = 0;
	short what = BEV_EVENT_WRITING;
	int connected = 0;
	ev_ssize_t atmost = -1;

	_bufferevent_incref_and_lock(bufev);

	if (event == EV_TIMEOUT) {
		/* Note that we only check for event==EV_TIMEOUT. If
		 * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
		 * timeout, since a read has occurred */
		what |= BEV_EVENT_TIMEOUT;
		goto error;
	}

	...//判斷這個socket是否已經連線上伺服器了


	//使用者可能設定了限速,如果沒有限速,那麼atmost將返回16384(16K)
	atmost = _bufferevent_get_write_max(bufev_p);

	//一些原因導致寫被掛起來了
	if (bufev_p->write_suspended)
		goto done;

	//如果evbuffer有資料可以寫到sockfd中
	if (evbuffer_get_length(bufev->output)) {
		//解凍連結串列頭
		evbuffer_unfreeze(bufev->output, 1);
		//將output這個evbuffer的資料寫到socket fd 的緩衝區中
		//會把已經寫到socket fd緩衝區的資料,從evbuffer中刪除
		res = evbuffer_write_atmost(bufev->output, fd, atmost);
		evbuffer_freeze(bufev->output, 1);
		
		if (res == -1) {
			int err = evutil_socket_geterror(fd);
			if (EVUTIL_ERR_RW_RETRIABLE(err))//可以恢復的錯誤。一般是EINTR或者EAGAIN
				goto reschedule;
			what |= BEV_EVENT_ERROR;
		} else if (res == 0) {//該socket已經斷開連線了
			what |= BEV_EVENT_EOF;
		}
		if (res <= 0)
			goto error;
	}

	//如果把寫緩衝區的資料都寫完成了。為了防止event_base不斷地觸發可寫
	//事件,此時要把這個監聽可寫的event刪除。
	//前面的atmost限制了一次最大的可寫資料。如果還沒寫所有的資料
	//那麼就不能delete這個event,而是要繼續監聽可寫事情,知道把所有的
	//資料都寫到socket fd中。
	if (evbuffer_get_length(bufev->output) == 0) {
		event_del(&bufev->ev_write);
	}


	//如果evbuffer裡面的資料量已經寫得七七八八了,小於設定的低水位值,那麼
	//就會呼叫使用者設定的寫事件回撥函式
	if ((res || !connected) &&
	    evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
		_bufferevent_run_writecb(bufev);
	}

	goto done;

 reschedule:
	if (evbuffer_get_length(bufev->output) == 0) {
		event_del(&bufev->ev_write);
	}
	goto done;

 error:
	bufferevent_disable(bufev, EV_WRITE);//有錯誤。把這個寫event刪除
	_bufferevent_run_eventcb(bufev, what);

 done:
	_bufferevent_decref_and_unlock(bufev);
}

        上面程式碼的邏輯比較清晰,呼叫evbuffer_write_atmost函式把資料從evbuffer中寫到evbuffer緩衝區中,此時要注意函式的返回值,因為可能寫的時候發生錯誤。如果發生了錯誤,就要呼叫使用者設定的event回撥函式(網上也有人稱其為錯誤處理函式)。

        之後,還要判斷evbuffer的資料是否已經全部寫到socket 的緩衝區了。如果已經全部寫了,那麼就要把監聽寫事件的event從event_base的插入佇列中刪除。如果還沒寫完,那麼就不能刪除,因為還要繼續監聽可寫事件,下次接著寫。

        現在來看一下,把監聽寫事件的event從event_base的插入佇列中刪除後,如果下次使用者有資料要寫的時候,怎麼把這個event新增到event_base的插入佇列。

        使用者一般是通過bufferevent_write函式把資料寫入到evbuffer(寫入evbuffer後,接著就會被寫入socket,所以呼叫bufferevent_write就相當於把資料寫入到socket。)。而這個bufferevent_write函式是直接呼叫evbuffer_add函式的。函式evbuffer_add沒有呼叫什麼可疑的函式,能夠把監聽可寫的event新增到event_base中。唯一的可能就是那個回撥函式。對就是evbuffer的回撥函式。關於evbuffer的回撥函式,可以參考這裡

//bufferevent.c檔案
int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
{
	if (evbuffer_add(bufev->output, data, size) == -1)
		return (-1);

	return 0;
}


//buffer.c檔案
int
evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen)
{
	...

out:
	evbuffer_invoke_callbacks(buf);//呼叫回撥函式
	result = 0;
done:
	return result;
}

        還記得本文前面的bufferevent_socket_new函式嗎?該函式裡面會有

evbuffer_add_cb(bufev->output,bufferevent_socket_outbuf_cb, bufev);

        當bufferevent的寫緩衝區output的資料發生變化時,函式bufferevent_socket_outbuf_cb就會被呼叫。現在馬上飛到這個函式。

//bufferevent_sock.c檔案
static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf,
    const struct evbuffer_cb_info *cbinfo,
    void *arg)
{
	struct bufferevent *bufev = arg;
	struct bufferevent_private *bufev_p =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);

	if (cbinfo->n_added && //evbuffer添加了資料
	    (bufev->enabled & EV_WRITE) && //預設情況下是enable EV_WRITE的
	    !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&//這個event已經被踢出event_base了
	    !bufev_p->write_suspended) {//這個bufferevent的寫並沒有被掛起

		//把這個event新增到event_base中
		if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
		    /* Should we log this? */
		}
	}
}

        這個函式首先進行一些判斷,滿足條件後就會把這個監聽寫事件的event新增到event_base中。其中event_pending函式就是判斷這個bufev->ev_write是否已經被event_base刪除了。關於event_pending,可以參考這裡

        對於bufferevent_write,初次使用該函式的讀者可能會有疑問:呼叫該函式後,引數data指向的記憶體空間能不能馬上釋放,還是要等到Libevent把data指向的資料都寫到socket 快取區才能刪除?其實,從前一篇博文可以看到,evbuffer_add是直接複製一份使用者要傳送的資料到evbuffer快取區的。所以,呼叫完bufferevent_write,就可以馬上釋放參數data指向的記憶體空間

        網上的關於Libevent的一些使用例子,包括我寫的《 Libevent使用例子,從簡單到複雜》,都是在主執行緒中呼叫bufferevent_write函式寫入資料的。從上面的分析可以得知,是可以馬上把監聽可寫事件的event新增到event_base中。如果是在次執行緒呼叫該函式寫入資料呢?此時,主執行緒可能還睡眠在poll、epoll這類的多路IO複用函式上。這種情況下能不能及時喚醒主執行緒呢?其實是可以的,只要你的Libevent在一開始使用了執行緒功能。具體的分析過程可以參考《evthread_notify_base通知主執行緒》。上面程式碼中的be_socket_add會呼叫event_add,而在次執行緒呼叫event_add就會呼叫evthread_notify_base通知主執行緒。


bufferevent_socket_connect:

        使用者可以在呼叫bufferevent_socket_new函式時,傳一個-1作為socket的檔案描述符,然後呼叫bufferevent_socket_connect函式連線伺服器,無需自己寫程式碼呼叫connect函式連線伺服器。

        bufferevent_socket_connect函式會呼叫socket函式申請一個套接字fd,然後把這個fd設定成非阻塞的(這就導致了一些坑爹的事情)。接著就connect伺服器,因為該socket fd是非阻塞的,所以不會等待,而是馬上返回,連線這工作交給核心來完成。所以,返回後這個socket還沒有真正連線上伺服器。那麼什麼時候連線上呢?核心又是怎麼通知通知使用者呢?

        一般來說,當可以往socket fd寫東西了,那就說明已經連線上了。也就是說這個socket fd變成可寫狀態,就連線上了。

        所以,對於“非阻塞connect”比較流行的做法是:用select或者poll這類多路IO複用函式監聽該socket的可寫事件。當這個socket觸發了可寫事件,然後再對這個socket呼叫getsockopt函式,做進一步的判斷。

        Libevent也是這樣實現的,下面來看一下bufferevent_socket_connect函式。
//bufferevent_sock.c檔案
int
bufferevent_socket_connect(struct bufferevent *bev,
    struct sockaddr *sa, int socklen)
{
	struct bufferevent_private *bufev_p =
	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);

	evutil_socket_t fd;
	int r = 0;
	int result=-1;
	int ownfd = 0;

	_bufferevent_incref_and_lock(bev);

	if (!bufev_p)
		goto done;


	fd = bufferevent_getfd(bev);
	if (fd < 0) {//該bufferevent還沒有設定fd
		if (!sa)
			goto done;
		fd = socket(sa->sa_family, SOCK_STREAM, 0);
		if (fd < 0)
			goto done;
		if (evutil_make_socket_nonblocking(fd)<0)//設定為非阻塞
			goto done;
		ownfd = 1;
	}
	if (sa) {
		r = evutil_socket_connect(&fd, sa, socklen);//非阻塞connect
		if (r < 0)
			goto freesock;
	}
	...

	//為bufferevent裡面的兩個event設定監聽的fd
	//後面會呼叫bufferevent_enable
	bufferevent_setfd(bev, fd);
	
	if (r == 0) {//暫時還沒連線上,因為fd是非阻塞的
		//此時需要監聽可寫事件,當可寫了,並且沒有錯誤的話,就成功連線上了
		if (! be_socket_enable(bev, EV_WRITE)) {
			bufev_p->connecting = 1;//標誌這個sockfd正在連線
			result = 0;
			goto done;
		}
	} else if (r == 1) {//已經連線上了
		/* The connect succeeded already. How very BSD of it. */
		result = 0;
		bufev_p->connecting = 1; 
		event_active(&bev->ev_write, EV_WRITE, 1);//手動啟用這個event
	} else {// connection refused
		/* The connect failed already.  How very BSD of it. */
		bufev_p->connection_refused = 1;
		bufev_p->connecting = 1;
		result = 0;
		event_active(&bev->ev_write, EV_WRITE, 1);//手動啟用這個event
	}

	goto done;

freesock:
	_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);//出現錯誤
	if (ownfd)
		evutil_closesocket(fd);
done:
	_bufferevent_decref_and_unlock(bev);
	return result;
}

        這個函式比較多錯誤處理的程式碼,大致看一下就行了。有幾個地方要注意,即使connect的時候被拒絕,或者已經連線上了,都會手動啟用這個event。一個event即使沒有加入event_base,也是可以手動啟用的。具體原理參考這裡

        無論是手動啟用event,或者監聽到這個event可寫了,都是會呼叫bufferevent_writecb函式。現在再次看一下該函式,只看connect部分。

//bufferevent_sock.c檔案
static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
{
	struct bufferevent_private *bufev_p =
	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
	int connected = 0;

	_bufferevent_incref_and_lock(bufev);

	...
	//正在連線。因為這個sockfd可能是非阻塞的,所以可能之前的connect還沒
	//連線上。而判斷該sockfd是否成功連線上了的一個方法是判斷這個sockfd是否可寫
	if (bufev_p->connecting) {
		//c等於1,說明已經連線成功
		//c等於0,說明還沒連線上
		//c等於-1,說明發生錯誤
		int c = evutil_socket_finished_connecting(fd);


		if (bufev_p->connection_refused) {//在bufferevent_socket_connect中被設定
		  bufev_p->connection_refused = 0;
		  c = -1;
		}

		if (c == 0)//還沒連線上,繼續監聽可寫吧
			goto done;


		//錯誤,或者已經連線上了
		bufev_p->connecting = 0;//修改標誌值
		
		if (c < 0) {//錯誤
			event_del(&bufev->ev_write);
			event_del(&bufev->ev_read);
			_bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
			goto done;

		} else {//連線上了。
			connected = 1;
			...//win32

			//居然會呼叫使用者設定的錯誤處理函式。太神奇了
			_bufferevent_run_eventcb(bufev,
					BEV_EVENT_CONNECTED);
			if (!(bufev->enabled & EV_WRITE) || //預設都是enable EV_WRITE的
			    bufev_p->write_suspended) {
				event_del(&bufev->ev_write);//不再需要監聽可寫。因為已經連線上了
				goto done;
			}
		}
	}

	...


 done:
	_bufferevent_decref_and_unlock(bufev);
}

        可以看到無論是connect被拒絕、發生錯誤或者連線上了,都在這裡做統一的處理。

        如果已經連線上了,那麼會呼叫使用者設定event回撥函式(網上也稱之為錯誤處理函式),通知使用者已經連線上了。並且,還會把監聽可寫事件的event從event_base中刪除,其理由在前面已經說過了。

        函式evutil_socket_finished_connecting會檢查這個socket,從而得知這個socket是處於什麼狀態。在bufferevent_socket_connect函式中,出現的一些錯誤,比如被拒絕,也是能通過這個函式檢查出來的。所以可以在這裡做統一的處理。該函式的內部是使用。貼一下這個函式的程式碼吧。

//evutil.c檔案
//Return 1 for connected, 0 for not yet, -1 for error.
int
evutil_socket_finished_connecting(evutil_socket_t fd)
{
	int e;
	ev_socklen_t elen = sizeof(e);

	//用來檢測這個fd是否已經連線上了,這個fd是非阻塞的
	//如果e的值被設為0,那麼就說明連線上了。
	//否則e被設定為對應的錯誤值。
	if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (void*)&e, &elen) < 0)
		return -1;

	if (e) {
		if (EVUTIL_ERR_CONNECT_RETRIABLE(e))//還沒連線上
			return 0;
		EVUTIL_SET_SOCKET_ERROR(e);
		return -1;
	}

	return 1;
}


        好長啊!終於寫完了。

    謝絕推酷、第七城市的轉載!!!