1. 程式人生 > >【技術乾貨】淺析State-Thread

【技術乾貨】淺析State-Thread

State-Thread(以下簡稱st),是一個由C語言編寫的小巧、簡潔卻高效的開源協程庫。這個庫基於單執行緒運作、不強制佔用使用者執行緒,給予了開發者最大程度的輕量級和較低的侵入性。本篇文章中,網易雲信音視訊研發大神將為大家簡要分析State-Thread,歡迎大家積極留言,和我們共同討論。

在開始這個話題之前,我們先來聊一聊協程。

什麼是協程?

協程是一種程式元件。通常我們把協程理解為是一種程式自己實現排程、用於提高執行效率、降低開發複雜度的東西。提高執行效率很好理解,因為在程式層自己完成了部分的排程,降低了對系統排程的依賴,減少了大量的中斷和換頁操作。而降低了開發複雜度,則是指對於開發者而言,可以使用同步的方式去進行程式碼開發(不需要考慮非同步模型的諸多回調),也不需要考慮多執行緒模型的執行緒排程和諸多的臨界資源問題。

很多語言都擁有協程,例如python或者golang。而對於c/c++而言,通常實現協程的常見方式,通常是依賴於glibc提供的setjump&longjump或者基於組合語言,當然還有基於語義實現(protothread)。linux上使用協程庫的方式,通常也會分為替換函式和更為暴力的替換so來實現。當然而各種方式有各自的優劣。而st選用的組合語言實現setjump&longjump和要求使用者呼叫st_打頭的函式來嵌入程式。所以st具備了跨平臺的能力,以及讓開發者們更開心的“與允許呼叫者自行選擇切換時機”的能力。

st究竟是如何實現了這一切?

首先我們先看看st的整體工作流程

在巨集觀的來看,ST的結構主要分成:

  1. vp_schedule。主要是負責了一個排程的能力。有點類似於linux核心當中的schedule()函式。每次當這個函式被呼叫的時候,都會完成一次執行緒的切換。
  2. 各種Queue。用於儲存各種狀態下等待被排程協程(st_thread)
  3. Timer。用於記錄各種超時和sleep。
  4. poll。用於監聽各種io事件,會根據系統能力不同而進行切換(kqueue、epoll、poll、select)。
  5. st_thread。用於儲存各種協程的資訊。

其中比較重要的是schedule模組和thread模組兩者。這兩者實現了一個完整的協程切換和排程。屬於st的核心。而schedule部分通常是開發者們最需要關心的部分。

接下來我們會深入到程式碼層,看一下具體在這個過程裡做了些什麼

通常對於st而言,所有暴露給使用者的除了init函式,就是一系列的st_xxx函數了。那麼先看看init函式。

int st_init(void)
{
 _st_thread_t *thread;
 if (_st_active_count) {
 /* Already initialized */
 return 0;
 }
 /* We can ignore return value here */
 st_set_eventsys(ST_EVENTSYS_DEFAULT);
 if (_st_io_init() < 0)
 return -1;
 memset(&_st_this_vp, 0, sizeof(_st_vp_t));
 ST_INIT_CLIST(&_ST_RUNQ);
 ST_INIT_CLIST(&_ST_IOQ);
 ST_INIT_CLIST(&_ST_ZOMBIEQ);
 if ((*_st_eventsys->init)() < 0)
 return -1;
 _st_this_vp.pagesize = getpagesize();
 _st_this_vp.last_clock = st_utime();
 /*
 * Create idle thread
 */
 _st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start,
 NULL, 0, 0);
 if (!_st_this_vp.idle_thread)
  return -1;
 _st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD;
 _st_active_count--;
 _ST_DEL_RUNQ(_st_this_vp.idle_thread);
 /*
 * Initialize primordial thread
 */
 thread = (_st_thread_t *) calloc(1, sizeof(_st_thread_t) +
 (ST_KEYS_MAX * sizeof(void *)));
 if (!thread)
 return -1;
 thread->private_data = (void **) (thread + 1);
 thread->state = _ST_ST_RUNNING;
 thread->flags = _ST_FL_PRIMORDIAL;
 _ST_SET_CURRENT_THREAD(thread);
 _st_active_count++;
 return 0;
}

