redis學習筆記(12)---server基本流程
server工作流程
當執行./redis-server後,redis資料庫的server端就會啟動。
然後就會執行redis.c中的main()函式
其中main()函式中的工作可以主要分為以下幾個部分:
- 1、初始化server端的配置資訊- - -initServerConfig()
- 2、解析執行時的命令引數,並根據引數進行處理,eg:./redis-server - -help
- 3、如果設定了daemonize引數,則將server設為deamon程序- - -daemonize()
- 4、啟動server- - -initServer()
- 5、設定週期性處理函式beforeSleep()
- 6、開始工作- - -aeMain()
1、initServerConfig()
初始化server端的配置資訊,儲存在伺服器例項server中,包括監聽埠、DB數、命令表等資訊。
2、daemonize()
將程序設為守護程序。
守護程序的相關知識之前在linux程序基礎 中已經進行了簡單介紹。
void daemonize(void) {
int fd;
if (fork() != 0) exit(0); /* parent exits */
setsid(); /* create a new session */
if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
dup2(fd, STDIN_FILENO);
dup2(fd, STDOUT_FILENO);
dup2(fd, STDERR_FILENO);
if (fd > STDERR_FILENO) close(fd);
}
}
3、initServer()
這個函式中完成了非常多的任務,包括設定訊號處理函式、 建立clients佇列、slaves佇列、建立資料庫、建立共享物件等。
除此之外最重要的兩個任務是建立監聽socket並監聽client、以及建立週期性處理事件。
我們知道,任何一個伺服器的的事件都可以分為IO讀寫事件和時間處理事件,redis同樣如此。
1)IO讀寫事件,包括監聽客戶端的連線以及與客戶端進行資料互動等
2)時間處理事件,在設定的時間處理相關事件,包括週期性重新整理資料庫等。
這兩類事件都是通過server.el這個變數來儲存的。
3.1、IO讀寫事件
首先redis會為伺服器建立一個監聽fd,來監聽來自客戶端的連線,主要會呼叫到以下兩個函式
listenToPort(server.port,server.ipfd,&server.ipfd_count);
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL);
最後將該事件加入到server.el結構中
- listenToPort:根據傳入的port建立監聽描述符,同時呼叫fcntl()將fd設為非阻塞的,然後呼叫bind()、listen()函式。注意redis會分別根據IPv4、IPv6兩種地址分別建立一個socket
- aeCreateFileEvent:建立讀寫事件。對於監聽描述符而言,只需要建立一個讀事件監聽來自client的連線即可。注意,監聽描述符的回撥函式為acceptTcpHandler
3.2、時間處理事件
對於server端,redis會週期性的執行serverCron()來完成一些處理,因此將這個事件也加入到server.el結構中
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
//每1ms執行一次serverCron()
4、beforeSleep()
在redis的事件主迴圈中,每次迴圈都會執行一次,其中包括向所有slave傳送ACK、寫AOF檔案等操作
5、aeMain()
redis伺服器端最重要的函式,為redis的事件主迴圈。如果redis沒有接收到中斷訊號,那麼就會一直迴圈執行這個函式。
aeMain(server.el);
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
可以發現,該函式就是死迴圈的執行beforesleep()和aeProcessEvents()。
作為redis伺服器端的核心流程,aeProcessEvents()的實現程式碼較長,但是主要也只有3個動作
- 1、計算呼叫select、epoll等函式可以阻塞的時間
- 2、呼叫aeApiPoll()等待IO事件發生,若有事件發生,則呼叫相應的回撥函式
- 3、呼叫processTimeEvents()處理時間事件
由於select、epoll等IO複用機制在一定時間內沒有事件發生時,會一直阻塞在那裡。因此為了不影響後面時間事件的處理,必須在最近的一個時間事件到來之前,完成IO複用機制的呼叫。因此首先找到最近一個時間事件,計算距離當前時間的時間差,來作為呼叫aeApiPoll()的引數。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
//1、有時間事件時,計算時間差
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop); //找到最近一個時間事件
if (shortest) {
aeGetTime(&now_sec, &now_ms); //得到當前時間
tvp = &tv;
//計算時間差tvp
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
if (flags & AE_DONT_WAIT) { //此時不阻塞,立即返回
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else { //否則可以永遠等待
tvp = NULL; /* wait forever */
}
}
//2、呼叫IO複用機制,處理IO事件
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
if (fe->mask & mask & AE_READABLE) { //處理讀事件
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) { //處理寫事件
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
//3、處理時間事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed;
}
這就是redis伺服器側主要的工作流程了。
具體例子:
1、當有一個client連線到來時
此時redis伺服器端的監聽描述符就會有事件發生,之前已經提到過該fd上只註冊了讀事件acceptTcpHandler(),因此執行fe->rfileProc(eventLoop,fd,fe->clientData,mask);
就會呼叫acceptTcpHandler()函式
1)首先acceptTcpHandler()會呼叫accept獲取連線描述符cfd
2)然後呼叫acceptCommonHandler()建立一個client例項
3)在createClient()中會為每個連線描述符註冊讀事件readQueryFromClient() ,同時將每個client的預設資料庫設為0
這樣client和server端的連線就建立好了。當有客戶端請求到來時,就會執行readQueryFromClient()函式 ,來處理該請求了。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); //1、呼叫accept獲取連線描述符cfd
if (cfd == ANET_ERR) {
......
}
acceptCommonHandler(cfd,0);
}
}
static void acceptCommonHandler(int fd, int flags) {
redisClient *c;
if ((c = createClient(fd)) == NULL) { //2、建立client例項
close(fd);
return;
}
}
redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(redisClient));
if (fd != -1) {
anetNonBlock(NULL,fd);
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR){ //3、註冊讀事件
/* ...... */
}
}
selectDb(c,0);
/* ...... */
return c;
}
2、當有客戶端請求到來時(eg:執行命令 set key value )
2.1、readQueryFromClient()讀入請求並處理
首先連線fd上的讀事件會被觸發,因此server端會呼叫readQueryFromClient()來進行處理。主要過程是:
- 1、為接收快取區申請記憶體
- 2、呼叫read從客戶端讀入請求資料到c->querybuf中
- 3、呼叫processInputBuffer對請求進行處理
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); //1、申請空間
nread = read(fd, c->querybuf+qblen, readlen); //2、讀請求
processInputBuffer(c); //3、處理輸入請求
}
當執行命令 set key value後,列印c->querybuf得到如下結果:
即其中內容的格式為:
2.2、processInputBuffer()處理請求
在processInputBuffer()中
1)首先呼叫processMultibulkBuffer,按照協議格式,將接收緩衝區中的內容解析出來,並為每個引數建立一個字串物件robject
//主要處理如下
ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); //1、首先解析出引數個數並轉換成數字
c->multibulklen = ll; //將引數個數賦給multibulklen
c->argv = zmalloc(sizeof(robj*)*c->multibulklen); //2、為物件分配記憶體
c->argv[c->argc++] = createStringObject(c->querybuf+pos,c->bulklen); //3、依次建立ll個物件
對於本例,ll = 3,最終會生成3個字串物件,字串的內容分別為”set” 、”key” 、”value”。
c->argv的型別為robj **argv;
,因此可以將其看作一個數組,陣列中的每一項指向一個robj。
在上一章已經講過,當字串長度小於39位元組時,會採用embstr編碼方式來組織資料,因此最終c->argv的內容如下:
2)呼叫processCommand對命令進行處理
首先查詢命令,當命令不存在或引數個數不對時,錯誤則直接返回。
然後中間會進行一系列判斷,暫時不管
最後就呼叫call()處理命令
如本例中的命令為set,因此會在redisCommandTable中找到set命令,然後執行set命令對應的函式setCommand()
int processCommand(redisClient *c) {
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) { //沒有找到命令
return REDIS_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) { //命令引數個數錯誤
return REDIS_OK;
}
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
call(c,REDIS_CALL_FULL); //執行命令
}
return REDIS_OK;
}
3)呼叫setCommand()執行命令
setCommand()在上一篇中已經進行了簡單的介紹,需要注意的是最後setCommand()會執行
addReply(c, ok_reply ? ok_reply : shared.ok);
將操作的結果返回給客戶端
2.3、呼叫addReply將結果返回給client
1)該函式會呼叫prepareClientToWrite()首先將要返回給客戶端的結果按照一定的格式儲存到緩衝區中
2)然後呼叫aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,sendReplyToClient, c)
在該連線fd上註冊寫事件。
這樣當server下一次執行aeMain函式時,就會檢測到有寫事件發生,就會呼叫sendReplyToClient()函數了。
3)在sendReplyToClient()函式中,就會呼叫write系統呼叫將結果通過socket返回給client了。
這樣整個set key value命令就執行完了
本文所引用的原始碼全部來自Redis3.0.7版本