1. 程式人生 > >Libevent原始碼分析-----超時event的處理

Libevent原始碼分析-----超時event的處理

如何成為超時event:       

         Libevent允許建立一個超時event,使用evtimer_new巨集。

//event.h檔案

#define evtimer_new(b, cb, arg) event_new((b), -1, 0, (cb), (arg))

        從巨集的實現來看,它一樣是用到了一般的event_new,並且不使用任何的檔案描述符。從超時event巨集的實現來看,無論是evtimer建立的event還是一般event_new建立的event,都能使得Libevent進行超時監聽。其實,使得Libevent對一個event進行超時監聽的原因是:在呼叫event_add的時候,第二引數不能為NULL,要設定一個超時值。如果為NULL,那麼Libevent將不會為這個event監聽超時。下文統一稱設定了超時值的event為超時event。

超時event的原理:

        Libevent對超時進行監聽的原理不同於之前講到的對訊號的監聽,Libevent對超時的監聽的原理是,多路IO複用函式都是有一個超時值。如果使用者需要Libevent同時監聽多個超時event,那麼Libevent就把超時值最小的那個作為多路IO複用函式的超時值。自然,當時間一到,就會從多路IO複用函式返回。此時對超時event進行處理即可。

        Libevent執行使用者同時監聽多個超時event,那麼就必須要對這個超時值進行管理。Libevent提供了小根堆和通用超時(common timeout)這兩種管理方式。下文為了敘述方便,就假定使用的是小根堆。

工作流程:

        下面來看一下超時event的工作流程。

設定超時值:

        首先呼叫event_add時要設定一個超時值,這樣才能成為一個超時event。

//event.c檔案

//在event_add中,會把第三個引數設為0.使得使用的是相對時間

static inline int

event_add_internal(struct event *ev, const struct timeval *tv,

int tv_is_absolute)

{

struct event_base *base = ev->ev_base;

int res = 0;

int notify = 0;


//tv不為NULL,就說明是一個超時event,在小根堆中為其留一個位置

if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {

if (min_heap_reserve(&base->timeheap,

1 + min_heap_size(&base->timeheap)) == -1)

return (-1); /* ENOMEM == errno */

}


...//將IO或者訊號event插入到對應的佇列中。



if (res != -1 && tv != NULL) {

struct timeval now;


//使用者把這個event設定成EV_PERSIST。即永久event.

//如果沒有這樣設定的話,那麼只會超時一次。設定了,那麼就

//可以超時多次。那麼就要記錄使用者設定的超時值。

if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute)

ev->ev_io_timeout = *tv;


//該event之前被加入到超時佇列。使用者可以對同一個event呼叫多次event_add

//並且可以每次都用不同的超時值。

if (ev->ev_flags & EVLIST_TIMEOUT) {

/* XXX I believe this is needless. */

//之前為該event設定的超時值是所有超時中最小的。

//從下面的刪除可知,會刪除這個最小的超時值。此時多路IO複用函式

//的超時值引數就已經改變了。

if (min_heap_elt_is_top(ev))

notify = 1; //要通知主執行緒。可能是次執行緒為這個event呼叫本函式


//從超時佇列中刪除這個event。因為下次會再次加入。

//多次對同一個超時event呼叫event_add,那麼只能保留最後的那個。

event_queue_remove(base, ev, EVLIST_TIMEOUT);

}


//因為可以在次執行緒呼叫event_add。而主執行緒剛好在執行event_base_dispatch

if ((ev->ev_flags & EVLIST_ACTIVE) &&

(ev->ev_res & EV_TIMEOUT)) {//該event被啟用的原因是超時


...

event_queue_remove(base, ev, EVLIST_ACTIVE);

}


//獲取現在的時間

gettime(base, &now);


//雖然使用者在event_add時只需用一個相對時間,但實際上在Libevent內部

//還是要把這個時間轉換成絕對時間。從儲存的角度來說,存絕對時間只需

//一個變數。而相對時間則需兩個,一個存相對值,另一個存參照物。

if (tv_is_absolute) { //該引數指明時間是否為一個絕對時間

ev->ev_timeout = *tv;

} else {

//參照時間 + 相對時間 ev_timeout存的是絕對時間

evutil_timeradd(&now, tv, &ev->ev_timeout);

}



//將該超時event插入到超時佇列中

event_queue_insert(base, ev, EVLIST_TIMEOUT);


//本次插入的超時值,是所有超時中最小的。那麼此時就需要通知主執行緒。

if (min_heap_elt_is_top(ev))

notify = 1;

}


//如果程式碼邏輯中是需要通知的。並且本執行緒不是主執行緒。那麼就通知主執行緒

if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))

evthread_notify_base(base);


