1. 程式人生 > >linux核心 RCU機制詳解

linux核心 RCU機制詳解

 簡介

        RCU(Read-Copy Update)是資料同步的一種方式,在當前的Linux核心中發揮著重要的作用。RCU主要針對的資料物件是連結串列,目的是提高遍歷讀取資料的效率,為了達到目的使用RCU機制讀取資料的時候不對連結串列進行耗時的加鎖操作。這樣在同一時間可以有多個執行緒同時讀取該連結串列,並且允許一個執行緒對連結串列進行修改(修改的時候,需要加鎖)。RCU適用於需要頻繁的讀取資料,而相應修改資料並不多的情景,例如在檔案系統中,經常需要查詢定位目錄,而對目錄的修改相對來說並不多,這就是RCU發揮作用的最佳場景。

       Linux核心原始碼當中,關於RCU的文件比較齊全,你可以在 /Documentation/RCU/ 目錄下找到這些檔案。Paul E. McKenney 是核心中RCU原始碼的主要實現者,他也寫了很多RCU方面的文章。他把這些文章和一些關於RCU的論文的連結整理到了一起。

http://www2.rdrop.com/users/paulmck/RCU/

       在RCU的實現過程中,我們主要解決以下問題:

       1,在讀取過程中,另外一個執行緒刪除了一個節點。刪除執行緒可以把這個節點從連結串列中移除,但它不能直接銷燬這個節點,必須等到所有的讀取執行緒讀取完成以後,才進行銷燬操作。RCU中把這個過程稱為寬限期(Grace period)。

       2,在讀取過程中,另外一個執行緒插入了一個新節點,而讀執行緒讀到了這個節點,那麼需要保證讀到的這個節點是完整的。這裡涉及到了釋出-訂閱機制(Publish-Subscribe Mechanism)。

       3, 保證讀取連結串列的完整性。新增或者刪除一個節點,不至於導致遍歷一個連結串列從中間斷開。但是RCU並不保證一定能讀到新增的節點或者不讀到要被刪除的節點。

       寬限期

        通過例子,方便理解這個內容。以下例子修改於Paul的文章。

[cpp] view plaincopyprint?
  1. struct foo {  
  2. int a;  
  3. char b;  
  4. long c;  
  5.  };  
  6. DEFINE_SPINLOCK(foo_mutex);  
  7. struct foo *gbl_foo;  
  8. void foo_read (void)  
  9. {  
  10.      foo *fp = gbl_foo;  
  11. if ( fp != NULL )  
  12.             dosomething(fp->a, fp->b , fp->c );  
  13. }  
  14. void foo_update( foo* new_fp )  
  15. {  
  16.      spin_lock(&foo_mutex);  
  17.      foo *old_fp = gbl_foo;  
  18.      gbl_foo = new_fp;  
  19.      spin_unlock(&foo_mutex);  
  20.      kfee(old_fp);  
  21. }  
struct foo {
           int a;
           char b;
           long c;
 };

DEFINE_SPINLOCK(foo_mutex);

struct foo *gbl_foo;

void foo_read (void)
{
     foo *fp = gbl_foo;
     if ( fp != NULL )
        	dosomething(fp->a, fp->b , fp->c );
}

void foo_update( foo* new_fp )
{
     spin_lock(&foo_mutex);
     foo *old_fp = gbl_foo;
     gbl_foo = new_fp;
     spin_unlock(&foo_mutex);
     kfee(old_fp);
}

      如上的程式,是針對於全域性變數gbl_foo的操作。假設以下場景。有兩個執行緒同時執行 foo_ read和foo_update的時候,當foo_ read執行完賦值操作後,執行緒發生切換;此時另一個執行緒開始執行foo_update並執行完成。當foo_ read執行的程序切換回來後,執行dosomething 的時候,fp已經被刪除,這將對系統造成危害。為了防止此類事件的發生,RCU裡增加了一個新的概念叫寬限期(Grace period)。如下圖所示:

         圖中每行代表一個執行緒,最下面的一行是刪除執行緒,當它執行完刪除操作後,執行緒進入了寬限期。寬限期的意義是,在一個刪除動作發生後,它必須等待所有在寬限期開始前已經開始的讀執行緒結束,才可以進行銷燬操作。這樣做的原因是這些執行緒有可能讀到了要刪除的元素。圖中的寬限期必須等待1和2結束;而讀執行緒5在寬限期開始前已經結束,不需要考慮;而3,4,6也不需要考慮,因為在寬限期結束後開始後的執行緒不可能讀到已刪除的元素。為此RCU機制提供了相應的API來實現這個功能。                             

[cpp] view plaincopyprint?
  1. void foo_read(void)  
  2. {  
  3.     rcu_read_lock();  
  4.     foo *fp = gbl_foo;  
  5. if ( fp != NULL )  
  6.             dosomething(fp->a,fp->b,fp->c);  
  7.     rcu_read_unlock();  
  8. }  
  9. void foo_update( foo* new_fp )  
  10. {  
  11.     spin_lock(&foo_mutex);  
  12.     foo *old_fp = gbl_foo;  
  13.     gbl_foo = new_fp;  
  14.     spin_unlock(&foo_mutex);  
  15.     synchronize_rcu();  
  16.     kfee(old_fp);  
  17. }  
