寫在前面

OpenResty(後面簡稱:OR)是一個基於Nginx和Lua的高效能Web平臺,它內部整合大量的Lua API以及第三方模組,可以利用它快速搭建支援高併發、極具動態性和擴充套件性的Web應用、Web服務或動態閘道器。

OR最大的特點就是,將Lua協程與Nginx事件驅動模型及非阻塞I/O結合起來。使使用者可以在handler中使用 同步但是依然是非阻塞 的方式編寫其應用程式碼,而無需關心底層的協程排程以及與Nginx事件驅動模型的互動。

本文將先從總體上介紹OR的協程排程機制,然後結合原始碼以及Lua棧的情況來詳細瞭解各個部分是如何實現的,包括其異常保護、協程初始化、協程的恢復和執行、協程的掛起、協程的執行結束、協程出錯的情況。

本文主要關注排程函式內部的邏輯,如果想了解外部的呼叫流程。可以參看Openresty Lua鉤子呼叫完整流程

注:lua-nginx模組與stream-lua-nginx模組的主體部分類似,後者實現相對簡單一點。下面的討論將基於stream-lua模組。

為了防止歧義,文中用到的一些術語明確一下:

  • 主執行緒:表示外層呼叫run_thread()的OS執行緒
  • 入口執行緒:每個handler被呼叫時會建立一個入口執行緒,用於執行lua程式碼
  • 使用者執行緒:使用者在Lua程式碼中通過ngx.thread.spawn()建立的執行緒
  • 使用者協程:使用者在Lua程式碼中通過coroutine.create()建立的協程
  • 協程:泛指所有協程,包括入口執行緒、使用者執行緒和使用者協程
  • vm:表示Lua虛擬機器
  • L:視出現的上下文,一般表示父協程,在建立入口執行緒的時候表示Lua VM
  • co:一般表示新建立的協程
  • L棧: |協程表|新協程|頂|:表示Lua棧結構,最右邊是棧頂

關鍵資料結構