這段函式一共做了3事情,建立了一個idle_thread, 初始化了_ST_RUNQ、_ST_IOQ、

_ST_ZOMBIEQ三個佇列,把當前呼叫者初始化成原始函式(通常st_init會在main裡面呼叫,所以這個原始的thread相當於是主執行緒)。idle_thread函式,其實就是整個IO和定時器相關的本體函數了。st會在每一次_ST_RUNQ執行完成後,呼叫idle_thread來獲取可讀寫的io和定時器。這個我們後續再說。

那麼,st_xxx一般會分成io類和延遲類(sleep)。兩者入口其實是同一個,只不過在io類的會多呼叫一層。我們這裡選擇st_send為代表。

int st_sendmsg(_st_netfd_t *fd, const struct msghdr *msg, int flags,
  st_utime_t timeout)
{
 int n;
 while ((n = sendmsg(fd->osfd, msg, flags)) < 0) {
 if (errno == EINTR)
 continue;
 if (!_IO_NOT_READY_ERROR)
 return -1;
 /* Wait until the socket becomes writable */
 if (st_netfd_poll(fd, POLLOUT, timeout) < 0)
 return -1;
 }
 return n;
}

本質上所有的st函式都是以非同步介面+ st_netfd_poll來實現的。在st_netfd_poll以內,會去呼叫st_poll,而st_poll本質上會呼叫並且切換執行緒。

int st_netfd_poll(_st_netfd_t *fd, int how, st_utime_t timeout)
{
 struct pollfd pd;
 int n;
 pd.fd = fd->osfd;
 pd.events = (short) how;
 pd.revents = 0;
 if ((n = st_poll(&pd, 1, timeout)) < 0)
 return -1;
 if (n == 0) {
 /* Timed out */
 errno = ETIME;
 return -1;
 }
 if (pd.revents & POLLNVAL) {
 errno = EBADF;
 return -1;
 }
 return 0;
}
int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
{
 struct pollfd *pd;
 struct pollfd *epd = pds + npds;
 _st_pollq_t pq;
 _st_thread_t *me = _ST_CURRENT_THREAD();
 int n;
 if (me->flags & _ST_FL_INTERRUPT) {
 me->flags &= ~_ST_FL_INTERRUPT;
 errno = EINTR;
 return -1;
 }
 if ((*_st_eventsys->pollset_add)(pds, npds) < 0)
 return -1;
 pq.pds = pds;
 pq.npds = npds;
 pq.thread = me;
 pq.on_ioq = 1;
 _ST_ADD_IOQ(pq);
 if (timeout != ST_UTIME_NO_TIMEOUT)
 _ST_ADD_SLEEPQ(me, timeout);
 me->state = _ST_ST_IO_WAIT;
 _ST_SWITCH_CONTEXT(me);
 n = 0;
 if (pq.on_ioq) {
 /* If we timed out, the pollq might still be on the ioq. Remove it */
 _ST_DEL_IOQ(pq);
 (*_st_eventsys->pollset_del)(pds, npds);
 } else {
 /* Count the number of ready descriptors */
 for (pd = pds; pd < epd; pd++) {
 if (pd->revents)
 n++;
 }
 }
 if (me->flags & _ST_FL_INTERRUPT) {
 me->flags &= ~_ST_FL_INTERRUPT;
 errno = EINTR;
 return -1;
 }
 return n;
}

那麼到此為止,st_poll中就出現了我們最關心的排程部分了

