1. 程式人生 > >nginx upstream模組詳解(處理流程篇二 upstream與event_pipe互動)

nginx upstream模組詳解(處理流程篇二 upstream與event_pipe互動)

ngx_event_pipe 提供了upstream對上游伺服器返回包體資料 同時能做將包體資料傳送請求端 

ngx_event_pipe具體的結構在點選開啟連結

ngx_event_pipe函式負責在upstream包體資料處理過程中讀取上游伺服器包體資料 並且在處理上游包體資料的過程中 傳送到請求端 這種處理流程經過測試驗證 的確如此。

提供給upstream模組服務的函式只有ngx_event_pipe 其內建處理的函式有:

ngx_event_pipe_read_upstream  負責讀取上游返回的包體  ngx_event_pipe_write_to_downstream負責將包體傳送到請求端

ngx_event_pipe_write_chain_to_temp_file 會將包體的資料逐步寫入到一個臨時檔案 這裡的臨時檔案到後面被用作快取檔案

ngx_event_pipe_remove_shadow_links 

ngx_event_pipe_drain_chains

ngx_event_pipe函式處理過程

ngx_int_t
ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
{
    ...
    for ( ;; ) {
        if (do_write) {
            p->log->action = "sending to client";

            rc = ngx_event_pipe_write_to_downstream(p);      //寫標記 包體資料寫入到下游請求端

            if (rc == NGX_ABORT) {
                return NGX_ABORT;
            }

            if (rc == NGX_BUSY) {   //來不及處理
                return NGX_OK;
            }
        }

        p->read = 0;
        p->upstream_blocked = 0;

        p->log->action = "reading upstream";

        if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {   //執行上游包體資料讀取處理
            return NGX_ABORT;
        }

        if (!p->read && !p->upstream_blocked) {   //在上游包體讀取未阻塞狀態下 沒有讀取到資料 跳出
            break;
        }

        do_write = 1;  //包體讀取一般先執行
    }

    if (p->upstream->fd != (ngx_socket_t) -1) {  //對上游連線的socket是有效的
        rev = p->upstream->read;

        flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;  //上游包體讀取出錯或者沒有資料可讀 事件會被清理 否則什麼也不做

        if (ngx_handle_read_event(rev, flags) != NGX_OK) {  //滿足flag標記對讀事件的處理
            return NGX_ABORT;
        }

      ...  //新增定時器
    }

    if (p->downstream->fd != (ngx_socket_t) -1   
        && p->downstream->data == p->output_ctx)  //同上
    {
        wev = p->downstream->write; 
        if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {     //對低潮值進行傳送(如果有)  同時會對請求端的寫事件進行處理
            return NGX_ABORT;
        }

        ... //定時器
    }

    return NGX_OK;
}

nginx_event_pipe_read_upstream 處理說明

