1. 程式人生 > >Redis之事件模型

Redis之事件模型

介紹

Memcached的底層網路驅動部分直接使用了libevent,其實Redis使用libevent也是可以,僅僅使用其event_base完全可以行得通。但是作者為什麼需要自己造個輪子,可能作者覺得libevent封裝得過於複雜實現的功能過多吧。這裡區別一下Redis作者的ae.c和libevent的區別吧:

  • libevent統一了三種事件的處理,IO事件、時間事件和訊號事件,並且每個事件可以註冊多個回撥函式,事件之間具有優先順序關係(通過將就緒連結串列依據優先順序設定為多條實現)。而Redis僅僅統一了IO事件和事件事件,且一個事件fd只能註冊一個回撥函式,並且IO事件之間不具備優先順序,按照epoll返回順序依次執行。因此Redis的封裝更加簡單明瞭。
  • 二者都是通過Reactor模式封裝epoll了,所以實現起來,基本就是對一個大結構體操作。所以很容易實現。
  • 再次發現,Redis的作者很喜歡造輪子。

實現

1、事件分發器及IO事件、定時事件結構體

通過三個結構體,直接看清楚這種Reactor模式實現起來多麼容易。

  • 時間事件通過單鏈表儲存,內部儲存距離1970年1月1日00:00:00的秒及微妙。每次時間事件是否需要處理就是通過獲取當前時間,與儲存的時間簡單比較而已。因為先處理IO事件,所以加上處理IO事件的時間,就會導致時間事件的處理稍微推遲。於是通常IO時間回撥函式儘量短。這樣就可以通過單執行緒實現高併發操作,牛逼了。
  • IO事件是通過陣列儲存的,儲存在陣列中的index=fd。於是當事件fd很大,作者就直接使用realloc擴容,簡單粗暴高效。而libevent是使用雙向連結串列儲存,相對複雜一點。
  • 通過以上,再去分析就很容易了。
/*
Ractor模式結構體,管理全部的事件以及epoll函式的狀態資訊資料
*/
typedef struct aeEventLoop {
    int maxfd;   /*已經註冊的最大檔案描述符 highest file descriptor currently registered */
    int setsize; /* 檔案描述符監聽集合大小max number of file descriptors tracked */
    long long timeEventNextId;/* 下一個時間事件的ID */
    time_t lastTime;     /* 最後一次執行事件的時間 ,快取上次事件Used to detect system clock skew */
/* 牛逼了,直接realloc,但是對於libevent是通過雙向連結串列儲存,陣列查詢快,連結串列刪除快。 */ aeFileEvent *events; /* 動態分配的陣列,儲存註冊的檔案事件,套接字讀和寫事件,Registered events */ aeFiredEvent *fired; /* 動態分配的陣列,就緒的事件,事件就緒,則將其插入就緒列表Fired events */ aeTimeEvent *timeEventHead;/*事件事件,通過單鏈表連線在一起*/ int stop;/*事件處理的開關*/ void *apidata; /* 多路複用庫的事件狀態,通常用來儲存epoll需要的特定資料This is used for polling API specific data */ aeBeforeSleepProc *beforesleep;/*執行處理事件之前的函式 */ aeBeforeSleepProc *aftersleep;/*執行處理事件之後的函式 */ } aeEventLoop; /* File event structure 一個IO事件的封裝 */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc;//讀檔案函式指標 aeFileProc *wfileProc;//寫檔案函式指標 void *clientData;//指向客戶端傳動的資料 } aeFileEvent; /* Time event structure 一個時間事件的封裝 */ typedef struct aeTimeEvent { long long id; /*事件事件ID time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ aeTimeProc *timeProc;/*事件事件就緒回撥函式*/ aeEventFinalizerProc *finalizerProc;/* 事件事件終結回撥函式*/ void *clientData;/*使用者資料*/ struct aeTimeEvent *next;/*通過連結串列連線*/ } aeTimeEvent; /* 對於epoll,aeEventLoop內的apidata指向aeApiState,裡面儲存了epoll的檔案描述符以及epoll_wait返回儲存就緒事件結果的陣列。 */ typedef struct aeApiState {//epoll函式本身的檔案描述符 int epfd;//epoll對應的檔案描述符 struct epoll_event *events;//epoll_wait返回需要使用 } aeApiState;

