1. 程式人生 > >Linux內核同步 - RCU synchronize原理分析

Linux內核同步 - RCU synchronize原理分析

ron core rom con 更新 如何 || scn 常常

RCU(Read-Copy Update)是Linux內核比較成熟的新型讀寫鎖,具有較高的讀寫並發性能,常常用在需要互斥的性能關鍵路徑。在kernel中,rcu有tiny rcu和tree rcu兩種實現,tiny rcu更加簡潔,通常用在小型嵌入式系統中,tree rcu則被廣泛使用在了server, desktop以及android系統中。本文將以tree rcu為分析對象。

1 如何度過寬限期

RCU的核心理念是讀者訪問的同時,寫者可以更新訪問對象的副本,但寫者需要等待所有讀者完成訪問之後,才能刪除老對象。這個過程實現的關鍵和難點就在於如何判斷所有的讀者已經完成訪問。通常把寫者開始更新,到所有讀者完成訪問這段時間叫做寬限期(Grace Period)。內核中實現寬限期等待的函數是synchronize_rcu。

1.1 讀者鎖的標記

在普通的TREE RCU實現中,rcu_read_lock和rcu_read_unlock的實現非常簡單,分別是關閉搶占和打開搶占:

  1. static inline void __rcu_read_lock(void)
  2. {
  3. preempt_disable();
  4. }
  5. static inline void __rcu_read_unlock(void)
  6. {
  7. preempt_enable();
  8. }

這時是否度過寬限期的判斷就比較簡單:每個CPU都經過一次搶占。因為發生搶占,就說明不在rcu_read_lock和rcu_read_unlock之間,必然已經完成訪問或者還未開始訪問。

1.2 每個CPU度過quiescnet state

接下來我們看每個CPU上報完成搶占的過程。kernel把這個完成搶占的狀態稱為quiescent state。每個CPU在時鐘中斷的處理函數中,都會判斷當前CPU是否度過quiescent state。

  1. void update_process_times(int user_tick)
  2. {
  3. ......
  4. rcu_check_callbacks(cpu, user_tick);
  5. ......
  6. }
  7. void rcu_check_callbacks(int cpu, int user)
  8. {
  9. ......
  10. if (user || rcu_is_cpu_rrupt_from_idle()) {
  11. /*在用戶態上下文,或者idle上下文,說明已經發生過搶占*/
  12. rcu_sched_qs(cpu);
  13. rcu_bh_qs(cpu);
  14. } else if (!in_softirq()) {
  15. /*僅僅針對使用rcu_read_lock_bh類型的rcu,不在softirq,
  16. *說明已經不在read_lock關鍵區域*/
  17. rcu_bh_qs(cpu);
  18. }
  19. rcu_preempt_check_callbacks(cpu);
  20. if (rcu_pending(cpu))
  21. invoke_rcu_core();
  22. ......
  23. }

這裏補充一個細節說明,Tree RCU有多個類型的RCU State,用於不同的RCU場景,包括rcu_sched_state、rcu_bh_state和rcu_preempt_state。不同的場景使用不同的RCU API,度過寬限期的方式就有所區別。例如上面代碼中的rcu_sched_qs和rcu_bh_qs,就是為了標記不同的state度過quiescent state。普通的RCU例如內核線程、系統調用等場景,使用rcu_read_lock或者rcu_read_lock_sched,他們的實現是一樣的;軟中斷上下文則可以使用rcu_read_lock_bh,使得寬限期更快度過。

細分這些場景是為了提高RCU的效率。rcu_preempt_state將在下文進行說明。

1.3 匯報寬限期度過

每個CPU度過quiescent state之後,需要向上匯報直至所有CPU完成quiescent state,從而標識寬限期的完成,這個匯報過程在軟中斷RCU_SOFTIRQ中完成。軟中斷的喚醒則是在上述的時鐘中斷中進行。

update_process_times

-> rcu_check_callbacks

-> invoke_rcu_core

RCU_SOFTIRQ軟中斷處理的匯報流程如下:

rcu_process_callbacks

-> __rcu_process_callbacks

-> rcu_check_quiescent_state

-> rcu_report_qs_rdp

-> rcu_report_qs_rnp

其中rcu_report_qs_rnp是從葉子節點向根節點的遍歷過程,同一個節點的子節點都通過quiescent state後,該節點也設置為通過。

這個樹狀的匯報過程,也就是“Tree RCU”這個名字得來的緣由。