static ngx_int_t
ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
{
    ...

    if (p->upstream_eof || p->upstream_error || p->upstream_done) {
        return NGX_OK;
    }

#if (NGX_THREADS)
    ...  
#endif

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                   "pipe read upstream: %d", p->upstream->read->ready);

    for ( ;; ) {

        if (p->upstream_eof || p->upstream_error || p->upstream_done) {
            break;
        }

        if (p->preread_bufs == NULL && !p->upstream->read->ready) {
            break;
        }

        if (p->preread_bufs) {     //pre-read 預讀取buffer (包含了快取header頭部和key資訊 以及http頭) 實際由u->buffer傳入
 
            /* use the pre-read bufs if they exist */

            chain = p->preread_bufs;
            p->preread_bufs = NULL;
            n = p->preread_size;

            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                           "pipe preread: %z", n);

            if (n) {  //已經讀取了資料read標記
                p->read = 1;
            }

        } else {

#if (NGX_HAVE_KQUEUE)

            /*
             * kqueue notifies about the end of file or a pending error.
             * This test allows not to allocate a buf on these conditions
             * and not to call c->recv_chain().
             */

            if (p->upstream->read->available == 0
                && p->upstream->read->pending_eof)
            {
                p->upstream->read->ready = 0;
                p->upstream->read->eof = 1;
                p->upstream_eof = 1;
                p->read = 1;

                if (p->upstream->read->kq_errno) {
                    p->upstream->read->error = 1;
                    p->upstream_error = 1;
                    p->upstream_eof = 0;

                    ngx_log_error(NGX_LOG_ERR, p->log,
                                  p->upstream->read->kq_errno,
                                  "kevent() reported that upstream "
                                  "closed connection");
                }

                break;
            }
#endif

            if (p->limit_rate) {  //有限速設定
                if (p->upstream->read->delayed) {
                    break;
                }

                limit = (off_t) p->limit_rate * (ngx_time() - p->start_sec + 1)
                        - p->read_length;

                if (limit <= 0) {
                    p->upstream->read->delayed = 1;
                    delay = (ngx_msec_t) (- limit * 1000 / p->limit_rate + 1);  //設定上游讀取延時並加入到定時器
                    ngx_add_timer(p->upstream->read, delay);
                    break;
                }

            } else {
                limit = 0; //
            }

            if (p->free_raw_bufs) {

                /* use the free bufs if they exist */

                chain = p->free_raw_bufs;
                if (p->single_buf) { //只有一個buffer
                    p->free_raw_bufs = p->free_raw_bufs->next;
                    chain->next = NULL;
                } else {
                    p->free_raw_bufs = NULL;
                }

            } else if (p->allocated < p->bufs.num) { //已經分配的buffer數量沒有超過配置的值 配置的數量的緩衝儘可能地申請

                /* allocate a new buf if it's still allowed */

                b = ngx_create_temp_buf(p->pool, p->bufs.size);
                if (b == NULL) {
                    return NGX_ABORT;
                }

                p->allocated++;

                chain = ngx_alloc_chain_link(p->pool);
                if (chain == NULL) {
                    return NGX_ABORT;
                }

                chain->buf = b;
                chain->next = NULL;

            } else if (!p->cacheable
                       && p->downstream->data == p->output_ctx
                       && p->downstream->write->ready
                       && !p->downstream->write->delayed)
            {
                /*
                 * if the bufs are not needed to be saved in a cache and
                 * a downstream is ready then write the bufs to a downstream
                 */

                p->upstream_blocked = 1;

                ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                               "pipe downstream ready");

                break;

            } else if (p->cacheable
                       || p->temp_file->offset < p->max_temp_file_size)    //滿足可緩衝的條件
            {

                /*
                 * if it is allowed, then save some bufs from p->in
                 * to a temporary file, and add them to a p->out chain
                 */

                rc = ngx_event_pipe_write_chain_to_temp_file(p);          //響應包體寫入臨時檔案

                ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                               "pipe temp offset: %O", p->temp_file->offset);

                if (rc == NGX_BUSY) {
                    break;
                }

                if (rc != NGX_OK) {
                    return rc;
                }

                chain = p->free_raw_bufs;
                if (p->single_buf) {
                    p->free_raw_bufs = p->free_raw_bufs->next;
                    chain->next = NULL;
                } else {
                    p->free_raw_bufs = NULL;
                }

            } else {

                /* there are no bufs to read in */     //沒有buffer緩衝去讀取資料了

                ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                               "no pipe bufs to read in");

                break;
            }

            n = p->upstream->recv_chain(p->upstream, chain, limit);  //開始接收上游包體資料

            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                           "pipe recv chain: %z", n);

            if (p->free_raw_bufs) {
                chain->next = p->free_raw_bufs;
            }
            p->free_raw_bufs = chain;

            if (n == NGX_ERROR) {       //讀取包體出錯
                p->upstream_error = 1;
                break;
            }

            if (n == NGX_AGAIN) {
                if (p->single_buf) {    //只有一個buffer 移除掉buffer的shadow_link
                    ngx_event_pipe_remove_shadow_links(chain->buf);    
                }

                break;
            }

            p->read = 1; //讀取標記設定為真

            if (n == 0) {          //沒有資料可讀 upstream_eof標記 並且跳出
                p->upstream_eof = 1;
                break;
            }
        }

        delay = p->limit_rate ? (ngx_msec_t) n * 1000 / p->limit_rate : 0;  //根據讀取的位元組數及配置的值設定讀取延時

        p->read_length += n;     //讀取包體長度更新
        cl = chain;
        p->free_raw_bufs = NULL;

        while (cl && n > 0) {    //有效資料長度 這裡要先移除掉buffer的shadow_link

            ngx_event_pipe_remove_shadow_links(cl->buf);

            size = cl->buf->end - cl->buf->last;  //buffer可用位元組數

            if (n >= size) {     //buffer不夠用
                cl->buf->last = cl->buf->end;

                /* STUB */ cl->buf->num = p->num++;  //stub資訊更新

                if (p->input_filter(p, cl->buf) == NGX_ERROR) {
                    return NGX_ABORT;
                }

                n -= size;
                ln = cl;
                cl = cl->next;
                ngx_free_chain(p->pool, ln);  //緩衝chain釋放

            } else {
                cl->buf->last += n;
                n = 0;
            }
        }

        if (cl) {
            for (ln = cl; ln->next; ln = ln->next) { /* void */ } //取得cl最尾部buffer chain

            ln->next = p->free_raw_bufs;
            p->free_raw_bufs = cl;
        }

        if (delay > 0) {   //有設定延時 則加入到定時器中
            p->upstream->read->delayed = 1;
            ngx_add_timer(p->upstream->read, delay);
            break;
        }
    }