void foo_read(void)
{
	rcu_read_lock();
	foo *fp = gbl_foo;
	if ( fp != NULL )
			dosomething(fp->a,fp->b,fp->c);
	rcu_read_unlock();
}

void foo_update( foo* new_fp )
{
	spin_lock(&foo_mutex);
	foo *old_fp = gbl_foo;
	gbl_foo = new_fp;
	spin_unlock(&foo_mutex);
	synchronize_rcu();
	kfee(old_fp);
}

      其中foo_read中增加了rcu_read_lock和rcu_read_unlock,這兩個函式用來標記一個RCU讀過程的開始和結束。其實作用就是幫助檢測寬限期是否結束。foo_update增加了一個函式synchronize_rcu(),呼叫該函式意味著一個寬限期的開始,而直到寬限期結束,該函式才會返回。我們再對比著圖看一看,執行緒1和2,在synchronize_rcu之前可能得到了舊的gbl_foo,也就是foo_update中的old_fp,如果不等它們執行結束,就呼叫kfee(old_fp),極有可能造成系統崩潰。而3,4,6在synchronize_rcu之後執行,此時它們已經不可能得到old_fp,此次的kfee將不對它們產生影響。

     寬限期是RCU實現中最複雜的部分,原因是在提高讀資料效能的同時,刪除資料的效能也不能太差。


     訂閱——釋出機制 

      當前使用的編譯器大多會對程式碼做一定程度的優化,CPU也會對執行指令做一些優化調整,目的是提高程式碼的執行效率,但這樣的優化,有時候會帶來不期望的結果。如例:

[cpp] view plaincopyprint?
  1. void foo_update( foo* new_fp )  
  2. {  
  3.     spin_lock(&foo_mutex);  
  4.     foo *old_fp = gbl_foo;  
  5.     new_fp->a = 1;  
  6.     new_fp->b = ‘b’;  
  7.     new_fp->c = 100;  
  8.     gbl_foo = new_fp;  
  9.     spin_unlock(&foo_mutex);  
  10.     synchronize_rcu();  
  11.     kfee(old_fp);  
  12. }  
void foo_update( foo* new_fp )
{
	spin_lock(&foo_mutex);
	foo *old_fp = gbl_foo;
	
	new_fp->a = 1;
	new_fp->b = ‘b’;
	new_fp->c = 100;
	
	gbl_foo = new_fp;
	spin_unlock(&foo_mutex);
	synchronize_rcu();
	kfee(old_fp);
}

       這段程式碼中,我們期望的是6,7,8行的程式碼在第10行程式碼之前執行。但優化後的程式碼並不對執行順序做出保證。在這種情形下,一個讀執行緒很可能讀到 new_fp,但new_fp的成員賦值還沒執行完成。當讀執行緒執行dosomething(fp->a, fp->b , fp->c ) 的 時候,就有不確定的引數傳入到dosomething,極有可能造成不期望的結果,甚至程式崩潰。可以通過優化屏障來解決該問題,RCU機制對優化屏障做了包裝,提供了專用的API來解決該問題。這時候,第十行不再是直接的指標賦值,而應該改為 :

       rcu_assign_pointer(gbl_foo,new_fp);

       rcu_assign_pointer的實現比較簡單,如下:

      <include/linux/rcupdate.h>

[cpp] view plaincopyprint?
  1. #define rcu_assign_pointer(p, v) \
  2.          __rcu_assign_pointer((p), (v), __rcu)  
  3. #define __rcu_assign_pointer(p, v, space) \
  4. do { \  
  5.                  smp_wmb(); \  
  6.                  (p) = (typeof(*v) __force space *)(v); \  
  7.          } while (0)  
#define rcu_assign_pointer(p, v) \
         __rcu_assign_pointer((p), (v), __rcu)

#define __rcu_assign_pointer(p, v, space) \
         do { \
                 smp_wmb(); \
                 (p) = (typeof(*v) __force space *)(v); \
         } while (0)

      我們可以看到它的實現只是在賦值之前加了優化屏障 smp_wmb來確保程式碼的執行順序。另外就是巨集中用到的__rcu,只是作為編譯過程的檢測條件來使用的。

      在DEC Alpha CPU機器上還有一種更強悍的優化,如下所示:

[cpp] view plaincopyprint?
  1. void foo_read(void)  
  2. {         
  3.     rcu_read_lock();  
  4.     foo *fp = gbl_foo;  
  5. if ( fp != NULL )  
  6.         dosomething(fp->a, fp->b ,fp->c);  
  7.     rcu_read_unlock();  
  8. }  
