1. 程式人生 > >Redis原始碼剖析和註釋(十九)--- Redis 事件處理實現

Redis原始碼剖析和註釋(十九)--- Redis 事件處理實現

Redis 事件處理實現

1. Redis事件介紹

Redis伺服器是一個事件驅動程式。下面先來簡單介紹什麼是事件驅動。

所謂事件驅動,就是當你輸入一條命令並且按下回車,然後訊息被組裝成Redis協議的格式傳送給Redis伺服器,這就會產生一個事件,Redis伺服器會接收該命令,處理該命令和傳送回覆,而當你沒有與伺服器進行互動時,那麼伺服器就會處於阻塞等待狀態,會讓出CPU從而進入睡眠狀態,當事件觸發時,就會被作業系統喚醒。事件驅動使CPU更高效的利用。

事件驅動是一種概括和抽象,也可以稱為I/O多路複用(I/O multiplexing),它的實現方式各個系統都不同,一會會說到Redis的方式。

redis伺服器中,處理了兩類事件:

  • 檔案事件(file event)Redis伺服器通過套接字於客戶端(或其他Redis伺服器)進行連線,而檔案事件就是伺服器對套接字操作的抽象。
  • 時間事件(time event):Redis伺服器的一些操作需要在給定的事件點執行,而時間事件就是伺服器對這類定時操作的抽象。

2. 事件的抽象

Redis將這兩個事件分別抽象成一個數據結構來管理。redis 所有原始碼註釋

2.1 檔案事件結構

/* File event structure */
typedef struct aeFileEvent {
    // 檔案時間型別:AE_NONE,AE_READABLE,AE_WRITABLE
int mask; /* one of AE_(READABLE|WRITABLE) */ // 可讀處理函式 aeFileProc *rfileProc; // 可寫處理函式 aeFileProc *wfileProc; // 客戶端傳入的資料 void *clientData; } aeFileEvent; //檔案事件

其中rfileProcwfileProc成員分別為兩個函式指標,他們的原型為

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int
mask);

這個函式是回撥函式,如果當前檔案事件所指定的事件型別發生時,則會呼叫對應的回撥函式處理該事件。函式指標與回撥函式詳解

當事件就緒時,我們需要知道檔案事件的檔案描述符還有事件型別才能對於鎖定該事件,因此定義了aeFiredEvent結構統一管理:

/* A fired event */
typedef struct aeFiredEvent {
    // 就緒事件的檔案描述符
    int fd;
    // 就緒事件型別:AE_NONE,AE_READABLE,AE_WRITABLE
    int mask;
} aeFiredEvent; //就緒事件

2.2 時間事件結構

/* Time event structure */
typedef struct aeTimeEvent {
    // 時間事件的id
    long long id; /* time event identifier. */
    // 時間事件到達的時間的秒數
    long when_sec; /* seconds */
    // 時間事件到達的時間的毫秒數
    long when_ms; /* milliseconds */
    // 時間事件處理函式
    aeTimeProc *timeProc;
    // 時間事件終結函式
    aeEventFinalizerProc *finalizerProc;
    // 客戶端傳入的資料
    void *clientData;
    // 指向下一個時間事件
    struct aeTimeEvent *next;
} aeTimeEvent;  //時間事件

從這個結構中可以看出,時間事件表是一個連結串列,因為它有一個next指標域,指向下一個時間事件。

和檔案事件一樣,當時間事件所指定的事件發生時,也會呼叫對應的回撥函式,結構成員timeProcfinalizerProc都是回撥函式,函式原型如下:函式指標與回撥函式詳解

typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);

雖然對檔案事件和時間事件都做了抽象,Redis仍然需要對事件做整體抽象,於是定義了aeEventLoop結構。

2.3 事件狀態結構

