1. 程式人生 > >菜鳥學習nginx之事件模組epoll(1)

菜鳥學習nginx之事件模組epoll(1)

上一篇介紹核心事件模組,本篇介紹事件模組ngx_epoll_module。Nginx在linux環境下采用epoll網路模型,對於epoll網路型不瞭解的可自行百度查詢,本篇不在闡述。

一、問題

本篇要澄清以下幾個問題:

1、當客戶端發起TCP連線後,事件模組是如何管理新連線?

2、Nginx是如何接收到客戶端請求(只是TCP層請求非HTTP請求)?

3、Nginx是如何傳送響應給客戶端(只是TCP層響應)?

4、超時事件管理方式

二、事件模組ngx_epoll_module

2.0、介面定義

typedef struct
{
    ngx_int_t (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
    ngx_int_t (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

    ngx_int_t (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
    ngx_int_t (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

    ngx_int_t (*add_conn)(ngx_connection_t *c);
    ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);

    ngx_int_t (*notify)(ngx_event_handler_pt handler);

    ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer,
                                ngx_uint_t flags);

    ngx_int_t (*init)(ngx_cycle_t *cycle, ngx_msec_t timer);
    void (*done)(ngx_cycle_t *cycle);
} ngx_event_actions_t;

typedef struct
{
    ngx_str_t *name;

    void *(*create_conf)(ngx_cycle_t *cycle);
    char *(*init_conf)(ngx_cycle_t *cycle, void *conf);

    ngx_event_actions_t actions;
} ngx_event_module_t;

 通過ngx_event_module_t定義一個新的事件模組。事件模組主要是用接收、傳送報文,通常情況下我們不需要自己建立一個新的事件模組。介面ngx_event_actions_t中add、del是一對操作,主要用於向epoll註冊事件、刪除事件。init用於初始化事件模組,對於epoll來說init回撥函式是ngx_epoll_init。

2.1、模組定義

static ngx_str_t epoll_name = ngx_string("epoll");

static ngx_command_t ngx_epoll_commands[] = {

    {ngx_string("epoll_events"),
     NGX_EVENT_CONF | NGX_CONF_TAKE1,
     ngx_conf_set_num_slot,
     0,
     offsetof(ngx_epoll_conf_t, events),
     NULL},

    {ngx_string("worker_aio_requests"),
     NGX_EVENT_CONF | NGX_CONF_TAKE1,
     ngx_conf_set_num_slot,
     0,
     offsetof(ngx_epoll_conf_t, aio_requests),
     NULL},

    ngx_null_command};

static ngx_event_module_t ngx_epoll_module_ctx = {
    &epoll_name,
    ngx_epoll_create_conf, /* create configuration */
    ngx_epoll_init_conf,   /* init configuration */

    {
        ngx_epoll_add_event,      /* add an event */
        ngx_epoll_del_event,      /* delete an event */
        ngx_epoll_add_event,      /* enable an event */
        ngx_epoll_del_event,      /* disable an event */
        ngx_epoll_add_connection, /* add an connection */
        ngx_epoll_del_connection, /* delete an connection */
#if (NGX_HAVE_EVENTFD)
        ngx_epoll_notify, /* trigger a notify */
#else
        NULL, /* trigger a notify */
#endif
        ngx_epoll_process_events, /* process the events */
        ngx_epoll_init,           /* init the events */
        ngx_epoll_done,           /* done the events */
    }};

ngx_module_t ngx_epoll_module = {
    NGX_MODULE_V1,
    &ngx_epoll_module_ctx, /* module context */
    ngx_epoll_commands,    /* module directives */
    NGX_EVENT_MODULE,      /* module type */
    NULL,                  /* init master */
    NULL,                  /* init module */
    NULL,                  /* init process */
    NULL,                  /* init thread */
    NULL,                  /* exit thread */
    NULL,                  /* exit process */
    NULL,                  /* exit master */
    NGX_MODULE_V1_PADDING};

2.2、事件模組初始化

在上一篇有介紹到,在初始化ngx_event_core_module時會呼叫具體事件模組的init回撥函式即ngx_epoll_init,如果對於epoll模型比較瞭解,看起來應該很容易,具體程式碼如下:

/**
 * epoll模型初始化
 * @param cycle 核心結構體
 * @param timer 定時器超時時間
 */
static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
    ngx_epoll_conf_t *epcf;
    /* 獲取epoll模型下配置結構 */
    epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);

    if (ep == -1)
    {   
        /**
         * 建立epoll物件 其中epoll_create引數沒有意義
         */
        ep = epoll_create(cycle->connection_n / 2);

        if (ep == -1)
        {
            ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                          "epoll_create() failed");
            return NGX_ERROR;
        }

#if (NGX_HAVE_EVENTFD)
        /**
         * 使用eventfd實現 事件通知功能 主要用於多執行緒模式下  目前可忽略
         */
        if (ngx_epoll_notify_init(cycle->log) != NGX_OK)
        {
            ngx_epoll_module_ctx.actions.notify = NULL;
        }
#endif

#if (NGX_HAVE_FILE_AIO)
        ngx_epoll_aio_init(cycle, epcf);
#endif

#if (NGX_HAVE_EPOLLRDHUP)
        ngx_epoll_test_rdhup(cycle);
#endif
    }
    /**
     * 建立epoll_event陣列 用於儲存epoll返回事件
     * 陣列大小為512
     */
    if (nevents < epcf->events)
    {
        if (event_list)
        {
            ngx_free(event_list);
        }

        event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events,
                               cycle->log);
        if (event_list == NULL)
        {
            return NGX_ERROR;
        }
    }

    nevents = epcf->events;//一次性 最大可儲存 epoll事件數

    ngx_io = ngx_os_io;

    ngx_event_actions = ngx_epoll_module_ctx.actions;

