1. 程式人生 > >Redis原始碼學習(2):事件迴圈

Redis原始碼學習(2):事件迴圈

Redis中的事件迴圈是他的ae模組(advance eventloop?),這是個簡單的事件迴圈模組,自身實現了事件迴圈框架和時間事件邏輯,而具體事件處理則根據不同的系統編譯不同的模組。ae主要由這幾個模組組成:

  • ae.c
  • ae.h
  • ae_epoll.c
  • ae_evport.c
  • ae_kqueue.c
  • ae_select.c

可以看出來,ae實現了epoll,evport,kqueue和select模型幾種,需要設定引數來配置具體的模組,關於這幾種事件模型原理這裡就不細說了,本文僅以epoll來說。

/* Include the best multiplexing layer supported by this system.
 - The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif

所有epoll事件均是以陣列元素的形式新增到陣列中,fd作為陣列索引,整個事件迴圈框架會維護兩個陣列:註冊事件陣列和觸發事件陣列,在poll時會把註冊事件中完成的事件新增到觸發事件陣列中。而時間事件則是一個單向連結串列:

/* State of an event based program */
typedef struct aeEventLoop {
    ...
    //註冊的可以被epoll觸發的事件陣列
    aeFileEvent *events; /* Registered events */
    //已經觸發的epoll事件陣列
    aeFiredEvent *fired; /* Fired events */
    //時間事件連結串列,利用epoll超時時間來實現
    aeTimeEvent *timeEventHead;
    ...
} aeEventLoop;

typedef struct aeApiState {
    int epfd;
    //epoll內部陣列,用於接收以觸發事件陣列
    struct epoll_event *events;
} aeApiState;

這樣在新增新的事件時就需要把fd,屬性,回撥以及回撥引數等分別新增到這些陣列中。

aeFileEvent *fe = &eventLoop->events[fd];
//新增到epoll
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
    return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
//設定最大fd
if (fd > eventLoop->maxfd)
    eventLoop->maxfd = fd;

而對於時間事件,則只是簡單的新增到時間佇列中:

aeTimeEvent *te;

te = zmalloc(sizeof(*te));
...
te->next = eventLoop->timeEventHead;
eventLoop->timeEventHead = te;

ae框架中還有兩個回撥,一個是在一次事件處理前執行,一個是在事件處理後執行,這樣使用者就有足夠的自由度來處理一些自己的事情。:

/* State of an event based program */
typedef struct aeEventLoop {
    ......
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;

在Redis中,預處理回撥(beforesleep)會在每次進入事件前:

  • 檢查是否停止事件迴圈。
  • 快速的檢查並清理超時鍵值對。
  • 向從伺服器傳送ACK請求。
  • 處理上一輪迴圈中阻塞的請求。
  • 寫入aof檔案。
  • 將傳送給client的緩衝區傳送掉。

完成這些後,ae將進入下一輪事件迴圈。

eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
     eventLoop->beforesleep(eventLoop);
     aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}

處理迴圈時,首先會在時間事件中找到離現在(now)的最近的待觸發時間事件,並將到這個事件所需的時間設定為epoll的超時時間,這樣epoll一旦超時,意味著剛剛找到的時間事件需要處理了。

        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        //找到最近的時間事件
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            //計算出和現在的差值並作為超時時間
            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            //如果差值小於0,那應該有個時間事件已經過了,應該趕緊處理
            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        //沒找到時間事件
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            //如果需要一直阻塞就直接設定NULL,否則超時時間0
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        //執行poll
        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        numevents = aeApiPoll(eventLoop, tvp);

aeApiPoll會把觸發的事件放到觸發陣列(fired)中:

retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
    int j;
    numevents = retval;
    for (j = 0; j < numevents; j++) {
        int mask = 0;
        struct epoll_event *e = state->events+j;
        ......
        eventLoop->fired[j].fd = e->data.fd;
        eventLoop->fired[j].mask = mask;
    }
}

然後框架依次去執行觸發陣列的回撥函式:

for (j = 0; j < numevents; j++) {
      aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
      int fd = eventLoop->fired[j].fd;
      int rfired = 0;

/* note the fe->mask & mask & ... code: maybe an already processed
 * event removed an element that fired and we still didn't
 * processed, so we check if the event is still valid. */
       if (fe->mask & mask & AE_READABLE) {
           rfired = 1;
           fe->rfileProc(eventLoop,fd,fe->clientData,mask);
       }
       if (fe->mask & mask & AE_WRITABLE) {
           if (!rfired || fe->wfileProc != fe->rfileProc)
              fe->wfileProc(eventLoop,fd,fe->clientData,mask);
       }
       processed++;
}