return (res);

}

        對於同一個event,如果是IO event或者訊號event,那麼將無法多次新增。但如果是一個超時event,那麼是可以多次新增的。並且對應超時值會使用最後新增時指明的那個,之前的統統不要,即替換掉之前的超時值。

        程式碼中出現了多次使用了notify變數。這主要是用在:次執行緒在執行這個函式,而主執行緒在執行event_base_dispatch。前面說到Libevent能對超時event進行監聽的原理是:多路IO複用函式有一個超時引數。在次執行緒新增的event的超時值更小,又或者替換了之前最小的超時值。在這種情況下,都是要通知主執行緒,告訴主執行緒,最小超時值已經變了。關於通知主執行緒evthread_notify_base,可以參考博文《evthread_notify_base通知主執行緒》。

        程式碼中的第三個判斷體中用到了ev->ev_io_timeout。但event結構體中並沒有該變數。其實,ev_io_timeout是一個巨集定義。

//event-internal.h檔案

#define ev_io_timeout _ev.ev_io.ev_timeout

        要注意的一點是,在呼叫event_add時設定的超時值是一個時間段(可以認為隔多長時間就觸發一次),相對於現在,即呼叫event_add的時間,而不是呼叫event_base_dispatch的時間。

呼叫多路IO複用函式等待超時:

        現在來看一下event_base_loop函式,看其是怎麼處理超時event的。

//event.c檔案

int

event_base_loop(struct event_base *base, int flags)

{

const struct eventop *evsel = base->evsel;

struct timeval tv;

struct timeval *tv_p;

int res, done, retval = 0;


EVBASE_ACQUIRE_LOCK(base, th_base_lock);


base->running_loop = 1;


done = 0;

while (!done) {

tv_p = &tv;

if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {

// 根據Timer事件計算evsel->dispatch的最大等待時間(超時值最小)

timeout_next(base, &tv_p);

} else { //不進行等待

//把等待時間置為0,即可不進行等待,馬上觸發事件

evutil_timerclear(&tv);

}


res = evsel->dispatch(base, tv_p);


//處理超時事件,將超時事件插入到啟用連結串列中

timeout_process(base);


if (N_ACTIVE_CALLBACKS(base)) {

int n = event_process_active(base);

}

}


done:

base->running_loop = 0;

EVBASE_RELEASE_LOCK(base, th_base_lock);


return (retval);

}



//選出超時值最小的那個

static int

timeout_next(struct event_base *base, struct timeval **tv_p)

{

/* Caller must hold th_base_lock */

struct timeval now;

struct event *ev;

struct timeval *tv = *tv_p;

int res = 0;


// 堆的首元素具有最小的超時值,這個是小根堆的性質。

ev = min_heap_top(&base->timeheap);


//堆中沒有元素

if (ev == NULL) {

*tv_p = NULL;

goto out;

}


//獲取當然時間

if (gettime(base, &now) == -1) {

res = -1;

goto out;

}


// 如果超時時間<=當前時間,不能等待,需要立即返回

// 因為ev_timeout這個時間是由event_add呼叫時的絕對時間 + 相對時間。所以ev_timeout是

// 絕對時間。可能在呼叫event_add之後,過了一段時間才呼叫event_base_diapatch,所以

// 現在可能都過了使用者設定的超時時間。

if (evutil_timercmp(&ev->ev_timeout, &now, <=)) {

evutil_timerclear(tv); //清零,這樣可以讓dispatcht不會等待,馬上返回

goto out;

}


// 計算等待的時間=當前時間-最小的超時時間

evutil_timersub(&ev->ev_timeout, &now, tv);


out:

return (res);

}

        上面程式碼的流程是:計算出本次呼叫多路IO複用函式的等待時間,然後呼叫多路IO複用函式中等待超時。

啟用超了時的event:

        上面程式碼中的timeout_process函式就是處理超了時的event。

//event.c檔案

//把超時了的event,放到啟用佇列中。並且,其啟用原因設定為EV_TIMEOUT

static void

timeout_process(struct event_base *base)

{

/* Caller must hold lock. */

struct timeval now;

struct event *ev;


if (min_heap_empty(&base->timeheap)) {

return;

}


gettime(base, &now);


//遍歷小根堆的元素。之所以不是隻取堆頂那一個元素,是因為當主執行緒呼叫多路IO複用函式

//進入等待時,次執行緒可能添加了多個超時值更小的event

while ((ev = min_heap_top(&base->timeheap))) {

//ev->ev_timeout存的是絕對時間

//超時時間比此刻時間大,說明該event還沒超時。那麼餘下的小根堆元素更不用檢查了。

if (evutil_timercmp(&ev->ev_timeout, &now, >))

break;



//下面說到的del是等同於呼叫event_del.把event從這個event_base中(所有的佇列都)

//刪除。event_base不再監聽之。

//這裡是timeout_process函式。所以對於有超時的event,才會被del掉。

//對於有EV_PERSIST選項的event,在處理啟用event的時候,會再次新增進event_base的。

//這樣做的一個好處就是,再次新增的時候,又可以重新計算該event的超時時間(絕對時間)。

event_del_internal(ev);


//把這個event加入到event_base的啟用佇列中。

//event_base的啟用佇列又有該event了。所以如果該event是EV_PERSIST的,是可以

//再次新增進該event_base的

event_active_nolock(ev, EV_TIMEOUT, 1);

}

}

        當從多路IO複用函式返回時,就檢查時間小根堆,看有多少個event已經超時了。如果超時了,那就把這個event加入到event_base的啟用佇列中。並且把這個超時del(刪除)掉,這主要是用於非PERSIST 超時event的。刪除一個event的具體操作可以檢視這裡

        把一個event新增進啟用佇列後的工作流程可以參考《Libevent工作流程探究》一文。

