1. 程式人生 > >【轉】rt-thread的位圖調度算法分析

【轉】rt-thread的位圖調度算法分析

tools 檢查 span googl popu 調用函數 source != 雙層

序言

期待讀者

本文期待讀者有C語言編程基礎,後文中要分析代碼,對其中的一些C語言中的簡單語句不會介紹,但是並不要求讀者有過多的C基礎,比如指針和鏈表等不會要求太多,後面在分析代碼時,會附帶地介紹相關的知識,如果您已經精通了C語言的指針則可以略過相關的介紹。除此之外,不再假設讀者擁有任何知識。

如何閱讀代碼

就以rt-thread內核代碼為例(註,指rt-thread的kernel代碼),大約有8500行代碼。直接閱讀顯然是很容易陷入代碼中的。所謂工欲善其事,必先利其器,我推薦使用下面的工具來閱讀。

  • MDK/IAR/其他集成開發環境,最好支持軟件仿真,我使用MDK4.0
  • 強大的代碼閱讀軟件source insight
  • 一個筆記本,隨時用來記錄自己的一些想法,感悟,或者困惑

首先使用source insight創建一個rt-thread的工程,然後開始代碼閱讀。關於source insight的使用,暫不贅述。建議讀者自行google source insight教程。

一切就緒有以後,面對浩如煙海的代碼(8500行),從哪裏開始下手呢?我的建議是, 先閱讀工程中的頭文件開始,然後閱讀C文件。之所以采用這種閱讀方式,是c代碼中用的數據結構通常定義在.h的文件中。了解一個程序,首先先要了解這個程序所使用的數據結構。同樣,在閱讀C文件時,如果其中定義了數據結構,比如定義了某些結構體等,則先閱讀它門。

rt-thread的內核調度算法

rt-thread的內核調度算法采用位圖(bitmap)的調度算法。這個算法的好處是可以實現O(1)調度(註,O(1)定義,請參考《數據結構與算法分析》),大致來說,就是每次調度的時間是恒定的:無論當前的系統中存在多少個線程,多少個優先級,rt-thread的調度函數總是可以在一個恒定的時間內選擇出最高優先級的那個線程來執行。

rt-thread內核調度算法涉及的源碼文件主要是scheduler.c

《rt-thread編程指南》中已經大致介紹了,rt-thread的調度算法為基於優先級調度和基於時間片輪轉調度共存的策略。這裏再重復一下,rt-thread內核中存在多個線程優先級,具體的級別數目可以在rt_config.h中以宏定義的方式配置。而且rt-thread支持多個線程具有同樣的線程優先級 。

當系統存在多個線程時,可能的情況是,某些線程具有不同的線程優先級,但是還有一些線程具有相同的優先級。對於這種情況,rt-thread采用的調度策略是,對不同優先級的線程,采用可搶占的方式:即高優先級的線程會“立刻”搶占低優先級的線程,而對同線程優先級別的多個線程則采用時間片輪轉的方式。

在上面的情形中,擺在rt-thread面前的問題就,如何從多個線程優先級別中找出當前優先級最高的那個線程,並調度執行。

線程結構存儲

實際上,尋找當前線程優先級最高的線程並調度執行,首先需要解決線程數據結構的存儲問題。下面先來分析,rt-thread中如何存儲多個線程的數據結構。

現在讓我們做幾點說明:

  • 每一個線程的信息用線程控制塊來表示,線程控制塊,即Thread Control-Block,縮寫為TCB,它是在rtdef.h中定義一個struct結構體,這個結構體的作用就是用來描述一個線程所有必要信息。
  • 線程的優先級別用非負整數(即無符號整數)表示,並且優先級越高,對應的數越小
  • 系統的線程優先級的數目固定,最多支持256級
  • 系統中的線程數目不做任何限制,線程的數目僅受限於系統RAM的大小。 重點來考慮最後兩點,我們來思考一下,當系統存在多個的線程時,也就是說會有多個TCB時,我們怎麽來“排列”或者存儲這些TCB,才能實現上面的這兩點要求?

