redis個人理解3---redis的事件驅動原始碼分析
redis效能很好,而且是一個單執行緒的框架。得益於redis主要通過非同步IO, 多路複用的技術,使用反應堆(reactor)模式,把大量的io操作通過訊息驅動的方式單執行緒一條條處理,這樣可以很好的利用CPU資源。因為沒有同步呼叫,所以處理速度非常快。使得多個Client訪問redis-server時候,併發效能很高。 那麼具體redis是如何實現的呢?
1 redis的多路複用技術
redis是一個C/S架構的框架,所以支援多個Client通過網路來訪問Server端。redis-server為了同時支援多個client發來的資料庫操作請求,使用了IO多路複用技術。

在一個執行緒裡面,通過系統UNIX提供的系統API(select, poll, epoll等),同時對n個檔案描述符fd(socket也可以抽象成為檔案描述符),進行讀寫偵聽,一旦系統偵聽的fd發生了 可讀/可寫事件的時候,通過系統API函式,可以獲取到對應的fd,對於對應的檔案事件進行分派,同時處理。
類似於一個老師(redis-server)一個人照看一個班n個學生(n個redis-cli的socket),一旦某個學生舉手(socket 檔案描述符發生可讀可寫事件),這個老師立馬處理這個學生的需求(檔案事件分發器),處理完了立馬回來,看著一個班的n個學生,看看是不是還有人舉手,周而復始的進行處理。
epoll, kqueue, select,evport 這幾種其實都是UNIX的多路複用介面,因為redis對於類unix作業系統的相容性其實做的比較好,所以redis對這幾種介面都是支援的。對應的程式碼實現分別是:ae_epoll.c, ae_kqueue.c, ae_select.c, ae_evport.c.

