1. 程式人生 > >redis學習筆記(12)---server基本流程

redis學習筆記(12)---server基本流程

server工作流程

  當執行./redis-server後,redis資料庫的server端就會啟動。
  然後就會執行redis.c中的main()函式
  其中main()函式中的工作可以主要分為以下幾個部分:
  

  • 1、初始化server端的配置資訊- - -initServerConfig()
  • 2、解析執行時的命令引數,並根據引數進行處理,eg:./redis-server - -help
  • 3、如果設定了daemonize引數,則將server設為deamon程序- - -daemonize()
  • 4、啟動server- - -initServer()
  • 5、設定週期性處理函式beforeSleep()
  • 6、開始工作- - -aeMain()

1、initServerConfig()

  初始化server端的配置資訊,儲存在伺服器例項server中,包括監聽埠、DB數、命令表等資訊。
  

2、daemonize()  

  將程序設為守護程序。
  守護程序的相關知識之前在linux程序基礎 中已經進行了簡單介紹。  

void daemonize(void) {
    int fd;
    if (fork() != 0) exit(0); /* parent exits */
    setsid(); /* create a new session */
    if
((fd = open("/dev/null", O_RDWR, 0)) != -1) { dup2(fd, STDIN_FILENO); dup2(fd, STDOUT_FILENO); dup2(fd, STDERR_FILENO); if (fd > STDERR_FILENO) close(fd); } }

3、initServer()

  這個函式中完成了非常多的任務,包括設定訊號處理函式、 建立clients佇列、slaves佇列、建立資料庫、建立共享物件等。
  除此之外最重要的兩個任務是建立監聽socket並監聽client、以及建立週期性處理事件。


  我們知道,任何一個伺服器的的事件都可以分為IO讀寫事件和時間處理事件,redis同樣如此。
  1)IO讀寫事件,包括監聽客戶端的連線以及與客戶端進行資料互動等
  2)時間處理事件,在設定的時間處理相關事件,包括週期性重新整理資料庫等。
  這兩類事件都是通過server.el這個變數來儲存的。

3.1、IO讀寫事件

  首先redis會為伺服器建立一個監聽fd,來監聽來自客戶端的連線,主要會呼叫到以下兩個函式  

listenToPort(server.port,server.ipfd,&server.ipfd_count);
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL);
  • listenToPort:根據傳入的port建立監聽描述符,同時呼叫fcntl()將fd設為非阻塞的,然後呼叫bind()、listen()函式。注意redis會分別根據IPv4、IPv6兩種地址分別建立一個socket
  • aeCreateFileEvent:建立讀寫事件。對於監聽描述符而言,只需要建立一個讀事件監聽來自client的連線即可。注意,監聽描述符的回撥函式為acceptTcpHandler
  最後將該事件加入到server.el結構中

3.2、時間處理事件

  對於server端,redis會週期性的執行serverCron()來完成一些處理,因此將這個事件也加入到server.el結構中  

aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
//每1ms執行一次serverCron()

4、beforeSleep()

  在redis的事件主迴圈中,每次迴圈都會執行一次,其中包括向所有slave傳送ACK、寫AOF檔案等操作

5、aeMain()  

  redis伺服器端最重要的函式,為redis的事件主迴圈。如果redis沒有接收到中斷訊號,那麼就會一直迴圈執行這個函式。  

aeMain(server.el);
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

  可以發現,該函式就是死迴圈的執行beforesleep()和aeProcessEvents()。
  作為redis伺服器端的核心流程,aeProcessEvents()的實現程式碼較長,但是主要也只有3個動作
  

  • 1、計算呼叫select、epoll等函式可以阻塞的時間
  • 2、呼叫aeApiPoll()等待IO事件發生,若有事件發生,則呼叫相應的回撥函式
  • 3、呼叫processTimeEvents()處理時間事件
  •   

  由於select、epoll等IO複用機制在一定時間內沒有事件發生時,會一直阻塞在那裡。因此為了不影響後面時間事件的處理,必須在最近的一個時間事件到來之前,完成IO複用機制的呼叫。因此首先找到最近一個時間事件,計算距離當前時間的時間差,來作為呼叫aeApiPoll()的引數。  

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { 
        //1、有時間事件時,計算時間差
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))  
            shortest = aeSearchNearestTimer(eventLoop); //找到最近一個時間事件
        if (shortest) {
            aeGetTime(&now_sec, &now_ms); //得到當前時間
            tvp = &tv;
            //計算時間差tvp
            tvp->tv_sec = shortest->when_sec - now_sec; 
            if (shortest->when_ms < now_ms) { 
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            if (flags & AE_DONT_WAIT) {  //此時不阻塞,立即返回
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else { //否則可以永遠等待
                tvp = NULL; /* wait forever */
            }
        }
        //2、呼叫IO複用機制,處理IO事件
        numevents = aeApiPoll(eventLoop, tvp);  
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
            if (fe->mask & mask & AE_READABLE) { //處理讀事件
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) { //處理寫事件
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    } 
    //3、處理時間事件
    if (flags & AE_TIME_EVENTS)  
        processed += processTimeEvents(eventLoop);
    return processed;
}

  這就是redis伺服器側主要的工作流程了。

具體例子:

1、當有一個client連線到來時

  此時redis伺服器端的監聽描述符就會有事件發生,之前已經提到過該fd上只註冊了讀事件acceptTcpHandler(),因此執行fe->rfileProc(eventLoop,fd,fe->clientData,mask);就會呼叫acceptTcpHandler()函式
  1)首先acceptTcpHandler()會呼叫accept獲取連線描述符cfd
  2)然後呼叫acceptCommonHandler()建立一個client例項  
  3)在createClient()中會為每個連線描述符註冊讀事件readQueryFromClient() ,同時將每個client的預設資料庫設為0
  這樣client和server端的連線就建立好了。當有客戶端請求到來時,就會執行readQueryFromClient()函式 ,來處理該請求了。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); //1、呼叫accept獲取連線描述符cfd
        if (cfd == ANET_ERR) {
            ......
        }
        acceptCommonHandler(cfd,0); 
    }
} 
static void acceptCommonHandler(int fd, int flags) {
    redisClient *c;
    if ((c = createClient(fd)) == NULL) { //2、建立client例項
        close(fd); 
        return;
    }
}
redisClient *createClient(int fd) {
    redisClient *c = zmalloc(sizeof(redisClient));
    if (fd != -1) {
        anetNonBlock(NULL,fd);
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR){  //3、註冊讀事件
            /*  ......  */
        }
    }
    selectDb(c,0);
    /*  ......  */
    return c;
}

