1. 程式人生 > >libco協程庫原理解析與應用

libco協程庫原理解析與應用

       最近在準備一個libco協程庫原理簡析與應用的分享,順便就整理下寫個部落格,一方面加深下自己對協程庫的理解,另一方面也希望能對想了解協程的學者有所幫助。廢話不多說了,言歸正傳吧。

        想去剖析libco協程庫的實現原理,首先我們要了解下什麼是協程。維基百科上給協程的定義是協程(coroutine)又稱微執行緒,是一個無優先順序的子程式(函式)排程元件,允許子程式在特定的地方掛起和恢復。我們可通過一個例子來理解下這句話的意思。函式呼叫大家都很熟悉吧,下面的這個是一個函式呼叫的例子。

    

    我們可以很容易的看出函式執行的結果是1 2 3 x y z。而協程的定義說是函式排程元件,但不同的是允許子程式在特定的地方掛起和恢復。也就是呼叫A函式時可以執行到一半然後掛起去執行B函式,之後可以從A函式掛起的地方繼續執行,例如下面的例子:

    

    這個程式執行的結果是1 2 x 3 y z。從這個例子中我們可以對協程的定義有個很清晰的認識了。

     瞭解了協程的定義,那麼我們為什麼需要協程呢,協程有什麼的作用呢,接下來我們來看下協程的由來

       起初人們喜歡同步程式設計,然後發現有一堆執行緒因為I/O卡在那裡,併發上不去,資源嚴重浪費。 網路伺服器模型一般為while(1){accept(); read(); do(); send()}。

        然後出了非同步(select,epoll,kqueue,etc),將I/O操作交給核心執行緒,自己註冊一個回撥函式處理最終結果。但由於這種方式回撥函式的應用,專案大了之後回撥函式的巢狀會層出不窮, 程式碼結構變得不清晰,例如下面這個例子:

    

        於是發明了協程,寫同步的程式碼,享受著非同步帶來的效能優勢。

          就實際使用理解來講,協程允許我們寫同步程式碼的邏輯,卻做著非同步的事,避免了回撥巢狀,使得程式碼邏輯清晰。協程是追求極限效能和優美的程式碼結構的產物。例如下面這個例子:

     

        那麼接下來我們就來具體分析下libco協程庫的實現吧

        libco是騰訊開源的一個協程庫,主要應用於微信後臺RPC框架。早期微信後臺因為業務需求複雜多變、產品要求快速迭代等需求,大部分模組都採用了半同步半非同步模型。接入層為非同步模型,業務邏輯層則是同步的多程序或多執行緒模型,業務邏輯的併發能力只有幾十到幾百。隨著微信業務的增長,系統規模變得越來越龐大,每個模組很容易受到後端服務/網路抖動的影響。於是微信就自己開發了一個libco協程庫來處理同步的rpc呼叫。

        首先我們看下libco的框架:


        我們看到libco框架分為三層,底層仍然用現在使用比較廣泛的i/o多路複用模型來實現非同步i/o。中間層是對系統函式的hook層,主要是將阻塞的系統i/o呼叫(如read,write)改為非同步的呼叫。最上層是使用者介面層,也是我們使用libco庫最直接接觸的一層。該層實現了協程原語(協程建立,執行,排程,等),且實現了一套協程間通訊的訊號量。libco實現協程的核心有兩點, 一點是對協程上下文的切換,另一點就是對同步介面的非同步化。接下來我們主要從這兩點來解析libco的實現。

        首先我們看下協程上下文切換的實現,從下面這個例子我們可以看出co_create,co_resume,co_yield_ct完成了協程的建立,啟動和掛起操作。那我們看下co_resume和co_yield內部都實現了什麼.

        

            

        我們看到這兩個函式都呼叫了coctx_swap()這個函式,這個函式就是libco協程上下文切換的實現。我們看下這個函式的原型:

        extern void coctx_swap( coctx_t *,coctx_t* ) asm("coctx_swap");、

        可以看出coctx_swap實際上是呼叫的是彙編函式,該函式有兩個型別為coctx_t的引數,分別代表掛起和恢復的協程,我們看下coctx_t這個結構體的實現:

           struct coctx_t{
              void *regs[14];
               size_t ss_size;
              char *ss_sp;

            };

        我們知道協程是使用者級執行緒,其共享同一套暫存器,所以當要掛起該協程時要把該協程的暫存器資訊儲存起來,regs就是用來儲存暫存器資訊的陣列。而ss_sp則是指向協程的棧幀資訊,libco為每個協程在堆上分配了128k的空間作為該協程的棧幀。那麼進行協程的切換其實是做了三件事,一是儲存掛起協程的暫存器資訊,二是恢復啟動協程的暫存器資訊,三是跳轉到啟動協程的返回地址繼續執行。

       我們來看下coctx_swap的彙編實現:

        

        在分析這個彙編函式前,我們先來了解下暫存器的一些知識。%rsp暫存器是棧頂暫存器,通過移動它可以指定棧資訊,對某些棧空間進行操作。%rax是返回地址暫存器,如在函式呼叫時儲存函式返回後繼續要執行的地址。%rdi,%rsi則分別儲存函式呼叫時的引數1和引數2,例如在coctx_swap這個函式中,%rdi,%rsi則分別儲存掛起和恢復的協程上下文資訊。其他則是一些引數和資料暫存器。知道上面這幾個暫存器的作用,就好分析coctx_swap這個彙編函數了。那麼接下來我們具體分析下。

        leaq 8(%rsp), %rax是將rsp的上一個地址儲存到%rax上,其實也就是把當前的%rsp地址先給儲存到%rax上,因為後面會通過移動%rsp來把暫存器資訊儲存到regs陣列中,這裡要說明下%rsp是棧頂暫存器,通過移動它可以指定棧幀地址。leaq 112(%rdi),%rsp試講%rsp指向了掛起協程的regs[13]處然後再該地址處壓入個暫存器資訊,即將暫存器資訊存入regs[13]中。這就是第一部分,儲存掛起協程的暫存器資訊。

        movq %rsi, %rsp則是將%rsp執行恢復協程的regs地址處,然後將regs陣列中儲存的資訊恢復到相應暫存器中。pushq %rax將返回地址入棧。這部分就是恢復要啟動的攜程的暫存器資訊。

           xorl %eax, %eax ret是子程式返回指令,彈出棧頂資料開始執行,而此時棧頂是上一步中push的返回地址,所以挑戰到了要啟動的協程上下文開始執行。整個上下文切換過程就是這麼處理的。

            接下來我們來分析下libco系統呼叫hook層的實現。這層主要是對rpc同步系統呼叫在不改變編碼風格的前提下非同步化。該層的原始碼實現在co_hook_sys_call.cpp中,通過dlsym函式對阻塞的系統呼叫加個鉤子,改造成非同步的呼叫。

            對rpc同步系統呼叫非同步化的大概步驟如下:

   1.通過fcntl將阻塞的檔案描述符設定為非阻塞

    2.通過poll函式向核心註冊I/O和超時事件並掛起協程,待I/O或超時事件觸發則恢復協程繼續進行非同步I/O操作(因為此時檔案描述符已設為非阻塞),這步即是將同步+超時的rpc呼叫改為非同步+超時的rpc呼叫。

    3.返回結果

    接下來我們具體分析下hook的原始碼:

     在co_hook_sys_call.cpp檔案中,為每個檔案描述符分配了一個rpchook_t結構的例項:

