1. 程式人生 > >Redis網路庫原始碼分析(3)之ae.c

Redis網路庫原始碼分析(3)之ae.c

一、aeCreateEventLoop & aeCreateFileEvent

上一篇文章中,我們已經將伺服器啟動,只是其中有些細節我們跳過了,比如aeCreateEventLoop函式到底做了什麼? 接下來我們要分析ae.c檔案,它是整個Redis網路事件框架,其中定義了各個管理事件的函式,比如aeCreateFileEventaeDeleteFileEvent分別是註冊新的事件和刪除事件。

其實aeCreateEventLoop的作用主要是給server->loop申請空間。

//ae.c


aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop; 

    if
((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) { //給eventLoop申請空間 goto err; } eventLoop->events = zmalloc(sizeof(aeFileEvent) * setsize); //給events連結串列申請空間 eventLoop->fired = zmalloc(sizeof(aeFiredEvent) * setsize); //給fired連結串列申請空間 if (eventLoop->events == NULL || eventLoop->fired == NULL
) { goto err; } eventLoop->setsize = setsize; //設定大小 eventLoop->lastTime = time(NULL); //設定lastTime=now eventLoop->timeEventHead = NULL; //定時事件連結串列置空 eventLoop->timeEventNextId = 0; //定時事件的id為0 eventLoop->stop = 0; //stop為0
eventLoop->maxfd = -1; //最大檔案描述符為0 eventLoop->beforesleep = NULL; //beforesleep設定為NULL if (aeApiCreate(eventLoop) == -1) { //給EPOLL申請空間 goto err; } /* Events with mask == AE_NONE are not set. So let's initialize the vector with it. */ for (int i = 0; i < setsize; i++) { eventLoop->events[i].mask = AE_NONE; //將每一個fd的事件初始化為0 } return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL; }

至此申請空間完成,我們整個EventLoop結構如下圖所示:

這裡寫圖片描述

我們完成了所有空間的申請和初始化工作:

loop->events : 是一個aeFileEvent 陣列,大小為 setsize 。
loop->fired :  是一個aeFiredEvent 陣列,大小也為 setsize 。
loop->timeEventHead :目前為NULL
loop->apidata:指向aeApiState,包含epfd和epoll_event陣列。

接著我們呼叫anetTcpServer返回了listen_fdanetTcpServer我們在anet.c分析的時候再說,接下來重點是我們呼叫aeCreateFileEventlisten_fd註冊到epfd上的過程。

//ae.c


int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) {
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];  //利用fe指向eventLoop->events[listen_fd]

    if (aeApiAddEvent(eventLoop, fd, mask) == -1) { //本質是呼叫epoll_ctl(epfd,EPOLL_CTL_ADD,fd,...);
        return AE_ERR;
    }

    fe->mask |= mask;                          //如果fe->mask之前不是空,現在就相當於同時監控兩個事件
    if (mask & AE_READABLE) {                
        fe->rfileProc = proc;                  //說明proc是讀操作的處理函式
    }

    if (mask & AE_WRITABLE) {
        fe->wfileProc = proc;                  //說明proc是寫操作的處理函式
    }

    fe->clientData = clientData;               //讓它們指向同一個client或者server例項
    if (fd > eventLoop->maxfd) {
        eventLoop->maxfd = fd;                 //如果新的fd大於maxfd,則更新maxfd
    }

    return AE_OK;
}

此時我們的整個EventLoop變成了下面這個樣子:

這裡寫圖片描述

可以看到:

1loop->events[4].mask  = 1 , 表示讀,rfileProc 為 acceptTcpHandler。 因為它是listen_fd,負責接受連線。為什麼是 4 呢?因為 3 已經作為 epfd 的檔案描述符了。

2 : 我們將 fd = 4 & EPOLLIN事件註冊給了epfd。

現在就等著來新的連線了,因為這樣的話一旦檢測到listen_fd上有資料可讀,那就會呼叫acceptTcpHandler接受連線,這也是回掉機制的一種體現:我們現在已經給listen_fd註冊了相應的回掉函數了,等著事件發生,然後去呼叫註冊好的函式。我們繼續往下走繼續看這個過程:

