1. 程式人生 > >Nginx 中 upstream 機制的實現

Nginx 中 upstream 機制的實現

概述

        upstream 機制使得 Nginx 成為一個反向代理伺服器,Nginx 接收來自下游客戶端的 http 請求,並處理該請求,同時根據該請求向上遊伺服器傳送 tcp 請求報文,上游伺服器會根據該請求返回相應地響應報文,Nginx 根據上游伺服器的響應報文,決定是否向下遊客戶端轉發響應報文。另外 upstream 機制提供了負載均衡的功能,可以將請求負載均衡到叢集伺服器的某個伺服器上面。

啟動 upstream

        在 Nginx 中呼叫 ngx_http_upstream_init 方法啟動 upstream 機制,但是在使用 upstream 機制之前必須呼叫 ngx_http_upstream_create 方法建立 ngx_http_upstream_t 結構體,因為預設情況下 ngx_http_request_t 結構體中的 upstream 成員是指向 NULL,該結構體的具體初始化工作還需由 HTTP 模組完成。有關 ngx_http_upstream_t 結構體 和ngx_http_upstream_conf_t 結構體的相關說明可參考文章《

Nginx 中 upstream 機制》。

        下面是函式 ngx_http_upstream_create 的實現:

/* 建立 ngx_http_upstream_t 結構體 */
ngx_int_t
ngx_http_upstream_create(ngx_http_request_t *r)
{
    ngx_http_upstream_t  *u;

    u = r->upstream;

    /*
     * 若已經建立過ngx_http_upstream_t 且定義了cleanup成員,
     * 則呼叫cleanup清理方法將原始結構體清除;
     */
    if (u && u->cleanup) {
        r->main->count++;
        ngx_http_upstream_cleanup(r);
    }

    /* 從記憶體池分配ngx_http_upstream_t 結構體空間 */
    u = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_t));
    if (u == NULL) {
        return NGX_ERROR;
    }

    /* 給ngx_http_request_t 結構體成員upstream賦值 */
    r->upstream = u;

    u->peer.log = r->connection->log;
    u->peer.log_error = NGX_ERROR_ERR;
#if (NGX_THREADS)
    u->peer.lock = &r->connection->lock;
#endif

#if (NGX_HTTP_CACHE)
    r->cache = NULL;
#endif

    u->headers_in.content_length_n = -1;

    return NGX_OK;
}

關於 upstream 機制的啟動方法 ngx_http_upstream_init 的執行流程如下:

  • 檢查 Nginx 與下游伺服器之間連線上的讀事件是否在定時器中,即檢查 timer_set 標誌位是否為 1,若該標誌位為 1,則把讀事件從定時器中移除;
  • 呼叫 ngx_http_upstream_init_request 方法啟動 upstream 機制;

ngx_http_upstream_init_request 方法執行流程如下所示:

  • 檢查 ngx_http_upstream_t 結構體中的 store 標誌位是否為 0;檢查 ngx_http_request_t 結構體中的 post_action 標誌位是否為0;檢查 ngx_http_upstream_conf_t 結構體中的ignore_client_abort 是否為 0;若上面的標誌位都為 0,則設定ngx_http_request_t 請求的讀事件的回撥方法為ngx_http_upstream_rd_check_broken_connection;設定寫事件的回撥方法為 ngx_http_upstream_wr_check_broken_connection;這兩個方法都會呼叫 ngx_http_upstream_check_broken_connection方法檢查 Nginx 與下游之間的連線是否正常,若出現錯誤,則終止連線;
  • 若不滿足上面的標誌位,即至少有一個不為 0 ,呼叫請求中ngx_http_upstream_t 結構體中某個 HTTP 模組實現的create_request 方法,構造發往上游伺服器的請求;
  • 呼叫 ngx_http_cleanup_add 方法向原始請求的 cleanup 連結串列尾端新增一個回撥 handler 方法,該回調方法設定為ngx_http_upstream_cleanup,若當前請求結束時會呼叫該方法做一些清理工作;
  • 呼叫 ngx_http_upstream_connect 方法向上遊伺服器發起連線請求;