#if (NGX_HAVE_CLEAR_EVENT)
    ngx_event_flags = NGX_USE_CLEAR_EVENT  /* ET模式 */
#else
    ngx_event_flags = NGX_USE_LEVEL_EVENT
#endif
                      | NGX_USE_GREEDY_EVENT | NGX_USE_EPOLL_EVENT;

    return NGX_OK;
}

2.3、事件模組關閉

事件模組關閉比較簡單,直接呼叫close方法,將對應的物件關閉即可

static void
ngx_epoll_done(ngx_cycle_t *cycle)
{
    /* 關閉epoll物件 */
    if (close(ep) == -1)
    {
        ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                      "epoll close() failed");
    }

    ep = -1;

#if (NGX_HAVE_EVENTFD)
    /* 關閉eventfd物件 */
    if (close(notify_fd) == -1)
    {
        ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                      "eventfd close() failed");
    }

    notify_fd = -1;

#endif

#if (NGX_HAVE_FILE_AIO)

    if (ngx_eventfd != -1)
    {

        if (io_destroy(ngx_aio_ctx) == -1)
        {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "io_destroy() failed");
        }

        if (close(ngx_eventfd) == -1)
        {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "eventfd close() failed");
        }

        ngx_eventfd = -1;
    }

    ngx_aio_ctx = 0;

#endif

    ngx_free(event_list); //釋放事件陣列

    event_list = NULL;
    nevents = 0;
}

三、事件迴圈

在上一小節介紹事件模組初始化和關閉流程,那最主要的事件迴圈是在哪裡實現的呢?其實在模組定義的時候就已經聲明瞭ngx_epoll_process_events,該函式在什麼地方呼叫呢?在介紹《菜鳥學習Nginx之啟動流程(3)》中有提到,如下所示:

static void
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
    ...
    for (;;)
    {
        ...
        /* 阻塞 等待事件或者定時器超時事件 */
        ngx_process_events_and_timers(cycle);
    }
}

3.1、事件迴圈

事件迴圈具體流程圖如下:

從流程圖中可得,主要有三部分內容需要處理:

1)獲取定時器超時事件,因為epoll_wait需要指定返回時間。超時時間可能為-1(表示永遠不超時), 也可能是大於0

2)是否開啟程序間互斥鎖,當有多個worker程序時會開啟。當開啟互斥鎖時就需要處理"驚群"問題。

3)處理事件

具體程式碼如下:

/**
 * 此函式是處理事件的入口函式,所有業務流程起始函式
 * 《深入理解Nginx模組開發與架構解析》 P331 
 */
void ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
    ngx_uint_t flags;
    ngx_msec_t timer, delta;

    if (ngx_timer_resolution)
    {//使用者指定時間精度,超時事件由SIGALARM觸發
        timer = NGX_TIMER_INFINITE;
        flags = 0;
    }
    else
    {
        //獲取下一個超時時間 如果二叉樹中沒有超時事件則返回-1 代表永久不超時
        timer = ngx_event_find_timer();
        flags = NGX_UPDATE_TIME; //表示需要更新時間快取

#if (NGX_WIN32)

        /* handle signals from master in case of network inactivity */

        if (timer == NGX_TIMER_INFINITE || timer > 500)
        {
            timer = 500;
        }

#endif
    }
    
    /* 解決驚群 */
    if (ngx_use_accept_mutex)
    {
        if (ngx_accept_disabled > 0)
        {//實現worker程序間負載均衡
            ngx_accept_disabled--;
        }
        else
        {//解決驚群,通過程序間同步鎖
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR)
            {
                return;
            }

            if (ngx_accept_mutex_held)
            {
                flags |= NGX_POST_EVENTS;
            }
            else
            {
                if (timer == NGX_TIMER_INFINITE || timer > ngx_accept_mutex_delay)
                {
                    timer = ngx_accept_mutex_delay;
                }
            }
        }
    }

    /* 記錄時間差 */
    delta = ngx_current_msec;
    
    /**
     * 如果是epoll模型 此處實際呼叫函式是ngx_epoll_process_events
     * 阻塞在epoll_wait
     */    
    (void)ngx_process_events(cycle, timer, flags);

    delta = ngx_current_msec - delta; /* 記錄時間差 */

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "timer delta: %M", delta);
    /**
     * 如果是epoll模型 在ngx_epoll_process_events函式中可能會對事件進行劃分
     * 劃分到不同佇列中,其中accept佇列要優先處理
     */    
    ngx_event_process_posted(cycle, &ngx_posted_accept_events);
    
    if (ngx_accept_mutex_held)
    {//釋放程序間鎖
        ngx_shmtx_unlock(&ngx_accept_mutex);
    }

    if (delta)
    {//時間差 表示時間超時,需要處理超時事件
        ngx_event_expire_timers();
    }
    /* 處理其他事件 */
    ngx_event_process_posted(cycle, &ngx_posted_events);
}

對於驚群處理,有單獨文章介紹,本篇不深入剖析。 

3.2、事件驅動 

接下來看一下事件驅動是如何處理的。上一小節中ngx_process_events函式實際呼叫的是ngx_epoll_process_events函式。該函式邏輯相對複雜一些,採用分片展示程式碼邏輯:

/**
 * 事件驅動
 * @param cycle 核心結構體
 * @param timer 等待時間
 * @param flags
 *        取值: NGX_POST_EVENTS、NGX_UPDATE_TIME 
 */
static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
    int events;
    uint32_t revents;
    ngx_int_t instance, i;
    ngx_uint_t level;
    ngx_err_t err;
    ngx_event_t *rev, *wev;
    ngx_queue_t *queue;
    ngx_connection_t *c;

    /* NGX_TIMER_INFINITE == INFTIM */

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "epoll timer: %M", timer);

   /**
     * timer不是固定不變的,如果沒有任何事件發生(空閒期),
     * timer可能是NGX_TIMER_INFINITE 即表示永久阻塞
     */
    events = epoll_wait(ep, event_list, (int)nevents, timer);

說明:

epoll_wait返回有三種場景:

1、超時時間到期,即timer > 0的場景,當超時時間到了仍然沒有事件發生(讀寫事件)

2、超時時間為timer=-1(永遠不超時),但是發生了SIGARLM訊號,當訊號處理函式結束後,epoll_wait返回,errno為NGX_EINTR

3、超時間未到期(timer=-1或者timer>0),發生了讀寫事件(一般正常情況)

所以下面的程式碼會針對這三種場景進行處理:

    /**
     * Nginx兩種時間策略: 
     *   1、如果nginx.conf檔案中定義時間精度timer_resolution,則表示nginx的時間
     *      快取精確到ngx_timer_resolution毫秒
     *   2、如果沒有定義時間精度 則嚴格按照系統時間
     * ----------------------------------------------------------------
     * 條件說明:
     *     flags & NGX_UPDATE_TIME  -- 表示強制更新系統時間
     *     ngx_event_timer_alarm  當採用時間精度時,nginx會啟動一個定時器,每次
     *                            超時,都會產生SIGALRM訊號。具體參考ngx_event_process_init
     */
    if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm)
    {//只要是時間相關的事件 就立即更新時間快取
        ngx_time_update(); //更新時間快取
    }

    if (err)
    {
        if (err == NGX_EINTR)
        {

            if (ngx_event_timer_alarm)
            {//表示發生了SIGALRM訊號中斷 認為是正常場景
                ngx_event_timer_alarm = 0;
                return NGX_OK;
            }

            level = NGX_LOG_INFO;
        }
        else
        {//異常場景
            level = NGX_LOG_ALERT;
        }

        ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
        return NGX_ERROR;
    }

    if (events == 0)
    {
        if (timer != NGX_TIMER_INFINITE)
        {//表示時間超時
            return NGX_OK;
        }

        ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                      "epoll_wait() returned no events without timeout");
        return NGX_ERROR;
    }

上面這段程式碼主要是針對場景1,場景2的處理 ,相對簡單。接下來看一下重頭戲,對於正常讀寫事件的處理流程。