struct rpchook_t
{
int user_flag;
struct sockaddr_in dest; //maybe sockaddr_un;
int domain; //AF_LOCAL , AF_INET


struct timeval read_timeout;
struct timeval write_timeout;
};

static rpchook_t *g_rpchook_socket_fd[ 102400 ] = { 0 };

在rpchook_t結構體找那個user_flag代表使用者設定的檔案描述符是阻塞的還是非阻塞的標記,read_timeout, write_timeout分別表示該檔案描述符的讀寫超時時間預設設定是1秒,也可通過setsockopts設定讀寫超時時間。我們接下來通過read函式來詳細講述下阻塞的介面如何非同步化的。read的原始碼如下:

ssize_t read( int fd, void *buf, size_t nbyte )

{

        //未使用hook函式或是非阻塞呼叫或未分配rpchost_t結構的檔案描述符都呼叫系統呼叫

HOOK_SYS_FUNC( read );

if( !co_is_enable_sys_hook() )

{

return g_sys_read_func( fd,buf,nbyte );
}
rpchook_t *lp = get_by_fd( fd );


if( !lp || ( O_NONBLOCK & lp->user_flag ) ) 
{
ssize_t ret = g_sys_read_func( fd,buf,nbyte );
return ret;

}

        //同步的rpc呼叫向核心註冊該fd的讀事件和超時事件,若事件為發生掛起該協程處理其他協程,待事件到達後恢復協程

int timeout = ( lp->read_timeout.tv_sec * 1000 ) 
+ ( lp->read_timeout.tv_usec / 1000 );
struct pollfd pf = { 0 };
pf.fd = fd;
pf.events = ( POLLIN | POLLERR | POLLHUP );
int pollret = poll( &pf,1,timeout );

        //fd可讀或超時事件發生協程恢復進行讀操作,此時雖然呼叫的是系統為hook的呼叫但fd已設定為非阻塞模式,所以這是

        //一個非同步讀操作

ssize_t readret = g_sys_read_func( fd,(char*)buf ,nbyte );

if( readret < 0 )

{

                //fd不可讀,相當於rpc同步時的等待超時,只是在這裡是非同步超時因為執行緒並沒有掛起而是協程掛起去處理其他協程

co_log_err("CO_ERR: read fd %d ret %ld errno %d poll ret %d timeout %d",
fd,readret,errno,pollret,timeout);
}

return readret;

}

        其他的阻塞操作如connect, write等大概也是通過poll向核心註冊i/o和超時事件並掛起協程,等待事件的發生來完成非同步化的,在這裡就不一一介紹了。

        libco的兩個核心內容,即上下文切換和hook層同步呼叫非同步化基本上也就講完了,詳細的程式碼實現還要各位再好好研讀下,這裡就不多介紹原始碼了。