/* 初始化啟動upstream機制 */
void
ngx_http_upstream_init(ngx_http_request_t *r)
{
    ngx_connection_t     *c;

    /* 獲取當前請求所對應的連線 */
    c = r->connection;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http init upstream, client timer: %d", c->read->timer_set);

#if (NGX_HTTP_SPDY)
    if (r->spdy_stream) {
        ngx_http_upstream_init_request(r);
        return;
    }
#endif

    /*
     * 檢查當前連線上讀事件的timer_set標誌位是否為1,若該標誌位為1,
     * 表示讀事件在定時器機制中,則需要把它從定時器機制中移除;
     * 因為在啟動upstream機制後,就不需要對客戶端的讀操作進行超時管理;
     */
    if (c->read->timer_set) {
        ngx_del_timer(c->read);
    }

    if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {

        if (!c->write->active) {
            if (ngx_add_event(c->write, NGX_WRITE_EVENT, NGX_CLEAR_EVENT)
                == NGX_ERROR)
            {
                ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }
        }
    }

    ngx_http_upstream_init_request(r);
}
static void
ngx_http_upstream_init_request(ngx_http_request_t *r)
{
    ngx_str_t                      *host;
    ngx_uint_t                      i;
    ngx_resolver_ctx_t             *ctx, temp;
    ngx_http_cleanup_t             *cln;
    ngx_http_upstream_t            *u;
    ngx_http_core_loc_conf_t       *clcf;
    ngx_http_upstream_srv_conf_t   *uscf, **uscfp;
    ngx_http_upstream_main_conf_t  *umcf;

    if (r->aio) {
        return;
    }

    u = r->upstream;

#if (NGX_HTTP_CACHE)
    ...
    ...
#endif

    /* 檔案快取標誌位 */
    u->store = (u->conf->store || u->conf->store_lengths);

    /*
     * 檢查ngx_http_upstream_t 結構中標誌位 store;
     * 檢查ngx_http_request_t 結構中標誌位 post_action;
     * 檢查ngx_http_upstream_conf_t 結構中標誌位 ignore_client_abort;
     * 若上面這些標誌位為1,則表示需要檢查Nginx與下游(即客戶端)之間的TCP連線是否斷開;
     */
    if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
        r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;
        r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;
    }

    /* 把當前請求包體結構儲存在ngx_http_upstream_t 結構的request_bufs連結串列緩衝區中 */
    if (r->request_body) {
        u->request_bufs = r->request_body->bufs;
    }

    /* 呼叫create_request方法構造發往上游伺服器的請求 */
    if (u->create_request(r) != NGX_OK) {
        ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    /* 獲取ngx_http_upstream_t結構中主動連線結構peer的local本地地址資訊 */
    u->peer.local = ngx_http_upstream_get_local(r, u->conf->local);

    /* 獲取ngx_http_core_module模組的loc級別的配置項結構 */
    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

    /* 初始化ngx_http_upstream_t結構中成員output向下遊傳送響應的方式 */
    u->output.alignment = clcf->directio_alignment;
    u->output.pool = r->pool;
    u->output.bufs.num = 1;
    u->output.bufs.size = clcf->client_body_buffer_size;
    u->output.output_filter = ngx_chain_writer;
    u->output.filter_ctx = &u->writer;

    u->writer.pool = r->pool;

    /* 新增用於表示上游響應的狀態,例如:錯誤編碼、包體長度等 */
    if (r->upstream_states == NULL) {

        r->upstream_states = ngx_array_create(r->pool, 1,
                                            sizeof(ngx_http_upstream_state_t));
        if (r->upstream_states == NULL) {
            ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

    } else {

        u->state = ngx_array_push(r->upstream_states);
        if (u->state == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
    }

    /*
     * 呼叫ngx_http_cleanup_add方法原始請求的cleanup連結串列尾端新增一個回撥handler方法,
     * 該handler回撥方法設定為ngx_http_upstream_cleanup,若當前請求結束時會呼叫該方法做一些清理工作;
     */
    cln = ngx_http_cleanup_add(r, 0);
    if (cln == NULL) {
        ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    cln->handler = ngx_http_upstream_cleanup;
    cln->data = r;
    u->cleanup = &cln->handler;

    if (u->resolved == NULL) {

        /* 若沒有實現u->resolved標誌位,則定義上游伺服器的配置 */
        uscf = u->conf->upstream;

    } else {

        /*
         * 若實現了u->resolved標誌位,則解析主機域名,指定上游伺服器的地址;
         */


        /*
         * 若已經指定了上游伺服器地址,則不需要解析,
         * 直接呼叫ngx_http_upstream_connection方法向上遊伺服器發起連線;
         * 並return從當前函式返回;
         */
        if (u->resolved->sockaddr) {

            if (ngx_http_upstream_create_round_robin_peer(r, u->resolved)
                != NGX_OK)
            {
                ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }

            ngx_http_upstream_connect(r, u);

            return;
        }

        /*
         * 若還沒指定上游伺服器的地址,則需解析主機域名;
         * 若成功解析出上游伺服器的地址和埠號,
         * 則呼叫ngx_http_upstream_connection方法向上遊伺服器發起連線;
         */
        host = &u->resolved->host;

        umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);

        uscfp = umcf->upstreams.elts;

        for (i = 0; i < umcf->upstreams.nelts; i++) {

            uscf = uscfp[i];

            if (uscf->host.len == host->len
                && ((uscf->port == 0 && u->resolved->no_port)
                     || uscf->port == u->resolved->port)
                && ngx_strncasecmp(uscf->host.data, host->data, host->len) == 0)
            {
                goto found;
            }
        }

        if (u->resolved->port == 0) {
            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                          "no port in upstream \"%V\"", host);
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        temp.name = *host;

        ctx = ngx_resolve_start(clcf->resolver, &temp);
        if (ctx == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        if (ctx == NGX_NO_RESOLVER) {
            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                          "no resolver defined to resolve %V", host);

            ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY);
            return;
        }

        ctx->name = *host;
        ctx->handler = ngx_http_upstream_resolve_handler;
        ctx->data = r;
        ctx->timeout = clcf->resolver_timeout;

        u->resolved->ctx = ctx;

        if (ngx_resolve_name(ctx) != NGX_OK) {
            u->resolved->ctx = NULL;
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        return;
    }

found:

    if (uscf == NULL) {
        ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
                      "no upstream configuration");
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    if (uscf->peer.init(r, uscf) != NGX_OK) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    ngx_http_upstream_connect(r, u);
}

static void
ngx_http_upstream_rd_check_broken_connection(ngx_http_request_t *r)
{
    ngx_http_upstream_check_broken_connection(r, r->connection->read);
}


static void
ngx_http_upstream_wr_check_broken_connection(ngx_http_request_t *r)
{
    ngx_http_upstream_check_broken_connection(r, r->connection->write);
}

建立連線

        upstream 機制與上游伺服器建立 TCP 連線時,採用的是非阻塞模式的套接字,即發起連線請求之後立即返回,不管連線是否建立成功,若沒有立即建立成功,則需在 epoll 事件機制中監聽該套接字。向上遊伺服器發起連線請求由函式ngx_http_upstream_connect 實現。在分析 ngx_http_upstream_connect 方法之前,首先分析下 ngx_event_connect_peer 方法,因為該方法會被ngx_http_upstream_connect 方法呼叫。

ngx_event_connect_peer 方法的執行流程如下所示:

  • 呼叫 ngx_socket 方法建立一個 TCP 套接字;
  • 呼叫 ngx_nonblocking 方法設定該 TCP 套接字為非阻塞模式;
  • 設定套接字連線接收和傳送網路字元流的方法;
  • 設定套接字連線上讀、寫事件方法;
  • 將 TCP 套接字以期待 EPOLLIN | EPOLLOUT 事件的方式新增到epoll 事件機制中;
  • 呼叫 connect 方法向伺服器發起 TCP 連線請求;

ngx_http_upstream_connect 方法表示向上遊伺服器發起連線請求,其執行流程如下所示:

  • 呼叫 ngx_event_connect_peer 方法主動向上游伺服器發起連線請求,需要注意的是該方法已經將相應的套接字註冊到epoll事件機制來監聽讀、寫事件,該方法返回值為 rc;
    • 若 rc = NGX_ERROR,表示發起連線失敗,則呼叫ngx_http_upstream_finalize_request 方法關閉連線請求,並 return 從當前函式返回;
    • 若 rc = NGX_BUSY,表示當前上游伺服器處於不活躍狀態,則呼叫 ngx_http_upstream_next 方法根據傳入的引數嘗試重新發起連線請求,並 return 從當前函式返回;
    • 若 rc = NGX_DECLINED,表示當前上游伺服器負載過重,則呼叫 ngx_http_upstream_next 方法嘗試與其他上游伺服器建立連線,並 return 從當前函式返回;
    • 設定上游連線 ngx_connection_t 結構體的讀事件、寫事件的回撥方法 handler 都為 ngx_http_upstream_handler,設定 ngx_http_upstream_t 結構體的寫事件 write_event_handler 的回撥為 ngx_http_upstream_send_request_handler,讀事件 read_event_handler 的回撥方法為 ngx_http_upstream_process_header;
    • 若 rc = NGX_AGAIN,表示當前已經發起連線,但是沒有收到上游伺服器的確認應答報文,即上游連線的寫事件不可寫,則需呼叫 ngx_add_timer 方法將上游連線的寫事件新增到定時器中,管理超時確認應答;
    • 若 rc = NGX_OK,表示成功建立連線,則呼叫 ngx_http_upsream_send_request 方法向上遊伺服器傳送請求;
/* 向上遊伺服器建立連線 */
static void
ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ngx_int_t          rc;
    ngx_time_t        *tp;
    ngx_connection_t  *c;

    r->connection->log->action = "connecting to upstream";

    if (u->state && u->state->response_sec) {
        tp = ngx_timeofday();
        u->state->response_sec = tp->sec - u->state->response_sec;
        u->state->response_msec = tp->msec - u->state->response_msec;
    }

    u->state = ngx_array_push(r->upstream_states);
    if (u->state == NULL) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));

    tp = ngx_timeofday();
    u->state->response_sec = tp->sec;
    u->state->response_msec = tp->msec;

    /* 向上遊伺服器發起連線 */
    rc = ngx_event_connect_peer(&u->peer);

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "http upstream connect: %i", rc);

    /* 下面根據rc不同返回值進行分析 */

    /* 若建立連線失敗,則關閉當前請求,並return從當前函式返回 */
    if (rc == NGX_ERROR) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    u->state->peer = u->peer.name;

    /*
     * 若返回rc = NGX_BUSY,表示當前上游伺服器不活躍,
     * 則呼叫ngx_http_upstream_next向上遊伺服器重新發起連線,
     * 實際上,該方法最終還是呼叫ngx_http_upstream_connect方法;
     * 並return從當前函式返回;
     */
    if (rc == NGX_BUSY) {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams");
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE);
        return;
    }

    /*
     * 若返回rc = NGX_DECLINED,表示當前上游伺服器負載過重,
     * 則呼叫ngx_http_upstream_next向上遊伺服器重新發起連線,
     * 實際上,該方法最終還是呼叫ngx_http_upstream_connect方法;
     * 並return從當前函式返回;
     */
    if (rc == NGX_DECLINED) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

    /* rc == NGX_OK || rc == NGX_AGAIN || rc == NGX_DONE */

    c = u->peer.connection;

    c->data = r;

    /* 設定當前連線ngx_connection_t 上讀、寫事件的回撥方法 */
    c->write->handler = ngx_http_upstream_handler;
    c->read->handler = ngx_http_upstream_handler;

    /* 設定upstream機制的讀、寫事件的回撥方法 */
    u->write_event_handler = ngx_http_upstream_send_request_handler;
    u->read_event_handler = ngx_http_upstream_process_header;

    c->sendfile &= r->connection->sendfile;
    u->output.sendfile = c->sendfile;

    if (c->pool == NULL) {

        /* we need separate pool here to be able to cache SSL connections */

        c->pool = ngx_create_pool(128, r->connection->log);
        if (c->pool == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
    }

    c->log = r->connection->log;
    c->pool->log = c->log;
    c->read->log = c->log;
    c->write->log = c->log;

    /* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */

    u->writer.out = NULL;
    u->writer.last = &u->writer.out;
    u->writer.connection = c;
    u->writer.limit = 0;

    /*
     * 檢查當前ngx_http_upstream_t 結構的request_sent標誌位,
     * 若該標誌位為1,則表示已經向上遊伺服器傳送請求,即本次發起連線失敗;
     * 則呼叫ngx_http_upstream_reinit方法重新向上遊伺服器發起連線;
     */
    if (u->request_sent) {
        if (ngx_http_upstream_reinit(r, u) != NGX_OK) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
    }

    if (r->request_body
        && r->request_body->buf
        && r->request_body->temp_file
        && r == r->main)
    {
        /*
         * the r->request_body->buf can be reused for one request only,
         * the subrequests should allocate their own temporary bufs
         */

        u->output.free = ngx_alloc_chain_link(r->pool);
        if (u->output.free == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        u->output.free->buf = r->request_body->buf;
        u->output.free->next = NULL;
        u->output.allocated = 1;

        r->request_body->buf->pos = r->request_body->buf->start;
        r->request_body->buf->last = r->request_body->buf->start;
        r->request_body->buf->tag = u->output.tag;
    }

    u->request_sent = 0;

    /*
     * 若返回rc = NGX_AGAIN,表示沒有收到上游伺服器允許建立連線的應答;
     * 由於寫事件已經新增到epoll事件機制中等待可寫事件發生,
     * 所有在這裡只需將當前連線的寫事件新增到定時器機制中進行超時管理;
     * 並return從當前函式返回;
     */
    if (rc == NGX_AGAIN) {
        ngx_add_timer(c->write, u->conf->connect_timeout);
        return;
    }

#if (NGX_HTTP_SSL)

    if (u->ssl && c->ssl == NULL) {
        ngx_http_upstream_ssl_init_connection(r, u, c);
        return;
    }

#endif

    /*
     * 若返回值rc = NGX_OK,表示連線成功建立,
     * 呼叫此方法向上遊伺服器傳送請求 */
    ngx_http_upstream_send_request(r, u);
}

傳送請求

        當 Nginx 與上游伺服器成功建立連線之後,會呼叫 ngx_http_upstream_send_request 方法傳送請求,若是該方法不能一次性把請求內容傳送完成時,則需等待 epoll 事件機制的寫事件發生,若寫事件發生,則會呼叫寫事件 write_event_handler 的回撥方法 ngx_http_upstream_send_request_handler 繼續傳送請求,並且有可能會多次呼叫該寫事件的回撥方法, 直到把請求傳送完成。

下面是 ngx_http_upstream_send_request 方法的執行流程:

  • 檢查 ngx_http_upstream_t 結構體中的標誌位 request_sent 是否為 0,若為 0 表示未向上遊傳送請求。 且此時呼叫 ngx_http_upstream_test_connect 方法測試是否與上游建立連線,若返回非 NGX_OK, 則需呼叫 ngx_http_upstream_next 方法試圖與上游建立連線,並return 從當前函式返回;
  • 呼叫 ngx_output_chain 方法向上遊傳送儲存在 request_bufs 連結串列中的請求資料,該方法返回值為 rc,並設定 request_sent 標誌位為 1,檢查連線上寫事件 timer_set 標誌位是否為1,若為 1 呼叫ngx_del_timer 方法將寫事件從定時器中移除;
  • 若 rc = NGX_ERROR,表示當前連線上出錯,則呼叫 ngx_http_upstream_next 方法嘗試再次與上游建立連線,並 return 從當前函式返回;
  • 若 rc = NGX_AGAIN,並是當前請求資料未完全傳送,則需將剩餘的請求資料儲存在 ngx_http_upstream_t 結構體的 output 成員中,並且呼叫 ngx_add_timer 方法將當前連線上的寫事件新增到定時器中,呼叫 ngx_handle_write_event 方法將寫事件註冊到 epoll 事件機制中,等待可寫事件發生,並return 從當前函式返回;
  • 若 rc = NGX_OK,表示已經發送全部請求資料,則準備接收來自上游伺服器的響應報文;
  • 先呼叫 ngx_add_timer 方法將當前連線的讀事件新增到定時器機制中,檢測接收響應是否超時,檢查當前連線上的讀事件是否準備就緒,即標誌位 ready 是否為1,若該標誌位為 1,則呼叫 ngx_http_upstream_process_header 方法開始處理響應頭部,並 return 從當前函式返回;
  • 若當前連線上讀事件的標誌位 ready 為0,表示暫時無可讀資料,則需等待讀事件再次被觸發,由於原始讀事件的回撥方法為 ngx_http_upstream_process_header,所有無需重新設定。由於請求已經全部發送,防止寫事件的回撥方法 ngx_http_upstream_send_request_handler 再次被觸發,因此需要重新設定寫事件的回撥方法為 ngx_http_upstream_dummy_handler,該方法實際上不執行任何操作,同時呼叫 ngx_handle_write_event 方法將寫事件註冊到 epoll 事件機制中;
/* 向上遊伺服器傳送請求 */
static void
ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ngx_int_t          rc;
    ngx_connection_t  *c;

    /* 獲取當前連線 */
    c = u->peer.connection;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http upstream send request");

    /*
     * 若標誌位request_sent為0,表示還未傳送請求;
     * 且ngx_http_upstream_test_connect方法返回非NGX_OK,標誌當前還未與上游伺服器成功建立連線;
     * 則需要呼叫ngx_http_upstream_next方法嘗試與下一個上游伺服器建立連線;
     * 並return從當前函式返回;
     */
    if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

    c->log->action = "sending request to upstream";

    /*
     * 呼叫ngx_output_chain方法向上遊傳送儲存在request_bufs連結串列中的請求資料;
     * 值得注意的是該方法的第二個引數可以是NULL也可以是request_bufs,那怎麼來區分呢?
     * 若是第一次呼叫該方法傳送request_bufs連結串列中的請求資料時,request_sent標誌位為0,
     * 此時,第二個引數自然就是request_bufs了,那麼為什麼會有NULL作為引數的情況呢?
     * 當在第一次呼叫該方法時,並不能一次性把所有request_bufs中的資料傳送完畢時,
     * 此時,會把剩餘的資料儲存在output結構裡面,並把標誌位request_sent設定為1,
     * 因此,再次傳送請求資料時,不用指定request_bufs引數,因為此時剩餘資料已經儲存在output中;
     */
    rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs);

    /* 向上遊伺服器傳送請求之後,把request_sent標誌位設定為1 */
    u->request_sent = 1;

    /* 下面根據不同rc的返回值進行判斷 */

    /*
     * 若返回值rc=NGX_ERROR,表示當前連線上出錯,
     * 將錯誤資訊傳遞給ngx_http_upstream_next方法,
     * 該方法根據錯誤資訊決定是否重新向上遊伺服器發起連線;
     * 並return從當前函式返回;
     */
    if (rc == NGX_ERROR) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

    /*
     * 檢查當前連線上寫事件的標誌位timer_set是否為1,
     * 若該標誌位為1,則需把寫事件從定時器機制中移除;
     */
    if (c->write->timer_set) {
        ngx_del_timer(c->write);
    }

    /*
     * 若返回值rc = NGX_AGAIN,表示請求資料並未完全傳送,
     * 即有剩餘的請求資料儲存在output中,但此時,寫事件已經不可寫,
     * 則呼叫ngx_add_timer方法把當前連線上的寫事件新增到定時器機制,
     * 並呼叫ngx_handle_write_event方法將寫事件註冊到epoll事件機制中;
     * 並return從當前函式返回;
     */
    if (rc == NGX_AGAIN) {
        ngx_add_timer(c->write, u->conf->send_timeout);

        if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        return;
    }

    /* rc == NGX_OK */

    /*
     * 若返回值 rc = NGX_OK,表示已經發送完全部請求資料,
     * 準備接收來自上游伺服器的響應報文,則執行以下程式;
     */
    if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
        if (ngx_tcp_push(c->fd) == NGX_ERROR) {
            ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
                          ngx_tcp_push_n " failed");
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
    }

    /* 將當前連線上讀事件新增到定時器機制中 */
    ngx_add_timer(c->read, u->conf->read_timeout);

    /*
     * 若此時,讀事件已經準備就緒,
     * 則呼叫ngx_http_upstream_process_header方法開始接收並處理響應頭部;
     * 並return從當前函式返回;
     */
    if (c->read->ready) {
        ngx_http_upstream_process_header(r, u);
        return;
    }

    /*
     * 若當前讀事件未準備就緒;
     * 則把寫事件的回撥方法設定為ngx_http_upstream_dumy_handler方法(不進行任何實際操作);
     * 並把寫事件註冊到epoll事件機制中;
     */
    u->write_event_handler = ngx_http_upstream_dummy_handler;

    if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }
}