void foo_read(void)
{		
	rcu_read_lock();
	foo *fp = gbl_foo;
	if ( fp != NULL )
		dosomething(fp->a, fp->b ,fp->c);
	rcu_read_unlock();
}

      第六行的 fp->a,fp->b,fp->c會在第3行還沒執行的時候就預先判斷執行,當他和foo_update同時執行的時候,可能導致傳入dosomething的一部分屬於舊的gbl_foo,而另外的屬於新的。這樣導致執行結果的錯誤。為了避免該類問題,RCU還是提供了巨集來解決該問題:

<include/linux/rcupdate.h>

[cpp] view plaincopyprint?
  1. #define rcu_dereference(p) rcu_dereference_check(p, 0)
  2. #define rcu_dereference_check(p, c) \
  3.          __rcu_dereference_check((p), rcu_read_lock_held() || (c), __rcu)  
  4. #define __rcu_dereference_check(p, c, space) \
  5.          ({ \  
  6.                  typeof(*p) *_________p1 = (typeof(*p)*__force )ACCESS_ONCE(p); \  
  7.                  rcu_lockdep_assert(c, "suspicious rcu_dereference_check()" \  
  8. " usage"); \  
  9.                  rcu_dereference_sparse(p, space); \  
  10.                  smp_read_barrier_depends(); \  
  11.                  ((typeof(*p) __force __kernel *)(_________p1)); \  
  12.          })  
  13. staticinlineint rcu_read_lock_held(void)  
  14. {  
  15. if (!debug_lockdep_rcu_enabled())  
  16. return 1;  
  17. if (rcu_is_cpu_idle())  
  18. return 0;  
  19. if (!rcu_lockdep_current_cpu_online())  
  20. return 0;  
  21. return lock_is_held(&rcu_lock_map);  
  22. }  
#define rcu_dereference(p) rcu_dereference_check(p, 0)


#define rcu_dereference_check(p, c) \
         __rcu_dereference_check((p), rcu_read_lock_held() || (c), __rcu)

#define __rcu_dereference_check(p, c, space) \
         ({ \
                 typeof(*p) *_________p1 = (typeof(*p)*__force )ACCESS_ONCE(p); \
                 rcu_lockdep_assert(c, "suspicious rcu_dereference_check()" \
                                       " usage"); \
                 rcu_dereference_sparse(p, space); \
                 smp_read_barrier_depends(); \
                 ((typeof(*p) __force __kernel *)(_________p1)); \
         })

static inline int rcu_read_lock_held(void)
{
         if (!debug_lockdep_rcu_enabled())
                 return 1;
         if (rcu_is_cpu_idle())
                 return 0;
         if (!rcu_lockdep_current_cpu_online())
                 return 0;
         return lock_is_held(&rcu_lock_map);
}

       這段程式碼中加入了除錯資訊,去除除錯資訊,可以是以下的形式(其實這也是舊版本中的程式碼):

[cpp] view plaincopyprint?
  1. #define rcu_dereference(p)     ({ \
  2.                     typeof(p) _________p1 = p; \  
  3.                     smp_read_barrier_depends(); \  
  4.                     (_________p1); \  
  5.                     })  
#define rcu_dereference(p)     ({ \
					typeof(p) _________p1 = p; \
					smp_read_barrier_depends(); \
					(_________p1); \
					})

       在賦值後加入優化屏障smp_read_barrier_depends()。

        我們之前的第四行程式碼改為 foo *fp = rcu_dereference(gbl_foo);,就可以防止上述問題。

       資料讀取的完整性

        還是通過例子來說明這個問題:

       

       如圖我們在原list中加入一個節點new到A之前,所要做的第一步是將new的指標指向A節點,第二步才是將Head的指標指向new。這樣做的目的是當插入操作完成第一步的時候,對於連結串列的讀取並不產生影響,而執行完第二步的時候,讀執行緒如果讀到new節點,也可以繼續遍歷連結串列。如果把這個過程反過來,第一步head指向new,而這時一個執行緒讀到new,由於new的指標指向的是Null,這樣將導致讀執行緒無法讀取到A,B等後續節點。從以上過程中,可以看出RCU並不保證讀執行緒讀取到new節點。如果該節點對程式產生影響,那麼就需要外部呼叫做相應的調整。如在檔案系統中,通過RCU定位後,如果查詢不到相應節點,就會進行其它形式的查詢,相關內容等分析到檔案系統的時候再進行敘述。

      我們再看一下刪除一個節點的例子:

     如圖我們希望刪除B,這時候要做的就是將A的指標指向C,保持B的指標,然後刪除程式將進入寬限期檢測。由於B的內容並沒有變更,讀到B的執行緒仍然可以繼續讀取B的後續節點。B不能立即銷燬,它必須等待寬限期結束後,才能進行相應銷燬操作。由於A的節點已經指向了C,當寬限期開始之後所有的後續讀操作通過A找到的是C,而B已經隱藏了,後續的讀執行緒都不會讀到它。這樣就確保寬限期過後,刪除B並不對系統造成影響。

     小結

       RCU的原理並不複雜,應用也很簡單。但程式碼的實現確並不是那麼容易,難點都集中在了寬限期的檢測上,後續分析原始碼的時候,我們可以看到一些極富技巧的實現方式。