當一個執行緒進行排程的時候一般都是poll_add(如果是io操作),add_queue, _ST_SWITCH_CONTEXT完成一次排程。根據不同的型別,會add到不同的queue。例如需要超時,則會add到IOQ和SLEEPQ。而_ST_SWITCH_CONTEXT,則是最關鍵的切換執行緒操作了。

_ST_SWITCH_CONTEXT其實是一個巨集,它的本質是呼叫了MD_SETJMP和_st_vp_schedule().

#define _ST_SWITCH_CONTEXT(_thread) \
 ST_BEGIN_MACRO \
 ST_SWITCH_OUT_CB(_thread); \
 if (!MD_SETJMP((_thread)->context)) { \
 _st_vp_schedule(); \
 } \
 ST_DEBUG_ITERATE_THREADS(); \
 ST_SWITCH_IN_CB(_thread);  \
 ST_END_MACRO

這個函式其實就是一個完成的執行緒切換了。在st裡執行緒的切換會使用MD_SETJMP->_st_vp_schedule->MD_LONGJMP。MD_SETJMP和MD_LONGJMP其實就是st使用匯編自己寫的setjmp和longjmp函式(glibc),效果也是幾乎等效的。(因為st本身會做平臺適配,所以我們以x86-64的彙編為例)

#elif defined(__amd64__) || defined(__x86_64__)
/*
 * Internal __jmp_buf layout
 */
#define JB_RBX 0
#define JB_RBP 1
#define JB_R12 2
#define JB_R13 3
#define JB_R14 4
#define JB_R15 5
#define JB_RSP 6
#define JB_PC 7
 .file "md.S"
 .text
 /* _st_md_cxt_save(__jmp_buf env) */
.globl _st_md_cxt_save
 .type _st_md_cxt_save, @function
 .align 16
_st_md_cxt_save:
 /*
 * Save registers.
 */
 movq %rbx, (JB_RBX*8)(%rdi)
 movq %rbp, (JB_RBP*8)(%rdi)
 movq %r12, (JB_R12*8)(%rdi)
 movq %r13, (JB_R13*8)(%rdi)
 movq %r14, (JB_R14*8)(%rdi)
 movq %r15, (JB_R15*8)(%rdi)
 /* Save SP */
 leaq 8(%rsp), %rdx
 movq %rdx, (JB_RSP*8)(%rdi)
 /* Save PC we are returning to */
 movq (%rsp), %rax
 movq %rax, (JB_PC*8)(%rdi)
 xorq %rax, %rax
 ret
 .size _st_md_cxt_save, .-_st_md_cxt_save
/****************************************************************/
 /* _st_md_cxt_restore(__jmp_buf env, int val) */
.globl _st_md_cxt_restore
 .type _st_md_cxt_restore, @function
 .align 16
_st_md_cxt_restore:
 /*
 * Restore registers.
 */
 movq (JB_RBX*8)(%rdi), %rbx
 movq (JB_RBP*8)(%rdi), %rbp
 movq (JB_R12*8)(%rdi), %r12
 movq (JB_R13*8)(%rdi), %r13
 movq (JB_R14*8)(%rdi), %r14
 movq (JB_R15*8)(%rdi), %r15
 /* Set return value */
 test %esi, %esi
 mov $01, %eax
 cmove %eax, %esi
 mov %esi, %eax
 movq (JB_PC*8)(%rdi), %rdx
 movq (JB_RSP*8)(%rdi), %rsp
 /* Jump to saved PC */
 jmpq *%rdx
 .size _st_md_cxt_restore, .-_st_md_cxt_restore
/****************************************************************/

MD_SETJMP的時候,會使用匯編把所有暫存器的資訊保留下來,而MD_LONGJMP則會把所有的暫存器資訊重新加載出來。兩者配合使用的時候,可以完成一次函式間的跳轉。

那麼我們已經看到了MD_SETJMP的呼叫,MD_LONGJMP呼叫在哪兒呢?