當無法一次性將請求內容全部發送完畢,則需等待 epoll 事件機制的寫事件發生,一旦發生就會呼叫回撥方法 ngx_http_upstream_send_request_handler。

ngx_http_upstream_send_request_handler 方法的執行流程如下所示:

  • 檢查連線上寫事件是否超時,即timedout 標誌位是否為 1,若為 1 表示已經超時,則呼叫 ngx_http_upstream_next 方法重新向上遊發起連線請求,並 return 從當前函式返回;
  • 若標誌位 timedout 為0,即不超時,檢查 header_sent 標誌位是否為 1,表示已經接收到來自上游伺服器的響應頭部,則不需要再向上游傳送請求,將寫事件的回撥方法設定為 ngx_http_upstream_dummy_handler,同時將寫事件註冊到 epoll 事件機制中,並return 從當前函式返回;
  • 若標誌位 header_sent 為 0,則呼叫 ngx_http_upstream_send_request 方法向上遊傳送請求資料;
static void
ngx_http_upstream_send_request_handler(ngx_http_request_t *r,
    ngx_http_upstream_t *u)
{
    ngx_connection_t  *c;

    c = u->peer.connection;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "http upstream send request handler");

    /* 檢查當前連線上寫事件的超時標誌位 */
    if (c->write->timedout) {
        /* 執行超時重連機制 */
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
        return;
    }