/* State of an event based program */
typedef struct aeEventLoop {
    // 當前已註冊的最大的檔案描述符
    int maxfd;   /* highest file descriptor currently registered */
    // 檔案描述符監聽集合的大小
    int setsize; /* max number of file descriptors tracked */
    // 下一個時間事件的ID
    long long timeEventNextId;
    // 最後一次執行事件的時間
    time_t lastTime;     /* Used to detect system clock skew */
    // 註冊的檔案事件表
    aeFileEvent *events; /* Registered events */
    // 已就緒的檔案事件表
    aeFiredEvent *fired; /* Fired events */
    // 時間事件的頭節點指標
    aeTimeEvent *timeEventHead;
    // 事件處理開關
    int stop;
    // 多路複用庫的事件狀態資料
    void *apidata; /* This is used for polling API specific data */
    // 執行處理事件之前的函式
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;  //事件輪詢的狀態結構

aeEventLoop結構儲存了一個void *型別的萬能指標apidata,是用來儲存輪詢事件的狀態的,也就是儲存底層呼叫的多路複用庫的事件狀態,關於Redis的多路複用庫的選擇,Redis包裝了常見的select epoll evport kqueue,他們在編譯階段,根據不同的系統選擇效能最高的一個多路複用庫作為Redis的多路複用程式的實現,而且所有庫實現的介面名稱都是相同的,因此Redis多路複用程式底層實現是可以互換的。具體選擇庫的原始碼為

// IO複用的選擇,效能依次下降,Linux支援 "ae_epoll.c" 和 "ae_select.c"
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

也可以通過Redis客戶端的命令來檢視當前選擇的多路複用庫,INFO server

127.0.0.1:6379> INFO server
# Server
……
multiplexing_api:epoll
……

那麼,既然知道了多路複用庫的選擇,那麼我們來檢視一下apidata儲存的epoll模型的事件狀態結構:ae_epoll.c檔案中

typedef struct aeApiState {
    // epoll事件的檔案描述符
    int epfd;
    // 事件表
    struct epoll_event *events;
} aeApiState;   //事件的狀態

epoll模型的struct epoll_event的結構中定義這自己的事件型別,例如EPOLLIN POLLOUT等等,但是Redis的檔案事件結構aeFileEvent中也在mask中定義了自己的事件型別,例如:AE_READABLE AE_WRITABLE等,於是,就需要實現一箇中間層將兩者的事件型別相聯絡起來,這也就是之前提到的ae_epoll.c檔案中實現的相同的API,我們列出來:

// 建立一個epoll例項,儲存到eventLoop中
static int aeApiCreate(aeEventLoop *eventLoop)
// 調整事件表的大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize)  
// 釋放epoll例項和事件表空間
static void aeApiFree(aeEventLoop *eventLoop)
// 在epfd標識的事件表上註冊fd的事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
// 在epfd標識的事件表上注刪除fd的事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
// 等待所監聽檔案描述符上有事件發生
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
// 返回正在使用的IO多路複用庫的名字
static char *aeApiName(void)

這些API都是呼叫相應的底層多路複用庫來將Redis事件狀態結構aeEventLoop所關聯,就是將epoll的底層函式封裝起來,Redis實現事件時,只需呼叫這些介面即可。我們檢視兩個重要的函式的原始碼,看看是如何實現的

  • 向Redis事件狀態結構aeEventLoop的事件表event註冊一個事件,對應的是epoll_ctl函式
// 在epfd標識的事件表上註冊fd的事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    // EPOLL_CTL_ADD,向epfd註冊fd的上的event
    // EPOLL_CTL_MOD,修改fd已註冊的event
    // #define AE_NONE 0           //未設定
    // #define AE_READABLE 1       //事件可讀
    // #define AE_WRITABLE 2       //事件可寫
    // 判斷fd事件的操作,如果沒有設定事件,則進行關聯mask型別事件,否則進行修改
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    // struct epoll_event {
    //      uint32_t     events;      /* Epoll events */
    //      epoll_data_t data;        /* User data variable */
    // };
    ee.events = 0;
    // 如果是修改事件,合併之前的事件型別
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    // 根據mask對映epoll的事件型別
    if (mask & AE_READABLE) ee.events |= EPOLLIN;   //讀事件
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;  //寫事件
    ee.data.fd = fd;    //設定事件所從屬的目標檔案描述符
    // 將ee事件註冊到epoll中
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}
  • 等待所監聽檔案描述符上有事件發生,對應著底層epoll_wait函式
// 等待所監聽檔案描述符上有事件發生
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    // 監聽事件表上是否有事件發生
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    // 至少有一個就緒的事件
    if (retval > 0) {
        int j;

        numevents = retval;
        // 遍歷就緒的事件表,將其加入到eventLoop的就緒事件表中
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            // 根據就緒的事件型別,設定mask
            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            // 新增到就緒事件表中
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    // 返回就緒的事件個數
    return numevents;
}