#if (NGX_DEBUG)
    ... //buffer chain緩衝資訊列印
    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                   "pipe length: %O", p->length);

#endif

    if (p->free_raw_bufs && p->length != -1) { //p->length 代表的是剩餘包體的長度
        cl = p->free_raw_bufs;

        if (cl->buf->last - cl->buf->pos >= p->length) {

            p->free_raw_bufs = cl->next;

            /* STUB */ cl->buf->num = p->num++;

            if (p->input_filter(p, cl->buf) == NGX_ERROR) {  //包體的input_filter處理
                return NGX_ABORT;
            }

            ngx_free_chain(p->pool, cl);
        }
    }

    if (p->length == 0) {           //包體資料讀取完成
        p->upstream_done = 1;
        p->read = 1;
    }

    if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) { //在沒有資料可讀或者讀取出錯情況下 對資料進行處理

        /* STUB */ p->free_raw_bufs->buf->num = p->num++;

        if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) { //
            return NGX_ABORT;
        }

        p->free_raw_bufs = p->free_raw_bufs->next;

        if (p->free_bufs && p->buf_to_file == NULL) {         //有空閒的buffer同時寫入檔案的buffer不存在時  
            for (cl = p->free_raw_bufs; cl; cl = cl->next) {
                if (cl->buf->shadow == NULL) {   //清除沒有shadow的buffer
                    ngx_pfree(p->pool, cl->buf->start);
                }
            }
        }
    }

    if (p->cacheable && (p->in || p->buf_to_file)) {

        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                       "pipe write chain");

        rc = ngx_event_pipe_write_chain_to_temp_file(p);

        if (rc != NGX_OK) {
            return rc;
        }
    }

    return NGX_OK;
}

ngx_event_pipe_write_to_downstream處理說明

static ngx_int_t
ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
{
    ...

    downstream = p->downstream;

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                   "pipe write downstream: %d", downstream->write->ready);

#if (NGX_THREADS)

    if (p->writing) {
        rc = ngx_event_pipe_write_chain_to_temp_file(p); 
        if (rc == NGX_ABORT) {
            return NGX_ABORT;
        }
    }

#endif

    flushed = 0;

    for ( ;; ) {
        if (p->downstream_error) {        //往請求端傳送出錯
            return ngx_event_pipe_drain_chains(p);   //busy, out, in 三個緩衝chain釋放 同時釋放shadow緩衝並將空閒的buffer加入到pipe中
        }

        if (p->upstream_eof || p->upstream_error || p->upstream_done) { //滿足上游包體資料讀取"完成條件"

            /* pass the p->out and p->in chains to the output filter */

            for (cl = p->busy; cl; cl = cl->next) {
                cl->buf->recycled = 0;
            }

            if (p->out) {
                ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                               "pipe write downstream flush out");

                for (cl = p->out; cl; cl = cl->next) {
                    cl->buf->recycled = 0;
                }

                rc = p->output_filter(p->output_ctx, p->out);     //響應到請求段的包體putput_filter過濾處理

                if (rc == NGX_ERROR) {                  //寫入到請求端出錯
                    p->downstream_error = 1;
                    return ngx_event_pipe_drain_chains(p);
                }

                p->out = NULL;
            }

            if (p->writing) { //還有往請求端寫入的緩衝鏈
                break;
            }

            if (p->in) {
                ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                               "pipe write downstream flush in");

                for (cl = p->in; cl; cl = cl->next) {
                    cl->buf->recycled = 0;
                }

                rc = p->output_filter(p->output_ctx, p->in);  //upstream模組的output_filter會把資料傳送給請求端

                if (rc == NGX_ERROR) {
                    p->downstream_error = 1;
                    return ngx_event_pipe_drain_chains(p);   ... } } }