for (i = 0; i < events; i++)
    {
        c = event_list[i].data.ptr;
        /* 指標變數 最後一位始終為0  節省記憶體空間 */
        instance = (uintptr_t)c & 1;
        c = (ngx_connection_t *)((uintptr_t)c & (uintptr_t)~1);

        rev = c->read;

        if (c->fd == -1 || rev->instance != instance)
        {/**
          * 當fd=-1 或者instance不一致表示 當前事件是過期事件不需要處理
          * 《深入理解Nginx模組開發與架構解析》一書:318頁 
          */
            /*
             * the stale event from a file descriptor
             * that was just closed in this iteration
             */

            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "epoll: stale event %p", c);
            continue;
        }

        revents = event_list[i].events;

        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "epoll: fd:%d ev:%04XD d:%p",
                       c->fd, revents, event_list[i].data.ptr);

        if (revents & (EPOLLERR | EPOLLHUP))
        {
            ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "epoll_wait() error on fd:%d ev:%04XD",
                           c->fd, revents);

            /*
             * if the error events were returned, add EPOLLIN and EPOLLOUT
             * to handle the events at least in one active handler
             */

            revents |= EPOLLIN | EPOLLOUT;
        }

迴圈遍歷事件佇列events_list中事件,從私有資料欄位中取出instance和conection物件。

說明:

1、instance值,利用指標最後一位始終為0的特性儲存。為什麼指標最後一位始終為0呢?作業系統在分配記憶體,為了提升訪問速度,經常是4位元組/8位元組對其,這樣分配出來的地址最後一位始終為0。

2、為什麼需要判斷rev->instance 和 instance是否相等?下面這段話摘自《深入理解Nginx模組開發與架構解析》一書:318頁:

那麼,過期事件又是怎麼回事呢?舉個例子,假設epoll_wait一次返回3個事件,在第1個事件的處理過程中,由於業務的需要,所以關閉了一個連線,而這個連線恰好對應第3個事件。這樣的話,在處理到第3個事件時,這個事件就已經是過期事件了,一旦處理必然出錯。既然如此,把關閉的這個連線的fd套接字置為–1能解決問題嗎?答案是不能處理所有情況。

下面先來看看這種貌似不可能發生的場景到底是怎麼發生的:假設第3個事件對應的ngx_connection_t連線中的fd套接字原先是50,處理第1個事件時把這個連線的套接字關閉了,同時置為–1,並且呼叫ngx_free_connection將該連線歸還給連線池。在
ngx_epoll_process_events方法的迴圈中開始處理第2個事件,恰好第2個事件是建立新連線事件,呼叫ngx_get_connection從連線池中取出的連線非常可能就是剛剛釋放的第3個事件對應的連線。由於套接字50剛剛被釋放,Linux核心非常有可能把剛剛釋放的套接字50又分配給新建立的連線。因此,在迴圈中處理第3個事件時,這個事件就是過期的了!它對應的事件是關閉的連線,而不是新建立的連線。

下面是讀事件處理,若是NGX_POST_EVENTS事件則延遲處理並且按照優先順序進行區分,Accept事件優先順序高於其他的讀寫事件,所以下面在處理讀事件進行區分處理,如下所示:

        if ((revents & EPOLLIN) && rev->active)
        {
#if (NGX_HAVE_EPOLLRDHUP)
            if (revents & EPOLLRDHUP)
            {
                rev->pending_eof = 1;
            }
            rev->available = 1;
#endif
            rev->ready = 1;
            if (flags & NGX_POST_EVENTS)
            { //需要延遲處理該事件
                queue = rev->accept ? &ngx_posted_accept_events
                                    : &ngx_posted_events;
                ngx_post_event(rev, queue); //將事件加入到事件佇列中
            }
            else
            {//立即處理該事件
                rev->handler(rev); //ngx_http_keepalive_handler
            }
        }

 寫事件處理與讀事件處理大同小異,如下:

        wev = c->write;
        if ((revents & EPOLLOUT) && wev->active)
        {
            if (c->fd == -1 || wev->instance != instance)
            {
                /*
                 * the stale event from a file descriptor
                 * that was just closed in this iteration
                 */

                ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                               "epoll: stale event %p", c);
                continue;
            }

            wev->ready = 1;
#if (NGX_THREADS)
            wev->complete = 1;
#endif

            if (flags & NGX_POST_EVENTS)
            {//延遲處理該事件
                ngx_post_event(wev, &ngx_posted_events);
            }
            else
            {//立即處理該事件
                wev->handler(wev);
            }
        }

四、總結

到這裡把epoll事件驅動處理流程介紹完畢。但是事件是如何註冊到epoll中呢又如何刪除呢?在下一篇介紹。