#if (NGX_HTTP_SSL)

    if (u->ssl && c->ssl == NULL) {
        ngx_http_upstream_ssl_init_connection(r, u, c);
        return;
    }

#endif

    /* 已經接收到上游伺服器的響應頭部,則不需要再向上游伺服器傳送請求資料 */
    if (u->header_sent) {
        /* 將寫事件的回撥方法設定為不進行任何實際操作的方法ngx_http_upstream_dumy_handler */
        u->write_event_handler = ngx_http_upstream_dummy_handler;

        /* 將寫事件註冊到epoll事件機制中,並return從當前函式返回 */
        (void) ngx_handle_write_event(c->write, 0);

        return;
    }

    /* 若沒有接收來自上游伺服器的響應頭部,則需向上遊伺服器傳送請求資料 */
    ngx_http_upstream_send_request(r, u);
}

接收響應

接收響應頭部

當 Nginx 已經向上遊傳送請求,準備開始接收來自上游的響應頭部,由方法 ngx_http_upstream_process_header 實現,該方法接收並解析響應頭部。

ngx_http_upstream_process_header 方法的執行流程如下:

  • 檢查上游連線上的讀事件是否超時,若標誌位 timedout 為 1,則表示超時,此時呼叫 ngx_http_upstream_next 方法重新與上游建立連線,並 return 從當前函式返回;
  • 若標誌位 timedout 為 0,接著檢查 ngx_http_upstream_t 結構體中的標誌位 request_sent,若該標誌位為 0,表示未向上遊傳送請求,同時呼叫 ngx_http_upstream_test_connect 方法測試連線狀態,若該方法返回值為非 NGX_OK,表示與上游已經斷開連線,則呼叫 ngx_http_upstream_next 方法重新與上游建立連線,並 return 從當前函式返回;
  • 檢查 ngx_http_upstream_t 結構體中接收響應頭部的 buffer 緩衝區是否有記憶體空間以便接收響應頭部,若 buffer.start 為 NULL,表示該緩衝區為空,則需呼叫 ngx_palloc 方法分配記憶體,該記憶體大小 buffer_size 由 ngx_http_upstream_conf_t 配置結構體的 buffer_size 成員指定;
  • 呼叫 recv 方法開始接收來自上游伺服器的響應頭部,並根據該方法的返回值 n 進行判斷:
    • 若 n = NGX_AGAIN,表示讀事件未準備就緒,需要等待下次讀事件被觸發時繼續接收響應頭部,此時,呼叫 ngx_add_timer 方法將讀事件新增到定時器中,同時呼叫 ngx_handle_read_event 方法將讀事件註冊到epoll 事件機制中,並 return 從當前函式返回;
    • 若 n = NGX_ERROR 或 n = 0,表示上游連線發生錯誤 或 上游伺服器主動關閉連線,則呼叫 ngx_http_upstream_next 方法重新發起連線請求,並 return 從當前函式返回;
    • 若 n 大於 0,表示已經接收到響應頭部,此時,呼叫 ngx_http_upstream_t 結構體中由 HTTP 模組實現的 process_header 方法解析響應頭部,且返回 rc 值;
  • 若 rc = NGX_AGAIN,表示接收到的響應頭部不完整,檢查接收緩衝區 buffer 是否還有剩餘的記憶體空間,若緩衝區沒有剩餘的記憶體空間,表示接收到的響應頭部過大,此時呼叫 ngx_http_upstream_next 方法重新建立連線,並 return 從當前函式返回;若緩衝區還有剩餘的記憶體空間,則continue 繼續接收響應頭部;
  • 若 rc = NGX_HTTP_UPSTREAM_INVALID_HEADER,表示接收到的響應頭部是非法的,則呼叫 ngx_http_upstream_next 方法重新建立連線,並 return 從當前函式返回;
  • 若 rc = NGX_ERROR,表示連接出錯,此時呼叫 ngx_http_upstream_finalize_request 方法結束請求,並 return 從當前函式返回;
  • 若 rc = NGX_OK,表示已接收到完整的響應頭部,則呼叫 ngx_http_upstream_process_headers 方法處理已解析的響應頭部,該方法會將已解析出來的響應頭部儲存在 ngx_http_request_t 結構體中的 headers_out 成員;
  • 檢查 ngx_http_request_t 結構體的 subrequest_in_memory 成員決定是否需要轉發響應給下游伺服器;
    • 若 subrequest_in_memory 為 0,表示需要轉發響應給下游伺服器,則呼叫 ngx_http_upstream_send_response 方法開始轉發響應給下游伺服器,並 return 從當前函式返回;
    • 若 subrequest_in_memory 為 1,表示不需要將響應轉發給下游,此時檢查 HTTP 模組是否定義了 ngx_http_upstream_t 結構體中的 input_filter 方法處理響應包體;
  • 若沒有定義 input_filter 方法,則使用 upstream 機制預設方法 ngx_http_upstream_non_buffered_filter 代替 input_filter 方法;
  • 若定義了自己的 input_filter 方法,則首先呼叫 input_filter_init 方法為處理響應包體做初始化工作;
  • 檢查接收緩衝區 buffer 在解析完響應頭部之後剩餘的字元流,若有剩餘的字元流,則表示已經預接收了響應包體,此時呼叫 input_filter 方法處理響應包體;
  • 設定 upstream 機制讀事件 read_event_handler 的回撥方法為 ngx_http_upstream_process_body_in_memory,並呼叫該方法開始接收並解析響應包體;
