1. 程式人生 > >nginx原始碼閱讀(十三).新建連線並處理就緒事件

nginx原始碼閱讀(十三).新建連線並處理就緒事件

前言

本小節主要討論事件模組中如何接受新連線,其中涉及到了驚群以及負載均衡的處理。

如何建立新連線

之前在分析ngx_event_process_init函式時,將所有空閒連線形成連結串列之後,它會遍歷所有監聽埠並將其讀事件的回撥函式設定為ngx_event_accept

接著會把監聽連線的讀事件新增到ngx_epoll_module模組中。當執行ngx_epoll_process_events時,如果有新連線事件出現,則會呼叫ngx_event_accept來建立新的連線。

建立新連線的回撥函式

建立新連線的回撥函式是ngx_event_accept,每個監聽連線的讀事件的handler

都在ngx_event_process_init中設定為ngx_event_accept
下面直接看它的原始碼(省略了不必要的部分):

void
ngx_event_accept(ngx_event_t *ev)
{
    socklen_t          socklen;
    ngx_err_t          err;
    ngx_log_t         *log;
    ngx_socket_t       s;
    ngx_event_t       *rev, *wev;
    ngx_listening_t   *ls;
    ngx_connection_t  *c, *lc;
    ngx_event_conf_t  *ecf;
    u_char             sa[NGX_SOCKADDRLEN
]; ...... //獲取ngx_event_core_module模組的配置項結構體指標 ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module); if (ngx_event_flags & NGX_USE_RTSIG_EVENT) { ev->available = 1; } else if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) { ev->available = ecf->multi_accept; } lc = ev->data
;
ls = lc->listening; ev->ready = 0; ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0, "accept on %V, ready: %d", &ls->addr_text, ev->available); /* 這個迴圈以available標誌位作為判斷條件 * 當該標誌位被置1,則代表每次儘可能的多建立新連線 * 否則每次呼叫ngx_event_accept只建立一個新連線 */ do { socklen = NGX_SOCKADDRLEN; ...... //呼叫accept接受新連線 s = accept(lc->fd, (struct sockaddr *) sa, &socklen); if (s == -1) { err = ngx_socket_errno; if (err == NGX_EAGAIN) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err, "accept() not ready"); return; } ngx_log_error((ngx_uint_t) ((err == NGX_ECONNABORTED) ? NGX_LOG_ERR : NGX_LOG_ALERT), ev->log, err, "accept() failed"); if (err == NGX_ECONNABORTED) { if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available--; } if (ev->available) { continue; } } return; } ..... /* 這一步操作是設定負載均衡的閥值 * ngx_accept_disabled的初值是負數,為總連線的7/8 * 當ngx_accept_disabled為負數時, * 則不會觸發負載均衡的操作 * 當ngx_accept_disabled是正數時,就會觸發負載均衡的操作了(當前程序不會處理新連線,而是將ngx_accept_disabled減1) */ ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n; //獲取一個空閒連線 c = ngx_get_connection(s, ev->log); if (c == NULL) { if (ngx_close_socket(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_close_socket_n " failed"); } return; } ...... //每個連線都有其一個專屬的記憶體池 c->pool = ngx_create_pool(ls->pool_size, ev->log); if (c->pool == NULL) { ngx_close_accepted_connection(c); return; } c->sockaddr = ngx_palloc(c->pool, socklen); if (c->sockaddr == NULL) { ngx_close_accepted_connection(c); return; } ngx_memcpy(c->sockaddr, sa, socklen); log = ngx_palloc(c->pool, sizeof(ngx_log_t)); if (log == NULL) { ngx_close_accepted_connection(c); return; } /* set a blocking mode for aio and non-blocking mode for others */ /* 設定套接字的屬性 */ if (ngx_inherited_nonblocking) { if (ngx_event_flags & NGX_USE_AIO_EVENT) { if (ngx_blocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_blocking_n " failed"); ngx_close_accepted_connection(c); return; } } } else { if (!(ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT))) { if (ngx_nonblocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_nonblocking_n " failed"); ngx_close_accepted_connection(c); return; } } } *log = ls->log; /* 接下來設定連線的成員值 */ //設定接受、傳送字元流的方法 c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain; c->log = log; c->pool->log = log; c->socklen = socklen; c->listening = ls; c->local_sockaddr = ls->sockaddr; c->unexpected_eof = 1; #if (NGX_HAVE_UNIX_DOMAIN) if (c->sockaddr->sa_family == AF_UNIX) { c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED; c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED; ...... } #endif rev = c->read; wev = c->write; wev->ready = 1; if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT)) { /* rtsig, aio, iocp */ rev->ready = 1; } if (ev->deferred_accept) { rev->ready = 1; #if (NGX_HAVE_KQUEUE) rev->available = 1; #endif } rev->log = log; wev->log = log; /* * TODO: MT: - ngx_atomic_fetch_add() * or protection by critical section or light mutex * * TODO: MP: - allocated in a shared memory * - ngx_atomic_fetch_add() * or protection by critical section or light mutex */ c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); ...... if (ls->addr_ntop) { c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len); if (c->addr_text.data == NULL) { ngx_close_accepted_connection(c); return; } c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->addr_text.data, ls->addr_text_max_len, 0); if (c->addr_text.len == 0) { ngx_close_accepted_connection(c); return; } } ...... ngx_log_debug3(NGX_LOG_DEBUG_EVENT, log, 0, "*%d accept: %V fd:%d", c->number, &c->addr_text, s); if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) { /* 將該連線對應的讀和寫事件都新增到epoll監聽中*/ if (ngx_add_conn(c) == NGX_ERROR) { ngx_close_accepted_connection(c); return; } } log->data = NULL; log->handler = NULL; //呼叫監聽埠上的ngx_listening_t結構體的handler方法處理新連線 ls->handler(c); if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available--; } } while (ev->available); }

本來想畫一個大致的流程圖,不過發現《深入理解Nginx》中已經歸納的很好了,如圖:
這裡寫圖片描述

處理事件

之前分析ngx_epoll_module時,process_events對應的ngx_epoll_process_events函式還沒有分析,放在這裡比較合適,它用於處理事件,其中就呼叫了ngx_event_accept函式建立新連線。

這裡也會詳細說明關於如何跳過過期的事件。關於過期事件,是指呼叫epoll_wait返回的就緒的事件中包含一個連線上的多個事件,在處理前面的事件時關閉了該連線,這樣就會導致該連線上後面的事件過期失效。

前面的事件關閉了連線之後,將fd置為-1,然後再歸還到連線池中,這樣可以解決大多數過期事件的問題。但是由於核心分配描述符是從小到大依次分配的,因此如果關閉了連線之後,下一個事件新建連線,複用了剛釋放的連線,同時獲得的描述符也是剛歸回的描述符。這樣就導致原本過期的事件對應的是新建立的連線,它會以為自己並沒有過期,但是其實它對應的應該是已經關閉的連線,已經過期了。

關於這種特殊情況,nginx採用的處理方法是,利用ngx_event_t中的instance位,當重複使用連線的時候,就將該連線對應的該位取反,這樣早期獲得該連線的instance位,與馬上重用該連線對應instance就不相同了。關於如何儲存早期的instance位是一個問題,我們可以額外增加1位來專門儲存它,不過nginx有更好的辦法,沒有增加額外的成本,等下我們就可以看到這是如何做到的。

其實大部分超時事件可以通過該連線對應的fd置為-1解決,但是剛才的特殊情況,是重用了剛釋放的連線並且核心分配的描述符又恰好與原來的連線對應的描述符一樣。造成這種特殊情況有比較坎坷的要求,首先是需要重用同一個連線,其次是該連線對應的描述符要與原來連線對應的描述符相等,將instance位置反,就是專門為了處理這種特殊情況。

static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
    int                events;
    uint32_t           revents;
    ngx_int_t          instance, i;
    ngx_uint_t         level;
    ngx_err_t          err;
    ngx_event_t       *rev, *wev, **queue;
    ngx_connection_t  *c;

    /* NGX_TIMER_INFINITE == INFTIM */

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "epoll timer: %M", timer);
    //若設定了timer_resolution,timer則設定為-1
    //這是為了防止長時間沒有啟用的事件導致阻塞在epoll_wait上
    //進而導致ngx_time_update得不到呼叫,以至時間的精度缺失
    events = epoll_wait(ep, event_list, (int) nevents, timer);

    err = (events == -1) ? ngx_errno : 0;
    //若NGX_UPDATE_TIME置位或者ngx_event_timer_alarm置位,則呼叫ngx_time_update更新時間快取
    //setitimer設定每過timer_resolution毫秒呼叫一次ngx_timer_signal_handler方法,將ngx_event_timer_alarm設定為1
    if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
        ngx_time_update();
    }

    if (err) {
        if (err == NGX_EINTR) {

            if (ngx_event_timer_alarm) {
                ngx_event_timer_alarm = 0;
                return NGX_OK;
            }

            level = NGX_LOG_INFO;

        } else {
            level = NGX_LOG_ALERT;
        }

        ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
        return NGX_ERROR;
    }

    if (events == 0) {
        if (timer != NGX_TIMER_INFINITE) {
            return NGX_OK;
        }

        ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                      "epoll_wait() returned no events without timeout");
        return NGX_ERROR;
    }

    ngx_mutex_lock(ngx_posted_events_mutex);

    //處理就緒的事件
    for (i = 0; i < events; i++) {
        /* 這幾行程式碼解決了過期事件問題
         * ngx_event_t中有個標誌位instance,用它即可判斷事件是否過期
         * 主要是利用了記憶體中地址對齊從而指標指向的變數地址的最後一位永遠為0
         * 這樣我們就可以通過最後一位來儲存原連線的instance
         * 然後通過與該連線的讀事件的instance標誌位相與就可以判斷是否過期
         */

        //獲取到對應的連線
        c = event_list[i].data.ptr;

        //取得instance位
        instance = (uintptr_t) c & 1;
        //還原為正確的連線地址
        c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);

        rev = c->read;

        //判斷是否為過期事件,若是則退出本次的處理
        if (c->fd == -1 || rev->instance != instance) {

            /*
             * the stale event from a file descriptor
             * that was just closed in this iteration
             */

            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "epoll: stale event %p", c);
            continue;
        }

        revents = event_list[i].events;

        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "epoll: fd:%d ev:%04XD d:%p",
                       c->fd, revents, event_list[i].data.ptr);

        if (revents & (EPOLLERR|EPOLLHUP)) {
            ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "epoll_wait() error on fd:%d ev:%04XD",
                           c->fd, revents);
        }