讓我們繼續看下去,在最一開始,我們就提及過_st_vp_schedule()這個核心函式。

void _st_vp_schedule(void)
{
 _st_thread_t *thread;
 if (_ST_RUNQ.next != &_ST_RUNQ) {
 /* Pull thread off of the run queue */
 thread = _ST_THREAD_PTR(_ST_RUNQ.next);
 _ST_DEL_RUNQ(thread);
 } else {
 /* If there are no threads to run, switch to the idle thread */
 thread = _st_this_vp.idle_thread;
 }
 ST_ASSERT(thread->state == _ST_ST_RUNNABLE);
 /* Resume the thread */
 thread->state = _ST_ST_RUNNING;
 _ST_RESTORE_CONTEXT(thread);
}

這個函式其實非常簡單,基本工作原理可以認為是執行以下幾步: 1.檢視當前RUNQ是否有可以呼叫的,如果有,則RUNQ pop一個thread。 2. 如果沒有,則執行idle_thread。 3. 呼叫_ST_RESTORE_CONTEXT。

那麼_ST_RESTORE_CONTEXT做了什麼呢?

#define _ST_RESTORE_CONTEXT(_thread) \
 ST_BEGIN_MACRO \
 _ST_SET_CURRENT_THREAD(_thread); \
 MD_LONGJMP((_thread)->context, 1); \
 ST_END_MACRO

簡單來說,_ST_RESTORE_CONTEXT就是呼叫了我們之前所沒有看到的MD_LONGJMP。

所以,我們可以簡單地認為,在攜程需要schedule的時候,會先把自身當前的棧通過MD_SETJMP儲存起來,當執行緒被schedule再次排程出來的時候,則會使用MD_SETJMP來還原棧,完成一次協程切換。

然後我們來看看idle_thread做了什麼。

雖然這個協程名字叫做idle,但是其實做了很多的事情。

void *_st_idle_thread_start(void *arg)
{
 _st_thread_t *me = _ST_CURRENT_THREAD();
 while (_st_active_count > 0) {
 /* Idle vp till I/O is ready or the smallest timeout expired */
 _ST_VP_IDLE();
 /* Check sleep queue for expired threads */
 _st_vp_check_clock();
 me->state = _ST_ST_RUNNABLE;
 _ST_SWITCH_CONTEXT(me);
 }
 /* No more threads */
 exit(0);
 /* NOTREACHED */
 return NULL;
}

總的來說,idle_thread做了兩件事情。1. _ST_VP_IDLE() 2. _st_vp_check_clock()。_st_vp_check_clock很好理解,就是檢查定時器是否超時,如果超時了,則設定超時標記之後,放回RUNQ。而_ST_VP_IDLE,其實就是檢視io是否已經ready了。例如linux的話,則會呼叫epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist,

_st_epoll_data->evtlist_size, timeout)去檢視是否有可響應的io。timeout值會根據當前空閒情況進行變化,通常來說會是一個極小的值。

那麼看到這裡,整體的執行緒排程已經全部走完了。(詳見前面最一開始的流程圖)總體流程總結來說基本上是func() -> st_xxxx() -> AddQ -> MD_SETJMP -> schedule() -> MD_LONG -> func()。

所以對於st而言,所以的排程,是基於使用者呼叫。那麼如果使用者一直不呼叫st_xxx()(例如計算密集性服務),st也就無法進行協程切換,那麼其他協程也就產生極大的阻塞了。這也是為什麼st並不太合適計算密集型的原因(其實單執行緒框架大多都不合適計算密集型)

想要閱讀更多技術乾貨文章,歡迎關注

瞭解網易雲信,來自網易核心架構的通訊與視訊雲服務。

【招聘資訊】

前端主管|前端組長|資深/高階Java|資深ios|資深Android,都有崗位空缺

同時負責多個二級部門招聘 技術|產品|銷售|商務 歡迎郵件聯絡~

郵箱:[email protected]