/* 接收並解析響應頭部 */
static void
ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ssize_t            n;
    ngx_int_t          rc;
    ngx_connection_t  *c;

    c = u->peer.connection;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http upstream process header");

    c->log->action = "reading response header from upstream";

    /* 檢查當前連線上的讀事件是否超時 */
    if (c->read->timedout) {
        /*
         * 若標誌位timedout為1,表示讀事件超時;
         * 則把超時錯誤傳遞給ngx_http_upstream_next方法,
         * 該方法根據允許的錯誤進行重連線策略;
         * 並return從當前函式返回;
         */
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
        return;
    }

    /*
     * 若標誌位request_sent為0,表示還未傳送請求;
     * 且ngx_http_upstream_test_connect方法返回非NGX_OK,標誌當前還未與上游伺服器成功建立連線;
     * 則需要呼叫ngx_http_upstream_next方法嘗試與下一個上游伺服器建立連線;
     * 並return從當前函式返回;
     */
    if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

    /*
     * 檢查ngx_http_upstream_t結構體中接收響應頭部的buffer緩衝區;
     * 若接收緩衝區buffer未分配記憶體,則呼叫ngx_palloce方法分配記憶體,
     * 該記憶體的大小buffer_size由ngx_http_upstream_conf_t配置結構的buffer_size指定;
     */
    if (u->buffer.start == NULL) {
        u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size);
        if (u->buffer.start == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        /* 調整接收緩衝區buffer,準備接收響應頭部 */
        u->buffer.pos = u->buffer.start;
        u->buffer.last = u->buffer.start;
        u->buffer.end = u->buffer.start + u->conf->buffer_size;
        /* 表示該緩衝區記憶體可被複用、資料可被改變 */
        u->buffer.temporary = 1;

        u->buffer.tag = u->output.tag;

        /* 初始化headers_in的成員headers連結串列 */
        if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
                          sizeof(ngx_table_elt_t))
            != NGX_OK)
        {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

#if (NGX_HTTP_CACHE)

        if (r->cache) {
            u->buffer.pos += r->cache->header_start;
            u->buffer.last = u->buffer.pos;
        }
#endif
    }

    for ( ;; ) {

        /* 呼叫recv方法從當前連線上讀取響應頭部資料 */
        n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last);

        /* 下面根據 recv 方法不同返回值 n 進行判斷 */

        /*
         * 若返回值 n = NGX_AGAIN,表示讀事件未準備就緒,
         * 需等待下次讀事件被觸發時繼續接收響應頭部,
         * 即將讀事件註冊到epoll事件機制中,等待可讀事件發生;
         * 並return從當前函式返回;
         */
        if (n == NGX_AGAIN) {
#if 0
            ngx_add_timer(rev, u->read_timeout);
#endif

            if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
                ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }

            return;
        }

        if (n == 0) {
            ngx_log_error(NGX_LOG_ERR, c->log, 0,
                          "upstream prematurely closed connection");
        }

        /*
         * 若返回值 n = NGX_ERROR 或 n = 0,則表示上游伺服器已經主動關閉連線;
         * 此時,呼叫ngx_http_upstream_next方法決定是否重新發起連線;
         * 並return從當前函式返回;
         */
        if (n == NGX_ERROR || n == 0) {
            ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
            return;
        }

        /* 若返回值 n 大於 0,表示已經接收到響應頭部 */
        u->buffer.last += n;

