1. 程式人生 > >Nginx程序分析(worker_process篇)

Nginx程序分析(worker_process篇)

Linux下程序的建立使用fork系統呼叫,fork在哪裡呢?答案是在ngx_spawn_process函式中。我們先不忙著看ngx_spawn_process函式,先看呼叫ngx_spawn_process的ngx_start_worker_processes函式。該函式在主執行緒迴圈中需要建立工作程序的地方被呼叫。

void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type)
{
    ngx_int_t      i, s;
    ngx_channel_t  ch;

    ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes");

    ch.command = NGX_CMD_OPEN_CHANNEL;

    for (i = 0; i < n; i++) {
        cpu_affinity = ngx_get_cpu_affinity(i);

        ngx_spawn_process(cycle, ngx_worker_process_cycle, NULL,
                          "worker process", type);

        ch.pid = ngx_processes[ngx_process_slot].pid;
        ch.slot = ngx_process_slot;
        ch.fd = ngx_processes[ngx_process_slot].channel[0];

        for (s = 0; s < ngx_last_process; s++) {
            if (s == ngx_process_slot
                || ngx_processes[s].pid == -1
                || ngx_processes[s].channel[0] == -1)
            {
                continue;
            }
            ngx_log_debug6(NGX_LOG_DEBUG_CORE, cycle->log, 0,
                          "pass channel s:%d pid:%P fd:%d to s:%i pid:%P fd:%d",
                          ch.slot, ch.pid, ch.fd,
                          s, ngx_processes[s].pid,
                          ngx_processes[s].channel[0]);
            ngx_write_channel(ngx_processes[s].channel[0],
                              &ch, sizeof(ngx_channel_t), cycle->log);
        }
    }
}

ngx_start_worker_process函式還是很簡單的,基本就是呼叫ngx_spawn_process函式建立工作程序,然後通知所有程序。下面我們就具體看看主程序是如何建立工作程序的。

ngx_spawn_process函式首先需要為即將建立的工作程序分配一個程序資訊結構體。如果是重啟某個工作程序,直接使用respawn引數作為工作程序索引,否則需要從ngx_process陣列中找到一個空閒的元素。如果ngx_process陣列全部用光了,就返回錯誤。函式在fork之前會將新程序在ngx_process中的索引儲存在ngx_process_slot中。

ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data,
    char *name, ngx_int_t respawn)
{
    u_long     on;
    ngx_pid_t  pid;
    ngx_int_t  s;

    if (respawn >= 0) {
        s = respawn;
    } else {
        for (s = 0; s < ngx_last_process; s++) {
            if (ngx_processes[s].pid == -1) {
                break;
            }
        }

        if (s == NGX_MAX_PROCESSES) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                          "no more than %d processes can be spawned",
                          NGX_MAX_PROCESSES);
            return NGX_INVALID_PID;
        }
    }

    ...

    ngx_process_slot = s;
如果希望新程序與主程序是分離的(NGX_PROCESS_DETACHED),就只需要將程序資訊中的管道描述符設定為-1即可。否則,就需要建立一對Unix域套接字描述符,並將其設定為非同步非阻塞。
    if (respawn != NGX_PROCESS_DETACHED) {
        if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1)
        {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "socketpair() failed while spawning \"%s\"", name);
            return NGX_INVALID_PID;
        }

        ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,
                       "channel %d:%d",
                       ngx_processes[s].channel[0],
                       ngx_processes[s].channel[1]);
        // 設定channel為非阻塞
        if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          ngx_nonblocking_n " failed while spawning \"%s\"",
                          name);
            ngx_close_channel(ngx_processes[s].channel, cycle->log);
            return NGX_INVALID_PID;
        }

        if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          ngx_nonblocking_n " failed while spawning \"%s\"",
                          name);
            ngx_close_channel(ngx_processes[s].channel, cycle->log);
            return NGX_INVALID_PID;
        }
        // 設定channel[0]為非同步
        on = 1;
        if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "ioctl(FIOASYNC) failed while spawning \"%s\"", name);
            ngx_close_channel(ngx_processes[s].channel, cycle->log);
            return NGX_INVALID_PID;
        }
        // <span style="font-family: Arial, Helvetica, sans-serif;">當channel[0]資料到來時,向主程序傳送SIGIO訊號</span>
        if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "fcntl(F_SETOWN) failed while spawning \"%s\"", name);
            ngx_close_channel(ngx_processes[s].channel, cycle->log);
            return NGX_INVALID_PID;
        }
        // 希望程序呼叫exec執行外部程式的時候自動關閉channel
        if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
                           name);
            ngx_close_channel(ngx_processes[s].channel, cycle->log);
            return NGX_INVALID_PID;
        }

        if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
                           name);
            ngx_close_channel(ngx_processes[s].channel, cycle->log);
            return NGX_INVALID_PID;
        }

        ngx_channel = ngx_processes[s].channel[1];

    } else {
        ngx_processes[s].channel[0] = -1;
        ngx_processes[s].channel[1] = -1;
    }