依據上面的結構體,可以畫出下面事件處理框架:
這裡寫圖片描述
看清楚了上圖,再繼續講解API將變得異常簡單,所以在設計程式碼的時候,先選定好結構體,並畫出對應的記憶體模型之後,程式碼的設計就會變得異常簡單。

封裝epoll


/*
初始化epoll_create
*/
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));//分配需要儲存epoll本身資訊狀態的記憶體。

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);//分配陣列,最大事件數目,用於儲存epoll的返回值。
    if (!state->events) {//每一步都必須判斷記憶體是否分配成功。
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;//儲存到aeEventLoop中
    return 0;
}


/*
往epoll裡面新增事件,新增fd和事件型別
*/
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;//事件全部採用預設的水平觸發。
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;//讀或者寫
    ee.data.fd = fd;//傳入fd,就緒的時候也通用返回fd,最後通過fd判斷就緒事件。
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;//新增或者修改
    return 0;
}

/*
往epoll中刪除事件
*/
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    aeApiState *state = eventLoop->apidata;//獲取epoll資訊
    struct epoll_event ee = {0}; /* avoid valgrind warning,看來作者每個程式碼都用valgrind檢測過的 */
    int mask = eventLoop->events[fd].mask & (~delmask);

    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (mask != AE_NONE) {//修改事件
        epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
    } else {
        /* Note, Kernel < 2.6.9 requires a non null event pointer even for
         * EPOLL_CTL_DEL. */
        epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);//刪除事件
    }
}


/*
epoll_wait迴圈等待就緒事件
*/
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);//結果全部儲存在state->events中,空間大小由setsize設定。
    if (retval > 0) {//大於0,則有就緒了,retval是就緒個數。
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {//直接遍歷就緒事件,將對應的事件加入到就緒事件中。
            int mask = 0;
            struct epoll_event *e = state->events+j;//訪問陣列,考察就緒事件

            if (e->events & EPOLLIN) mask |= AE_READABLE;//可讀
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;//可寫
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;//
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;//加入就緒事件的fd
            eventLoop->fired[j].mask = mask;//因為什麼事件就緒
        }
    }
    return numevents;//返回就緒事件的數量。
}

//儲存epoll的狀態資訊
typedef struct aeApiState {//epoll函式本身的檔案描述符
    int epfd;//epoll對應的檔案描述符
    struct epoll_event *events;//epoll_wait返回需要使用
} aeApiState;



/*
擴充套件儲存epoll_wait返回儲存結果的記憶體。
*/
static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
    aeApiState *state = eventLoop->apidata;

    state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);//直接realloc,很平常的哦。
    return 0;
}

事件模型

全部程式碼包含在ae.c中,也就是將前面的程式碼再次封裝成aeEventLoop,使得更加好用。下面看看就很清楚。

/*
一些重要的標誌資訊,通過此資訊標記當前事件的幾種狀態。
*/
#define AE_OK 0
#define AE_ERR -1

#define AE_NONE 0       /* No events registered. */
#define AE_READABLE 1   /* Fire when descriptor is readable. */
#define AE_WRITABLE 2   /* Fire when descriptor is writable. */
#define AE_BARRIER 4    /* With WRITABLE, never fire the event if the
                           READABLE event already fired in the same event
                           loop iteration. Useful when you want to persist
                           things to disk before sending replies, and want
                           to do that in a group fashion. */

#define AE_FILE_EVENTS 1
#define AE_TIME_EVENTS 2
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
#define AE_DONT_WAIT 4
#define AE_CALL_AFTER_SLEEP 8

#define AE_NOMORE -1