#if 0
        u->valid_header_in = 0;

        u->peer.cached = 0;
#endif

        /*
         * 呼叫ngx_http_upstream_t結構體中process_header方法開始解析響應頭部;
         * 並根據該方法返回值進行不同的判斷;
         */
        rc = u->process_header(r);

        /*
         * 若返回值 rc = NGX_AGAIN,表示接收到的響應頭部不完整,
         * 需等待下次讀事件被觸發時繼續接收響應頭部;
         * continue繼續接收響應;
         */
        if (rc == NGX_AGAIN) {

            if (u->buffer.last == u->buffer.end) {
                ngx_log_error(NGX_LOG_ERR, c->log, 0,
                              "upstream sent too big header");

                ngx_http_upstream_next(r, u,
                                       NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
                return;
            }

            continue;
        }

        break;
    }

    /*
     * 若返回值 rc = NGX_HTTP_UPSTREAM_INVALID_HEADER,
     * 則表示接收到的響應頭部是非法的,
     * 呼叫ngx_http_upstream_next方法決定是否重新發起連線;
     * 並return從當前函式返回;
     */
    if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
        return;
    }

    /*
     * 若返回值 rc = NGX_ERROR,表示出錯,
     * 則呼叫ngx_http_upstream_finalize_request方法結束該請求;
     * 並return從當前函式返回;
     */
    if (rc == NGX_ERROR) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    /* rc == NGX_OK */

    /*
     * 若返回值 rc = NGX_OK,表示成功解析到完整的響應頭部;*/
    if (u->headers_in.status_n >= NGX_HTTP_SPECIAL_RESPONSE) {

        if (ngx_http_upstream_test_next(r, u) == NGX_OK) {
            return;
        }

        if (ngx_http_upstream_intercept_errors(r, u) == NGX_OK) {
            return;
        }
    }

    /* 呼叫ngx_http_upstream_process_headers方法處理已解析處理的響應頭部 */
    if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {
        return;
    }

    /*
     * 檢查ngx_http_request_t 結構體的subrequest_in_memory成員決定是否轉發響應給下游伺服器;
     * 若該標誌位為0,則需呼叫ngx_http_upstream_send_response方法轉發響應給下游伺服器;
     * 並return從當前函式返回;
     */
    if (!r->subrequest_in_memory) {
        ngx_http_upstream_send_response(r, u);
        return;
    }

    /* 若不需要轉發響應,則呼叫ngx_http_upstream_t中的input_filter方法處理響應包體 */
    /* subrequest content in memory */

    /*
     * 若HTTP模組沒有定義ngx_http_upstream_t中的input_filter處理方法;
     * 則使用upstream機制預設方法ngx_http_upstream_non_buffered_filter;
     *
     * 若HTTP模組實現了input_filter方法,則不使用upstream預設的方法;
     */
    if (u->input_filter == NULL) {
        u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
        u->input_filter = ngx_http_upstream_non_buffered_filter;
        u->input_filter_ctx = r;
    }

    /*
     * 呼叫input_filter_init方法為處理包體做初始化工作;
     */
    if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
        ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
        return;
    }

    /*
     * 檢查接收緩衝區是否有剩餘的響應資料;
     * 因為響應頭部已經解析完畢,若接收緩衝區還有未被解析的剩餘資料,
     * 則該資料就是響應包體;
     */
    n = u->buffer.last - u->buffer.pos;

    /*
     * 若接收緩衝區有剩餘的響應包體,呼叫input_filter方法開始處理已接收到響應包體;
     */
    if (n) {
        u->buffer.last = u->buffer.pos;

        u->state->response_length += n;

        /* 呼叫input_filter方法處理響應包體 */
        if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }
    }

    if (u->length == 0) {
        ngx_http_upstream_finalize_request(r, u, 0);
        return;
    }

    /* 設定upstream機制的讀事件回撥方法read_event_handler為ngx_http_upstream_process_body_in_memory */
    u->read_event_handler = ngx_http_upstream_process_body_in_memory;

    /* 呼叫ngx_http_upstream_process_body_in_memory方法開始處理響應包體 */
    ngx_http_upstream_process_body_in_memory(r, u);
}