static ngx_int_t
ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
{
  ...

#if (NGX_THREADS)

    if (p->writing) {

    ... //多執行緒處理
    }

#endif

    if (p->buf_to_file) {  //寫往臨時檔案的buffer加入到為傳送到請求段的out 緩衝鏈
        out = ngx_alloc_chain_link(p->pool);
        if (out == NULL) {
            return NGX_ABORT;
        }

        out->buf = p->buf_to_file;
        out->next = p->in;

    } else {
        out = p->in;
    }

    if (!p->cacheable) {

        size = 0;
        cl = out;
        ll = NULL;
        prev_last_shadow = 1;

        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                       "pipe offset: %O", p->temp_file->offset);

        do {
            bsize = cl->buf->last - cl->buf->pos;

            ngx_log_debug4(NGX_LOG_DEBUG_EVENT, p->log, 0,
                           "pipe buf ls:%d %p, pos %p, size: %z",
                           cl->buf->last_shadow, cl->buf->start,
                           cl->buf->pos, bsize);

            if (prev_last_shadow
                && ((size + bsize > p->temp_file_write_size) //緩衝鏈中的資料大小超過了配置的temp_file_write_size的大小 或者最大臨時檔案大小
                    || (p->temp_file->offset + size + bsize
                        > p->max_temp_file_size)))
            {
                break;
            }

            prev_last_shadow = cl->buf->last_shadow;

            size += bsize;
            ll = &cl->next;
            cl = cl->next;

        } while (cl); //while遍歷

        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);

        if (ll == NULL) {
            return NGX_BUSY;
        }

        if (cl) { //out緩衝鏈最後一個buffer不為空 
            p->in = cl;
            *ll = NULL;

        } else { //為空 更新
            p->in = NULL;
            p->last_in = &p->in;
        }

    } else {
        p->in = NULL;
        p->last_in = &p->in;
    }

#if (NGX_THREADS)
    if (p->thread_handler) {
        p->temp_file->thread_write = 1;
        p->temp_file->file.thread_task = p->thread_task;
        p->temp_file->file.thread_handler = p->thread_handler;
        p->temp_file->file.thread_ctx = p->thread_ctx;
    }
#endif

    n = ngx_write_chain_to_temp_file(p->temp_file, out); //out chain更新到臨時檔案中

    if (n == NGX_ERROR) {
        return NGX_ABORT;
    }

#if (NGX_THREADS)

    if (n == NGX_AGAIN) {
        p->writing = out;
        p->thread_task = p->temp_file->file.thread_task;
        return NGX_AGAIN;
    }

done:

#endif

    if (p->buf_to_file) {
        p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
        n -= p->buf_to_file->last - p->buf_to_file->pos;
        p->buf_to_file = NULL;
        out = out->next;
    }

    if (n > 0) { //成功有資料寫入到臨時檔案
        /* update previous buffer or add new buffer */

        if (p->out) {
            for (cl = p->out; cl->next; cl = cl->next) { /* void */ }

            b = cl->buf;

            if (b->file_last == p->temp_file->offset) {
                p->temp_file->offset += n;
                b->file_last = p->temp_file->offset;
                goto free;
            }

            last_out = &cl->next;

        } else {
            last_out = &p->out;
        }

        cl = ngx_chain_get_free_buf(p->pool, &p->free);
        if (cl == NULL) {
            return NGX_ABORT;
        }

        b = cl->buf;

        ngx_memzero(b, sizeof(ngx_buf_t));

        b->tag = p->tag;

        b->file = &p->temp_file->file;   
        b->file_pos = p->temp_file->offset;
        p->temp_file->offset += n; //增加臨時檔案偏移
        b->file_last = p->temp_file->offset; //更新buffer檔案偏移資訊

        b->in_file = 1;
        b->temp_file = 1;

        *last_out = cl;
    }

free:

    for (last_free = &p->free_raw_bufs;
         *last_free != NULL;
         last_free = &(*last_free)->next) //找到free_raw_bufs最後的空閒buffer
    {
        /* void */
    }

    for (cl = out; cl; cl = next) {
        next = cl->next;

        cl->next = p->free;
        p->free = cl;

        b = cl->buf;

        if (b->last_shadow) {

            tl = ngx_alloc_chain_link(p->pool);
            if (tl == NULL) {
                return NGX_ABORT;
            }

            tl->buf = b->shadow;
            tl->next = NULL;

            *last_free = tl;
            last_free = &tl->next;

            b->shadow->pos = b->shadow->start;
            b->shadow->last = b->shadow->start;

            ngx_event_pipe_remove_shadow_links(b->shadow);
        }
    }

    return NGX_OK;
}

    函式 ngx_event_pipe會被upstream模組設定好的ngx_http_upstream_process_upstream 上游有包體資料過來,進行觸發;同時也會被ngx_http_upstream_process_downstream這個設為下游請求端響應寫處理觸發。

    ngx_event_pipe先處理上游包體資料 這裡的上游包體資料在event_pipe功能中由ngx_event_pipe_read_upstream負責處理;在處理讀取上游包體資料的同時,會得到向下遊請求端寫入響應包體的處理,實際寫入到下游請求端的功能是由ngx_event_pipe_write_to_downstream來完成。

    下面是ngx_event_pipe處理流程的圖解說明