準備好channel後就可以呼叫fork建立程序了,這段程式碼Linux程式設計師肯定很熟悉了:
    pid = fork();

    switch (pid) {

    case -1:
        ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                      "fork() failed while spawning \"%s\"", name);
        ngx_close_channel(ngx_processes[s].channel, cycle->log);
        return NGX_INVALID_PID;

    case 0:
        ngx_pid = ngx_getpid();
        proc(cycle, data);
        break;

    default:
        break;
    }

    ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid);

    ngx_processes[s].pid = pid;
    ngx_processes[s].exited = 0;

    if (respawn >= 0) {
        return pid;
    }
工作程序在這裡呼叫proc進入程序迴圈,而主程序則將新程序的pid儲存到程序資訊中,對於重新啟動的程序,到這裡就結束了。接下來,也是最後一部分是填充程序結構體中的其他一部分資訊,對於重啟的程序這些資訊是保留著的。這部分是由主程序負責完成的,並不是本文討論的重點。
    ngx_processes[s].proc = proc;
    ngx_processes[s].data = data;
    ngx_processes[s].name = name;
    ngx_processes[s].exiting = 0;

    switch (respawn) {
    case NGX_PROCESS_RESPAWN:
        ngx_processes[s].respawn = 1;
        ngx_processes[s].just_respawn = 0;
        ngx_processes[s].detached = 0;
        break;
    case NGX_PROCESS_JUST_RESPAWN:
        ngx_processes[s].respawn = 1;
        ngx_processes[s].just_respawn = 1;
        ngx_processes[s].detached = 0;
        break;
    case NGX_PROCESS_DETACHED:
        ngx_processes[s].respawn = 0;
        ngx_processes[s].just_respawn = 0;
        ngx_processes[s].detached = 1;
        break;
    }
    // 到這裡才算真正在ngx_processes陣列中增加了新程序對應的元素
    if (s == ngx_last_process) {
        ngx_last_process++;
    }

    return pid;
}
講述了工作程序建立的過程之後,就可以看看工作程序的迴圈函數了。Nginx可以支援多執行緒,但本文不關心多執行緒的情況,下面程式碼中的省略號部分為支援多執行緒的程式碼,可以忽略而不影響我們理解。
static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
    ngx_uint_t         i;
    ngx_connection_t  *c;
    ...

    ngx_worker_process_init(cycle, 1);

    ngx_setproctitle("worker process");

    ...

    for ( ;; ) {

        if (ngx_exiting) {

            c = cycle->connections;

            for (i = 0; i < cycle->connection_n; i++) {

                /* THREAD: lock */

                if (c[i].fd != -1 && c[i].idle) {
                    c[i].close = 1;
                    c[i].read->handler(c[i].read);
                }
            }

            if (ngx_event_timer_rbtree.root == ngx_event_timer_rbtree.sentinel)
            {
                ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");

                ngx_worker_process_exit(cycle);
            }
        }

        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");

        ngx_process_events_and_timers(cycle);

        if (ngx_terminate) {
            ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");

            ngx_worker_process_exit(cycle);
        }

        if (ngx_quit) {
            ngx_quit = 0;
            ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
                          "gracefully shutting down");
            ngx_setproctitle("worker process is shutting down");

            if (!ngx_exiting) {
                ngx_close_listening_sockets(cycle);
                ngx_exiting = 1;
            }
        }

        if (ngx_reopen) {
            ngx_reopen = 0;
            ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");
            ngx_reopen_files(cycle, -1);
        }
    }
}

ngx_worker_process_cycle的主迴圈中最核心的是事件處理函式ngx_process_events_and_timers。

void ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
    ngx_uint_t  flags;
    ngx_msec_t  timer, delta;
    // timer是事件等待的超時值,如果設定了定時器解析度,則定時器會週期觸發事件,
    // 程序將不斷地被喚醒處理事件,因此不必設定超時值,也不需要設定NGX_TIMER_INFINITE。
    if (ngx_timer_resolution) {
        timer = NGX_TIMER_INFINITE;
        flags = 0;
    } else {
        timer = ngx_event_find_timer();
        flags = NGX_UPDATE_TIME;
    }
    // 以下語句塊處理“驚群”
    if (ngx_use_accept_mutex) {
        if (ngx_accept_disabled > 0) {
            // 工作程序處於過載,僅僅將ngx_accept_disabled減一,不請求鎖,從而避免過載時引發驚群效應
            ngx_accept_disabled--;
        } else {
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                return;
            }

            if (ngx_accept_mutex_held) {
                flags |= NGX_POST_EVENTS;
            } else {
                if (timer == NGX_TIMER_INFINITE
                    || timer > ngx_accept_mutex_delay)
                {
                    timer = ngx_accept_mutex_delay;
                }
            }
        }
    }

    delta = ngx_current_msec;

    (void) ngx_process_events(cycle, timer, flags);

    delta = ngx_current_msec - delta;

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "timer delta: %M", delta);

    if (ngx_posted_accept_events) {
        ngx_event_process_posted(cycle, &ngx_posted_accept_events);
    }

    if (ngx_accept_mutex_held) {
        ngx_shmtx_unlock(&ngx_accept_mutex);
    }

    if (delta) {
        ngx_event_expire_timers();
    }

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "posted events %p", ngx_posted_events);

    if (ngx_posted_events) {
        if (ngx_threaded) {
            ngx_wakeup_worker_thread(cycle);
        } else {
            ngx_event_process_posted(cycle, &ngx_posted_events);
        }
    }
}