接收響應包體

接收並解析響應包體由 ngx_http_upstream_process_body_in_memory 方法實現;

ngx_http_upstream_process_body_in_memory 方法的執行流程如下所示:

  • 檢查上游連線上讀事件是否超時,若標誌位 timedout 為 1,則表示已經超時,此時呼叫 ngx_http_upstream_finalize_request 方法結束請求,並 return 從當前函式返回;
  • 檢查接收緩衝區 buffer 是否還有剩餘的記憶體空間,若沒有剩餘的記憶體空間,則呼叫 ngx_http_upstream_finalize_request 方法結束請求,並 return 從當前函式返回;若有剩餘的記憶體空間則呼叫 recv 方法開始接收響應包體;
    • 若返回值 n = NGX_AGAIN,表示等待下一次觸發讀事件再接收響應包體,呼叫 ngx_handle_read_event 方法將讀事件註冊到 epoll 事件機制中,同時將讀事件新增到定時器機制中;
    • 若返回值 n = 0 或 n = NGX_ERROR,則呼叫 ngx_http_upstream_finalize_request 方法結束請求,並 return 從當前函式返回;
    • 若返回值 n 大於 0,則表示成功接收到響應包體,呼叫 input_filter 方法開始處理響應包體,檢查讀事件的 ready 標誌位;
  • 若標誌位 ready 為 1,表示仍有可讀的響應包體資料,因此回到步驟 2 繼續呼叫 recv 方法讀取響應包體,直到讀取完畢;
  • 若標誌位 ready 為 0,則呼叫 ngx_handle_read_event 方法將讀事件註冊到epoll事件機制中,同時呼叫 ngx_add_timer 方法將讀事件新增到定時器機制中;
/* 接收並解析響應包體 */
static void
ngx_http_upstream_process_body_in_memory(ngx_http_request_t *r,
    ngx_http_upstream_t *u)
{
    size_t             size;
    ssize_t            n;
    ngx_buf_t         *b;
    ngx_event_t       *rev;
    ngx_connection_t  *c;

    c = u->peer.connection;
    rev = c->read;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http upstream process body on memory");

    /*
     * 檢查讀事件標誌位timedout是否超時,若該標誌位為1,表示響應已經超時;
     * 則呼叫ngx_http_upstream_finalize_request方法結束請求;
     * 並return從當前函式返回;
     */
    if (rev->timedout) {
        ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
        ngx_http_upstream_finalize_request(r, u, NGX_HTTP_GATEWAY_TIME_OUT);
        return;
    }

    b = &u->buffer;

    for ( ;; ) {

        /* 檢查當前接收緩衝區是否剩餘的記憶體空間 */
        size = b->end - b->last;

        /*
         * 若接收緩衝區不存在空閒的記憶體空間,
         * 則呼叫ngx_http_upstream_finalize_request方法結束請求;
         * 並return從當前函式返回;
         */
        if (size == 0) {
            ngx_log_error(NGX_LOG_ALERT, c->log, 0,
                          "upstream buffer is too small to read response");
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }

        /*
         * 若接收緩衝區有可用的記憶體空間,
         * 則呼叫recv方法開始接收響應包體;
         */
        n = c->recv(c, b->last, size);

        /*
         * 若返回值 n = NGX_AGAIN,表示等待下一次觸發讀事件再接收響應包體;
         */
        if (n == NGX_AGAIN) {
            break;
        }

        /*
         * 若返回值n = 0(表示上游伺服器主動關閉連線),或n = NGX_ERROR(表示出錯);
         * 則呼叫ngx_http_upstream_finalize_request方法結束請求;
         * 並return從當前函式返回;
         */
        if (n == 0 || n == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, n);
            return;
        }

        /* 若返回值 n 大於0,表示成功讀取到響應包體 */
        u->state->response_length += n;

        /* 呼叫input_filter方法處理本次接收到的響應包體 */
        if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }

        /* 檢查讀事件的ready標誌位,若為1,繼續讀取響應包體 */
        if (!rev->ready) {
            break;
        }
    }

    if (u->length == 0) {
        ngx_http_upstream_finalize_request(r, u, 0);
        return;
    }

    /*
     * 若讀事件的ready標誌位為0,表示讀事件未準備就緒,
     * 則將讀事件註冊到epoll事件機制中,新增到定時器機制中;
     * 讀事件的回撥方法不改變,即依舊為ngx_http_upstream_process_body_in_memory;
     */
    if (ngx_handle_read_event(rev, 0) != NGX_OK) {
        ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
        return;
    }

    if (rev->active) {
        ngx_add_timer(rev, u->conf->read_timeout);

    } else if (rev->timer_set) {
        ngx_del_timer(rev);
    }
}

轉發響應

下面看下 upstream 處理上游響應包體的三種方式:

  1. 當請求結構體 ngx_http_request_t 中的成員subrequest_in_memory 標誌位為 1 時,upstream 不轉發響應包體到下游,並由HTTP 模組實現的input_filter() 方法處理包體;
  2. 當請求結構體 ngx_http_request_t 中的成員subrequest_in_memory 標誌位為 0 時,且ngx_http_upstream_conf_t 配置結構體中的成員buffering 標誌位為 1 時,upstream 將開啟更多的記憶體和磁碟檔案用於快取上游的響應包體(此時,上游網速更快),並轉發響應包體;
  3. 當請求結構體 ngx_http_request_t 中的成員subrequest_in_memory 標誌位為 0 時,且ngx_http_upstream_conf_t 配置結構體中的成員buffering 標誌位為 0 時,upstream 將使用固定大小的緩衝區來轉發響應包體;