處理永久超時event:

        現在來看一下如果該超時event有EV_PERSIST選項,在後面是怎麼再次新增進event_base,因為前面的程式碼註釋中已經說了,在選出超時event時,會把超時的event從event_base中delete掉。

//event.c檔案

int

event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd,

short events, void (*callback)(evutil_socket_t, short, void *), void *arg)

{

...

if (events & EV_PERSIST) {

ev->ev_closure = EV_CLOSURE_PERSIST;

} else {

ev->ev_closure = EV_CLOSURE_NONE;

}


return 0;

}



static int

event_process_active_single_queue(struct event_base *base,

struct event_list *activeq)

{

struct event *ev;


//遍歷同一優先順序的所有event

for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {


//下面這個if else 是用於IO event的。這裡貼出,是為了瞭解一些非超時event是

//怎麼處理永久事件(EV_PERSIST)的。

//如果是永久事件,那麼只需從active佇列中刪除。

if (ev->ev_events & EV_PERSIST)

event_queue_remove(base, ev, EVLIST_ACTIVE);

else //不是的話,那麼就要把這個event刪除掉。

event_del_internal(ev);



switch (ev->ev_closure) {

//這個case只對超時event的EV_PERSIST才有用。IO的沒有用

case EV_CLOSURE_PERSIST:

event_persist_closure(base, ev);

break;


default: //預設是EV_CLOSURE_NONE

case EV_CLOSURE_NONE:

//沒有設定EV_PERSIST的超時event,就只有一次的監聽機會

(*ev->ev_callback)(

ev->ev_fd, ev->ev_res, ev->ev_arg);

break;

}

}

}



static inline void

event_persist_closure(struct event_base *base, struct event *ev)

{

//在event_add_internal函式中,如果是超時event並且有EV_PERSIST,那麼就會把ev_io_timeout設定成

//使用者設定的超時時間(相對時間)。否則為0。即不進入判斷體中。

//說明這個if只用於處理具有EV_PERSIST屬性的超時event

if (ev->ev_io_timeout.tv_sec || ev->ev_io_timeout.tv_usec) {

struct timeval run_at, relative_to, delay, now;

ev_uint32_t usec_mask = 0;


gettime(base, &now);


//delay是使用者設定的超時時間。event_add的第二個引數

delay = ev->ev_io_timeout;

//是因為超時才執行到這裡,event可以同時監聽多種事件。如果是由於可讀而執行

//到這裡,那麼就說明還沒超時。

if (ev->ev_res & EV_TIMEOUT) { //如果是因為超時而啟用,那麼下次超時就是本次超時的

relative_to = ev->ev_timeout; // 加上 delay 時間。

} else {

relative_to = now; //重新計算超時值

}


evutil_timeradd(&relative_to, &delay, &run_at);

//無論relative是哪個時間,run_at都不應該小於now。

//如果小於,則說明是使用者手動修改了系統時間,使得gettime()函式獲取了一個

//之前的時間。比如現在是9點,使用者手動調回到7點。

if (evutil_timercmp(&run_at, &now, <)) {

//那麼就以新的系統時間為準

evutil_timeradd(&now, &delay, &run_at);

}


//把這個event再次新增到event_base中。注意,此時第三個引數為1,說明是一個絕對時間

event_add_internal(ev, &run_at, 1);

}

EVBASE_RELEASE_LOCK(base, th_base_lock);

(*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg);//執行回撥函式

}

   這段程式碼的處理流程是:如果使用者指定了EV_PERSIST,那麼在event_assign中就記錄下來。在event_process_active_single_queue函式中會針對永久event進行呼叫event_persist_closure函式對之進行處理。在event_persist_closure函式中,如果是一般的永久event,那麼就直接呼叫該event的回撥函式。如果是超時永久event,那麼就需要再次計算新的超時時間,並將這個event再次插入到event_base中。 

       這段程式碼也指明瞭,如果一個event因可讀而被啟用,那麼其超時時間就要重新計算。而不是之前的那個了。也就是說,如果一個event設定了3秒的超時,但1秒後就可讀了,那麼下一個超時值,就要重新計算設定,而不是2秒後。

        從前面的原始碼分析也可以得到:如果一個event監聽可讀的同時也設定了超時值,並且一直沒有資料可讀,最後超時了,那麼這個event將會被刪除掉,不會再等。