/*
選擇最好的IO複用函式。在Linux平臺,必定選擇高效率的epoll。
所以直接#include "ae_epoll.c",將c檔案裡面的static函式全部包含進來。
*/
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL//選定epoll
    #include "ae_epoll.c"//直接包含.c檔案,再次證明,include僅僅是將檔案對應的內容全部包含進來
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif


/*
1、初始化eventLoop結構體,setsize程式碼可以管理事件的個數,
因為作業系統fd是從小到大依次增加的,所以依據伺服器合理設定初始值,
後續可以使用realloc(原始不變,但新增加記憶體)動態擴大。
*/
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;//分配記憶體
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);//分配儲存事件記憶體
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);//分配臨時儲存就緒事件記憶體。
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;//初始化setsize
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    if (aeApiCreate(eventLoop) == -1) goto err;//初始化epoll。
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)//將檔案事件全部標記為沒有註冊。
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err://錯誤直接全部釋放記憶體。
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

/*
2、向aeEventLoop中新增一個IO事件,傳入事件fd,監聽事件mask、
回撥函式,客戶資料即可,將此事件註冊到epoll中,並通過eventLoop事件陣列管理。
*/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {//fd比setsize還大,則肯定失敗。
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];//直接事件fd就當作陣列索引,很容易。直接取出記憶體,填充即可。

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)//將fd加入
        return AE_ERR;
    fe->mask |= mask;//事件
    if (mask & AE_READABLE) fe->rfileProc = proc;//讀回撥函式
    if (mask & AE_WRITABLE) fe->wfileProc = proc;//寫回調函式
    fe->clientData = clientData;//就緒傳遞給回撥函式的資料。
    if (fd > eventLoop->maxfd)//通常檔案fd是慢慢增加的
        eventLoop->maxfd = fd;
    return AE_OK;
}

/*
3、刪除IO事件,陣列不必縮小,直接將其標記為未註冊即可。
*/
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
    if (fd >= eventLoop->setsize) return;
    aeFileEvent *fe = &eventLoop->events[fd];
    if (fe->mask == AE_NONE) return;//如果還沒有註冊,那麼直接返回,每個陣列槽,對應一個事件。

    /* We want to always remove AE_BARRIER if set when AE_WRITABLE
     * is removed. */
    if (mask & AE_WRITABLE) mask |= AE_BARRIER;

    aeApiDelEvent(eventLoop, fd, mask);
    fe->mask = fe->mask & (~mask);//直接清空

    //需要更新最大的fd
    if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
        /* Update the max fd */
        int j;

        for (j = eventLoop->maxfd-1; j >= 0; j--)
            if (eventLoop->events[j].mask != AE_NONE) break;
        eventLoop->maxfd = j;
    }
}


/*
4、向aeEventLoop中新增一個時間事件,距離當前多少ms後發生,回撥函式,事件終止回撥函式等。每次在事件迴圈中檢測時間是否到達,到達則執行就緒時間事件。
*/
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;//下一個事件ID,每個事件對應一個ID,0 1 2 3 4 5
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));//分配一個事件結構體
    if (te == NULL) return AE_ERR;
    te->id = id;//ID從0 1 2 3 4 計數
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);//獲取當前距離1970.1.1:0.0.0的秒及微妙數
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;

    te->next = eventLoop->timeEventHead;//將這個事件插入連結串列中而已。
    eventLoop->timeEventHead = te;
    return id;//返回當前時間事件ID
}
/*
5、刪除時間事件,通過其id,遍歷連結串列,找到id,然後將其標記為AE_DELETED_EVENT_ID,採用惰性刪除。
*/
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
{
    aeTimeEvent *te = eventLoop->timeEventHead;
    while(te) {
        if (te->id == id) {
            te->id = AE_DELETED_EVENT_ID;
            return AE_OK;
        }
        te = te->next;
    }
    return AE_ERR; /* NO event with the specified ID found */
}
/*
6、開始事件迴圈處理及等待
*/