2、當有客戶端請求到來時(eg:執行命令 set key value )

2.1、readQueryFromClient()讀入請求並處理

  首先連線fd上的讀事件會被觸發,因此server端會呼叫readQueryFromClient()來進行處理。主要過程是:
    

  • 1、為接收快取區申請記憶體
  • 2、呼叫read從客戶端讀入請求資料到c->querybuf中
  • 3、呼叫processInputBuffer對請求進行處理
  •   
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);  //1、申請空間
    nread = read(fd, c->querybuf+qblen, readlen);  //2、讀請求
    processInputBuffer(c);   //3、處理輸入請求
}

  當執行命令 set key value後,列印c->querybuf得到如下結果:
  這裡寫圖片描述
  即其中內容的格式為:
  這裡寫圖片描述

2.2、processInputBuffer()處理請求

  在processInputBuffer()中
  1)首先呼叫processMultibulkBuffer,按照協議格式,將接收緩衝區中的內容解析出來,並為每個引數建立一個字串物件robject  

//主要處理如下 
ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); //1、首先解析出引數個數並轉換成數字
c->multibulklen = ll; //將引數個數賦給multibulklen
c->argv = zmalloc(sizeof(robj*)*c->multibulklen);  //2、為物件分配記憶體
c->argv[c->argc++] = createStringObject(c->querybuf+pos,c->bulklen); //3、依次建立ll個物件

  對於本例,ll = 3,最終會生成3個字串物件,字串的內容分別為”set” 、”key” 、”value”。
  c->argv的型別為robj **argv; ,因此可以將其看作一個數組,陣列中的每一項指向一個robj。
  在上一章已經講過,當字串長度小於39位元組時,會採用embstr編碼方式來組織資料,因此最終c->argv的內容如下:
  這裡寫圖片描述
    
  2)呼叫processCommand對命令進行處理
  首先查詢命令,當命令不存在或引數個數不對時,錯誤則直接返回。
  然後中間會進行一系列判斷,暫時不管
  最後就呼叫call()處理命令
  如本例中的命令為set,因此會在redisCommandTable中找到set命令,然後執行set命令對應的函式setCommand()

int processCommand(redisClient *c) {
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {  //沒有找到命令
        return REDIS_OK;
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < -c->cmd->arity)) { //命令引數個數錯誤
        return REDIS_OK;
    }
    if (c->flags & REDIS_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        call(c,REDIS_CALL_FULL);  //執行命令
    }
    return REDIS_OK;
}

  3)呼叫setCommand()執行命令
  setCommand()在上一篇中已經進行了簡單的介紹,需要注意的是最後setCommand()會執行
  addReply(c, ok_reply ? ok_reply : shared.ok); 將操作的結果返回給客戶端
  

2.3、呼叫addReply將結果返回給client

  1)該函式會呼叫prepareClientToWrite()首先將要返回給客戶端的結果按照一定的格式儲存到緩衝區中
  2)然後呼叫aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,sendReplyToClient, c)該連線fd上註冊寫事件。
  這樣當server下一次執行aeMain函式時,就會檢測到有寫事件發生,就會呼叫sendReplyToClient()函數了。
  3)在sendReplyToClient()函式中,就會呼叫write系統呼叫將結果通過socket返回給client了。

  這樣整個set key value命令就執行完了



本文所引用的原始碼全部來自Redis3.0.7版本