nginx 原始碼學習筆記(二十一)—— event 模組(二) ——事件驅動核心ngx_process_events_and_timers
阿新 • • 發佈:2018-11-11
首先繼續回憶下,之前子執行緒執行操作裡面有一個未涉及的內容ngx_process_events_and_timers,今天我們就來研究下這個函式。
本篇文章來自於:http://blog.csdn.net/lengzijian/article/details/7601730
先來看一下第十九節的部分截圖:
今天主要講解的就是事件驅動函式,圖中的紅色部分:
[cpp] view plaincopyprint?
- src/event/ngx_event.c
- void
- ngx_process_events_and_timers(ngx_cycle_t *cycle)
- {
- ngx_uint_t flags;
- ngx_msec_t timer, delta;
- if (ngx_timer_resolution) {
- timer = NGX_TIMER_INFINITE;
- flags = 0;
- } else {
- timer = ngx_event_find_timer();
- flags = NGX_UPDATE_TIME;
- }
- /*
- ngx_use_accept_mutex變數代表是否使用accept互斥體
- 預設是使用,可以通過accept_mutex off;指令關閉;
- accept mutex 的作用就是避免驚群,同時實現負載均衡
- */
- if (ngx_use_accept_mutex) {
- /*
- ngx_accept_disabled變數在ngx_event_accept函式中計算。
- 如果ngx_accept_disabled大於0,就表示該程序接受的連結過多,
- 因此放棄一次爭搶accept mutex的機會,同時將自己減一。
- 然後,繼續處理已有連線上的事件。
- nginx就利用這一點實現了繼承關於連線的基本負載均衡。
- */
- if (ngx_accept_disabled > 0) {
- ngx_accept_disabled--;
- } else {
- /*
- 嘗試鎖accept mutex,只有成功獲取鎖的程序,才會將listen套接字放到epoll中。
- 因此,這就保證了只有一個程序擁有監聽套介面,故所有程序阻塞在epoll_wait時,
- 才不會驚群現象。
- */
- if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
- return;
- }
- if (ngx_accept_mutex_held) {
- /*
- 如果程序獲得了鎖,將新增一個 NGX_POST_EVENTS 標誌。
- 這個標誌的作用是將所有產生的事件放入一個佇列中,等釋放後,在慢慢來處理事件。
- 因為,處理時間可能會很耗時,如果不先施放鎖再處理的話,該程序就長時間霸佔了鎖,
- 導致其他程序無法獲取鎖,這樣accept的效率就低了。
- */
- flags |= NGX_POST_EVENTS;
- } else {
- /*
- 沒有獲得所得程序,當然不需要NGX_POST_EVENTS標誌。
- 但需要設定延時多長時間,再去爭搶鎖。
- */
- if (timer == NGX_TIMER_INFINITE
- || timer > ngx_accept_mutex_delay)
- {
- timer = ngx_accept_mutex_delay;
- }
- }
- }
- }
- delta = ngx_current_msec;
- /*接下來,epoll要開始wait事件,
- ngx_process_events的具體實現是對應到epoll模組中的ngx_epoll_process_events函式
- 這裡之後會詳細講解的哦
- */
- (void) ngx_process_events(cycle, timer, flags);
- //統計本次wait事件的耗時
- delta = ngx_current_msec - delta;
- ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
- "timer delta: %M", delta);
- /*
- ngx_posted_accept_events是一個事件佇列,暫存epoll從監聽套介面wait到的accept事件。
- 前文提到的NGX_POST_EVENTS標誌被使用後,會將所有的accept事件暫存到這個佇列
- */
- if (ngx_posted_accept_events) {
- ngx_event_process_posted(cycle, &ngx_posted_accept_events);
- }
- //所有accept事件處理完之後,如果持有鎖的話,就釋放掉。
- if (ngx_accept_mutex_held) {
- ngx_shmtx_unlock(&ngx_accept_mutex);
- }
- /*
- delta是之前統計的耗時,存在毫秒級的耗時,就對所有時間的timer進行檢查,
- 如果timeout 就從time rbtree中刪除到期的timer,同時呼叫相應事件的handler函式處理
- */
- if (delta) {
- ngx_event_expire_timers();
- }
- ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
- "posted events %p", ngx_posted_events);
- /*
- 處理普通事件(連線上獲得的讀寫事件),
- 因為每個事件都有自己的handler方法,
- */
- if (ngx_posted_events) {
- if (ngx_threaded) {
- ngx_wakeup_worker_thread(cycle);
- } else {
- ngx_event_process_posted(cycle, &ngx_posted_events);
- }
- }
- }
之前有說過accept事件,其實他就是監聽套介面上是否有新來的事件,下面介紹下accept時間的handler方法:
ngx_event_accept:
[cpp] view plaincopyprint?
- src/event/ngx_event_accept.c
- void
- ngx_event_accept(ngx_event_t *ev)
- {
- socklen_t socklen;
- ngx_err_t err;
- ngx_log_t *log;
- ngx_socket_t s;
- ngx_event_t *rev, *wev;
- ngx_listening_t *ls;
- ngx_connection_t *c, *lc;
- ngx_event_conf_t *ecf;
- u_char sa[NGX_SOCKADDRLEN];
- //省略部分程式碼
- lc = ev->data;
- ls = lc->listening;
- ev->ready = 0;
- ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
- "accept on %V, ready: %d", &ls->addr_text, ev->available);
- do {
- socklen = NGX_SOCKADDRLEN;
- //accept一個新的連線
- s = accept(lc->fd, (struct sockaddr *) sa, &socklen);
- //省略部分程式碼
- /*
- accept到一個新的連線後,就重新計算ngx_accept_disabled的值,
- 它主要是用來做負載均衡,之前有提過。
- 這裡,我們可以看到他的就只方式
- “總連線數的八分之一 - 剩餘的連線數“
- 總連線指每個程序設定的最大連線數,這個數字可以再配置檔案中指定。
- 所以每個程序到總連線數的7/8後,ngx_accept_disabled就大於零,連線超載了
- */
- ngx_accept_disabled = ngx_cycle->connection_n / 8
- - ngx_cycle->free_connection_n;
- //獲取一個connection
- c = ngx_get_connection(s, ev->log);
- //為新的連結建立起一個memory pool
- //連線關閉的時候,才釋放pool
- c->pool = ngx_create_pool(ls->pool_size, ev->log);
- if (c->pool == NULL) {
- ngx_close_accepted_connection(c);
- return;
- }
- c->sockaddr = ngx_palloc(c->pool, socklen);
- if (c->sockaddr == NULL) {
- ngx_close_accepted_connection(c);
- return;
- }
- ngx_memcpy(c->sockaddr, sa, socklen);
- log = ngx_palloc(c->pool, sizeof(ngx_log_t));
- if (log == NULL) {
- ngx_close_accepted_connection(c);
- return;
- }
- /* set a blocking mode for aio and non-blocking mode for others */
- if (ngx_inherited_nonblocking) {
- if (ngx_event_flags & NGX_USE_AIO_EVENT) {
- if (ngx_blocking(s) == -1) {
- ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
- ngx_blocking_n " failed");
- ngx_close_accepted_connection(c);
- return;
- }
- }
- } else {
- //我們使用epoll模型,這裡我們設定連線為nonblocking
- if (!(ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT))) {
- if (ngx_nonblocking(s) == -1) {
- ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
- ngx_nonblocking_n " failed");
- ngx_close_accepted_connection(c);
- return;
- }
- }
- }
- *log = ls->log;
- //初始化新的連線
- c->recv = ngx_recv;
- c->send = ngx_send;
- c->recv_chain = ngx_recv_chain;
- c->send_chain = ngx_send_chain;
- c->log = log;
- c->pool->log = log;
- c->socklen = socklen;
- c->listening = ls;
- c->local_sockaddr = ls->sockaddr;
- c->unexpected_eof = 1;
- #if (NGX_HAVE_UNIX_DOMAIN)
- if (c->sockaddr->sa_family == AF_UNIX) {
- c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;
- c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;
- #if (NGX_SOLARIS)
- /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */
- c->sendfile = 0;
- #endif
- }
- #endif
- rev = c->read;
- wev = c->write;
- wev->ready = 1;
- if (ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT)) {
- /* rtsig, aio, iocp */
- rev->ready = 1;
- }
- if (ev->deferred_accept) {
- rev->ready = 1;
- #if (NGX_HAVE_KQUEUE)
- rev->available = 1;
- #endif
- }
- rev->log = log;
- wev->log = log;
- /*
- * TODO: MT: - ngx_atomic_fetch_add()
- * or protection by critical section or light mutex
- *
- * TODO: MP: - allocated in a shared memory
- * - ngx_atomic_fetch_add()
- * or protection by critical section or light mutex
- */
- c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
- if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {
- if (ngx_add_conn(c) == NGX_ERROR) {
- ngx_close_accepted_connection(c);
- return;
- }
- }
- log->data = NULL;
- log->handler = NULL;
- /*
- 這裡listen handler很重要,它將完成新連線的最後初始化工作,
- 同時將accept到的新的連線放入epoll中;掛在這個handler上的函式,
- 就是ngx_http_init_connection 在之後http模組中在詳細介紹
- */
- ls->handler(c);
- if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
- ev->available--;
- }
- } while (ev->available);
- }
accpt事件的handler方法也就是如此了。之後就是每個連線的讀寫事件handler方法,這一部分會直接將我們引入http模組,我們還不急,還要學習下nginx經典模組epoll。