3. 事件的原始碼實現

Redis事件的原始碼全部定義在ae.c檔案中,我們從事件的主函式aeMain說起,一步一步深入剖析。

// 事件輪詢的主函式
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    // 一直處理事件
    while (!eventLoop->stop) {
        // 執行處理事件之前的函式
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        //處理到時的時間事件和就緒的檔案事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

這個事件的主函式aeMain很清楚的可以看到,如果伺服器一直處理事件,那麼就是一個死迴圈,而一個最典型的事件驅動,就是一個死迴圈。呼叫處理事件的函式aeProcessEvents,他們引數是一個事件狀態結構aeEventLoopAE_ALL_EVENTS,原始碼如下:

// 處理到時的時間事件和就緒的檔案事件
// 函式返回執行的事件個數
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    // 如果什麼事件都沒有設定則直接返回
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    // 請注意,既然我們要處理時間事件,即使沒有要處理的檔案事件,我們仍要呼叫select(),以便在下一次事件準備啟動之前進行休眠

    // 當前還沒有要處理的檔案事件,或者設定了時間時間但是沒有設定不阻塞標識
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        // 如果設定了時間事件而沒有設定不阻塞標識
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            // 獲取最近到時的時間事件
            shortest = aeSearchNearestTimer(eventLoop);
        // 獲取到了最早到時的時間事件
        if (shortest) {
            long now_sec, now_ms;
            // 獲取當前時間
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            // 等待該時間事件到時所需要的時長
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            // 如果沒到時
            if (ms > 0) {
                // 儲存時長到tvp中
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            // 如果已經到時,則將tvp的時間設定為0
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }

        // 沒有獲取到了最早到時的時間事件,時間事件連結串列為空
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            // 如果設定了不阻塞標識
            if (flags & AE_DONT_WAIT) {
                // 將tvp的時間設定為0,就不會阻塞
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                // 阻塞到第一個時間事件的到來
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        // 等待所監聽檔案描述符上有事件發生
        // 如果tvp為NULL,則阻塞在此,否則等待tvp設定阻塞的時間,就會有時間事件到時
        // 返回了就緒檔案事件的個數
        numevents = aeApiPoll(eventLoop, tvp);
        // 遍歷就緒檔案事件表
        for (j = 0; j < numevents; j++) {
            // 獲取就緒檔案事件的地址
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            // 獲取就緒檔案事件的型別,檔案描述符
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

        /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            // 如果是檔案可讀事件發生
            if (fe->mask & mask & AE_READABLE) {
                // 設定讀事件標識 且 呼叫讀事件方法處理讀事件
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 如果是檔案可寫事件發生
            if (fe->mask & mask & AE_WRITABLE) {
                // 讀寫事件的執行發法不同,則執行寫事件,避免重複執行相同的方法
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;    //執行的事件次數加1
        }
    }
    /* Check time events */
    // 執行時間事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

剛才提到該函式的一個引數是AE_ALL_EVENTS,他的定義在ae.h中,定義如下:

#define AE_FILE_EVENTS 1                                //檔案事件
#define AE_TIME_EVENTS 2                                //時間事件
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)   //檔案和時間事件
#define AE_DONT_WAIT 4                                  //不阻塞等待標識

很明顯,flagsAE_FILE_EVENTSAE_TIME_EVENTS或的結果,他們的含義如下:

  • 如果flags = 0,函式什麼都不做,直接返回
  • 如果flags設定了 AE_ALL_EVENTS ,則執行所有型別的事件
  • 如果flags設定了 AE_FILE_EVENTS ,則執行檔案事件
  • 如果flags設定了 AE_TIME_EVENTS ,則執行時間事件
  • 如果flags設定了 AE_DONT_WAIT ,那麼函式處理完事件後直接返回,不阻塞等待

Redis伺服器在沒有被事件觸發時,就會阻塞等待,因為沒有設定AE_DONT_WAIT標識。但是他不會一直的死等待,等待檔案事件的到來,因為他還要處理時間時間,因此,在呼叫aeApiPoll進行監聽之前,先從時間事件表中獲取一個最近到達的時間時間,根據要等待的時間構建一個struct timeval tv, *tvp結構的變數,這個變數儲存著伺服器阻塞等待檔案事件的最長時間,一旦時間到達而沒有觸發檔案事件,aeApiPoll函式就會停止阻塞,進而呼叫processTimeEvents處理時間事件,因為Redis伺服器設定一個對自身資源和狀態進行檢查的週期性檢查的時間事件,而該函式就是timeProc所指向的回撥函式

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)

如果在阻塞等待的最長時間之間,觸發了檔案事件,就會先執行檔案事件,後執行時間事件,因此處理時間事件通常比預設的會晚一點。

而執行檔案事件rfileProcwfileProc也是呼叫了回撥函式,Redis將檔案事件的處理分為了好幾種,用於處理不同的網路通訊需求,下面列出回撥函式的原型:

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask)
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask)
  • acceptTcpHandler:用於accept client的connect
  • acceptUnixHandler:用於acceptclient的本地connect
  • sendReplyToClient:用於向client傳送命令回覆。
  • readQueryFromClient:用於讀入client傳送的請求。