#if 0
        if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                          "strange epoll_wait() events fd:%d ev:%04XD",
                          c->fd, revents);
        }
#endif

        if ((revents & (EPOLLERR|EPOLLHUP))
             && (revents & (EPOLLIN|EPOLLOUT)) == 0)
        {
            /*
             * if the error events were returned without EPOLLIN or EPOLLOUT,
             * then add these flags to handle the events at least in one
             * active handler
             */

            revents |= EPOLLIN|EPOLLOUT;
        }

        //當前事件是讀事件,並且該事件是活躍的
        if ((revents & EPOLLIN) && rev->active) {

            if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) {
                rev->posted_ready = 1;

            } else {
                rev->ready = 1;
            }

            /* 若延遲處理的標識位置位,則將事件新增到相應的佇列中
             * 新連線事件新增到ngx_posted_accept_events佇列中
             * 普通的事件新增到ngx_posted_events佇列中
             * 如果並不需要延時處理,則直接回調該事件的處理方法
             * 至於為什麼需要延遲處理,這是為了配合解決驚群和負載均衡問題,我們放在下一節講
             */
            if (flags & NGX_POST_EVENTS) {
                queue = (ngx_event_t **) (rev->accept ?
                               &ngx_posted_accept_events : &ngx_posted_events);

                ngx_locked_post_event(rev, queue);

            } else {
                //這裡的handler則可能是ngx_event_accept函式,建立新連線
                rev->handler(rev);
            }
        }
        //獲取寫事件
        wev = c->write;

        //當前是寫事件並且該事件為活躍事件
        if ((revents & EPOLLOUT) && wev->active) {

            //判斷事件是否過期
            if (c->fd == -1 || wev->instance != instance) {

                /*
                 * the stale event from a file descriptor
                 * that was just closed in this iteration
                 */

                ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                               "epoll: stale event %p", c);
                continue;
            }

            if (flags & NGX_POST_THREAD_EVENTS) {
                wev->posted_ready = 1;

            } else {
                wev->ready = 1;
            }
            //延遲處理
            if (flags & NGX_POST_EVENTS) {
                //將寫事件新增到ngx_posted_events佇列中
                ngx_locked_post_event(wev, &ngx_posted_events);

            } else {
                //呼叫回撥函式
                wev->handler(wev);
            }
        }
    }
    //解鎖
    ngx_mutex_unlock(ngx_posted_events_mutex);

    return NGX_OK;
}

ngx_epoll_process_events的主要工作就是呼叫epoll_wait,處理啟用的事件,並檢測事件是否過期,過期則不處理,接著判斷該事件是否需要延遲處理,如果需要,則加到相應的佇列中,否則呼叫連線上對應的讀/寫事件的回撥函式進行處理。

小結

本小節主要分析了新連線的處理以及ngx_epoll_process_events函式、過期事件的處理。接下來,我們將看到nginx是如何處理驚群以及負載均衡問題以及整個事件處理的流程。