1. 程式人生 > >Redis的事件機制

Redis的事件機制

[toc] Redis程式的執行過程是一個處理事件的過程,也稱Redis是一個事件驅動的服務。Redis中的事件分兩類:檔案事件(File Event)、時間事件(Time Event)。檔案事件處理檔案的讀寫操作,特別是與客戶端通訊的Socket檔案描述符的讀寫操作;時間事件主要用於處理一些定時處理的任務。 本文首先介紹Redis的執行過程,闡明Redis程式是一個事件驅動的程式;接著介紹事件機制實現中涉及的資料結構以及事件的註冊;最後介紹了處理客戶端中涉及到的套接字檔案讀寫事件。 #### 一、Redis的執行過程 Redis的執行過程是一個事件處理的過程,可以通過下圖反映出來: ![](https://img2020.cnblogs.com/blog/339551/202007/339551-20200727222634160-1313294053.jpg) ​ 圖1 Redis的事件處理過程 從上圖可以看出:Redis伺服器的執行過程就是迴圈等待並處理事件的過程。通過時間事件將執行事件分成一個個的時間分片,如圖1的右半部分所示。如果在指定的時間分片中,有檔案事件發生,如:讀檔案描述符可讀、寫檔案描述符可寫,則呼叫相應的處理函式進行檔案的讀寫處理。檔案事件處理完成之後,處理期望發生時間在當前時間之前或正好是當前時刻的時間事件。然後再進入下一次迴圈迭代處理。 如果在指定的事件間隔中,沒有檔案事件發生,則不需要處理,直接進行時間事件的處理,如下圖所示。 ![](https://img2020.cnblogs.com/blog/339551/202007/339551-20200727222902809-586806562.jpg) ​ 圖2 Redis的事件處理過程(無檔案事件發生) #### 二、事件資料結構 ##### 2.1 檔案事件資料結構 Redis用如下結構體來記錄一個檔案事件: ```c /* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent; ``` 通過mask來描述發生了什麼事件: - AE_READABLE:檔案描述符可讀; - AE_WRITABLE:檔案描述符可寫; - AE_BARRIER:檔案描述符阻塞 rfileProc和wfileProc分別為讀事件和寫事件發生時的回撥函式,其函式簽名如下: ```c typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); ``` ##### 2.2 事件事件資料結構 Redis用如下結構體來記錄一個時間事件: ```c /* Time event structure */ typedef struct aeTimeEvent { long long id; /* time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ aeTimeProc *timeProc; aeEventFinalizerProc *finalizerProc; void *clientData; struct aeTimeEvent *prev; struct aeTimeEvent *next; } aeTimeEvent; ``` when_sec和when_ms指定時間事件發生的時間,timeProc為時間事件發生時的處理函式,簽名如下: ```c typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); ``` prev和next表明時間事件構成了一個雙向連結串列。 #### 3.3 事件迴圈 Redis用如下結構體來記錄系統中註冊的事件及其狀態: ```c /* State of an event based program */ typedef struct aeEventLoop { int maxfd; /* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ long long timeEventNextId; time_t lastTime; /* Used to detect system clock skew */ aeFileEvent *events; /* Registered events */ aeFiredEvent *fired; /* Fired events */ aeTimeEvent *timeEventHead; int stop; void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; aeBeforeSleepProc *aftersleep; } aeEventLoop; ``` 這一結構體中,最主要的就是檔案事件指標events和時間事件頭指標timeEventHead。檔案事件指標event指向一個固定大小(可配置)陣列,通過檔案描述符作為下標,可以獲取檔案對應的事件物件。 #### 三、事件的註冊過程 事件驅動的程式實際上就是在事件發生時,呼叫相應的處理函式(即:回撥函式)進行邏輯處理。因此關於事件,程式需要知道:①事件的發生;② 回撥函式。事件的註冊過程就是告訴程式這兩。下面我們分別從檔案事件、時間事件的註冊過程進行闡述。 ##### 3.1 檔案事件的註冊過程 對於檔案事件: - 事件的發生:應用程式需要知道哪些檔案描述符發生了哪些事件。感知檔案描述符上有事件發生是由作業系統的職責,應用程式需要告訴作業系統,它關心哪些檔案描述符的哪些事件,這樣通過相應的系統API就會返回發生了事件的檔案描述符。 - 回撥函式:應用程式知道了檔案描述符發生了事件之後,需要呼叫相應回撥函式進行處理,因而需要在事件發生之前將相應的回撥函式準備好。 這就是檔案事件的註冊過程,函式的實現如下: ```c int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; } ``` 這段程式碼邏輯非常清晰:首先根據檔案描述符獲得檔案事件物件,接著在作業系統中新增自己關心的檔案描述符(`addApiAddEvent`),最後將回調函式記錄到檔案事件物件中。因此,一個執行緒就可以同時監聽多個檔案事件,這就是IO多路複用。作業系統提供多種IO多路複用模型,如:Select模型、Poll模型、EPOLL模型等。Redis支援所有這些模型,使用者可以根據需要進行選擇。不同的模型,向作業系統新增檔案描述符方式也不同,Redis將這部分邏輯封裝在`aeApiAddEvent`中,下面程式碼是EPOLL模型的實現: ```c static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* avoid valgrind warning */ /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; } ``` 這段程式碼就是對作業系統呼叫`epoll_ctl()`的封裝,`EPOLLIN`對應的是讀(輸入)事件,EPOLLOUT對應的是寫(輸出)事件。 ##### 3.2 時間事件的註冊過程 對於時間事件: - 事件的發生:當前時刻正好是事件期望發生的時刻,或者是晚於事件期望發生的時刻,所以需要讓程式知道事件期望發生的時刻; - 回撥函式:此時呼叫回撥函式進行處理,所以需要讓程式知道事件的回撥函式。 對應的事件事件註冊函式如下: ```c long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; te->id = id; aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms); te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; te->prev = NULL; te->next = eventLoop->timeEventHead; if (te->next) te->next->prev = te; eventLoop->timeEventHead = te; return id; } ``` 這段程式碼邏輯也是非常簡單:首先建立時間事件物件,接著設定事件,設定回撥函式,最後將事件事件物件插入到時間事件連結串列中。設定時間事件期望發生的時間比較簡單: ```c static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) { long cur_sec, cur_ms, when_sec, when_ms; aeGetTime(&cur_sec, &cur_ms); when_sec = cur_sec + milliseconds/1000; when_ms = cur_ms + milliseconds%1000; if (when_ms >= 1000) { when_sec ++; when_ms -= 1000; } *sec = when_sec; *ms = when_ms; } static void aeGetTime(long *seconds, long *milliseconds) { struct timeval tv; gettimeofday(&tv, NULL); *seconds = tv.tv_sec; *milliseconds = tv.tv_usec/1000; } ``` 當前時間加上期望的時間間隔,作為事件期望發生的時刻。 #### 四、套接字檔案事件 Redis為客戶端提供儲存資料和獲取資料的快取服務,監聽並處理來自請求,將結果返回給客戶端,這一過程將會發生以下檔案事件: ![](https://img2020.cnblogs.com/blog/339551/202007/339551-20200727222952441-228252876.jpg) 與上圖相對應,對於一個請求,Redis會註冊三個檔案事件: ##### 4.1 TCP連線建立事件 伺服器初始化時,在伺服器套接字上註冊TCP連線建立的事件。 ```c void initServer(void) { /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } } ``` 回撥函式為acceptTcpHandler,該函式最重要的職責是建立客戶端結構。 ##### 4.2 客戶端套接字讀事件 建立客戶端:在客戶端套接字上註冊客戶端套接字可讀事件。 ```c if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } ``` 回撥函式為readQueryFromClient,顧名思義,此函式將從客戶端套接字中讀取資料。 ##### 4.3 向客戶端返回資料 Redis完成請求後,Redis並非處理完一個請求後就註冊一個寫檔案事件,然後事件回撥函式中往客戶端寫回結果。根據圖1,檢測到檔案事件發生後,Redis對這些檔案事件進行處理,即:呼叫rReadProc或writeProc回撥函式。處理完成後,對於需要向客戶端寫回的資料,先快取到記憶體中: ```c typedef struct client { // ...其他欄位 list *reply; /* List of reply objects to send to the client. */ /* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES]; }; ``` 傳送給客戶端的資料會存放到兩個地方: - reply指標存放待發送的物件; - buf中存放待返回的資料,bufpos指示資料中的最後一個位元組所在位置。 這裡遵循一個原則:只要能存放在buf中,就儘量存入buf位元組陣列中,如果buf存不下了,才存放在reply物件陣列中。 寫回發生在進入下一次等待檔案事件之前,見圖1中【等待前處理】,會呼叫以下函式來處理客戶端資料寫回邏輯: ```c int writeToClient(int fd, client *c, int handler_installed) { while(clientHasPendingReplies(c)) { if (c->bufpos > 0) { nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; if ((int)c->sentlen == c->bufpos) { c->bufpos = 0; c->sentlen = 0; } } else { o = listNodeValue(listFirst(c->reply)); objlen = o->used; if (objlen == 0) { c->reply_bytes -= o->size; listDelNode(c->reply,listFirst(c->reply)); continue; } nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; } } } ``` 上述函式只截取了資料傳送部分,首先發送`buf`中的資料,然後傳送`reply`中的資料。 有讀者可能會疑惑:write()系統呼叫是阻塞式的介面,上述做法會不會在write()呼叫的地方有等待,從而導致效能低下?這裡就要介紹Redis是怎麼處理這個問題的。 首先,我們發現建立客戶端的程式碼: ```c client *createClient(int fd) { client *c = zmalloc(sizeof(client)); if (fd != -1) { anetNonBlock(NULL,fd); } } ``` 可以看到設定fd是非阻塞(NonBlock),這就保證了在套接字fd上的read()和write()系統呼叫不是阻塞的。 其次,和檔案事件的處理操作一樣,往客戶端寫資料的操作也是批量的,函式如下: ```c int handleClientsWithPendingWrites(void) { listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { /* Try to write buffers to the client socket. */ if (writeToClient(c->fd,c,0) == C_ERR) continue; /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ if (clientHasPendingReplies(c)) { int ae_flags = AE_WRITABLE; if (aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) { freeClientAsync(c); } } } } ``` 可以看到,首先對每個客戶端呼叫剛才介紹的`writeToClient()`函式進行寫資料,如果還有資料沒寫完,那麼註冊寫事件,當套接字檔案描述符寫就緒時,呼叫`sendReplyToClient()`進行剩餘資料的寫操作: ```c void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { writeToClient(fd,privdata,1); } ``` 仔細想一下就明白了:處理完得到結果後,這時套接字的寫緩衝區一般是空的,因此write()函式呼叫成功,所以就不需要註冊寫檔案事件了。如果寫緩衝區滿了,還有資料沒寫完,此時再註冊寫檔案事件。並且在資料寫完後,將寫事件刪除: ```c int writeToClient(int fd, client *c, int handler_installed) { if (!clientHasPendingReplies(c)) { if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); } } ``` 注意到,在sendReplyToClient()函式實現中,第三個引數正