最後會處理一下時間事件連結串列,其實就是檢查一下這個連結串列,如果有需要刪除的就觸發回收回撥並刪除,如果有時間到的那就觸發時間回撥:

    while(te) {
        long now_sec, now_ms;
        long long id;

        //這應該是上一輪標記需要刪除的時間事件,現在刪除並觸發回收回撥
        /* Remove events scheduled for deletion. */
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            if (prev == NULL)
                eventLoop->timeEventHead = te->next;
            else
                prev->next = te->next;
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }

        /* Make sure we don't process time events created by time events in
         * this iteration. Note that this check is currently useless: we always
         * add new timers on the head, however if we change the implementation
         * detail, this check may be useful again: we keep it here for future
         * defense. */
        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        //如果事件已到,就觸發時間回撥,如果時間回撥返回AE_NOMORE,那就標記需要刪除這個時間事件。
        aeGetTime(&now_sec, &now_ms);
        //這裡還是有可能超過設定時間的,這種也需要觸發
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;

            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        prev = te;
        te = te->next;
    }

這種將時間事件作為poll的超時時間的機制可以很好的在一個事件迴圈中同時新增檔案時間和時間事件併兼容多種事件模型。
因為fd是以陣列的形式儲存的,所以一開始需要分配足夠的空間,而相應的觸發陣列和內部陣列也需要相應的大小。同時這種結構也無法向Windows相容。

相關推薦

Redis原始碼學習(2):事件迴圈

Redis中的事件迴圈是他的ae模組(advance eventloop?),這是個簡單的事件迴圈模組,自身實現了事件迴圈框架和時間事件邏輯,而具體事件處理則根據不同的系統編譯不同的模組。ae主要由這幾個模組組成: ae.c ae.h ae_epoll.c

Redis2.2.2原始碼學習——aeEvent事件輪詢

背景                Redis的事件主要分為檔案事件和定時器事件,作者對這兩種事件處理的高階之處在於預先計算最近一個要超時的定時器距離當前的事件間隔,在這個時間間隔內呼叫poll函式處理檔案事件,之後再處理定時器事件。 “Redis在處理請求時完全是用單執

深度理解nodejs[2]-事件迴圈

程序與執行緒 我們在電腦中會執行多個程式,每一個程式中都會有多個執行緒。例如我們執行比特幣客戶端的時候,我們某一個執行緒要處理網路、某一個執行緒要處理挖礦、某一個執行緒要處理使用者輸入…執行緒的排程使用了作業系統級別的排程器來明確了哪一個執行緒應該被執行。執行緒也有優先順序之分,例如監聽滑鼠滑動的優先順

struts1原始碼學習2(initChain)