樹結構每層的節點數量和葉子節點數量由一系列的宏定義來決定:

  1. #define MAX_RCU_LVLS 4
  2. #define RCU_FANOUT_1 (CONFIG_RCU_FANOUT_LEAF)
  3. #define RCU_FANOUT_2 (RCU_FANOUT_1 * CONFIG_RCU_FANOUT)
  4. #define RCU_FANOUT_3 (RCU_FANOUT_2 * CONFIG_RCU_FANOUT)
  5. #define RCU_FANOUT_4 (RCU_FANOUT_3 * CONFIG_RCU_FANOUT)
  6. #if NR_CPUS <= RCU_FANOUT_1
  7. # define RCU_NUM_LVLS 1
  8. # define NUM_RCU_LVL_0 1
  9. # define NUM_RCU_LVL_1 (NR_CPUS)
  10. # define NUM_RCU_LVL_2 0
  11. # define NUM_RCU_LVL_3 0
  12. # define NUM_RCU_LVL_4 0
  13. #elif NR_CPUS <= RCU_FANOUT_2
  14. # define RCU_NUM_LVLS 2
  15. # define NUM_RCU_LVL_0 1
  16. # define NUM_RCU_LVL_1 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_1)
  17. # define NUM_RCU_LVL_2 (NR_CPUS)
  18. # define NUM_RCU_LVL_3 0
  19. # define NUM_RCU_LVL_4 0
  20. #elif NR_CPUS <= RCU_FANOUT_3
  21. # define RCU_NUM_LVLS 3
  22. # define NUM_RCU_LVL_0 1
  23. # define NUM_RCU_LVL_1 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_2)
  24. # define NUM_RCU_LVL_2 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_1)
  25. # define NUM_RCU_LVL_3 (NR_CPUS)
  26. # define NUM_RCU_LVL_4 0
  27. #elif NR_CPUS <= RCU_FANOUT_4
  28. # define RCU_NUM_LVLS 4
  29. # define NUM_RCU_LVL_0 1
  30. # define NUM_RCU_LVL_1 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_3)
  31. # define NUM_RCU_LVL_2 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_2)
  32. # define NUM_RCU_LVL_3 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_1)
  33. # define NUM_RCU_LVL_4 (NR_CPUS)

1.3 寬限期的發起與完成

所有寬限期的發起和完成都是由同一個內核線程rcu_gp_kthread來完成。通過判斷rsp->gp_flags & RCU_GP_FLAG_INIT來決定是否發起一個gp;通過判斷! (rnp->qsmask) && !rcu_preempt_blocked_readers_cgp(rnp))來決定是否結束一個gp。

發起一個GP時,rsp->gpnum++;結束一個GP時,rsp->completed = rsp->gpnum。

1.4 rcu callbacks處理

rcu的callback通常是在sychronize_rcu中添加的wakeme_after_rcu,也就是喚醒synchronize_rcu的進程,它正在等待GP的結束。

callbacks的處理同樣在軟中斷RCU_SOFTIRQ中完成

rcu_process_callbacks

-> __rcu_process_callbacks

-> invoke_rcu_callbacks

-> rcu_do_batch

-> __rcu_reclaim

這裏RCU的callbacks鏈表采用了一種分段鏈表的方式,整個callback鏈表,根據具體GP結束的時間,分成若幹段:nxtlist -- *nxttail[RCU_DONE_TAIL] -- *nxttail[RCU_WAIT_TAIL] -- *nxttail[RCU_NEXT_READY_TAIL] -- *nxttail[RCU_NEXT_TAIL]。

rcu_do_batch只處理nxtlist -- *nxttail[RCU_DONE_TAIL]之間的callbacks。每個GP結束都會重新調整callback所處的段位,每個新的callback將會添加在末尾,也就是*nxttail[RCU_NEXT_TAIL]。

2 可搶占的RCU

如果config文件定義了CONFIG_TREE_PREEMPT_RCU=y,那麽sychronize_rcu將默認使用rcu_preempt_state。這類rcu的特點就在於read_lock期間是允許其它進程搶占的,因此它判斷寬限期度過的方法就不太一樣。

從rcu_read_lock和rcu_read_unlock的定義就可以知道,TREE_PREEMPT_RCU並不是以簡單的經過搶占為CPU渡過GP的標準,而是有個rcu_read_lock_nesting計數

  1. void __rcu_read_lock(void)
  2. {
  3. current->rcu_read_lock_nesting++;
  4. barrier(); /* critical section after entry code. */
  5. }
  6. void __rcu_read_unlock(void)
  7. {
  8. struct task_struct *t = current;
  9. if (t->rcu_read_lock_nesting != 1) {
  10. --t->rcu_read_lock_nesting;
  11. } else {
  12. barrier(); /* critical section before exit code. */
  13. t->rcu_read_lock_nesting = INT_MIN;
  14. barrier(); /* assign before ->rcu_read_unlock_special load */
  15. if (unlikely(ACCESS_ONCE(t->rcu_read_unlock_special)))
  16. rcu_read_unlock_special(t);
  17. barrier(); /* ->rcu_read_unlock_special load before assign */
  18. t->rcu_read_lock_nesting = 0;
  19. }
  20. }

當搶占發生時,__schedule函數會調用rcu_note_context_switch來通知RCU更新狀態,如果當前CPU處於rcu_read_lock狀態,當前進程將會放入rnp->blkd_tasks阻塞隊列,並呈現在rnp->gp_tasks鏈表中。

從上文1.3節寬限期的結束處理過程我們可以知道,rcu_gp_kthread會判斷! (rnp->qsmask) && !rcu_preempt_blocked_readers_cgp(rnp))兩個條件來決定GP是否完成,其中!rnp->qsmask代表每個CPU都經過一次quiescent state,quiescent state的定義與傳統RCU一致;!rcu_preempt_blocked_readers_cgp(rnp)這個條件就代表了rcu是否還有阻塞的進程。

Linux內核同步 - RCU synchronize原理分析