轉發響應由函式 ngx_http_upstream_send_response 實現,該函式的執行流程如下:

  • 呼叫 ngx_http_send_header 方法轉發響應頭部,並將 ngx_http_upstream_t 結構體中的 header_sent 標誌位設定為 1,表示已經轉發響應頭部;
  • 若臨時檔案還儲存著請求包體,則需呼叫 ngx_pool_run_cleanup_filter 方法清理臨時檔案;
  • 檢查標誌位 buffering,若該標誌位為 1,表示需要開啟檔案快取,若該標誌位為 0,則不需要開啟檔案快取,只需要以固定的記憶體塊大小轉發響應包體即可;
  • 若標誌位 buffering 為0;
    • 則檢查 HTTP 模組是否實現了自己的 input_filter 方法,若沒有則使用 upstream 機制預設的方法 ngx_http_upstream_non_buffered_filter;
    • 設定 ngx_http_upstream_t 結構體中讀事件 read_event_handler 的回撥方法為 ngx_http_upstream_process_non_buffered_upstream,當接收上游響應時,會通過 ngx_http_upstream_handler 方法最終呼叫 ngx_http_upstream_process_non_buffered_uptream 來接收響應;
    • 設定 ngx_http_upstream_t 結構體中寫事件 write_event_handler 的回撥方法為 ngx_http_upstream_process_non_buffered_downstream,當向下遊傳送資料時,會通過 ngx_http_handler 方法最終呼叫 ngx_http_upstream_process_non_buffered_downstream 方法來發送響應包體;
    • 呼叫 input_filter_init 方法為 input_filter 方法處理響應包體做初始化工作;
    • 檢查接收緩衝區 buffer 在解析完響應頭部之後,是否還有剩餘的響應資料,若有表示預接收了響應包體:
      • 若在解析響應頭部區間,預接收了響應包體,則呼叫 input_filter 方法處理該部分預接收的響應包體,並呼叫 ngx_http_upstream_process_non_buffered_downstream 方法轉發本次接收到的響應包體給下游伺服器;
      • 若在解析響應頭部區間,沒有接收響應包體,則首先清空接收緩衝區 buffer 以便複用來接收響應包體,檢查上游連線上讀事件是否準備就緒,若標誌位 ready 為1,表示準備就緒,則呼叫 ngx_http_upstream_process_non_buffered_upstream 方法接收上游響應包體;若標誌位 ready 為 0,則 return 從當前函式返回;
  • 若標誌位 buffering 為1;
    • 初始化 ngx_http_upstream_t 結構體中的 ngx_event_pipe_t pipe 成員;
    • 呼叫 input_filter_init 方法為 input_filter 方法處理響應包體做初始化工作;
    • 設定上游連線上的讀事件 read_event_handler 的回撥方法為 ngx_http_upstream_process_upstream;
    • 設定上游連線上的寫事件 write_event_handler 的回撥方法為 ngx_http_upstream_process_downstream;
    • 呼叫 ngx_http_upstream_proess_upstream 方法處理由上游伺服器發來的響應包體;
/* 轉發響應包體 */
static void
ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    int                        tcp_nodelay;
    ssize_t                    n;
    ngx_int_t                  rc;
    ngx_event_pipe_t          *p;
    ngx_connection_t          *c;
    ngx_http_core_loc_conf_t  *clcf;

    /* 呼叫ngx_http_send_hander方法向下遊傳送響應頭部 */
    rc = ngx_http_send_header(r);

    if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) {
        ngx_http_upstream_finalize_request(r, u, rc);
        return;
    }

    /* 將標誌位header_sent設定為1 */
    u->header_sent = 1;

    if (u->upgrade) {
        ngx_http_upstream_upgrade(r, u);
        return;
    }

    /* 獲取Nginx與下游之間的TCP連線 */
    c = r->connection;

    if (r->header_only) {

        if (u->cacheable || u->store) {

            if (ngx_shutdown_socket(c->fd, NGX_WRITE_SHUTDOWN) == -1) {
                ngx_connection_error(c, ngx_socket_errno,
                                     ngx_shutdown_socket_n " failed");
            }

            r->read_event_handler = ngx_http_request_empty_handler;
            r->write_event_handler = ngx_http_request_empty_handler;
            c->error = 1;

        } else {
            ngx_http_upstream_finalize_request(r, u, rc);
            return;
        }
    }

    /* 若臨時檔案儲存著請求包體,則呼叫ngx_pool_run_cleanup_file方法清理臨時檔案的請求包體 */
    if (r->request_body && r->request_body->temp_file) {
        ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd);
        r->request_body->temp_file->file.fd = NGX_INVALID_FILE;
    }

    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

    /*
     * 若標誌位buffering為0,轉發響應時以下游伺服器網速優先;
     * 即只需分配固定的記憶體塊大小來接收來自上游伺服器的響應並轉發,
     * 當該記憶體塊已滿,則暫停接收來自上游伺服器的響應資料,
     * 等待把記憶體塊的響應資料轉發給下游伺服器後有剩餘記憶體空間再繼續接收響應;
     */
    if (!u->buffering) {

        /*
         * 若HTTP模組沒有實現input_filter方法,
         * 則採用upstream機制預設的方法ngx_http_upstream_non_buffered_filter;
         */
        if (u->input_filter == NULL) {
            u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
            u->input_filter = ngx_http_upstream_non_buffered_filter;
            u->input_filter_ctx = r;
        }

        /*
         * 設定ngx_http_upstream_t結構體中讀事件的回撥方法為ngx_http_upstream_non_buffered_upstream,(即讀取上游響應的方法);
         * 設定當前請求ngx_http_request_t結構體中寫事件的回撥方法為ngx_http_upstream_process_non_buffered_downstream,(即轉發響應到下游的方法);
         */
        u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream;
        r->write_event_handler =
                             ngx_http_upstream_process_non_buffered_downstream;

        r->limit_rate = 0;

        /* 呼叫input_filter_init為input_filter方法處理響應包體做初始化工作 */
        if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }

        if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
            ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay");

            tcp_nodelay = 1;

            if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
                               (const void *) &tcp_nodelay, sizeof(int)) == -1)
            {
                ngx_connection_error(c, ngx_socket_errno,
                                     "setsockopt(TCP_NODELAY) failed");
                ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
                return;
            }

            c->tcp_nodelay = NGX_TCP_NODELAY_SET;
        }

        /* 檢查解析完響應頭部後接收緩衝區buffer是否已接收了響應包體 */
        n = u->buffer.last - u->buffer.pos;

        /* 若接收緩衝區已經接收了響應包體 */
        if (n) {
            u->buffer.last = u->buffer.pos;

            u->state->response_length += n;

            /* 呼叫input_filter方法開始處理響應包體 */
            if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
                ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
                return;