先上程式碼 protected void initChain()         throws ServletException {    &nb

redis原始碼分析—AE事件處理機制

redis中有兩類事件,一類是IO事件統一封裝成aeFileEvent,底層呼叫系統支援的多路複用層(evport、epoll、kqueue、select),一類是定時器事件,以aeTimeEvent描述,巧妙借用系統函式epoll_wait等阻塞函式設定阻塞時間以達到觸發條

Redis原始碼學習(1):adlist

redis中的adlist是一個簡單的無環雙向連結串列,主要邏輯在這兩個檔案中實現: adlist.h adlist.c 結構也很簡單,資料結構中最基本的結構。 新增節點程式碼: if (after) { node->prev =

libevent原始碼學習-----統一事件源及訊號繫結函式

libevent在對檔案描述符,套接字進行監控時直接放到event,這些event通過io多路複用函式進行監控,然而對應訊號來說io複用函式卻無能為力,為了解決問題,libevent採用統一事件源的方式,即將訊號也表現成event的形式,用到了socketpai

結合redis設計與實現的redis原始碼學習-1-記憶體分配(zmalloc)

在進入公司後的第一個任務就是使用redis的快取功能實現伺服器的雲託管功能,在瞭解了大致需求後,依靠之前對redis的瞭解封裝了常用的redis命令,並使用單例的連線池來維護與redis的連線,使用連線池來獲取redis的連線物件,依靠這些功能基本可以實現要求的

Redis原始碼學習-AOF

前言 網路上也有許多介紹redis的AOF機制的文章,但是從巨集觀上介紹aof的流程,沒有具體分析在AOF過程中涉及到的資料結構和控制機制。昨晚特別看了2.8原始碼,感覺原始碼中的許多細節是值得細細深究的。特別是list *aof_rewrite_buf_blocks結構。

Redis原始碼學習之【命令協議格式】

介紹 本來這篇要介紹Redis的命令解析的,但是要想對Redis的命令解析有更直觀的瞭解,必須先了解Redis的命令協議格式。 原始碼 暫無(或者是是在network.c中吧) 分析 Requests *<number of arguments> CR LF $

Vue.js原始碼學習三 —— 事件 Event 學習

早上好!繼續學習Vue原始碼~這次我們來學習 event 事件。 原始碼簡析 其實看了前兩篇的同學已經知道原始碼怎麼找了,這裡再提一下。 先找到Vue核心原始碼index方法 src/core/instance/index.js func

px4原生原始碼學習-(2)--實時作業系統篇

/**************************************************************************************************************   po上我使用到的硬體和開發環境   px4硬體

Redis原始碼學習之【epoll封裝】

介紹 在上一篇博文中說到了在Redis 的事件處理中使用到了底層的linux epoll,根據Redis的實現可以使用其他的多路通訊層,但是在一般的linux伺服器中使用的最多的還是epoll所以這裡主要介紹一下epoll。Redis並沒有直接的使用linux的epoll而

Asp.NetCore原始碼學習[2-1]:配置[Configuration]

Asp.NetCore原始碼學習[2-1]:配置[Configuration] 在Asp. NetCore中,配置系統支援不同的配置源(檔案、環境變數等),雖然有多種的配置源,但是最終提供給系統使用的只有一個物件,那就是ConfigurationRoot。其內部維護了一個集合,用於儲存各種配置源的ICo

Asp.NetCore原始碼學習[2-1]:日誌

Asp.NetCore原始碼學習[2-1]:日誌 在一個系統中,日誌是不可或缺的部分。對於.net而言有許多成熟的日誌框架,包括Log4Net、NLog、Serilog 等等。你可以在系統中直接使用這些第三方的日誌框架,也可以通過這些框架去適配ILoggerProvider 和 ILogger介面。適配

曹工說Redis原始碼2)-- redis server 啟動過程解析及簡單c語言基礎知識補充

文章導航 Redis原始碼系列的初衷,是幫助我們更好地理解Redis,更懂Redis,而怎麼才能懂,光看是不夠的,建議跟著下面的這一篇,把環境搭建起來,後續可以自己閱讀原始碼,或者跟著我這邊一起閱讀。由於我用c也是好幾年以前了,些許錯誤在所難免,希望讀者能不吝指出。 曹工說Redis原始碼(1)-- redi

redis原始碼學習之工作流程初探

[toc] ## 背景 redis是當下比較流行的KV資料庫之一,是抵禦高併發的一把利器,本著知其然還要知其所以然的目的,我決定花一點時間來研究其原始碼,希望最後能向自己解釋清楚“redis為什麼這麼快”這個疑惑,第一篇主要介紹環境搭建和redis工作流程初探,後期會陸續獻上其他有意思的章節。 ## 環境

redis原始碼學習之slowlog

[toc] ##背景 redis雖說是一個基於記憶體的KV資料庫,以高效能著稱,但是依然存在一些耗時比較高的命令,比如keys *,lrem等,更有甚者會在lua中寫一些比較耗時的操作,比如大迴圈裡面執行命令等,鑑於此,本篇將從原始碼角度分析redis慢日誌的記錄原理,並給出一些自己的看法。 #

co_routine.cpp/.h/inner.h(第四部分:定時器和事件迴圈)—— libco原始碼分析、學習筆記

由於本原始碼蠻長的,所以按照功能劃分模組來分析,分為若干部分,詳見二級目錄↑ 定時器和事件迴圈 libco管理定時事件便是使用時間輪這種資料結構 定時器前驅知識本篇只稍微提一下,具體知識請參考《Linux高效能伺服器程式設計 遊雙 著》第11章,或其他方式學

微信小程序學習Course 2 事件

類型 family 函數代碼 傳輸數據 tail function 冒泡 微信 log 微信小程序學習Course 2 事件 事件是用來打通邏輯層與視圖層的樞紐,我們一般在視圖層(WXML文件)對某個控件綁定事件函數,在邏輯層(JS文件)編寫事件函數代碼。 2.1 事件類型