因為我使用的是Ubuntu作業系統,所以本文就使用epoll為例子,看下redis的epoll的事件驅動是如何實現的。
2 redis 的epoll原始碼分析
2.1 redis eventpoll 的啟動初始化
在redi-server啟動的時候,會走到initServer()函式中,這個函式是對 redisServer server;
這個全域性唯一變數的初始化,這個server的結構定義了整個server相關的所有信息,具體結構非常複雜,這裡就按下不表,但是注意裡面有一個結構:
aeEventLoop *el;//這個就是redis的所有事件迴圈的註冊結構 複製程式碼
/* State of an event based program */ typedef struct aeEventLoop { int maxfd;/* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ long long timeEventNextId; time_t lastTime;/* Used to detect system clock skew */ aeFileEvent *events; /* Registered events */ aeFiredEvent *fired; /* Fired events */ aeTimeEvent *timeEventHead; int stop; void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; aeBeforeSleepProc *aftersleep; } aeEventLoop; 複製程式碼
/* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent; 複製程式碼
從程式碼上不太能看清楚裡面的結構,看下圖:

具體的初始化函式aeCreateEventLoop如下:
aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->lastTime = time(NULL); eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; if (aeApiCreate(eventLoop) == -1) goto err;//主要是初始化eventLoop->apidata // Events with mask == AE_NONE are not set. //So let's initialize the vector with it. for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL; } 複製程式碼
aeApiCreate
static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); if (!state) return -1; state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { zfree(state); return -1; } state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; } eventLoop->apidata = state; return 0; } 複製程式碼
接著在initServer函式中,redis會根據配置嘗試去偵聽埠:
/* Open the TCP listening socket for the user commands. */ if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) exit(1); 複製程式碼
在listenToPort函式中,redis會嘗試bind/listen多個ip,同時考慮了IPV4/IPV6兩種場景,原始碼如下:
int listenToPort(int port, int *fds, int *count) { int j; /* Force binding of 0.0.0.0 if no bind address is specified, always * entering the loop if j == 0. */ if (server.bindaddr_count == 0) server.bindaddr[0] = NULL; for (j = 0; j < server.bindaddr_count || j == 0; j++) { if (server.bindaddr[j] == NULL) { int unsupported = 0; /* Bind * for both IPv6 and IPv4, we enter here only if * server.bindaddr_count == 0. */ fds[*count] = anetTcp6Server(server.neterr,port,NULL, server.tcp_backlog); if (fds[*count] != ANET_ERR) { anetNonBlock(NULL,fds[*count]); (*count)++; } else if (errno == EAFNOSUPPORT) { unsupported++; serverLog(LL_WARNING,"Not listening to IPv6: unsupproted"); } if (*count == 1 || unsupported) { /* Bind the IPv4 address as well. */ fds[*count] = anetTcpServer(server.neterr,port,NULL, server.tcp_backlog); if (fds[*count] != ANET_ERR) { anetNonBlock(NULL,fds[*count]); (*count)++; } else if (errno == EAFNOSUPPORT) { unsupported++; serverLog(LL_WARNING,"Not listening to IPv4: unsupproted"); } } /* Exit the loop if we were able to bind * on IPv4 and IPv6, * otherwise fds[*count] will be ANET_ERR and we'll print an * error and return to the caller with an error. */ if (*count + unsupported == 2) break; } else if (strchr(server.bindaddr[j],':')) { /* Bind IPv6 address. */ fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j], server.tcp_backlog); } else { /* Bind IPv4 address. */ fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], server.tcp_backlog); } if (fds[*count] == ANET_ERR) { serverLog(LL_WARNING, "Creating Server TCP listening socket %s:%d: %s", server.bindaddr[j] ? server.bindaddr[j] : "*", port, server.neterr); return C_ERR; } anetNonBlock(NULL,fds[*count]); (*count)++; } return C_OK; } 複製程式碼
建立成功後,作為的server端的socket會做為檔案描述符被儲存在server的ipfd陣列中:
int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */ 複製程式碼
接著還是在initServer函式中,會為這幾個server socket的ipfd 建立事件註冊,原始碼如下:
/* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } 複製程式碼
可以看出aeCreateFileEvent 這個函式會把檔案描述符server.ipfd[i]和事件AE_READABLE,以及回撥函式acceptTcpHandler做了關聯,也就是每當client發來tcp建鏈請求事件發生時,就觸發acceptTcpHandler函式。 下面看看這個函式到底是如何利用上面圖中的資料結構,把這幾樣東西結合在一起的。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; } 複製程式碼
從上面的原始碼可以看出,這個函式主要做了兩件事,一個就是把事件,回撥函式儲存在eventLoop->events[fd]結構中。再然後就是呼叫了aeApiAddEvent,而這個函式其實就是epoll介面函式的一層封裝。具體實現如下:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* avoid valgrind warning */ /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,ⅇ) == -1) return -1; return 0; } 複製程式碼
程式碼邏輯很清晰,其實核心就是呼叫了epoll介面中的epoll_ctl,把server socket的fd放到了epoll中進行monitor。
2.2 redis 服務的epoll迴圈呼叫
初始化完了後,redis就會進入迴圈狀態,程式碼如下:
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } } 複製程式碼
迴圈狀態會不停的去嘗試處理事件,也就是aeProcessEvents函式。這個函式會處理redis所有事件,包括檔案事件和定時器事件,對於檔案事件來說,核心程式碼如下:
/* Call the multiplexing API, will return only on timeout or when * some event fires. */ numevents = aeApiPoll(eventLoop, tvp);//這裡會去當前的反應堆裡面看看有沒待處理的事件 /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */ /* Normally we execute the readable event first, and the writable * event laster. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if AE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsynching a file to disk, * before replying to a client. */ int invert = fe->mask & AE_BARRIER; /* Note the fe->mask & mask & ... code: maybe an already * processed event removed an element that fired and we still * didnt processed, so we check if the event is still valid. * * Fire the readable event if the call sequence is not * inverted. */ if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } /* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert && fe->mask & mask & AE_READABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } 複製程式碼
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j; numevents = retval; for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } return numevents; } 複製程式碼
每次迴圈都會呼叫aeApiPoll,而這個函式其實還是epoll介面函式的一層封裝,程式碼邏輯其實就是看看當前monitor的檔案描述符是否有事件可以觸發,如果有的話,就呼叫回撥函式進行處理。
2.3 redis 客戶端建立連線和處理流程
在2.1小節裡面已經提到了,對於server的socket 的檔案描述符和AE_READABLE事件,關聯了一個回撥函式acceptTcpHandler,這個函式就是當server 的socket可讀的時候,觸發的函式。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata); while(max--) {//因為可能同時有多個client發起連結 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); acceptCommonHandler(cfd,0,cip); } } 複製程式碼
可以看出來redis會用socket 的accept 函式去一個個的接受tcp的建鏈請求,然後轉交 acceptCommonHandler
函式處理。
#define MAX_ACCEPTS_PER_CALL 1000 static void acceptCommonHandler(int fd, int flags, char *ip) { client *c; if ((c = createClient(fd)) == NULL) { serverLog(LL_WARNING, "Error registering fd event for the new client: %s (fd=%d)", strerror(errno),fd); close(fd); /* May be already closed, just ignore errors */ return; } ...後面還有一些不影響主流程,所以暫時略過不表。 複製程式碼
這裡會建立一個client的資料區,用來表示一個客戶端,具體的邏輯如下:
client *createClient(int fd) { client *c = zmalloc(sizeof(client)); /* passing -1 as fd it is possible to create a non connected client. * This is useful since all the commands needs to be executed * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ if (fd != -1) { anetNonBlock(NULL,fd); anetEnableTcpNoDelay(NULL,fd); if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } selectDb(c,0); uint64_t client_id; atomicGetIncr(server.next_client_id,client_id,1); c->id = client_id; c->fd = fd; c->name = NULL; c->bufpos = 0; c->qb_pos = 0; c->querybuf = sdsempty(); c->pending_querybuf = sdsempty(); c->querybuf_peak = 0; c->reqtype = 0; c->argc = 0; c->argv = NULL; c->cmd = c->lastcmd = NULL; c->multibulklen = 0; c->bulklen = -1; c->sentlen = 0; c->flags = 0; c->ctime = c->lastinteraction = server.unixtime; c->authenticated = 0; c->replstate = REPL_STATE_NONE; c->repl_put_online_on_ack = 0; c->reploff = 0; c->read_reploff = 0; c->repl_ack_off = 0; c->repl_ack_time = 0; c->slave_listening_port = 0; c->slave_ip[0] = '\0'; c->slave_capa = SLAVE_CAPA_NONE; c->reply = listCreate(); c->reply_bytes = 0; c->obuf_soft_limit_reached_time = 0; listSetFreeMethod(c->reply,freeClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue); c->btype = BLOCKED_NONE; c->bpop.timeout = 0; c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL); c->bpop.target = NULL; c->bpop.xread_group = NULL; c->bpop.xread_consumer = NULL; c->bpop.xread_group_noack = 0; c->bpop.numreplicas = 0; c->bpop.reploffset = 0; c->woff = 0; c->watched_keys = listCreate(); c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL); c->pubsub_patterns = listCreate(); c->peerid = NULL; c->client_list_node = NULL; listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); if (fd != -1) linkClient(c); initClientMultiState(c); return c; } 複製程式碼
createClient 這個函式其實做了兩件事
readQueryFromClient
if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } 複製程式碼
而當redis-server 收到某個客戶端發來的資料庫操作請求時,就會觸發下面這個回撥函式,這個函式中會從socket中讀資料,並開始處理。
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*) privdata; int nread, readlen; size_t qblen; UNUSED(el); UNUSED(mask); readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */ if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); /* Note that the 'remaining' variable may be zero in some edge case, * for example once we resume a blocked client after CLIENT PAUSE. */ if (remaining > 0 && remaining < readlen) readlen = remaining; } qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); nread = read(fd, c->querybuf+qblen, readlen);//此處呼叫socket介面函式從client socket讀取資料,然後進行處理 if (nread == -1) { if (errno == EAGAIN) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); freeClient(c); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); freeClient(c); return; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a * copy of the string applied by the last command executed. */ c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); } sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) c->read_reploff += nread; server.stat_net_input_bytes += nread; if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClient(c); return; } /* Time to process the buffer. If the client is a master we need to * compute the difference between the applied offset before and after * processing the buffer, to understand how much of the replication stream * was actually applied to the master state: this quantity, and its * corresponding part of the replication stream, will be propagated to * the sub-slaves and to the replication backlog. */ processInputBufferAndReplicate(c); } 複製程式碼
在上面的函式中會分配一個最夠大的buffer,同時呼叫socket介面函式從client socket讀取資料,然後進行處理。最後交到 processInputBufferAndReplicate(c);
這個函式裡面會進行redis 正常命令的解析和處理。
至此一個基本的啟動listen埠,然後提供服務,再到客戶端發來建鏈請求,然後發來資料庫操作業務訊息流程就全部串起來了。