nginx upstream模組詳解(處理流程篇二 upstream與event_pipe互動)
ngx_event_pipe 提供了upstream對上游伺服器返回包體資料 同時能做將包體資料傳送請求端
ngx_event_pipe具體的結構在點選開啟連結
ngx_event_pipe函式負責在upstream包體資料處理過程中讀取上游伺服器包體資料 並且在處理上游包體資料的過程中 傳送到請求端 這種處理流程經過測試驗證 的確如此。
提供給upstream模組服務的函式只有ngx_event_pipe 其內建處理的函式有:
ngx_event_pipe_read_upstream 負責讀取上游返回的包體 ngx_event_pipe_write_to_downstream負責將包體傳送到請求端
ngx_event_pipe_write_chain_to_temp_file 會將包體的資料逐步寫入到一個臨時檔案 這裡的臨時檔案到後面被用作快取檔案
ngx_event_pipe_remove_shadow_links
ngx_event_pipe_drain_chains
ngx_event_pipe函式處理過程
ngx_int_t ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write) { ... for ( ;; ) { if (do_write) { p->log->action = "sending to client"; rc = ngx_event_pipe_write_to_downstream(p); //寫標記 包體資料寫入到下游請求端 if (rc == NGX_ABORT) { return NGX_ABORT; } if (rc == NGX_BUSY) { //來不及處理 return NGX_OK; } } p->read = 0; p->upstream_blocked = 0; p->log->action = "reading upstream"; if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) { //執行上游包體資料讀取處理 return NGX_ABORT; } if (!p->read && !p->upstream_blocked) { //在上游包體讀取未阻塞狀態下 沒有讀取到資料 跳出 break; } do_write = 1; //包體讀取一般先執行 } if (p->upstream->fd != (ngx_socket_t) -1) { //對上游連線的socket是有效的 rev = p->upstream->read; flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0; //上游包體讀取出錯或者沒有資料可讀 事件會被清理 否則什麼也不做 if (ngx_handle_read_event(rev, flags) != NGX_OK) { //滿足flag標記對讀事件的處理 return NGX_ABORT; } ... //新增定時器 } if (p->downstream->fd != (ngx_socket_t) -1 && p->downstream->data == p->output_ctx) //同上 { wev = p->downstream->write; if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) { //對低潮值進行傳送(如果有) 同時會對請求端的寫事件進行處理 return NGX_ABORT; } ... //定時器 } return NGX_OK; }
nginx_event_pipe_read_upstream 處理說明
static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p) { ... if (p->upstream_eof || p->upstream_error || p->upstream_done) { return NGX_OK; } #if (NGX_THREADS) ... #endif ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe read upstream: %d", p->upstream->read->ready); for ( ;; ) { if (p->upstream_eof || p->upstream_error || p->upstream_done) { break; } if (p->preread_bufs == NULL && !p->upstream->read->ready) { break; } if (p->preread_bufs) { //pre-read 預讀取buffer (包含了快取header頭部和key資訊 以及http頭) 實際由u->buffer傳入 /* use the pre-read bufs if they exist */ chain = p->preread_bufs; p->preread_bufs = NULL; n = p->preread_size; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe preread: %z", n); if (n) { //已經讀取了資料read標記 p->read = 1; } } else { #if (NGX_HAVE_KQUEUE) /* * kqueue notifies about the end of file or a pending error. * This test allows not to allocate a buf on these conditions * and not to call c->recv_chain(). */ if (p->upstream->read->available == 0 && p->upstream->read->pending_eof) { p->upstream->read->ready = 0; p->upstream->read->eof = 1; p->upstream_eof = 1; p->read = 1; if (p->upstream->read->kq_errno) { p->upstream->read->error = 1; p->upstream_error = 1; p->upstream_eof = 0; ngx_log_error(NGX_LOG_ERR, p->log, p->upstream->read->kq_errno, "kevent() reported that upstream " "closed connection"); } break; } #endif if (p->limit_rate) { //有限速設定 if (p->upstream->read->delayed) { break; } limit = (off_t) p->limit_rate * (ngx_time() - p->start_sec + 1) - p->read_length; if (limit <= 0) { p->upstream->read->delayed = 1; delay = (ngx_msec_t) (- limit * 1000 / p->limit_rate + 1); //設定上游讀取延時並加入到定時器 ngx_add_timer(p->upstream->read, delay); break; } } else { limit = 0; // } if (p->free_raw_bufs) { /* use the free bufs if they exist */ chain = p->free_raw_bufs; if (p->single_buf) { //只有一個buffer p->free_raw_bufs = p->free_raw_bufs->next; chain->next = NULL; } else { p->free_raw_bufs = NULL; } } else if (p->allocated < p->bufs.num) { //已經分配的buffer數量沒有超過配置的值 配置的數量的緩衝儘可能地申請 /* allocate a new buf if it's still allowed */ b = ngx_create_temp_buf(p->pool, p->bufs.size); if (b == NULL) { return NGX_ABORT; } p->allocated++; chain = ngx_alloc_chain_link(p->pool); if (chain == NULL) { return NGX_ABORT; } chain->buf = b; chain->next = NULL; } else if (!p->cacheable && p->downstream->data == p->output_ctx && p->downstream->write->ready && !p->downstream->write->delayed) { /* * if the bufs are not needed to be saved in a cache and * a downstream is ready then write the bufs to a downstream */ p->upstream_blocked = 1; ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe downstream ready"); break; } else if (p->cacheable || p->temp_file->offset < p->max_temp_file_size) //滿足可緩衝的條件 { /* * if it is allowed, then save some bufs from p->in * to a temporary file, and add them to a p->out chain */ rc = ngx_event_pipe_write_chain_to_temp_file(p); //響應包體寫入臨時檔案 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe temp offset: %O", p->temp_file->offset); if (rc == NGX_BUSY) { break; } if (rc != NGX_OK) { return rc; } chain = p->free_raw_bufs; if (p->single_buf) { p->free_raw_bufs = p->free_raw_bufs->next; chain->next = NULL; } else { p->free_raw_bufs = NULL; } } else { /* there are no bufs to read in */ //沒有buffer緩衝去讀取資料了 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, "no pipe bufs to read in"); break; } n = p->upstream->recv_chain(p->upstream, chain, limit); //開始接收上游包體資料 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe recv chain: %z", n); if (p->free_raw_bufs) { chain->next = p->free_raw_bufs; } p->free_raw_bufs = chain; if (n == NGX_ERROR) { //讀取包體出錯 p->upstream_error = 1; break; } if (n == NGX_AGAIN) { if (p->single_buf) { //只有一個buffer 移除掉buffer的shadow_link ngx_event_pipe_remove_shadow_links(chain->buf); } break; } p->read = 1; //讀取標記設定為真 if (n == 0) { //沒有資料可讀 upstream_eof標記 並且跳出 p->upstream_eof = 1; break; } } delay = p->limit_rate ? (ngx_msec_t) n * 1000 / p->limit_rate : 0; //根據讀取的位元組數及配置的值設定讀取延時 p->read_length += n; //讀取包體長度更新 cl = chain; p->free_raw_bufs = NULL; while (cl && n > 0) { //有效資料長度 這裡要先移除掉buffer的shadow_link ngx_event_pipe_remove_shadow_links(cl->buf); size = cl->buf->end - cl->buf->last; //buffer可用位元組數 if (n >= size) { //buffer不夠用 cl->buf->last = cl->buf->end; /* STUB */ cl->buf->num = p->num++; //stub資訊更新 if (p->input_filter(p, cl->buf) == NGX_ERROR) { return NGX_ABORT; } n -= size; ln = cl; cl = cl->next; ngx_free_chain(p->pool, ln); //緩衝chain釋放 } else { cl->buf->last += n; n = 0; } } if (cl) { for (ln = cl; ln->next; ln = ln->next) { /* void */ } //取得cl最尾部buffer chain ln->next = p->free_raw_bufs; p->free_raw_bufs = cl; } if (delay > 0) { //有設定延時 則加入到定時器中 p->upstream->read->delayed = 1; ngx_add_timer(p->upstream->read, delay); break; } } #if (NGX_DEBUG) ... //buffer chain緩衝資訊列印 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe length: %O", p->length); #endif if (p->free_raw_bufs && p->length != -1) { //p->length 代表的是剩餘包體的長度 cl = p->free_raw_bufs; if (cl->buf->last - cl->buf->pos >= p->length) { p->free_raw_bufs = cl->next; /* STUB */ cl->buf->num = p->num++; if (p->input_filter(p, cl->buf) == NGX_ERROR) { //包體的input_filter處理 return NGX_ABORT; } ngx_free_chain(p->pool, cl); } } if (p->length == 0) { //包體資料讀取完成 p->upstream_done = 1; p->read = 1; } if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) { //在沒有資料可讀或者讀取出錯情況下 對資料進行處理 /* STUB */ p->free_raw_bufs->buf->num = p->num++; if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) { // return NGX_ABORT; } p->free_raw_bufs = p->free_raw_bufs->next; if (p->free_bufs && p->buf_to_file == NULL) { //有空閒的buffer同時寫入檔案的buffer不存在時 for (cl = p->free_raw_bufs; cl; cl = cl->next) { if (cl->buf->shadow == NULL) { //清除沒有shadow的buffer ngx_pfree(p->pool, cl->buf->start); } } } } if (p->cacheable && (p->in || p->buf_to_file)) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe write chain"); rc = ngx_event_pipe_write_chain_to_temp_file(p); if (rc != NGX_OK) { return rc; } } return NGX_OK; }
ngx_event_pipe_write_to_downstream處理說明
static ngx_int_t
ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
{
...
downstream = p->downstream;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream: %d", downstream->write->ready);
#if (NGX_THREADS)
if (p->writing) {
rc = ngx_event_pipe_write_chain_to_temp_file(p);
if (rc == NGX_ABORT) {
return NGX_ABORT;
}
}
#endif
flushed = 0;
for ( ;; ) {
if (p->downstream_error) { //往請求端傳送出錯
return ngx_event_pipe_drain_chains(p); //busy, out, in 三個緩衝chain釋放 同時釋放shadow緩衝並將空閒的buffer加入到pipe中
}
if (p->upstream_eof || p->upstream_error || p->upstream_done) { //滿足上游包體資料讀取"完成條件"
/* pass the p->out and p->in chains to the output filter */
for (cl = p->busy; cl; cl = cl->next) {
cl->buf->recycled = 0;
}
if (p->out) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream flush out");
for (cl = p->out; cl; cl = cl->next) {
cl->buf->recycled = 0;
}
rc = p->output_filter(p->output_ctx, p->out); //響應到請求段的包體putput_filter過濾處理
if (rc == NGX_ERROR) { //寫入到請求端出錯
p->downstream_error = 1;
return ngx_event_pipe_drain_chains(p);
}
p->out = NULL;
}
if (p->writing) { //還有往請求端寫入的緩衝鏈
break;
}
if (p->in) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream flush in");
for (cl = p->in; cl; cl = cl->next) {
cl->buf->recycled = 0;
}
rc = p->output_filter(p->output_ctx, p->in); //upstream模組的output_filter會把資料傳送給請求端
if (rc == NGX_ERROR) {
p->downstream_error = 1;
return ngx_event_pipe_drain_chains(p); ... } } }
static ngx_int_t
ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
{
...
#if (NGX_THREADS)
if (p->writing) {
... //多執行緒處理
}
#endif
if (p->buf_to_file) { //寫往臨時檔案的buffer加入到為傳送到請求段的out 緩衝鏈
out = ngx_alloc_chain_link(p->pool);
if (out == NULL) {
return NGX_ABORT;
}
out->buf = p->buf_to_file;
out->next = p->in;
} else {
out = p->in;
}
if (!p->cacheable) {
size = 0;
cl = out;
ll = NULL;
prev_last_shadow = 1;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe offset: %O", p->temp_file->offset);
do {
bsize = cl->buf->last - cl->buf->pos;
ngx_log_debug4(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe buf ls:%d %p, pos %p, size: %z",
cl->buf->last_shadow, cl->buf->start,
cl->buf->pos, bsize);
if (prev_last_shadow
&& ((size + bsize > p->temp_file_write_size) //緩衝鏈中的資料大小超過了配置的temp_file_write_size的大小 或者最大臨時檔案大小
|| (p->temp_file->offset + size + bsize
> p->max_temp_file_size)))
{
break;
}
prev_last_shadow = cl->buf->last_shadow;
size += bsize;
ll = &cl->next;
cl = cl->next;
} while (cl); //while遍歷
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);
if (ll == NULL) {
return NGX_BUSY;
}
if (cl) { //out緩衝鏈最後一個buffer不為空
p->in = cl;
*ll = NULL;
} else { //為空 更新
p->in = NULL;
p->last_in = &p->in;
}
} else {
p->in = NULL;
p->last_in = &p->in;
}
#if (NGX_THREADS)
if (p->thread_handler) {
p->temp_file->thread_write = 1;
p->temp_file->file.thread_task = p->thread_task;
p->temp_file->file.thread_handler = p->thread_handler;
p->temp_file->file.thread_ctx = p->thread_ctx;
}
#endif
n = ngx_write_chain_to_temp_file(p->temp_file, out); //out chain更新到臨時檔案中
if (n == NGX_ERROR) {
return NGX_ABORT;
}
#if (NGX_THREADS)
if (n == NGX_AGAIN) {
p->writing = out;
p->thread_task = p->temp_file->file.thread_task;
return NGX_AGAIN;
}
done:
#endif
if (p->buf_to_file) {
p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
n -= p->buf_to_file->last - p->buf_to_file->pos;
p->buf_to_file = NULL;
out = out->next;
}
if (n > 0) { //成功有資料寫入到臨時檔案
/* update previous buffer or add new buffer */
if (p->out) {
for (cl = p->out; cl->next; cl = cl->next) { /* void */ }
b = cl->buf;
if (b->file_last == p->temp_file->offset) {
p->temp_file->offset += n;
b->file_last = p->temp_file->offset;
goto free;
}
last_out = &cl->next;
} else {
last_out = &p->out;
}
cl = ngx_chain_get_free_buf(p->pool, &p->free);
if (cl == NULL) {
return NGX_ABORT;
}
b = cl->buf;
ngx_memzero(b, sizeof(ngx_buf_t));
b->tag = p->tag;
b->file = &p->temp_file->file;
b->file_pos = p->temp_file->offset;
p->temp_file->offset += n; //增加臨時檔案偏移
b->file_last = p->temp_file->offset; //更新buffer檔案偏移資訊
b->in_file = 1;
b->temp_file = 1;
*last_out = cl;
}
free:
for (last_free = &p->free_raw_bufs;
*last_free != NULL;
last_free = &(*last_free)->next) //找到free_raw_bufs最後的空閒buffer
{
/* void */
}
for (cl = out; cl; cl = next) {
next = cl->next;
cl->next = p->free;
p->free = cl;
b = cl->buf;
if (b->last_shadow) {
tl = ngx_alloc_chain_link(p->pool);
if (tl == NULL) {
return NGX_ABORT;
}
tl->buf = b->shadow;
tl->next = NULL;
*last_free = tl;
last_free = &tl->next;
b->shadow->pos = b->shadow->start;
b->shadow->last = b->shadow->start;
ngx_event_pipe_remove_shadow_links(b->shadow);
}
}
return NGX_OK;
}
函式 ngx_event_pipe會被upstream模組設定好的ngx_http_upstream_process_upstream 上游有包體資料過來,進行觸發;同時也會被ngx_http_upstream_process_downstream這個設為下游請求端響應寫處理觸發。
ngx_event_pipe先處理上游包體資料 這裡的上游包體資料在event_pipe功能中由ngx_event_pipe_read_upstream負責處理;在處理讀取上游包體資料的同時,會得到向下遊請求端寫入響應包體的處理,實際寫入到下游請求端的功能是由ngx_event_pipe_write_to_downstream來完成。
下面是ngx_event_pipe處理流程的圖解說明