線程的優先級別數目固定,顯然我們可以使用一個數組來定義,數組的長度即為線程優先級的數目,數組的每個元素為一個指向TCB形數據結構的指針。即,我們定義了一個指針數組。

線程數目不受限制,那當某個線程優先級上存在多個線程時,這些TCB顯然沒辦法存儲在上面定義的數組對應的優先級位置上,我們使用鏈表來解決這個問題,鏈表是一種數據結構,每個元素彼此鏈接,TCB中有一個鏈接下一個TCB的“鏈表數據結構”,如同一個鉤子一樣。

這樣我們就可以達到上面提及的兩點設計要求,不同線程優先級的線程的TCB分別存在線程TCB數組對應優先級的位置上。對於相同優先級別的多個線程,我們只需要將該優先級的第一個就緒線程的TCB存儲在線程TCB數組中相關位置,後續同級線程通過鏈表依次連接。

scheduler.c 中




[cpp] view plain copy
 


  1. ...
  2. (1) rt_list_t rt_thread_priority_table[RT_THREAD_PRIORITY_MAX];
  3. (2) struct rt_thread *rt_current_thread;
  4. (3) rt_uint8_t rt_current_priority;
  5. #if RT_THREAD_PRIORITY_MAX > 32
  6. /* maximun priority level, 256 */
  7. (4) rt_uint32_t rt_thread_ready_priority_group;
  8. (5) rt_uint8_t rt_thread_ready_table[32];
  9. #else
  10. /* maximun priority level, 32 */
  11. (6)rt_uint32_t rt_thread_ready_priority_group;
  12. #endif
  13. ...

這裏我們假定RT_THREAD_PRIORITY_MAX這個宏為256,即條件編譯語句會編譯#if和#else之間的語句。

  • 語句(1)即定義了線程TCB數組。該數組的每一個元素是一個rt_list_t類型的元素,實際上這就是一個鏈表的數據結構。
  • 語句(2)中定義了一個指針,從名稱上來看,即當前線程,struct rt_thread就是線程TCB數據結構類型。
  • 語句(3)定了當前的優先級。
  • 語句(4)當前的ready優先級組。
  • 語句(5)定了一個數組,語句(6)定義了一個u32的變量,它們是做什麽用的呢?我們稍後分析。

rt-thread中的線程數據結構的存儲問題已經解決,下面開始位圖調度算法分析。

位圖調度算法

調度算法首先要找出所有線程優先級中優先級最高的那個線程優先級,當前系統中,某些線程優先級上可能不存在線程。也就說,rt_thread_priority_table數組中某些元素為空,因此要找出該數組中第一個非空的元素。

調度算法1




[cpp] view plain copy
 


  1. for(i=0; i<256; i++)
  2. {
  3. if(rt_thread_priority_table[i] != NULL)
  4. break;
  5. }
  6. highest_ready_priority = i;

上面這種做法是可以正確調度最高優先級的線程,但是它有一個問題,如果當前系統中具有最高優先級的線程對應的優先級的數字(根據上面的分析,數字越大,線程TCB越存儲在TCB數組的後面,其優先級別越低)如果為0級,顯然我們一次就可以找出,如果很不幸,這個從0級到254級上都沒有就緒的線程,僅在255級上有就緒的線程,我們卻不得不在檢查了數組這256個元素之後,才能找出可以運行的線程。

因此,我們要尋找一種具有恒定執行時間的調度算法 。

首先來考慮,每一個優先級上是否存在線程,這是一個是/否問題,即要麽存在線程,要麽不存在線程,這可以用一個bit位來表示。我們規定這個bit為1表示存在線程,為0表示不存在線程。