二、aeProcessEvents & acceptTcpHandler

繼續向下會進入aeMain,之後一直輪詢呼叫aeProcessEvents,接下來我們分析下aeProcessEvents到底是怎麼處理各類事件的:

//ae.c


int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) { 
        //如果flags什麼事件都沒有監聽,return 0
        return 0;
    }

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    /* 注意,我們即使沒有檔案事件,但是仍然想呼叫select/epoll,讓其阻塞直到我們想處理的
     * 定時事件發生為止*/

    if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { 
        //如果有定時事件處理
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) {
            // for 迴圈查詢到最近需要發生的定時事件
            shortest = aeSearchNearestTimer(eventLoop); 
        }

        if (shortest) {
            long now_sec, now_ms;
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next time event to fire? */
            /* 計算我們需要等待的ms數,直到最近的定時事件發生*/
            long long ms = (shortest->when_sec - now_sec) * 1000 + shortest->when_ms - now_ms;


            if (ms > 0) {
                //如果定時事件沒有過期,計算出需要等待的時間,作為epoll_wait的第四個引數
                tvp->tv_sec = ms / 1000;
                tvp->tv_usec = (ms % 1000) * 1000;
            } else {
                //否則置為0,epoll_wait就不會阻塞
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout to zero */
            /*如果沒有找到定時事件 */
            if (flags & AE_DONT_WAIT) {  //設定了AE_DONT_WAIT操作,就不等
                tv.tv_sec = tv.tv_usec = 0; 
                tvp = &tv;
            } else {                     //否則就阻塞等待直到事件發生  
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        numevents = aeApiPoll(eventLoop, tvp);  //呼叫epoll_wait函式,返回需要處理的事件列表

        for (int i = 0; i < numevents; i++) {   //遍歷依次處理loop->fired
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[i].fd];
            int mask = eventLoop->fired[i].mask;
            int fd = eventLoop->fired[i].fd;

            int rfired = 0; 
            /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) { 
                rfired = 1;     //確保讀或者寫只執行一個
                fe->rfileProc(eventLoop, fd, fe->clientData, mask); //執行讀處理
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop, fd, fe->clientData, mask);
                }
            }
            processed++;
        }
    }
    /* Check time events */
    /* 處理所有的時間事件 */
    if (flags & AE_TIME_EVENTS) {
        processed += processTimeEvents(eventLoop);
    }

    return processed; /* return the number of processed file/time events */
}

假設 listen_fd此時發生了事件,那一定是有新的連線過來,fe->rfileProc(eventLoop, fd, fe->clientData, mask) 就會使用 acceptTcpHandler接受連線:

static void acceptTcpHandler(aeEventLoop *loop, int fd, void *data, int mask)
{
    char cip[64];
    int cport;

    server_t *server = (server_t *)data;

    int cfd = anetTcpAccept(NULL, fd, cip, sizeof(cip), &cport); //呼叫accept接受連線
    if (cfd != -1) {
        printf("accepted ip %s:%d\n", cip, cport);
        anetNonBlock(NULL, cfd);                                 //設定socket非阻塞
        anetEnableTcpNoDelay(NULL, cfd);                         //開啟TcpNoDelay選項
        client_t *client = alloc_client();                       //申請一個新的客戶端
        if (!client) {
            printf("alloc client error...close socket\n");
            close(fd);
            return;
        }

        client->loop = loop;    
        client->fd = cfd;

        if (aeCreateFileEvent(loop, cfd, AE_READABLE, readEventHandler, client) == AE_ERR) {
            //繼續呼叫aeCreateFileEvent給新連線的fd註冊可讀事件,並且註冊讀函式readEventHandler
            if (errno == ERANGE) {
                // or use aeResizeSetSize(server->loop, cfd) modify this limit
                printf("so many client, close new.");
            } else {
                printf("create socket readable event error, close it.");
            }
            free_client(client);
        }
    }
}

處理到這裡,算是接受了一個連線,至於以後的讀寫操作,等到下次再分析~