/* Process every pending time event, then every pending file event
 * (that may be registered by time event callbacks just processed).
 * Without special flags the function sleeps until some file event
 * fires, or when the next time event occurs (if any).
 *
 * If flags is 0, the function does nothing and returns.
 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
 * if flags has AE_FILE_EVENTS set, file events are processed.
 * if flags has AE_TIME_EVENTS set, time events are processed.
 * if flags has AE_DONT_WAIT set the function returns ASAP until all
 * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
 * the events that's possible to process without to wait are processed.
 *
 * The function returns the number of events processed. 
 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    /* 同時為空,則直接返回*/
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
     /*
    請注意,既然我們要處理時間事件,即使沒有要處理的檔案事件,我們仍要呼叫select(),
    以便在下一次事件準備啟動之前進行休眠。
    當前還沒有要處理的檔案事件,或者設定了時間時間但是沒有設定不阻塞標識。
    */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);//找到最短註冊的時間事件。
        if (shortest) {
            /*
                修正需要等待的時間,因為執行aeSearchNearestTimer,需要花費一定得時間,所以可以修正epoll的時間
            */
            long now_sec, now_ms;
            //獲取當前時間
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            if (ms > 0) {//小於ms級別,則可以忽略
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {//epoll_wait逐個檢測事件一次,並立即返回
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {//一直阻塞等待,直到事件就緒
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        /*
            直接wait全部,並返回就緒的事件個數。
        */
        numevents = aeApiPoll(eventLoop, tvp);//沒有IO,就相等於延遲而已

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {//開始依據fired中的fd迴圈events中註冊的回撥函式哦。
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];//獲取事件
            int mask = eventLoop->fired[j].mask;//事件狀態
            int fd = eventLoop->fired[j].fd;//事件fd
            int fired = 0; /* Number of events fired for current fd. */

            /* Normally we execute the readable event first, and the writable
             * event laster. This is useful as sometimes we may be able
             * to serve the reply of a query immediately after processing the
             * query.
             *
             * However if AE_BARRIER is set in the mask, our application is
             * asking us to do the reverse: never fire the writable event
             * after the readable. In such a case, we invert the calls.
             * This is useful when, for instance, we want to do things
             * in the beforeSleep() hook, like fsynching a file to disk,
             * before replying to a client. */
            int invert = fe->mask & AE_BARRIER;

        /* Note the "fe->mask & mask & ..." code: maybe an already
             * processed event removed an element that fired and we still
             * didn't processed, so we check if the event is still valid.
             *
             * Fire the readable event if the call sequence is not
             * inverted. */
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);//執行回撥函式,傳給回撥函式有fd,fd的clientdata ,和事件就緒標記
                fired++;
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)//再檢查時間事件,獲取當前事件,然後和連結串列裡面的事件比較,大則就緒執行,小則等待即可。
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}
/*
7、優先處理IO事件,再通過對比事件,處理到時的時間事件,遍歷單鏈表,
找到時間到達的事件,並呼叫其回撥函式。如果不是定時時間,則將其標記為刪除
下次執行到此函式再刪除,這就是惰性刪除特性,如果是定時事件,則重新設定此事件多久後執行。
*/

/* Process time events */
/*
事件迴圈,重點考察epoll是如何處理事件事件的。

實際上,就是通過當前獲取的時間值,和先前註冊的時間值比較,小則就緒,然後可以執行對應的回撥函數了。
所以時間事件實際上不精確,類似於通過epoll延遲,然後比較時間值。
*/
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te, *prev;
    long long maxId;
    time_t now = time(NULL);//返回當前日期和時間,距離1970.1.1:0:0:0的秒數累計值

    /* If the system clock is moved to the future, and then set back to the
     * right value, time events may be delayed in a random way. Often this
     * means that scheduled operations will not be performed soon enough.
     *
     * Here we try to detect system clock skews, and force all the time
     * events to be processed ASAP when this happens: the idea is that
     * processing events earlier is less dangerous than delaying them
     * indefinitely, and practice suggests it is. */
     /*
     處理系統Bug,如果上次時間比當前還大,那麼時間出了問題,讓全部時間就緒,並執行。
    這裡嘗試發現時間混亂的情況,上一次處理事件的時間比當前時間還要大
    重置最近一次處理事件的時間
    */
    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    eventLoop->lastTime = now;//記錄上次進入此函式的時間

    prev = NULL;
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;//最大時間事件ID。
    while(te) {//遍歷時間事件,比較時間
        long now_sec, now_ms;
        long long id;

        /* Remove events scheduled for deletion. 
            如果時間被刪除了
        */
        if (te->id == AE_DELETED_EVENT_ID) {
            //刪除當前事件,prev儲存上個節點地址,單鏈表的刪除,easy
            aeTimeEvent *next = te->next;
            if (prev == NULL)//剛好是頭結點,則直接刪除
                eventLoop->timeEventHead = te->next;
            else
                prev->next = te->next;//否則通過prev刪除
            if (te->finalizerProc)//呼叫時間事件終結回撥函式
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;//既然刪除當前時間,那麼這次迴圈不必要再繼續下去了
        }

        /* Make sure we don't process time events created by time events in
         * this iteration. Note that this check is currently useless: we always
         * add new timers on the head, however if we change the implementation
         * detail, this check may be useful again: we keep it here for future
         * defense. */
        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);//獲取當前時間
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))//當前時間比註冊的大,那麼執行其回撥函式
        {
            int retval;

            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);//執行回撥函式
            processed++;//時間次數+1
            if (retval != AE_NOMORE) {//如果不是定時時間,則繼續設定此時間時間。
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                te->id = AE_DELETED_EVENT_ID;//否則設定功能為刪除,這是惰性刪除,下次執行到這裡再刪除。
            }
        }
        prev = te;//更新前驅節點值
        te = te->next;//為了下一次迭代
    }
    return processed;
}