對於256級的線程,則共需要256個bit位。理想的情況是,我們創建一個具有256個bit的變量,然後操作系統使用這個變量來維護整個系統所有對應優先級上是否存在活動的線程。顯然,C語言不支持:-(,但是256個bit也就是32個字節,我們定義一個32字節長的數組即可,然後將這個數組看成一個整體。

現在需要約定,這32個字節即256個bit,和256個線程優先級的對應關系。一個字節的最高位為bit7,最低位為bit0,和上面的說明一致的是,我們用bit0表示更高的優先級,用BIT7表示稍低的優先級。

來考慮這32個字節中的第一個字節。第一個字節的bit0用來表示優先級0,bit7表示優先級7。第二個字節bit0表示優先級8,bit7表示優先級15。其他依次類推。可以參考的如下表格,它描述了這32個字節的各個bit是和系統的256個優先級的對應關系。

單元格中的內容表示對應的優先級。 每一行為對應的一個字節,每一列為各個bit位。

[cpp] view plain copy
  1. bit7 6 5 4 3 2 1 0
  2. byte0 |007|006|005|004|003|002|001|000|
  3. byte1 |0l5|014|013|012|011|010|009|008|
  4. .................................
  5. byte32|255|254|253|252|251|250|249|248|

上面這32個字節所組成的256個bit,他們的排列方式很像一張圖(map),所以這種方式就別稱為位圖(bit map)。這張圖就是前面scheduler.c中定義的32個字節的數組。如下

    (5) rt_uint8_t rt_thread_ready_table[32]; 

舉個例子,我們創建了一個線程,並且指定了它的優先級是125,然後將它設置為就緒(READY),實際上在我們在調用函數將它變為READY的函數中,RTT就會去上面這256個bit中(也即是這32個字節),找到第125個bit,我稱之為位圖的BIT125, 也即是字節15 (125/ 8 = 15,125%8 = 5)的第5個bit,將這個bit置1。 即位圖的BIT125 就是rt_thread_ready_table[125/8]的BIT5.我們可以用位代碼表示為 BITMPA.BIT_125 = rt_thread_ready_table[125/8].BIT5

優先級125 對應那個字節的哪個bit呢?

這裏有個換算關系。其計算公式 :
(優先級別 / 8 )商取整數即對應位圖中的字節 (優先級別 % 8 )就是對應位圖字節中的bit位

即優先級125, 125 / 8 = 15 , 125 %8 = 5. 位圖的BIT125就是 rt_thread_ready_table[15]的BIT5

為了下面敘述的方便,做如下說明:

  • 位圖,就指的是數組rt_uint8_t rt_thread_ready_table[32]這32個字節組成的256個bit。

我們的系統需要根據各個線程的狀態,實時的更新這個位圖。舉個例子,優先級為125的不再存在就緒的線程時,操作系統就需要將位圖的BIT125清0,當一個線程狀態為READY後,則需要將這個線程的優先級在位圖中對應的BIT位置1。

自然,我們面臨的問題是,尋找優先級最高的線程的問題,就變成從位圖中找出第一個為1的bit的位置。 比如說,我們系統中,存在三個線程A, B, C, 優先級分別為 線程A,優先級5 線程B,優先級25 線程C,優先級125 顯然,此時位圖中BIT5, BIT25,BIT125分別為1,其余bit位全部為0。故我們需要編寫一個調度程序,它能夠找出當前優先級最高的BIT位,也就是BIT5,對應的優先級為5。

下面是一種顯然調度思路的思路,即依次遍歷數組rt_thread_priority_table,找出第一個非0的bit,這就是當前存在就緒線程的最高優先級。根據指針取出當前線程TCB,進而調度執行。


調度算法2




[cpp] view plain copy
 


  1. for(i=0; i<32; i++)
  2. {
  3. for(j=0; j<8; j++)
  4. {
  5. if (rt_thread_priority_table[i] & (1<<j) )
  6. break;//這就是找到最低的那個為1的bit位置了。
  7. }
  8. //下面就是我們找到的最高優先級
  9. highest_ready_priority = i * 8 + j;
  10. }

算法2可以工作,但依然存在問題,雙層for循環可能只循環一次,也可能會循環256次,這取決於位圖中位圖中為1的最低BIT的位置。如果BIT0為1,則執行一次即跳出循環,如果BIT0-BIT254都是0,僅BIT255為1,則循環256次。 平均來說, 雙層for循環的次數大約是 255/2 次。即與優先級數目N成正比。

算法2的問題就是,每次調度函數執行的時間不恒定,取決於當前線程的優先級分布狀況。這種調度策略從整體上說執行的時間是O(n)的,即調度算法的平均執行時間跟優先級數目成正比。這種方式本質上跟調度算法1一樣,依然不能實現在恒定時間完成調度的目標。

我們來改進調度算法2,它之所以耗費時間為O(N),就是因為每次我們都要對位圖的各個bit位做檢驗。

現在我們將位圖看作一個變量,並假定當前優先級別為8,則位圖變量可以用一個字節表示。考慮位圖變量的取值範圍,當位圖所有BIT0全為0時,位圖變量的值就是0,當位圖所有BIT位都是1時(表示所有線程優先級上都存在就緒的線程,此時最高優先級為0級),則位圖變量的值是255。反過來,如果當位圖變量為1時,此時位圖的BIT0為1,即最高優先級為優先級0,同樣,位圖變量為255時,最高優先級依然是0。 當位圖變量為6時,BIT2=1,BIT1=1,即最高優先級為1。因此當位圖變量取0-255之間的任意一個數字時,它的最低為1的BIT位置都是預知的。我們可以預先將這位圖變量的所有取值所對應的最高優先級計算出來,並存成一張表格,然後就可以避免算法2中的for循環,而只需要查表即可,這個執行時間自然是恒定的。實際上,查表法就是一種常用的用空間換取時間的方法。

位圖取值 最低為1的bit位

  • 0x01 0 (第0個bit位為1)
  • 0x02 1 (第1個bit位為1)
  • 0x03 0 (第0個bit位為1)
  • ....
  • 0xff 0 (第0個bit為1)

註意0x0比較特殊,全部bit位都是0,我們返回0,但不表示其第0位為1。只是為了數組整齊所以填充一個0。

我們可以寫一個簡單的程序來生成這個表格。這裏我使用了python編寫這個程序(C語言版本就留給讀者完成)。


gettab.py




[python] view plain copy
 


  1. #coding=gbk
  2. #打印一個字節的最低bit位,可能的值為0,1,2,3,4,5,6,7
  3. samples = 256
  4. def getlowbit(byte):
  5. c = 0
  6. for i in range(0,8):
  7. if(byte & 0x01):
  8. return c
  9. c = c+1
  10. byte = byte >> 1
  11. return 0
  12. line =""
  13. for i in range(0,samples):
  14. print "%d," %getlowbit(i),
  15. if((i+1)%16 == 0):
  16. print "\n"

就可以得到如下的表了:




[cpp] view plain copy
 


  1. const rt_uint8_t rt_lowest_bitmap[] =
  2. {
  3. /* 00 */ 0, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
  4. /* 10 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
  5. /* 20 */ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
  6. /* 30 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
  7. /* 40 */ 6, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
  8. /* 50 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
  9. /* 60 */ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
  10. /* 70 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
  11. /* 80 */ 7, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
  12. /* 90 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
  13. /* A0 */ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
  14. /* B0 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
  15. /* C0 */ 6, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
  16. /* D0 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
  17. /* E0 */ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
  18. /* F0 */ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0
  19. };

註意,我們的問題依然沒有解決,當進程優先級為8時,我們可以查表直接解決,當系統存在32個優先級時,如果直接制作表格的話,這個表格的元素個數將是 2**32 = 4294967296L= 4G字節。顯然這是不可接受的。

32個優先級,即優先級位圖變量可以使用u32型,也就是等價於4個字節,我們可以對這4個字節從字節0開始依次查表,如果字節0中非0,則最高優先級一定存在於字節0中,我們對字節0查表rt_lowest_bitmap,即可以得到最高優先級。 如果字節0為0,字節1非0,我們對字節1查表得到的是字節1中為1的最低bit位,然後加上8,就是系統的最高優先級。對字節2,字節3同樣處理。

  • 假定當前u32 rt_thread_priority_bitmap維護著當前系統優先級位圖。

調度算法3

 
[cpp] view plain copy
  1. /*
  2. * rt_thread_priority_bitmap 用來表示當前系統優先級位圖。
  3. * highest_ready_priority表示當前系統中最高優先級
  4. */
  5. if (rt_thread_priority_bitmap & 0xff)
  6. {
  7. highest_ready_priority = rt_lowest_bitmap[rt_thread_priority_bitmap & 0xff];
  8. }
  9. else if (rt_thread_priority_bitmap & 0xff00)
  10. {
  11. highest_ready_priority = rt_lowest_bitmap[(rt_thread_priority_bitmap >> 8) & 0xff] + 8;
  12. }
  13. else if (rt_thread_priority_bitmap & 0xff0000)
  14. {
  15. highest_ready_priority = rt_lowest_bitmap[(rt_thread_priority_bitmap >> 16) & 0xff] + 16;
  16. }
  17. else
  18. {
  19. highest_ready_priority = rt_lowest_bitmap[(rt_thread_priority_bitmap >> 24) & 0xff] + 24;
  20. }

現在我們解決了32個系統優先級時的調度問題,現在來考慮線程優先級為256的情況。讀者可能會想了,這沒什麽不同,256個bit=32個字節,依然采用算法3的思路,對著32個字節依次查表。問題是,當位圖變量有32個字節時,對這32個字節依次查表耗費的時間就不可以忽略了,為了提升系統實時調度的性能,我們需要對算法3進行改進。

為了解決這個問題,我們使用二級位圖。

即,256個bit由32個字節存儲,每一個字節的8個bit代表著位圖變量中的8個優先級,如果某個字節非0,則表示其中必有非0的bit位。

rtt中對應的數組為rt_uint8_t rt_thread_ready_table[32]

所謂二級位圖,即我們先確定32個字節中最低的非0的字節。為了實現這個效果,我們需要對這32個字節引入一個32個bit的位圖變量,每一個bit位表示對應的字節是否為0。例如,這個32bit的位圖變量的BIT5為0,表示系統線程優先級256bit所分成的32個字節中的 字節5 非0。 為了區分,稱這個32個bit的位圖變量 字節位圖變量 ,rt-thread中使用的是rt_thread_ready_priority_group. 顯然我們查找系統系統最高優先級時,先確定非0的最低字節,這實際上依然是算法3,然後再對該字節進行查表,即得到該字節內最低為1的bit位,然後兩者疊加(註意不是簡單的加)即可。

根據上面的分析,要想使用這個二級位圖算法,rtt在跟蹤線程的狀態轉換時,不僅需要維護256bit的位圖變量數組rt_thread_ready_table[thread->number] |= thread->high_mask,還需要維護32bit的 字節位圖變量 rt_thread_ready_priority_group。參看如下代碼。

thread.c




[cpp] view plain copy
 


  1. rt_err_t rt_thread_startup(rt_thread_t thread)
  2. {
  3. ...
  4. /* set current priority to init priority */
  5. thread->current_priority = thread->init_priority;
  6. (1) thread->number = thread->current_priority >> 3; /* 5bit */
  7. (2) thread->number_mask = 1L << thread->number;
  8. (3) thread->high_mask = 1L << (thread->current_priority & 0x07); /* 3bit */
  9. ...
  10. }
  11. void rt_schedule_insert_thread(struct rt_thread *thread)
  12. {
  13. ...
  14. #if RT_THREAD_PRIORITY_MAX > 32
  15. (4) rt_thread_ready_table[thread->number] |= thread->high_mask;
  16. #endif
  17. (5) rt_thread_ready_priority_group |= thread->number_mask;
  18. ....
  19. }

初始化線程時,我們指定了一個線程的優先級別thread->init_priority,由於線程優先級為0到255,一個字節就可以表示。但是我們的bitmap是32個字節。為了調高效率,我們最好能快速向位圖的對應的bit寫1。

  • 語句(1)thread->current_priority >> 3,這裏的>>3就是除以8,因為一個字節表示8個優先級。這樣就可以得到當前這個優先級對應的位圖32個字節中的第幾個字節,這裏用thread->number表示,顯然,number範圍是0到31。這裏為了提高效率,采用移位完成除法。
  • 上面除法的余數,就表示這個優先級在上面字節中的第幾個bit。這個余數可以使用 (thread->current_priority & 0x07)來表示。
  • 語句(3)是得到該bit對應的權值。例如一個字節的bit7對應的權值即 (1<<7),這樣做是為了使用“位與,或,非”等位運算,可以提高運行速度,即語句(4)。
  • 語句(4)清楚表示了這幾個變量作用。可見,根據某個表示優先級的數字向位圖中相應的bit位寫入了1。
  • 那麽語句(2)和(5)是做什麽用的呢? 這個number_mask實際上是為了加快查找位圖的速度而創建的。它將在rt_schedule函數中發揮作用。

上文已說明,thread->number就表示當前線程優先級在32個字節的位圖數組中的字節位置。為了提高效率,rt-thread另外使用了一個u32類型的變量rt_thread_ready_priority_group 來加快速度。如果這32個bit中某一個bit為1,就表示對應的某個字節非0(想想看,這意味著該字節所表示的8個優先級中存在就緒線程)。

rt_thread_ready_priority_group變量為32位寬度,長度上等於4個字節,因此可以對每一個字節查表(上面生成的表格)就可以得到為1的最低的bit位置。

概括起來就是,rtt首先確定32個字節的位圖中,非0的最低的那個字節,然後再查表得到這個字節非0的最低那個bit。這兩步驟正好可以利用兩次上面的表格rt_lowest_bitmap。

下面附上rt_schedule的代碼。非必要的代碼被我隱去。讀者可以對比下面的代碼理解上面的思路。

[cpp] view plain copy
  1. void rt_schedule(void)
  2. {
  3. ....
  4. register rt_ubase_t highest_ready_priority;
  5. #if RT_THREAD_PRIORITY_MAX == 8
  6. highest_ready_priority = rt_lowest_bitmap[rt_thread_ready_priority_group];
  7. #else
  8. register rt_ubase_t number;
  9. /* find out the highest priority task */
  10. if (rt_thread_ready_priority_group & 0xff)
  11. {
  12. number = rt_lowest_bitmap[rt_thread_ready_priority_group & 0xff];
  13. }
  14. else if (rt_thread_ready_priority_group & 0xff00)
  15. {
  16. number = rt_lowest_bitmap[(rt_thread_ready_priority_group >> 8) & 0xff] + 8;
  17. }
  18. else if (rt_thread_ready_priority_group & 0xff0000)
  19. {
  20. number = rt_lowest_bitmap[(rt_thread_ready_priority_group >> 16) & 0xff] + 16;
  21. }
  22. else
  23. {
  24. number = rt_lowest_bitmap[(rt_thread_ready_priority_group >> 24) & 0xff] + 24;
  25. }
  26. highest_ready_priority = (number << 3) + rt_lowest_bitmap[rt_thread_ready_table[number]];
  27. ....
  28. }

【來源】

【轉】rt-thread的位圖調度算法分析