接下來,我們檢視獲取最快達到的時間事件的函式aeSearchNearestTimer實現

// 尋找第一個快到時的時間事件
// 這個操作是有用的知道有多少時間可以選擇該事件設定為不用推遲任何事件的睡眠中。
// 如果事件連結串列沒有時間將返回NULL。
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
    // 時間事件頭節點地址
    aeTimeEvent *te = eventLoop->timeEventHead;
    aeTimeEvent *nearest = NULL;

    // 遍歷所有的時間事件
    while(te) {
        // 尋找第一個快到時的時間事件,儲存到nearest中
        if (!nearest || te->when_sec < nearest->when_sec ||
                (te->when_sec == nearest->when_sec &&
                 te->when_ms < nearest->when_ms))
            nearest = te;
        te = te->next;
    }
    return nearest;
}

這個函式沒什麼,就是遍歷連結串列,找到最小值。我們重點看執行時間事件的函式processTimeEvents實現

/* Process time events */
// 執行時間事件
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te, *prev;
    long long maxId;
    time_t now = time(NULL);

    // 這裡嘗試發現時間混亂的情況,上一次處理事件的時間比當前時間還要大
    // 重置最近一次處理事件的時間
    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    // 設定上一次時間事件處理的時間為當前時間
    eventLoop->lastTime = now;

    prev = NULL;
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;   //當前時間事件表中的最大ID
    // 遍歷時間事件連結串列
    while(te) {
        long now_sec, now_ms;
        long long id;

        /* Remove events scheduled for deletion. */
        // 如果時間事件已被刪除了
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            // 從事件連結串列中刪除事件的節點
            if (prev == NULL)
                eventLoop->timeEventHead = te->next;
            else
                prev->next = te->next;
            // 呼叫時間事件終結方法清楚該事件
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }

        // 確保我們不處理在此迭代中由時間事件建立的時間事件。 請注意,此檢查目前無效:我們總是在頭節點新增新的計時器,但是如果我們更改實施細節,則該檢查可能會再次有用:我們將其保留在未來的防禦
        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        // 獲取當前時間
        aeGetTime(&now_sec, &now_ms);
        // 找到已經到時的時間事件
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;

            id = te->id;
            // 呼叫時間事件處理方法
            retval = te->timeProc(eventLoop, id, te->clientData);
            // 時間事件次數加1
            processed++;
            // 如果不是定時事件,則繼續設定它的到時時間
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            // 如果是定時時間,則retval為-1,則將其時間事件刪除,惰性刪除
            } else {
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        // 更新前驅節點指標和後繼節點指標
        prev = te;
        te = te->next;
    }
    return processed;   //返回執行事件的次數
}

如果時間事件不存在,則就呼叫finalizerProc指向的回撥函式,刪除當前的時間事件。如果存在,就呼叫timeProc指向的回撥函式處理時間事件。Redis的時間事件分為兩類

  • 定時事件:讓一段程式在指定的時間後執行一次。
  • 週期性事件:讓一段程式每隔指定的時間後執行一次。

如果當前的時間事件是週期性,那麼就會在將時間週期新增到週期事件的到時時間中。如果是定時事件,則將該時間事件刪除。

至此,Redis事件的實現就剖析完畢,但是事件的其他API,例如:建立事件,刪除事件,調整事件表的大小等等都沒有列出,所有原始碼的剖析,可以上github上檢視:redis 所有原始碼註釋