從上面的一些程式碼可以看出,Redis的作者通過簡單的封裝epoll使其可以高效的管理IO事件和時間事件,所以作者並沒有使用顯得臃腫的libevent。

相關推薦

Redis事件模型

介紹 Memcached的底層網路驅動部分直接使用了libevent,其實Redis使用libevent也是可以,僅僅使用其event_base完全可以行得通。但是作者為什麼需要自己造個輪子,可能作者覺得libevent封裝得過於複雜實現的功能過多吧。這裡區別

輕松學習JavaScript二十七:DOM編程學習事件模型

經歷 學習 不存在 obj 發生 rip gb2 article 不支持 在介紹事件模型之前,我們先來看什麽是事件和什麽是event對象。 一事件介紹 JavaScript事件是由訪問Web頁面的用戶引起的一系列操作,使

Redis epoll事件模型

Redis的事件模型在這裡我們用ae_epoll.c,epoll詳細工作原理  https://blog.csdn.net/luolaifa000/article/details/84190836 一、redis對原始的epoll資料結構進行了封裝  二、

Redis事件

redis伺服器是一個事件驅動程式,伺服器需要處理以下兩類事件: 1、檔案事件:redis伺服器通過套接字與客戶端進行連線,而檔案事件就是伺服器對套接字操作的抽象,伺服器與客戶端的通訊會產生相應的檔案事件,,而伺服器則通過監聽並處理這些事件來完成一系列網路通訊操作。 2、時間事件:redis

redisredis記憶體模型

Redis之記憶體模型 Redis是目前最火爆的記憶體資料庫之一,通過在記憶體中讀寫資料,大大提高了讀寫速度,可以說Redis是實現網站高併發不可或缺的一部分。 我們使用Redis時,會接觸Redis的5種物件型別(字串、雜湊、列表、集合、有序集合),豐富的型別是Redi

redis事件模型詳解(結合Reactor設計模式)

        文章基於redis-4.0.1原始碼詳細介紹一下redis的事件模型。      一、redis事件模型概覽        redis是一個事件驅動的服務程式,在redis的服務程式中存在兩種型別的事件,分別是檔案事件和時間事件。檔案事件是對網路通訊操作的統稱

redis 文件事件模型