在深入瞭解協程排程機制之前,我們先來認識一下主要的資料結構:

  • 協程上下文:ngx_stream_lua_co_ctx_t

    • 協程內部棧(coctx->co
    • 協程狀態(coctx->co_status
    • 維護協程之間關係的資料(父協程coctx->parent_co_ctx、殭屍子執行緒coctx->zombie_child_threads
    • 使用者相關資料(coctx->data
    • 在Lua的registry表中對應該執行緒指標的引用值(co_ref
    • 一些狀態標記(是否是使用者執行緒is_uthread、是否因建立新執行緒thread_spawn_yielded被yield)
  • 模組上下文:ngx_stream_lua_ctx_t
    • ctx->cur_co_ctx(當前排程協程上下文)
    • ctx->co_op(協程是以何種方式YIELD)
  • 核心排程函式:ngx_stream_lua_run_thread()

協程排程

首先你可能很好奇OR為什麼要在C引擎層面自己實現協程的排程?或者說這麼做的好處是什麼?我覺得最主要的原因還是減輕開發者的負擔。

原生Lua coroutine介面

我們知道Lua是個非常輕巧的語言,它不像Go有自己的排程器。Lua原生的對協程的操作無非就是coroutine.resume()coroutine.yield()。這兩者是成對出現的,協程coroutine.yield()之後肯定回到父協程coroutine.resume()的地方,恢復子協程需要顯式再次coroutine.resume()。如果要在Lua程式碼層面實現非阻塞I/O,那麼父協程必須處理子協程I/O等待的情況,並在事件發生時恢復子協程的執行。如果需要同時進行多個任務,那麼父協程就需要負責多個協程間的排程。因為協程的拓撲可能是一個複雜的樹狀結構,所以協程的排程管理將變得異常複雜。

OpenResty實現

OR在C引擎層幫我們把這些事情都做了,你無須再關心所有這些,只需專心寫你的業務邏輯。為了支援同步非阻塞的方式編寫應用程式碼,OR重寫了coroutine的介面函式,從而接管了協程的排程,並在coroutine基礎上封裝抽象出了thread的概念。無論是coroutine還是thread,I/O等待對於使用者都是透明的,使用者無需關心。兩者的主要區別是,coroutine父子之間的協作度更高,coroutine.yield()coroutine.resume()成對出現。在子協程執行完成(出錯)或者顯式coroutine.yield()之前,父協程一直處於等待狀態。而thread則由排程器進行排程,子thread一旦開始執行就不再受父協程控制了,在需要併發請求時很有用。thread提供了spawn()wait()等介面,spawn()執行引數中指定的函式,直到執行完畢、出錯或者I/O等待時返回。wait()則使父協程可以同步等待子執行緒執行完畢、獲取結果。

OR在對協程排程上,最核心的改動是其建立新協程時的行為(coroutine.resume(), ngx.thread.spawn())。它不會直接呼叫lua_resume(),而是先lua_yield()回到主執行緒,然後由主執行緒再根據情況lua_resume()下一個協程。Lua程式碼域內從來不會直接呼叫lua_resume(),理解了這一點你就理解了OpenResty協程排程的精髓。

所以OR中協程拓撲是一個單層的結構,它只有一個入口點。這樣使得協程排程更加靈活,I/O事件的觸發時回撥函式也更容易實現。

OR排程器根據lua_resume()的返回值,確定協程是掛起了、結束了還是出錯了。因為OR改動了建立新協程時行為,同時又抽象了thread概念,所以如果是協程掛起的情況,還需要知道是什麼原因掛起,以便做相應的不同處理。是繼續排程?還是返回上層?我們前面提到的ctx->co_op便是做這個用途。

協程的排程在核心排程函式ngx_stream_lua_run_thread()中進行,它是建立或恢復協程的唯一入口點。最初是由配置的Lua鉤子呼叫(圖中ssl_cert_handler()),如果碰到了I/O等待的情況,後續則由對應的事件handler(圖中的sleep_handler()read_handler())再次拉起。run_thread()裡面實現了一個排程迴圈,迴圈裡面先從ctx->cur_co_ctx獲取下一個待resume的協程上下文,然後lua_resume()執行或恢復該協程,其返回值LUA_YIELD表示協程掛起,0表示協程執行結束,其餘的表示協程出錯了。其中協程掛起又分為四種不同的情況:即等待I/O、新建thread、coroutine.resume()coroutine.yield()。根據不同的情況,決定是跳到迴圈前面繼續恢復下一個協程,還是返回上層函式。

下圖是協程排程主要邏輯的示意圖,可以看到在Lua程式碼域中無論是新建、掛起或恢復協程,都是先呼叫lua_yield()回到主執行緒。I/O操作例如ngx.tcp.receive()如果碰到了I/O等待,會在內部註冊epoll事件(對於sleep的情況是定時器),然後自動lua_yield(),當事件觸發時繼續未完成的I/O操作,完成之後再呼叫run_thread()恢復之前被掛起的協程。

異常保護

作為一個排程器,OpenResty扮演者類似作業系統核心的角色,不過它的排程物件是Lua協程。作為一個“核心”,無論其排程物件出了什麼問題,都不應該使這個系統崩潰,而是應該將錯誤資訊打印出來。

Openresty內部就做了一個這樣的異常保護,其原理就是用setjmplongjmp包住了run_thread()裡面的整個協程排程邏輯。

/* 首先註冊虛擬機器的panic回撥 */
lua_atpanic(L, ngx_stream_lua_atpanic);
/* setjmp儲存環境 */
NGX_LUA_EXCEPTION_TRY {
/* 執行排程邏輯 */
} NGX_LUA_EXCEPTION_CATCH {
/* 出現異常時走到這裡 */
dd("nginx execution restored");
}

ngx_stream_lua_atpanic()的實現也非常簡單,只是簡單地列印崩潰日誌,然後呼叫NGX_LUA_EXCEPTION_THROW(1);恢復nginx的執行。

int
ngx_stream_lua_atpanic(lua_State *L)
{
#ifdef NGX_LUA_ABORT_AT_PANIC
abort();
#else
u_char *s = NULL;
size_t len = 0; if (lua_type(L, -1) == LUA_TSTRING) {
s = (u_char *) lua_tolstring(L, -1, &len);
} if (s == NULL) {
s = (u_char *) "unknown reason";
len = sizeof("unknown reason") - 1;
} ngx_log_stderr(0, "lua atpanic: Lua VM crashed, reason: %*s", len, s);
ngx_quit = 1; /* restore nginx execution */
NGX_LUA_EXCEPTION_THROW(1); /* impossible to reach here */
#endif
}

這幾個巨集定義分別如下:

#define NGX_LUA_EXCEPTION_TRY                                                \
if (setjmp(ngx_stream_lua_exception) == 0) #define NGX_LUA_EXCEPTION_CATCH \
else #define NGX_LUA_EXCEPTION_THROW(x) \
longjmp(ngx_stream_lua_exception, (x))

協程初始化

鉤子的入口執行緒

ngx_stream_lua_new_thread()用於建立入口執行緒

OR中需要在Registry表中儲存每個創建出來的Lua執行緒的reference,這個儲存協程的表在Registry表中對應的key是全域性變數ngx_stream_lua_coroutines_key的地址,因此下面這段程式碼就是從Registry表中查詢這個儲存協程的表,返回到棧頂:

/* 返回棧頂元素的索引,等於棧中元素的個數 */
base = lua_gettop(L);
/* 將儲存協程的表對應的key壓棧 */
lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
coroutines_key));
/* 將key出棧,獲取Registry表中key對應的元素,然後將結果入棧 */
lua_rawget(L, LUA_REGISTRYINDEX);

接下來建立一個新的協程,同時初始化其全域性表:

/* 建立Lua協程,返回的新lua_State跟原有的lua_State共享所有的全域性物件(如表),
但是有一個獨立的執行棧。 協程依賴垃圾回收銷燬 */
/* L棧: |協程表|新協程|頂| */
co = lua_newthread(L);
/* 建立該協程的全域性表,設定_G field為全域性表自己 */
/* L棧: |協程表|新協程|協程新的全域性表|頂| */
ngx_stream_lua_create_new_globals_table(co, 0, 0);
/* 再建立一個新表 */
/* L棧: |協程表|新協程|協程新的全域性表|新表|頂| */
lua_createtable(co, 0, 1);
/* 拿到全域性表 */
/* L棧: |協程表|新協程|協程新的全域性表|新表|舊全域性表|頂| */
ngx_stream_lua_get_globals_table(co);
/* 新表的__index的值為棧頂的值,也即就全域性表 */
/* L棧: |協程表|新協程|協程新的全域性表|新表|頂| */
lua_setfield(co, -2, "__index");
/* 新表出棧,將其設為索引-2處即協程新的全域性表的元表 */
/* L棧: |協程表|新協程|協程新的全域性表|頂| */
lua_setmetatable(co, -2);
/* 設定協程新的全域性表到對應索引,其_G field是自己,
其元表是新表,新表的__index是父協程的全域性表 */
/* L棧: |協程表|新協程|頂| */
ngx_stream_lua_set_globals_table(co);

這一塊的邏輯有點繞,我們來稍微理一下,其實就是用新建的全域性表替換了舊的全域性表,其中新的全域性表的_G欄位是它自己,新全域性表的元表中__index元方法是舊的全域性表。

此時的Lua虛擬機器棧頂情況如下圖所示:

L->top      |   棧頂    |
L->top - 1 |Lua_State*| 新建立的協程
L->top -2 | Lua Table| 儲存協程引用的表

下面一步就是在Lua虛擬機器中為這個新協程建立一個reference:

/* 為棧頂物件(即新協程),建立並返回一個協程表中的引用 */
/* 當前棧: |協程表|頂| */
*ref = luaL_ref(L, -2);
if (*ref == LUA_NOREF) {
lua_settop(L, base); /* restore main thread stack */
return NULL;
}

最後恢復堆疊

/* 設定棧頂索引 */
/* 當前棧: |頂| */
lua_settop(L, base);
return co;

以上步驟還只是建立了一個什麼都不能做的Lua協程,回到_by_chunk()函式之後還需要把入口函式放入協程中。

/* 將lua虛擬機器VM棧上的入口函式閉包移到新建立的協程棧上,
這樣新協程就有了虛擬機器已經解析完畢的程式碼了。*/
lua_xmove(L, co, 1); /* 拿到co全域性表,放到棧頂 */
/* 當前棧: |入口closure|全域性表|頂| */
ngx_stream_lua_get_globals_table(co);
/* 將全域性表設為入口closure的環境表 */
/* 當前棧: |入口closure|頂|*/
lua_setfenv(co, -2);

至此,協程入口函式以及環境表已經設定好。接下來就是讓它能夠執行起來,讓排程器能夠排程它執行:

/* 將nginx請求儲存到協程全域性表 */
ngx_stream_lua_set_req(co, r); ctx->cur_co_ctx = &ctx->entry_co_ctx;
ctx->cur_co_ctx->co = co;
ctx->cur_co_ctx->co_ref = co_ref;

接下來就是註冊cleanup鉤子,然後ngx_stream_lua_run_thread()

使用者建立的uthread

使用者執行緒由ngx.thread.spawn()建立,對應的C實現是ngx_stream_lua_uthread_spawn()。首先它會調ngx_stream_lua_coroutine_create_helper()建立一個新的協程。

建立協程

注意協程都是在worker的虛擬機器上建立的(不考慮cache off的情況的話)。但是使用者協程會繼承父協程的全域性表,其父子關係由OR進行維護。

/* 獲取虛擬機器 */
vm = ngx_stream_lua_get_lua_vm(r, ctx);
/* 建立新協程 */
co = lua_newthread(vm);
/* 然後建立coctx,設定其co、co_status值 */
coctx = ngx_stream_lua_create_co_ctx;
coctx->co = co;
coctx->co_status = NGX_STREAM_LUA_CO_SUSPENDED;

此時父協程的棧如下:

/* 當前棧: |entry_func|args|頂| */

接下來將父協程的全域性表給新建立的協程:

/* make new coroutine share globals of the parent coroutine.
* NOTE: globals don't have to be separated! */
/* 拷貝父協程的全域性表到棧上 */
/* L棧: |entry_func|args|全域性表|頂| */
ngx_stream_lua_get_globals_table(L);
/* 將全域性表移動到新建立的協程co的棧上 */
/* L棧: |entry_func|args|頂| */
lua_xmove(L, co, 1);
/* 從新協程棧上寫入其的全域性表 */
ngx_stream_lua_set_globals_table(co); /* 將新協程從程序虛擬機器,移動到父協程中 */
/* L棧: |entry_func|args|新協程|頂| */
lua_xmove(vm, L, 1);
/* 入口函式拷貝到L棧頂 */
/* L棧: |entry_func|args|新協程|entry_func|頂|*/
lua_pushvalue(L, 1);
/* 將入口函式從L移到co棧中 */
/* L棧: |entry_func|args|新協程|頂| */
/* co棧: |entry_func|頂|*/
lua_xmove(L, co, 1);

create_helper函式返回之後,L的棧頂是新協程,co的棧頂是入口函式。

初始化uthread

ngx_stream_lua_coroutine_create_helper返回之後,進行uthread的初始化。

此時,父協程L是這樣的:

  • 棧頂是新建立的協程
  • 然後是引數和入口函式

在此之前,先在registry表中儲存一個該協程的ref。(到現在還沒搞明白這個ref是幹嘛用的?除了建立執行緒和刪除執行緒,貌似只有檢查執行緒是否活著的時候會查一下這個ref,只是檢查狀態用coctx->co_status不是也能做到麼?8.12更新,之所以要把執行緒錨定到登錄檔上,是為了防止被當成垃圾回收。這也解釋了為什麼只有執行緒需要錨定到登錄檔上,而使用者協程不需要。因為使用者協程肯定由其父協程保留著一個引用。)

/* anchor the newly created coroutine into the Lua registry */
/* 把新建立的協程寫入Lua registry表中 */
/* 將ngx_stream_lua_coroutines_key的地址壓入棧中 */
lua_pushlightuserdata(L, &ngx_stream_lua_coroutines_key);
/* 從registry表中獲取協程表 */
/* L棧: |entry_func|args|新協程|協程表|頂|*/
lua_rawget(L, LUA_REGISTRYINDEX);
/* 將新協程壓棧 */
/* L棧: |entry_func|args|新協程|協程表|新協程|頂|*/
lua_pushvalue(L, -2);
/* -2位置是登錄檔,為新協程建立在報表中的索引 */
/* L棧: |entry_func|args|新協程|協程表|頂| */
coctx->co_ref = luaL_ref(L, -2); //
/* 彈出協程表 */
/* L棧: |entry_func|args|新協程|頂| */
lua_pop(L, 1);

接下來是初始化執行環境:

此時的,L的棧情況如下:

     |entry_func|引數1|...|引數n|新協程|
1 2 ... -2 -1
if (n > 1) {
/* 由於lua函式壓棧順序是從左到右
* 因此1就是壓入的第一個引數,而spawn的第一個引數就是入口函式
* 把棧頂元素(即新協程)移動到1,覆蓋入口函式,入口函式前面已經拷貝到新協程棧上了
*/
/* L棧: |新協程|args|頂| */
lua_replace(L, 1);
/* 將引數移到新協程棧中 */
/* L棧: |新協程|頂|*/
/* co棧: |入口函式|args|頂| */
lua_xmove(L, coctx->co, n - 1);
}

設定狀態,將父協程放入post_thread佇列中,設定協程的父子關係,設定新協程為下一個排程的執行緒

/* 設定狀態 */
coctx->co_status = NGX_STREAM_LUA_CO_RUNNING;
ctx->co_op = NGX_STREAM_LUA_USER_THREAD_RESUME;
ctx->cur_co_ctx->thread_spawn_yielded = 1; /* 將父協程放入post_thread佇列中 */
ngx_stream_lua_post_thread(r, ctx, ctx->cur_co_ctx) /* 儲存子執行緒的父協程上下文為當前協程 */
coctx->parent_co_ctx = ctx->cur_co_ctx;
/* 切換當前協程為新建立的協程 */
ctx->cur_co_ctx = coctx;

最後,spawn函式的返回值是新建立的協程

/* 將原協程的執行權切換出去,這裡的引數1表示棧上留了一個值,這裡是指新建立的協程
* 主執行緒並不會取這個值,而是等到新執行緒spawn返回時作為返回值。
* 此時L棧中是新協程,co棧中是引數和入口函式。
*/
return lua_yield(L, 1);

使用者建立的coroutine

OR替換了原生的coroutine介面,當存在getfenv(0).__ngx_req時(全域性環境儲存了nginx請求),使用重寫後的coroutine介面函式。

coroutine.create()建立新協程部分跟uthread是一樣的,都是呼叫ngx_stream_lua_coroutine_create_helper()。Lua函式返回新協程。此時新協程棧中是入口函式。

coroutine.resume()用於開始或恢復新協程,其對應的C函式是ngx_http_lua_coroutine_resume()

/* 首先,獲取到協程 */
/* L棧: |co|引數|, co棧: |入口函式| */
co = lua_tothread(L, 1); /* 然後設定狀態和父子關係 */
/* 父協程為normal */
p_coctx->co_status = NGX_HTTP_LUA_CO_NORMAL; coctx->parent_co_ctx = p_coctx; dd("set coroutine to running");
/* 子協程為running */
coctx->co_status = NGX_HTTP_LUA_CO_RUNNING; /* 設定co_op告知主執行緒yield型別 */
ctx->co_op = NGX_HTTP_LUA_USER_CORO_RESUME;
/* 設定下一個排程協程為新協程 */
ctx->cur_co_ctx = coctx;

接下來,將控制權交還給主協程,並把引數傳給主執行緒。

/* 此時L棧: |co|引數|, co棧: |入口函式| */
/* lua_gettop(L) - 1表示留在棧中的返回值個數,
* 由主執行緒取用之後,在lua_resume新協程時傳遞 */
/* 減一個,表示不傳底下的co */
return lua_yield(L, lua_gettop(L) - 1);

協程執行和恢復

OR中協程的執行和恢復總是由主執行緒來進行,不管是coroutine.resume()還是ngx.thread.spawn(),都是先lua_yield()回到主執行緒之後,在主執行緒中lua_resume()

注意到前面建立階段,thread是lua_yield(L, 1),coroutine是lua_yield(L, lua_gettop(L) - 1)。yield到主執行緒之後,我們繼續看排程程式的處理。

uthread

先獲取引數個數

/* 因為入口函式和引數已經在新執行緒棧中了,所以從新協程中獲取引數個數,-1是除掉入口函式 */
nrets = lua_gettop(ctx->cur_co_ctx->co) - 1;

然後跳到主迴圈的前面,執行新執行緒

/* 儲存新協程coctx */
orig_coctx = ctx->cur_co_ctx;
/* 執行新執行緒,其中nrets為引數個數 */
rv = lua_resume(orig_coctx->co, nrets);

lua_resume中就會開始新執行緒的執行。當新執行緒執行完畢或因I/O中斷yield之後,會恢復父協程。在恢復父協程之前,先設定引數個數為1,即之前留在棧上的新協程co。恢復父協程之後,ngx.thread.spawn()函式就返回了。

if (ctx->cur_co_ctx->thread_spawn_yielded) {
ctx->cur_co_ctx->thread_spawn_yielded = 0;
nrets = 1;
}

coroutine

同樣是先獲取引數個數

/* 獲取父協程 */
old_co = ctx->cur_co_ctx->parent_co_ctx->co;
/* 因為引數還在父協程棧中,所以從父協程棧中獲取引數個數 */
nrets = lua_gettop(old_co);
if (nrets) {
/* 將引數從父協程移到子協程 */
lua_xmove(old_co, ctx->cur_co_ctx->co, nrets);
}

此時子協程棧中是引數和入口函式。

然後跳到主迴圈的前面,執行新協程,跟前面uthread時一樣。

協程掛起

協程的掛起分為兩種情況:

  • 一種是內部在I/O等待時自動掛起,這種情況使用者不用參與,OR會自動將相應的事件及其handler掛到事件驅動上,當事件被喚醒時繼續未完成的I/O操作,完成之後由排程器恢復之前掛起的協程。
  • 另一種是使用者在Lua程式碼主動呼叫coroutine.yield()掛起。此時由排程器根據情況決定執行下一個執行的協程。

顯式主動掛起

我們先來看使用者主動掛起的情況,coroutine.yield()對應的C函式為ngx_stream_lua_coroutine_yield()。我們先來看看它裡面幹了些什麼。

/* 首先修改當前協程的狀態為掛起 */
coctx = ctx->cur_co_ctx;
coctx->co_status = NGX_STREAM_LUA_CO_SUSPENDED;
/* 設定co_op */
ctx->co_op = NGX_STREAM_LUA_USER_CORO_YIELD; /* 如果不是使用者執行緒(也即是普通coroutine),且有父協程,
將其父協程狀態設定為running */
if (!coctx->is_uthread && coctx->parent_co_ctx) {
coctx->parent_co_ctx->co_status = NGX_STREAM_LUA_CO_RUNNING;
} /* 最後將控制權交還主執行緒,將所有yield引數傳遞給主執行緒 */
return lua_yield(L, lua_gettop(L));

回到主執行緒之後,根據待掛起協程是thread還是corotine進行不同處理。

thread

if (ngx_stream_lua_is_thread(ctx)) {
/* 丟棄coroutine.yield()的任何引數 */
lua_settop(ctx->cur_co_ctx->co, 0);
/* 因為thread由排程器負責排程,所以將當前執行緒的狀態改為running,為什麼不在前面一起改?*/
ctx->cur_co_ctx->co_status = NGX_STREAM_LUA_CO_RUNNING;
/* 如果已經有pending的執行緒,則放到佇列中 */
if (ctx->posted_threads) {
ngx_stream_lua_post_thread(r, ctx, ctx->cur_co_ctx);
ctx->cur_co_ctx = NULL;
return NULL;
}
/* 否則,立即恢復執行緒 */
}

coroutine

/* 獲取當前棧的高度,也即coroutine.yield()的引數個數 */
nrets = lua_gettop(ctx->cur_co_ctx->co);
/* 設定父協程為下一個排程的協程 */
next_coctx = ctx->cur_co_ctx->parent_co_ctx;
next_co = next_coctx->co;
/* 將引數從子協程棧中移到父協程棧中 */
if (nrets) {
dd("moving %d return values to next co", nrets);
lua_xmove(ctx->cur_co_ctx->co, next_co, nrets);
#ifdef NGX_LUA_USE_ASSERT
ctx->cur_co_ctx->co_top -= nrets;
#endif }
/* 如果不是wrap封裝的,還要加一個true,作為第一個引數 */
if (!ctx->cur_co_ctx->is_wrap) {
/* prepare return values for coroutine.resume
* (true plus any retvals)
*/
lua_pushboolean(next_co, 1);
/* 插入1的位置,作為第一個引數 */
lua_insert(next_co, 1);
nrets++; /* add the true boolean value */
} ctx->cur_co_ctx = next_coctx;
/* 回到主迴圈的前面,resume父協程 */
break;

I/O等待場景

I/O等待的場景有很多,不過其背後的原理都差不多:

  • 定義一個事件,設定恢復時的handler及對應協程上下文,然後lua_yield()回到run_thread()
  • 主執行緒將ctx->cur_co_ctx設為空之後,直接返回NGX_AGAIN,如果有posted_thread會繼續執行,否則將控制權交還給nginx層
  • 後續當事件發生時,繼續未完成的操作,完成之後將儲存的協程上下文設為ctx->cur_co_ctx,然後呼叫ngx_stream_lua_run_thread()恢復協程的執行。

這裡舉兩個典型的例子:

ngx.sleep()

它的C函式實現是ngx_stream_lua_ngx_sleep(),先定義設定好handler和coctx,掛上定時器,然後lua_yield()

    ngx_stream_lua_cleanup_pending_operation(coctx);
coctx->cleanup = ngx_stream_lua_sleep_cleanup;
coctx->data = r; /* 儲存恢復時的handler和協程上下文 */
coctx->sleep.handler = ngx_stream_lua_sleep_handler;
coctx->sleep.data = coctx;
coctx->sleep.log = r->connection->log; /* 當delay為0時,放入post_event佇列或新增定時器 */
if (delay == 0) {
#ifdef HAVE_POSTED_DELAYED_EVENTS_PATCH
dd("posting 0 sec sleep event to head of delayed queue"); coctx->sleep.delayed = 1;
ngx_post_event(&coctx->sleep, &ngx_posted_delayed_events);
#else
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "ngx.sleep(0)"
" called without delayed events patch, this will"
" hurt performance");
ngx_add_timer(&coctx->sleep, (ngx_msec_t) delay);
#endif } else { /* 新增定時器 */
ngx_add_timer(&coctx->sleep, (ngx_msec_t) delay);
}
/* 外層函式*/
return lua_yield(L, 0);

run_thread()裡將當前協程上下文置為NULL,然後返回NGX_AGAIN

by_chunk()裡會先檢查有沒有在post佇列裡的執行緒,如果沒有則返回

    rc = ngx_stream_lua_run_thread(L, r, ctx, 0);

    if (rc == NGX_ERROR || rc >= NGX_OK) {
/* do nothing */ } else if (rc == NGX_AGAIN) {
rc = ngx_stream_lua_content_run_posted_threads(L, r, ctx, 0); } else if (rc == NGX_DONE) { /* 這裡DONE的情況只有HTTP子請求的時候會出現 */
rc = ngx_stream_lua_content_run_posted_threads(L, r, ctx, 1); } else {
rc = NGX_OK;
}

當定時器超時時,它會執行sleep_handler(),設定ctx->cur_co_ctx然後執行run_thread()恢復協程排程。

ngx.tcp.receive()

其對應的C函式實現是ngx_stream_lua_socket_tcp_receive(),裡面會調ngx_stream_lua_socket_tcp_receive_helper()。碰到讀等待的情況,也是先設定好handler和coctx,然後lua_yield()。我們來看下里面程式碼:

    /* 這裡0表示還未進行協程切換 */
u->read_waiting = 0;
u->read_co_ctx = NULL; /* 讀取的主要邏輯由此函式處理 */
rc = ngx_stream_lua_socket_tcp_read(r, u);
/* 不管是成功、出錯或等待I/O,肯定會返回 */
if(rc == NGX_ERROR) {
/*...*/
}
if(rc == NGX_OK) {
/*...*/
} /* rc == NGX_AGAIN */
/* 如果是等待I/O的情況,設定事件觸發時的handler、當前協程上下文 */
u->read_event_handler = ngx_stream_lua_socket_read_handler;
coctx = lctx->cur_co_ctx; /* 設定請求的寫事件handler,這個是返回到Lua層前呼叫的handler */
r->write_event_handler = ngx_stream_lua_content_wev_handler; /* 儲存當前協程上下文到u上 */
u->read_co_ctx = coctx;
/* 表示是後續是需要協程恢復的 */
u->read_waiting = 1;
/* 設定準備返回值的回撥 */
u->read_prepare_retvals = ngx_stream_lua_socket_tcp_receive_retval_handler; return lua_yield(L, 0);

回到run_thread(),同樣是將當前協程上下文置為NULL,然後返回NGX_AGAIN

當事件被觸發時,執行前面設定的ngx_stream_lua_socket_read_handler(),裡面又會呼叫讀取操作核心函式ngx_stream_lua_socket_tcp_read()。如果繼續碰到等待I/O,handler直接結束,等待下一次事件。如果是完成或出錯,會執行如下操作:

/* 恢復該值為0 */
u->read_waiting = 0;
/* 獲取協程上下文 */
coctx = u->read_co_ctx; /* 設定協程恢復的handler */
ctx->resume_handler = ngx_stream_lua_socket_tcp_read_resume;
/* 設定下一個排程的上下文,為之前呼叫讀取操作的協程 */
ctx->cur_co_ctx = coctx; /* 這個handler就是yield之前設定的那個,它裡面呼叫 ctx->resume_handler */
r->write_event_handler(r);

r->write_event_handler(r);是返回Lua層前呼叫的handler,裡面會呼叫resume_handlerngx_stream_lua_socket_tcp_read_resume()只是封裝了一下,最終都是呼叫的ngx_stream_lua_socket_tcp_resume_helper(),我們看來下它的程式碼:

/* 待恢復協程上下文 */
coctx = ctx->cur_co_ctx; u = coctx->data;
prepare_retvals = u->read_prepare_retvals;
/* 準備返回值 */
nret = prepare_retvals(r, u, ctx->cur_co_ctx->co); /* 恢復協程排程,回到Lua層 */
rc = ngx_stream_lua_run_thread(vm, r, ctx, nret);

至於完成的條件,取決與不同的呼叫方式。如果是讀取固定位元組數的話,會維護一個剩餘待讀取的位元組數u->rest。如果是讀取一行,則讀取到\n就結束。如果是readall,則一直讀到u->eof為止。

協程執行完畢

為了不失完整性,再說一下正常結束和出錯時的情況。正常執行完畢時,會設定協程狀態,然後清理它的殭屍子執行緒:

/* 將當前協程狀態置為DEAD */
ctx->cur_co_ctx->co_status = NGX_STREAM_LUA_CO_DEAD;
/* 如果子執行緒有殭屍執行緒,則清理之 */
if (ctx->cur_co_ctx->zombie_child_threads) {
ngx_stream_lua_cleanup_zombie_child_uthreads(
r, L, ctx, ctx->cur_co_ctx);
}

接下來,根據結束的協程的型別不同執行不同的操作:

入口執行緒

此時直接刪除執行緒即可,然後根據是否還有使用者執行緒,選擇返回NGX_AGAINNGX_OK

if (ngx_stream_lua_is_entry_thread(ctx)) {
/* 將虛擬機器棧清空 */
lua_settop(L, 0);
/* 刪除當前執行緒,會從REGISTY表中解引用當前協程的`coctx->co_ref` */
ngx_stream_lua_del_thread(r, L, ctx, ctx->cur_co_ctx); /* 如果還有其他使用者執行緒,返回NGX_AGAIN */
if (ctx->uthreads) {
ctx->cur_co_ctx = NULL;
return NGX_AGAIN;
} /* all user threads terminated already */
goto done; /* 到這就圓滿結束了 return NGX_OK; */
}

使用者執行緒

此時如果父協程已經死了,處理方式跟入口執行緒一樣,即刪除執行緒,然後根據是否還有任何使用者執行緒或入口執行緒,選擇返回NGX_AGAINNGX_OK

如果父協程還活著,並且已經在wait它了,直接恢復父協程。否則,加入到父協程的殭屍執行緒列表中。

if (ctx->cur_co_ctx->is_uthread) {
/* 清空虛擬機器棧 */
lua_settop(L, 0);
/* 獲取父協程 */
parent_coctx = ctx->cur_co_ctx->parent_co_ctx;
/* 如果父協程還活著 */
if (ngx_stream_lua_coroutine_alive(parent_coctx)) {
/* 並且在wait當前執行緒,則恢復父協程 */
if (ctx->cur_co_ctx->waited_by_parent) {
ngx_stream_lua_probe_info("parent already waiting");
ctx->cur_co_ctx->waited_by_parent = 0;
success = 1;
goto user_co_done;
} /* 否則將當前執行緒掛到父協程的殭屍子執行緒中 */
if (ngx_stream_lua_post_zombie_thread(r, parent_coctx,
ctx->cur_co_ctx)
!= NGX_OK)
{
return NGX_ERROR;
}
/* 壓入第一個返回值true,以備後續wait時返回 */
lua_pushboolean(ctx->cur_co_ctx->co, 1);
lua_insert(ctx->cur_co_ctx->co, 1);
/* 設定當前執行緒狀態為ZOMBIE */
ctx->cur_co_ctx->co_status = NGX_STREAM_LUA_CO_ZOMBIE;
ctx->cur_co_ctx = NULL;
return NGX_AGAIN; /* 返回上層 */
}
/* 如果父協程已經死了,直接刪除當前執行緒
* 會從REGISTY表中解引用當前協程的`coctx->co_ref` */
ngx_stream_lua_del_thread(r, L, ctx, ctx->cur_co_ctx);
ctx->uthreads--;
/* 如果沒有使用者執行緒了 */
if (ctx->uthreads == 0) {
/* 入口執行緒在活著,返回上層 */
if (ngx_stream_lua_entry_thread_alive(ctx)) {
ctx->cur_co_ctx = NULL;
return NGX_AGAIN;
} /* all threads terminated already */
goto done; /* 到這就圓滿結束了 return NGX_OK; */
} /* 如果還有其他使用者執行緒,返回上層 */
ctx->cur_co_ctx = NULL;
return NGX_AGAIN;
}

使用者協程

剩下的就是使用者協程的情況,這個情況跟使用者執行緒被父協程wait的情況是一樣的。主要是將返回值移動到父協程棧中,然後跳到主迴圈前面恢復父協程的執行。

success = 1;
/* 獲取返回值個數 */
nrets = lua_gettop(ctx->cur_co_ctx->co);
next_coctx = ctx->cur_co_ctx->parent_co_ctx;
next_co = next_coctx->co;
/* 將返回值移到父協程棧中 */
if (nrets) {
lua_xmove(ctx->cur_co_ctx->co, next_co, nrets);
}
/* 如果是使用者執行緒,刪除之 */
if (ctx->cur_co_ctx->is_uthread) {
ngx_stream_lua_del_thread(r, L, ctx, ctx->cur_co_ctx);
ctx->uthreads--;
}
/* 除了wrap的使用者協程,加上第一個true的返回值 */
if (!ctx->cur_co_ctx->is_wrap) {
/* ended successfully, coroutine.resume returns true plus
* any return values
*/
lua_pushboolean(next_co, success);
lua_insert(next_co, 1);
nrets++;
} /* 設定父協程的狀態為RUNNING */
ctx->cur_co_ctx = next_coctx;
next_coctx->co_status = NGX_STREAM_LUA_CO_RUNNING;
/* 回到主迴圈前面,恢復父協程的執行 */
continue;

出錯的情況

大致處理步驟是,恢復cur_co_ctx,獲取虛擬機器L棧上錯誤資訊,獲取當前協程棧中錯誤資訊,後面的操作類似協程執行完畢時,根據不同的情況選擇恢復父協程或者返回上層。

/* 恢復cur_co_ctx */
if (ctx->cur_co_ctx != orig_coctx) {
ctx->cur_co_ctx = orig_coctx;
}
/* 設定當前協程狀態為DEAD */
ctx->cur_co_ctx->co_status = NGX_HTTP_LUA_CO_DEAD;
/* 獲取錯誤資訊 */
if (orig_coctx->is_uthread
|| orig_coctx->is_wrap
|| ngx_http_lua_is_entry_thread(ctx))
{
ngx_http_lua_thread_traceback(L, orig_coctx->co, orig_coctx);
trace = lua_tostring(L, -1); if (lua_isstring(orig_coctx->co, -1)) {
msg = lua_tostring(orig_coctx->co, -1);
dd("user custom error msg: %s", msg); } else {
msg = "unknown reason";
}
}

使用者執行緒

跟正常結束的處理一樣,除了第一個返回值是false。

此時如果父協程已經死了,直接刪除執行緒,然後根據是否還有任何使用者執行緒或入口執行緒,選擇返回NGX_AGAINNGX_OK

如果父協程還活著,並且已經在wait它了,直接恢復父協程。否則,加入到父協程的殭屍執行緒列表中。

入口執行緒

ngx_stream_lua_request_cleanup()清理當前請求,裡面會清理掉所有的使用者建立的協程,然後清理入口協程自己。最後返回錯誤碼。

使用者協程

如果是wrap的協程,將錯誤傳遞給父協程(就好像是父協程出錯了,然後父協程重新走一遍上面的出錯處理流程)。

如果是普通協程,則恢復父協程的執行,返回false和錯誤資訊。

參考資料

本部落格已經遷移至CatBro's Blog,那裡是我自己搭建的個人部落格,頁面效果比這邊更好,支援站內搜尋,評論回覆還支援郵件提醒,歡迎關注。這邊只會在有時間的時候不定期搬運一下。

本篇文章連結