desc edi org sockaddr sel 事件處理 CI sizeof logs 參考文獻: 深入剖析 redis 事件驅動 Redis 中的事件循環 深入了解epoll (轉) Redis自己的事件模型 ae EPOLL(7) Linux IO模式及

Redis單執行緒模型

Redis客戶端對服務端的每次呼叫都經歷了傳送命令,執行命令,返回結果三個過程。其中執行命令階段,由於Redis是單執行緒來處理命令的,所有每一條到達服務端的命令不會立刻執行,所有的命令都會進入一個佇列中,然後逐個被執行。並且多個客戶端傳送的命令的執行順序是不確定的。但是可以確定的是不會有兩條命

Redis進階記憶體模型

前言 Redis是目前最火爆的記憶體資料庫之一,通過在記憶體中讀寫資料,大大提高了讀寫速度,可以說Redis是實現網站高併發不可或缺的一部分。 我們使用Redis時,會接觸Redis的5種物件型別(字串、雜湊、列表、集合、有序集合),豐富的型別是Redis相對於Mem

Redis原始碼閱讀——基於epoll的事件模型

Redis的事件模型實現基於linux的epoll,sun的export,FreeBSD和Mac osx的queue,還有select;我們簡單分析下Redis基於epoll實現的事件模型。main函式呼叫initServer實現服務初始化:void initServer(v

Nodejs事件驅動+非阻塞io模型

node是js執行環境。 基於v8引擎 特點:事件驅動、無阻塞的io模型 優勢:輕量、高效 node是門技術不是語言 java java .net c# node js。 1什麼是i/o?

Socket程式設計模型事件選擇模型

一 原理與關鍵函式 Winsock提供了另一個有用的非同步I/O模型。和WSAAsyncSelect模型類似的是,它也允許應用程式在一個或多個套接字上,接收以事件為基礎的網路事件通知。對於表1總結的、由WSAAsyncSelect模型採用的網路事件來說,它們均可原封不動地移

Spring原始碼事件驅動模型

SpringContext中初始化事件釋出者 ### //spring初始化事件的地方 //spring初始化事件的地方 public abstract class AbstractApplicationContext extends DefaultResou

13.Redis事件模型

void aeMain(aeEventLoop *eventLoop) {     eventLoop->stop = 0;     while (!eventLoop->stop) {         // 進入事件迴圈可能會進入睡眠狀態。在睡眠之前,執行預設定的函式 aeSetBe

Spring事件監聽(觀察者模型)

respond reg nal @override ace sin 繼承 abstract http 目錄 Spring事件監聽 一、事件監聽案例 1.事件類 2.事件監聽

5Python全棧路系列Django模型

模型續Python全棧之路系列之Django模型續連表操作一對一在app的models.py文件內添加以下內容用戶創建一對多關系的表:from django.db import models # Create your models here. class UserType(models.Model):

redis redis簡介及下載安裝

移動 持久化 文檔 acl inf zxvf osql .cn ted 1. 數據庫的分類:關系型數據庫,非關系型數據庫(Nosql) 2.非關系型數據庫: 鍵值型數據庫:redis 等。 列式存儲數據庫: hbase 等。 文檔型數據庫 : mongoDB 等。 圖形數

redis 使用java操作redis

main print 數據庫 cli 防火墻 images enc png red 1. 在java操作redis需要使用jedis插件,並且linux要開啟相關的防火墻。 重啟防火墻服務 : 2. 新建maven項目: 3.添加項目依賴: <dependenc

委托的應用事件

con 代碼 eve null awake his 增加 del 問題: 前言   上一個章節介紹了委托,這裏我們再通過一個例子繼續鞏固一下委托的用法並引入事件的用法 。   為了便於理解,這裏還是引入一個很老的例子: 因為一只貓叫了一聲 接著老鼠被嚇跑 小孩

優化 Nginx 處理事件模型

標準 root 高效 nginx pre conf icop con div Nginx 的連接處理機制在不同的操作系統會采用不同的 I/O 模型,要根據不同的系統選擇不同的事件處理模型,可供選擇的事件處理模型有:kqueue 、rtsig 、epoll 、/dev/pol