1. 程式人生 > >中斷服務下半部之工作佇列【轉】

中斷服務下半部之工作佇列【轉】

1        工作佇列概述
工作佇列 (work queue) 是另外一種將工作推後執行的形式,它和我們前面討論的所有其他形式都不相同。工作佇列可以把工作推後,交由一個核心執行緒去執行—這個下半部分總是會在程序上下文執行,但由於是核心執行緒,其不能訪問使用者空間。 最重要特點的就是工作佇列允許重新排程甚至是睡眠。

通常,在工作佇列和軟中斷 /tasklet 中作出選擇非常容易。可使用以下規則:

²      如果推後執行的任務需要睡眠,那麼只能選擇工作佇列;

²      如果推後執行的任務需要延時指定的時間再觸發,那麼使用工作佇列,因為其可以利用 timer 延時;

²      如果推後執行的任務需要在一個 tick 之內處理,則使用軟中斷或 tasklet ,因為其可以搶佔普通程序和核心執行緒;

²      如果推後執行的任務對延遲的時間沒有任何要求,則使用工作佇列,此時通常為無關緊要的任務。

另外如果你需要用一個可以重新排程的實體來執行你的下半部處理,你應該使用工作佇列。它是惟一能在程序上下文執行的下半部實現的機制,也只有它才可以睡眠。這意味著在你需要獲得大量的記憶體時、在你需要獲取訊號量時,在你需要執行阻塞式的 I/O 操作時 ,它都會非常有用。

實際上,工作佇列的本質就是將工作交給核心執行緒處理,因此其可以用核心執行緒替換。但是核心執行緒的建立和銷燬對程式設計者的要求較高,而工作佇列實現了核心執行緒的封裝, 不易出錯,所以我們也推薦使用工作佇列。

2        工作佇列的實現
2.1     工作者執行緒
工作佇列子系統是一個用於建立核心執行緒的介面, 通過它建立的程序負責執行由核心其他部分排到佇列裡的任務。它建立的這些核心執行緒被稱作工作者執行緒 (worker thread) 。工作佇列可以讓你的驅動程式建立一個專門的工作者執行緒來處理需要推後的工作。不過,工作佇列子系統提供了一個預設的工作者執行緒來處理這些工作。 因此,工作佇列最基本的表現形式就轉變成了一個把需要推後執行的任務交給特定的通用執行緒這樣一種介面。

預設的工作者執行緒叫做 events/n ,這裡 n 是處理器的編號, 每個處理器對應一個執行緒。比如,單處理器的系統只有 events/0 這樣一個執行緒。而雙處理器的系統就會多一個 events/1 執行緒。

預設的工作者執行緒會從多個地方得到被推後的工作。許多核心驅動程式都把它們的下半部交給預設的工作者執行緒去做。除非一個驅動程式或者子系統必須建立一個屬於它自己的核心執行緒,否則最好使用預設執行緒 。不過並不存在什麼東西能夠阻止程式碼建立屬於自己的工作者執行緒。如果你需要在工作者執行緒中執行大量的處理操作,這樣做或許會帶來好處。處理器密集型和效能要求嚴格的任務會因為擁有自己的工作者執行緒而獲得好處。

2.2     工作佇列的組織結構
2.2.1       工作佇列 workqueue_struct
外部可見的工作佇列抽象,使用者介面,是由每個 CPU 的工作佇列組成的連結串列

  64 struct workqueue_struct {

  65         struct cpu_workqueue_struct *cpu_wq ;

  66         const char *name ;

  67         struct list_head list ;  /* Empty if single thread */

  68 };

²      cpu_wq :本佇列包含的工作者執行緒;

²      name :所有本佇列包含的執行緒的公共名稱部分,建立工作佇列時的唯一使用者標識;

²      list :連結本佇列的各個工作執行緒。

在早期的版本中, cpu_wq 是用陣列維護的,即對每個工作佇列,每個 CPU 包含一個此執行緒。改成連結串列的優勢在於,建立工作佇列的時候可以指定只建立一個核心執行緒 ,這樣消耗的資源較少。

在該結構體裡面,給每個執行緒分配一個 cpu_workqueue_struct ,因而也就是給每個處理器分配一個,因為每個處理器都有一個該型別的工作者執行緒。

2.2.2       工作者執行緒 cpu_workqueue_struct
這個結構是針對每個 CPU 的,屬於核心維護的結構,使用者不可見。

  43 struct cpu_workqueue_struct {

  44

  45         spinlock_t lock ;

  46

  47         long remove_sequence ;   /* Least-recently added (next to run) */

  48         long insert_sequence ;   /* Next to add */

  49

  50         struct list_head worklist ;

  51         wait_queue_head_t more_work ;

  52         wait_queue_head_t work_done ;

  53

  54         struct workqueue_struct *wq ;

  55         struct task_struct *thread ;

  56

  57         int run_depth ;          /* Detect run_workqueue() recursion depth */

  58 } ____cacheline_aligned ;

²      lock :操作該資料結構的互斥鎖

²      remove_sequence :下一個要執行的工作序號,用於 flush

²      insert_sequence :下一個要插入工作的序號

²      worklist :待處理的工作的連結串列頭

²      more_work :標識有工作待處理的等待佇列,插入新工作後喚醒對應的核心執行緒

²      work_done :處理完的等待佇列,沒完成一個工作後,喚醒可能等待通知處理完成通知的執行緒

²      wq :所屬的工作佇列節點

²      thread :關聯的核心執行緒指標

²      run_depth : run_workqueue() 迴圈深度,多處可能呼叫此函式

所有的工作者執行緒都是用普通的核心執行緒實現的,它們都要執行 worker thread() 函式。在它初始化完以後,這個函式執行一個死迴圈並開始休眠。當有操作被插入到佇列裡的時候,執行緒就會被喚醒,以便執行這些操作。當沒有剩餘的操作時,它又會繼續休眠。

2.2.3       工作 work_struct
工作用 work_struct 結構體表示:

linux+v2.6.19/include/linux/workqueue.h

  14 struct work_struct {

  15         unsigned long pending ;

  16         struct list_head entry ;

  17          void (*func )(void *);

  18         void *data ;

  19         void *wq_data ;

  20         struct timer_list timer ;

  21 };

²      Pending :這個工作是否正在等待處理標誌,加入到工作佇列後置此標誌

²      Entry :該工作在連結串列中的入口點,連線所有工作

²      Func :該工作執行的回撥函式

²      Data :傳遞給處理函式的引數

²      wq_data :本工作所掛接的 cpu_workqueue_struct ;若需要使用定時器,則其為工作佇列傳遞給 timer

²      timer :延遲的工作佇列所用到的定時器,無需延遲是初始化為 NULL

 

位於最高一層的是工作佇列。系統允許有多種型別的工作佇列存在。每一個工作佇列具備一個 workqueue_struct ,而 SMP 機器上每個 CPU 都具備一個該類的工作者執行緒 cpu_workqueue_struct ,系統通過 CPU 號和 workqueue_struct 的連結串列指標及第一個成員 cpu_wq 可以得到每個 CPU 的 cpu_workqueue_struct 結構。

而每個工作提交時,將連結在當前 CPU 的 cpu_workqueue_struct 結構的 worklist 連結串列中。通常情況下由當前所註冊的 CPU 執行此工作,但在 flush_work 中可能由其他 CPU 來執行。 或者 CPU 熱插拔後也將進行工作的轉移。

核心中有些部分可以根據需要來建立工作佇列。而在預設情況下核心只有 events 這一種型別的工作佇列。大部分驅動程式都使用的是現存的預設工作者執行緒。它們使用起來簡單、方便。可是,在有些要求更嚴格的情況下,驅動程式需要自己的工作者執行緒。

2.3     工作佇列執行的細節
工作結構體被連線成連結串列,對於某個工作佇列,在每個處理器上都存在這樣一個連結串列。當一個工作者執行緒被喚醒時,它會執行它的連結串列上的所有工作。工作被執行完畢,它就將相應的 work_struct 物件從連結串列上移去。 當連結串列上不再有物件的時候,它就會繼續休眠 。

此為工作者執行緒的標準模板,所以工作者執行緒都使用此函式 。對於使用者自定義的核心執行緒可以參考此函式。

  233 static int worker_thread (void *__cwq )

  234 {

  235         struct cpu_workqueue_struct *cwq = __cwq ;

// 與該工作者執行緒關聯的 cpu_workqueue_struct 結構

  236         DECLARE_WAITQUEUE (wait , current );

// 宣告一個等待節點,若無工作,則睡眠

  237         struct k_sigaction sa ;

  238         sigset_t blocked ;

  239

  240         current ->flags |= PF_NOFREEZE ;

  241

  242         set_user_nice (current , -5);

// 設定較低的程序優先順序 , 工作程序不是個很緊急的程序,不和其他程序搶佔 CPU ,通常在系統空閒時執行

  244         /* 禁止並清除所有訊號 */

  245         sigfillset (&blocked );

  246         sigprocmask (SIG_BLOCK , &blocked , NULL );

  247         flush_signals (current );

  248

  255         /* SIG_IGN makes children autoreap: see do_notify_parent(). */

// 允許 SIGCHLD 訊號,並設定處理函式

  256         sa .sa .sa_handler = SIG_IGN ;

  257         sa .sa .sa_flags = 0;

  258         siginitset (&sa .sa .sa_mask , sigmask (SIGCHLD ));

  259         do_sigaction (SIGCHLD , &sa , (struct k_sigaction *)0);

  260

  261         set_current_state (TASK_INTERRUPTIBLE );

// 可被訊號中斷,適當的時刻可被殺死,若收到停止命令則退出返回,否則程序就一直執行,無工作可執行時,主動休眠

  262         while (!kthread_should_stop ()) {

// 為了便於 remove_wait_queue 的統一處理,將當前核心執行緒新增到 cpu_workqueue_struct 的 more_work 等待佇列中,當有新 work 結構鏈入佇列中時會啟用此等待佇列

  263                 add_wait_queue (&cwq ->more_work , &wait );

// 判斷是否有工作需要作,無則排程讓出 CPU 等待喚醒

  264                 if (list_empty (&cwq ->worklist ))

  265                         schedule ();

  266                 else

  267                         __set_current_state (TASK_RUNNING );

  268                 remove_wait_queue (&cwq ->more_work , &wait );

// 至此,執行緒肯定處於 TASK_RUNNING ,從等待佇列中移出

// 需要再次判斷是因為可能從 schedule 中被喚醒的。如果有工作做,則執行

  270                 if (!list_empty (&cwq ->worklist ))

  271                         run_workqueue (cwq );

// 無工作或者全部執行完畢了,迴圈整個過程,接著一般會休眠

  272                 set_current_state (TASK_INTERRUPTIBLE );

  273         }

  274         __set_current_state (TASK_RUNNING );

  275          return 0;

  276 }

該函式在死迴圈中完成了以下功能:

²      執行緒將自己設定為休眠狀態 TASK_INTERRUPTIBLE 並把自己加人到等待佇列上。

²      如果工作連結串列是空的,執行緒呼叫 schedule() 函式進入睡眠狀態。

²      如果連結串列中有物件,執行緒不會睡眠。相反,它將自己設定成 TASK_RUNNING ,脫離等待佇列。

²      如果連結串列非空,呼叫 run_workqueue 函式執行被推後的工作。

run_workqueue 執行具體的工作,多處會呼叫此函式。在呼叫 Flush_work 時為防止死鎖,主動呼叫 run_workqueue ,此時可能導致多層次遞迴。

  196 static void run_workqueue (struct cpu_workqueue_struct *cwq )

  197 {

  198         unsigned long flags ;

  199

  204         spin_lock_irqsave (&cwq ->lock , flags );

// 統計已經遞迴呼叫了多少次了

  205         cwq ->run_depth ++;

  206         if (cwq ->run_depth > 3) {

  207                 /* morton gets to eat his hat */

  208                 printk ("%s: recursion depth exceeded: %d/n",

  209                         __FUNCTION__ , cwq ->run_depth );

  210                 dump_stack ();

  211         }

  212         while (!list_empty (&cwq ->worklist )) {

  213                 struct work_struct *work = list_entry (cwq ->worklist .next ,

  214                                                 struct work_struct , entry );

  215                 void (*f ) (void *) = work ->func ;

  216                 void *data = work ->data ;

  217                         // 將當前節點從連結串列中刪除並初始化其 entry

  218                 list_del_init (cwq ->worklist .next );

  219                  spin_unlock_irqrestore (&cwq ->lock , flags );

  220

  221                 BUG_ON (work ->wq_data != cwq );

  222                 clear_bit (0, &work ->pending ); // 清除 pengding 位,標示已經執行

  223                 f (data );

  224

  225                 spin_lock_irqsave (&cwq ->lock , flags );

  226                 cwq ->remove_sequence ++;

// // 喚醒可能等待的程序,通知其工作已經執行完畢

  227                 wake_up (&cwq ->work_done );

  228         }

  229         cwq ->run_depth --;

  230         spin_unlock_irqrestore (&cwq ->lock , flags );

  231 }

3        工作佇列的 API
3.1     API 列表
 

功能描述
 對應 API 函式
 附註
 
靜態定義一個工作
 DECLARE_WORK (n , f , d )
  
 
動態建立一個工作
 INIT_WORK(_work, _func, _data)
  
 
工作原型
 void work_handler(void *data)
  
 
將工作新增到指定的工作佇列中
 queue_work(struct workqueue_struct *wq, struct work_struct *work)
  
 
將工作新增到 keventd_wq 佇列中
 schedule_work(struct work_struct *work)
  
 
延遲 delay 個 tick 後將工作新增到指定的工作佇列中
 queue_delayed_work(struct workqueue_struct *wq,

struct work_struct *work, unsigned long delay)
  
 
延遲 delay 個 tick 後將工作新增到 keventd_wq 佇列中
 schedule_delayed_work(struct work_struct *work, unsigned long delay)
  
 
重新整理等待指定佇列中的所有工作完成
 flush_workqueue(struct workqueue_struct *wq)
  
 
重新整理等待 keventd_wq 中的所有工作完成
 flush_scheduled_work(void)
  
 
取消指定佇列中所有延遲工作
 cancel_delayed_work(struct work_struct *work)
  
 
建立一個工作佇列
 create_workqueue(name)
  
 
建立一個單執行緒的工作佇列
 create_singlethread_workqueue(name)
  
 
銷燬指定的工作佇列
 destroy_workqueue(struct workqueue_struct *wq)
  
 

3.2     如何建立工作
首先要做的是實際建立一些需要推後完成的工作。可以通過 DECLARE_WORK 在編譯時靜態地建立該結構體:

  27 #define __WORK_INITIALIZER (n , f , d ) {           /

  28         .entry   = { &(n ).entry , &(n ).entry },         /

  29         .func = (f ),                              /

  30         .data = (d ),                              /

  31         .timer = TIMER_INITIALIZER (NULL , 0, 0),   /

  32         }

  33

  34 #define DECLARE_WORK (n , f , d )                /

  35         struct work_struct n = __WORK_INITIALIZER (n , f , d )

這樣就會靜態地建立一個名為 name ,處理函式為 func ,引數為 data 的 work_struct 結構體。

同樣,也可以在執行時通過指標建立一個工作:

  40 #define PREPARE_WORK (_work , _func , _data )       /

  41         do {                                    /

  42                 (_work )->func = _func ;             /

  43                 (_work )->data = _data ;            /

  44         } while (0)

  45

  49 #define INIT_WORK (_work , _func , _data )         /

  50         do {                                        /

  51                 INIT_LIST_HEAD (&(_work )->entry );      /

  52                 (_work )->pending = 0;                      /

  53                 PREPARE_WORK ((_work ), (_func ), (_data ));   /

  54                 init_timer (&(_work )->timer );                  /

  55         } while (0)

這會動態地初始化一個由 work 指向的工作,處理函式為 func ,引數為 data 。

無論是動態還是靜態建立,預設定時器初始化為 0 ,即不進行延時排程。

3.3     工作佇列處理函式
工作佇列處理函式的原型是:

void work_handler(void *data)

這個函式會由一個工作者執行緒執行,因此,函式會執行在程序上下文中。預設情況下,允許響應中斷,並且不持有任何鎖。如果需要,函式可以睡眠。需要注意的是,儘管操作處理函式執行在程序上下文中,但它不能訪問使用者空間,因為核心執行緒在使用者空間沒有相關的記憶體對映。 通常在系統呼叫發生時,核心會代表使用者空間的程序執行,此時它才能訪問使用者空間,也只有在此時它才會對映使用者空間的記憶體。

在工作佇列和核心其他部分之間使用鎖機制就像在其他的程序上下文中使用鎖機制一樣方便。這使編寫處理函式變得相對容易。

3.4     排程工作
3.4.1       queue_work
建立一個工作的時候無須考慮工作佇列的型別。在建立之後,可以呼叫下面列舉的函式。這些函式與 schedule-work() 以及 schedule-delayed-Work() 相近,惟一的區別就在於它們針對給定的工作佇列而不是預設的 event 佇列進行操作。

將工作新增到當前處理器對應的連結串列中,但並不能保證此工作由提交該工作的 CPU 執行。 Flushwork 時可能執行所有 CPU 上的工作 或者 CPU 熱插拔後將進行工作的轉移

  107 int fastcall queue_work (struct workqueue_struct *wq , struct work_struct *work )

  108 {

  109         int ret = 0, cpu = get_cpu ();

// 工作結構還沒在佇列 , 設定 pending 標誌表示把工作結構掛接到佇列中

  111         if (!test_and_set_bit (0, &work ->pending )) {

  112                 if (unlikely (is_single_threaded (wq )))

  113                         cpu = singlethread_cpu ;

  114                 BUG_ON (!list_empty (&work ->entry ));

  115                 __queue_work (per_cpu_ptr (wq ->cpu_wq , cpu ), work );

////////////////////////////////

  84 static void __queue_work (struct cpu_workqueue_struct *cwq ,

  85                          struct work_struct *work )

  86 {

  87         unsigned long flags ;

  88

  89         spin_lock_irqsave (&cwq ->lock , flags );

  //// 指向 CPU 工作佇列

  90         work ->wq_data = cwq ;

// 加到佇列尾部

  91         list_add_tail (&work ->entry , &cwq ->worklist );

  92         cwq ->insert_sequence ++;

// 喚醒工作佇列的核心處理執行緒

  93         wake_up (&cwq ->more_work );

  94         spin_unlock_irqrestore (&cwq ->lock , flags );

  95 }

////////////////////////////////////

  116                 ret = 1;

  117         }

  118         put_cpu ();

  119         return ret ;

  120 }

  121 EXPORT_SYMBOL_GPL (queue_work );

一旦其所在的處理器上的工作者執行緒被喚醒,它就會被執行。

3.4.2       schedule_work
在大多數情況下 , 並不需要自己建立工作佇列,而是隻定義工作 , 將工作結構掛接到核心預定義的事件工作佇列中排程 , 在 kernel/workqueue.c 中定義了一個靜態全域性量的工作佇列 static struct workqueue_struct *keventd_wq;

排程工作結構 , 將工作結構新增到全域性的事件工作佇列 keventd_wq ,呼叫了 queue_work 通用模組。對外遮蔽了 keventd_wq 的介面,使用者無需知道此引數,相當於使用了預設引數。 keventd_wq 由核心自己維護,建立,銷燬。

  455 static struct workqueue_struct *keventd_wq ;

  463 int fastcall schedule_work (struct work_struct *work )

  464 {

  465         return queue_work (keventd_wq , work );

  466 }

  467 EXPORT_SYMBOL (schedule_work );

3.4.3       queue_delayed_work
有時候並不希望工作馬上就被執行,而是希望它經過一段延遲以後再執行。在這種情況下,

同時也可以利用 timer 來進行延時排程,到期後才由預設的定時器回撥函式進行工作註冊。

延遲 delay 後,被定時器喚醒,將 work 新增到工作佇列 wq 中。

  143 int fastcall queue_delayed_work (struct workqueue_struct *wq ,

  144                         struct work_struct *work , unsigned long delay )

  145 {

  146         int ret = 0;

  147         struct timer_list *timer = &work ->timer ;

  148

  149         if (!test_and_set_bit (0, &work ->pending )) {

  150                 BUG_ON (timer_pending (timer ));

  151                 BUG_ON (!list_empty (&work ->entry ));

  152

  153                 /* This stores wq for the moment, for the timer_fn */

  154                 work ->wq_data = wq ;

  155                 timer ->expires = jiffies + delay ;

  156                 timer ->data = (unsigned long)work ;

  157                 timer ->function = delayed_work_timer_fn ;

////////////////////////////////////

定時器到期後執行的預設函式,其將某個 work 新增到一個工作佇列中,需兩個重要資訊:

Work : __data 定時器的唯一引數

待新增至的佇列:由 work ->wq_data 提供

  123 static void delayed_work_timer_fn (unsigned long __data )

  124 {

  125         struct work_struct *work = (struct work_struct *)__data ;

  126         struct workqueue_struct *wq = work ->wq_data ;

  127         int cpu = smp_processor_id ();

  128

  129         if (unlikely (is_single_threaded (wq )))

  130                 cpu = singlethread_cpu ;

  131

  132         __queue_work (per_cpu_ptr (wq ->cpu_wq , cpu ), work );

  133 }

////////////////////////////////////

  158                 add_timer (timer );

  159                 ret = 1;

  160         }

  161         return ret ;

  162 }

  163 EXPORT_SYMBOL_GPL (queue_delayed_work );

3.4.4       schedule_delayed_work
其利用 queue_delayed_work 實現了預設執行緒 keventd_wq 中工作的排程。

  477 int fastcall schedule_delayed_work (struct work_struct *work , unsigned long delay )

  478 {

  479         return queue_delayed_work (keventd_wq , work , delay );

  480 }

  481 EXPORT_SYMBOL (schedule_delayed_work );

3.5     重新整理工作
3.5.1       flush_workqueue
排入佇列的工作會在工作者執行緒下一次被喚醒的時候執行。有時,在繼續下一步工作之前,你必須保證一些操作已經執行完畢了。這一點對模組來說就很重要,在解除安裝之前,它就有可能需要呼叫下面的函式。 而在核心的其他部分,為了防止竟爭條件的出現,也可能需要確保不再有待處理的工作。

出於以上目的,核心準備了一個用於重新整理指定工作佇列的函式 flush_workqueue 。其確保所有已經排程的工作已經完成了,否則阻塞直到其執行完畢,通常用於驅動模組的關閉處理。 其檢查已經每個 CPU 上執行完的序號是否大於此時已經待插入的序號。對於新的以後插入的工作,其不受影響。

  320 void fastcall flush_workqueue (struct workqueue_struct *wq )

  321 {

  322         might_sleep ();

  323

  324         if (is_single_threaded (wq )) {

  325                 /* Always use first cpu's area. */

  326                 flush_cpu_workqueue (per_cpu_ptr (wq ->cpu_wq , singlethread_cpu ));

  327         } else {

  328                 int cpu ;

// 被保護的程式碼可能休眠,故此處使用核心互斥鎖而非自旋鎖

  330                  mutex_lock (&workqueue_mutex );

// 將同時排程其他 CPU 上的工作,這說明了工作並非在其註冊的 CPU 上執行

  331                 for_each_online_cpu (cpu )

  332                         flush_cpu_workqueue (per_cpu_ptr (wq ->cpu_wq , cpu ));

//////////////////////////

  278 static void flush_cpu_workqueue (struct cpu_workqueue_struct * cwq )

  279 {

  280         if ( cwq -> thread == current ) {

// keventd 本身需要重新整理所有工作時,手動呼叫 run_workqueue ,否則將造成死鎖。

  285                 run_workqueue ( cwq );

  286         } else {

  287                 DEFINE_WAIT ( wait );

  288                 long sequence_needed ;

  289

  290                 spin_lock_irq (& cwq -> lock );

// 儲存佇列中當前已有的工作所處的位置,不用等待新插入的工作執行完畢

  291                 sequence_needed = cwq -> insert_sequence ;

  292

  293                  while ( sequence_needed - cwq -> remove_sequence > 0) {

// 如果佇列中還有未執行完的工作,則休眠

  294                          prepare_to_wait (& cwq -> work_done , & wait ,

  295                                         TASK_UNINTERRUPTIBLE );

  296                         spin_unlock_irq (& cwq -> lock );

  297                         schedule ();

  298                         spin_lock_irq (& cwq -> lock );

  299                 }

  300                  finish_wait (& cwq -> work_done , & wait );

  301                 spin_unlock_irq (& cwq -> lock );

  302         }

  303 }

//////////////////////////

  333                 mutex_unlock (&workqueue_mutex );

  334         }

  335 }

  336 EXPORT_SYMBOL_GPL (flush_workqueue );

函式會一直等待,直到佇列中所有物件都被執行以後才返回。在等待所有待處理的工作執行的時候,該函式會進入休眠狀態,所以只能在程序上下文中使用它。

注意,該函式並不取消任何延遲執行的工作。就是說,任何通過 schedule_delayed_work 排程的工作,如果其延遲時間未結束,它並不會因為呼叫 flush_scheduled_work() 而被重新整理掉 。

3.5.2       flush_scheduled_work
 

刷新系統預設工作執行緒的函式為 flush_scheduled_work ,其呼叫了上面通用的函式

  532 void flush_scheduled_work (void)

  533 {

  534         flush_workqueue (keventd_wq );

  535 }

  536 EXPORT_SYMBOL (flush_scheduled_work );

3.5.3       cancel_delayed_work
取消延遲執行的工作應該呼叫:

int cancel_delayed_work(struct work_struct *work);

這個函式可以取消任何與 work_struct 相關的掛起工作。

3.6     建立新的工作佇列
如果預設的佇列不能滿足你的需要,你應該建立一個新的工作佇列和與之相應的工作者執行緒。 由於這麼做會在每個處理器上都建立一個工作者執行緒,所以只有在你明確了必須要靠自己的一套執行緒來提高效能的情況下 ,再建立自己的工作佇列。

建立一個新的任務佇列和與之相關的工作者執行緒,只需呼叫一個簡單的函式: create_workqueue 。這個函式會建立所有的工作者執行緒 ( 系統中的每個處理器都有一個 ) 並且做好所有開始處理工作之前的準備工作。 name 引數用於該核心執行緒的命名。對於具體的執行緒會更加 CPU 號新增上序號。

create_workqueue 和 create_singlethread_workqueue 都是建立一個工作佇列,但是差別在於 create_singlethread_workqueue 可以指定為此工作佇列只建立一個核心執行緒,這樣可以節省資源,無需發揮 SMP 的並行處理優勢。

create_singlethread_workqueue 對外進行了封裝,相當於使用了預設引數。二者同時呼叫了統一的處理函式 __ create_workqueue ,其對外不可見。

  59 #define create_workqueue (name ) __create_workqueue ((name ), 0 )

  60 #define create_singlethread_workqueue (name ) __create_workqueue ((name ), 1)

  363 struct workqueue_struct *__ create_workqueue (const char *name ,

  364                                             int singlethread )

  365 {

  366         int cpu , destroy = 0;

  367         struct workqueue_struct *wq ;

  368         struct task_struct *p ;

  369

  370         wq = kzalloc (sizeof(*wq ), GFP_KERNEL );

  371         if (!wq )

  372                 return NULL ;

  373

  374         wq ->cpu_wq = alloc_percpu (struct cpu_workqueue_struct );

  375         if (!wq ->cpu_wq ) {

  376                 kfree (wq );

  377                 return NULL ;

  378         }

  379

  380         wq ->name = name ;

  381         mutex_lock (&workqueue_mutex );

  382         if (singlethread ) {

  383                 INIT_LIST_HEAD (&wq ->list ); // 終止連結串列

  384                 p = create_workqueue_thread (wq , singlethread_cpu ) ;

  385                 if (!p )

  386                         destroy = 1;

  387                 else

  388                         wake_up_process (p );

  389         } else {

  390                 list_add (&wq ->list , &workqueues );

  391                 for_each_online_cpu (cpu ) {

  392                         p = create_workqueue_thread (wq , cpu );

/////////////////////////////////

  338 static struct task_struct *create_workqueue_thread (struct workqueue_struct *wq ,

  339                                                    int cpu )

  340 {

  341         struct cpu_workqueue_struct *cwq = per_cpu_ptr (wq ->cpu_wq , cpu );

  342         struct task_struct *p ;

  343

  344         spin_lock_init (&cwq ->lock );

  345         cwq ->wq = wq ;

  346         cwq ->thread = NULL ;

  347         cwq ->insert_sequence = 0;

  348         cwq ->remove_sequence = 0;

  349         INIT_LIST_HEAD (&cwq ->worklist );

  350         init_waitqueue_head (&cwq ->more_work );

  351         init_waitqueue_head (&cwq ->work_done );

  352

  353         if (is_single_threaded (wq ))

  354                 p = kthread_create (worker_thread , cwq , "%s", wq ->name );

  355         else

  356                 p = kthread_create (worker_thread , cwq , "%s/%d", wq ->name , cpu );

  357         if (IS_ERR (p ))

  358                 return NULL ;

  359         cwq ->thread = p ;

  360         return p ;

  361 }

/////////////////////////////////

  393                         if (p ) {

  394                                 kthread_bind (p , cpu );

  395                                 wake_up_process (p );

  396                         } else

  397                                 destroy = 1;

  398                  }

  399         }

  400         mutex_unlock (&workqueue_mutex );

  401

  405         if (destroy ) {// 如果啟動任意一個執行緒失敗,則銷燬整個工作佇列

  406                 destroy_workqueue (wq );

  407                 wq = NULL ;

  408         }

  409         return wq ;

  410 }

  411 EXPORT_SYMBOL_GPL (__create_workqueue );

3.7     銷燬工作佇列
銷燬一個工作佇列,若有未完成的工作,則阻塞等待其完成。然後銷燬對應的核心執行緒。

  434 void destroy_workqueue (struct workqueue_struct *wq )

  435 {

  436         int cpu ;

  437

  438         flush_workqueue (wq ); // 等待所有工作完成

  439 /// 利用全域性的互斥鎖鎖定所有工作佇列的操作

  441         mutex_lock (&workqueue_mutex );

// 清除相關的核心執行緒

  442         if (is_single_threaded (wq ))

  443                 cleanup_workqueue_thread (wq , singlethread_cpu );

  444         else {

  445                 for_each_online_cpu (cpu )

  446                         cleanup_workqueue_thread (wq , cpu );

/////////////////////////////////

  413 static void cleanup_workqueue_thread (struct workqueue_struct *wq , int cpu )

  414 {

  415         struct cpu_workqueue_struct *cwq ;

  416         unsigned long flags ;

  417         struct task_struct *p ;

  418

  419         cwq = per_cpu_ptr (wq ->cpu_wq , cpu );

  420         spin_lock_irqsave (&cwq ->lock , flags );

  421         p = cwq ->thread ;

  422         cwq ->thread = NULL ;

  423         spin_unlock_irqrestore (&cwq ->lock , flags );

  424         if (p )

  425                 kthread_stop (p ); // 銷燬該執行緒,此處可能休眠

  426 }

/////////////////////////////////

  447                 list_del (&wq ->list );

  448         }

  449         mutex_unlock (&workqueue_mutex );

  450         free_percpu (wq ->cpu_wq );

  451         kfree (wq );

  452 }

  453 EXPORT_SYMBOL_GPL (destroy_workqueue );

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

這節介紹另外一種的下半部實現——工作佇列。相對於軟中斷/tasklet,工作對列執行在程序上下文,允許睡眠,接下來慢慢介紹。

xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

1、工作佇列的使用

按慣例,在介紹工作佇列如何實現之前,先說說如何使用工作佇列實現下半部。

步驟一、定義並初始化工作佇列:

建立工作佇列函式:

struct workqueue_struct *create_workqueue(const char *name)

函式傳參是核心中工作佇列的名稱,返回值是workqueue_struct結構體的指標,該結構體用來維護一個等待佇列。

我的程式碼如下:

/*6th_irq_3/4th/test.c*/

14 struct workqueue_struct *xiaobai_wq; //定義工作佇列

33 xiaobai_wq = create_workqueue("xiaobai");

步驟二、定義並初始化work結構體:

核心使用結構體來維護一個加入工作佇列的任務:

/*linux/workqueue.h*/

25 struct work_struct {

26 atomic_long_t data;

27 #define WORK_STRUCT_PENDING 0 /* T if work item pending execution */

28 #define WORK_STRUCT_FLAG_MASK (3UL)

29 #define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)

30 struct list_head entry;

31 work_func_t func; //這個是重點,下半部實現的處理函式指標就放在這

32 #ifdef CONFIG_LOCKDEP

33 struct lockdep_map lockdep_map;

34 #endif

35 };

同樣有靜態和動態兩種方法:

靜態定義並初始化work結構體:

/*linux/workqueue.h*/

72 #define DECLARE_WORK(n, f) /

73 struct work_struct n = __WORK_INITIALIZER(n, f)

定義並初始化一個叫nwork_struct資料結構,它對應的的處理函式是f

對應的動態初始化方法,該函式返回work_struct指標,所以需要先定義一個work_struct結構:

/*linux/workqueue.h*/

107 #define INIT_WORK(_work, _func) /

108 do { /

109 (_work)->data = (atomic_long_t) WORK_DATA_INIT(); /

110 INIT_LIST_HEAD(&(_work)->entry); /

111 PREPARE_WORK((_work), (_func)); /

112 } while (0)

113 #endif

tasklet一樣,在初始化的同時,需要將處理函式實現,程式碼如下:

/*6th_irq_3/4th/test.c*/

15 struct work_struct xiaobai_work; //定義work結構體

16

17 void xiaobai_func(struct work_struct *work) //處理函式

18 {

19 printk("hello xiaobai!/n"); //同樣什麼都沒幹,只是列印

20 }

34 INIT_WORK(&xiaobai_work, xiaobai_func); //初始化work結構體

步驟三、在中斷處理函式中排程任務:

工作佇列和worl結構體都已經實現了,接下來就可以排程了,使用一下函式:

/*kernel/workqueue.c*/

161 int queue_work(struct workqueue_struct *wq, struct work_struct *work)

將指定的任務(work_struct),新增到指定的工作佇列中。同樣的,排程並不代表處理函式能夠馬上執行,這由核心程序排程決定。

步驟四、在解除安裝模組時,重新整理並登出等待佇列:

重新整理等待佇列函式:

/*kernel/workqueue.c*/

411 void flush_workqueue(struct workqueue_struct *wq)

該函式會一直等待,知道指定的等待佇列中所有的任務都執行完畢並從等待佇列中移除。

登出等待佇列:

/*kernel/workqueue.c*/

904 void destroy_workqueue(struct workqueue_struct *wq)

該函式是是建立等待佇列的反操作,登出掉指定的等待佇列。

四個步驟講完,貼個程式碼:

/*6th_irq_3/4th/test.c*/

1 #include <linux/module.h>

2 #include <linux/init.h>

3

4 #include <linux/interrupt.h>

5 #include <linux/workqueue.h>

6

7 #define DEBUG_SWITCH 1

8 #if DEBUG_SWITCH

9 #define P_DEBUG(fmt, args...) printk("<1>" "<kernel>[%s]"fmt, __FUNCTI ON__, ##args)

10 #else

11 #define P_DEBUG(fmt, args...) printk("<7>" "<kernel>[%s]"fmt, __FUNCTI ON__, ##args)

12 #endif

13

14 struct workqueue_struct *xiaobai_wq; //1.定義工作佇列

15 struct work_struct xiaobai_work; //2定義work結構體

16

17 void xiaobai_func(struct work_struct *work) //2實現處理函式

18 {

19 printk("hello xiaobai!/n");

20 }

21

22 irqreturn_t irq_handler(int irqno, void *dev_id)

23 {

24 printk("key down/n");

25 queue_work(xiaobai_wq ,&xiaobai_work); //3排程任務

26 return IRQ_HANDLED;

27 }

28 static int __init test_init(void) //模組初始化函式

29 {

30 int ret;

31

32 /*work*/

33 xiaobai_wq = create_workqueue("xiaobai"); //1初始化工作對列

34 INIT_WORK(&xiaobai_work, xiaobai_func); //2初始化work結構體

35

36 ret = request_irq(IRQ_EINT1, irq_handler,

37 IRQF_TRIGGER_FALLING, "key INT_EINT1", NULL);

38 if(ret){

39 P_DEBUG("request irq failed!/n");

40 return ret;

41 }

42

43 printk("hello irq/n");

44 return 0;

45 }

46

相關推薦

中斷服務半部工作佇列

1        工作佇列概述 工作佇列 (work queue) 是另外一種將工作推後執行的形式,它和我們前面討論的所有其他形式都不相同。工作佇列可以把工作推後,交由一個核心執行緒去執行—這個下半部分總是會在程序上下文執行,但由於是核心執行緒,其不能訪問使用者空間。 最重要

(六)3中斷半部工作佇列

1、工作佇列的使用 按慣例,在介紹工作佇列如何實現之前,先說說如何使用工作佇列實現下半部。 步驟一、定義並初始化工作佇列: 建立工作佇列函式: struct workqueue_struct *create_workqueue(const char *nam

中斷半部_工作佇列(work queue)

1>work_queue:<linux/workqueue.h> __3.0.4 2>description: 中斷下半部,在核心執行緒的程序上下文中執行推後的工作. 它是唯一能在程序上下文執行的中斷下半部實現機制,也只有它才可以睡眠. 3>建

linux 觸控式螢幕驅動中斷半部實現-工作佇列

工作佇列(work queue)是Linux kernel中將工作推後執行的一種機制。這種機制和BH或Tasklets不同之處在於工作佇列是把推後的工作交由一個核心執行緒去執行,因此工作佇列的優勢就在於它允許重新排程甚至睡眠。工作佇列可以把工作推後,交由一個核心執行緒去執

eMMC分割槽管理、匯流排協議和工作模式

本文轉載自:https://blog.csdn.net/u013686019/article/details/66472291 一、eMMC 簡介 eMMC 是 embedded MultiMediaCard 的簡稱。MultiMediaCard,即MMC, 是一種快閃記憶體卡(Fla

Linuxgit源碼安裝

lac itl 允許 pla rom code ins 8.0 代碼 轉自:http://blog.csdn.net/u012889638/article/details/51167123 版權聲明:本文為博主原創文章,未經博主允許不得轉載。 版本信息:Cent

Linux(Centos )的網絡內核參數優化來提高服務器並發處理能力

正在 數量 庫服務器 sdn 正常 般的 優化 trie 字數 簡介 提高服務器性能有很多方法,比如劃分圖片服務器,主從數據庫服務器,和網站服務器在服務器。但是硬件資源額定有限的情況下,最大的壓榨服務器的性能,提高服務器的並發處理能力,是很多運維技術人員思考的問題。要提高

函式呼叫 壓棧的工作原理

(轉自:https://blog.csdn.net/u011555996/article/details/70211315) 1.開篇   本篇文章著重寫的是系統中棧的工作原理,以及函式呼叫過程中棧幀的產生與釋放的過程,有可能名字過大,如果不合適我可以換一個名字,希望大家能夠指正,小丁虛心求

多執行緒記憶體問題分析mprotect方法

轉自:https://blog.csdn.net/agwtpcbox/article/details/53230664 http://www.yebangyu.org/blog/2016/02/01/detectmemoryghostinmultithread/ 多執行緒中的記憶體問題,一直被認為是噩夢般

camera理論基礎和工作原理

轉自:http://www.cnblogs.com/fjutacm/p/220631977df995512d136e4dbd411951.html 寫在前面的話,本文是因為工作中需要編寫攝像頭程式,因為之前沒有做過這類產品,所以網上搜索的資料,先整理如下,主要參考文章如下,如果有侵權,請聯絡我;另外,轉載請

UNIX的LD_PRELOAD環境變數

前言        也許這個話題並不新鮮,因為LD_PRELOAD所產生的問題由來已久。不過,在這裡,我還是想討論一下這個環境變數。因為這個環境變數所帶來的安全問題非常嚴重,值得所有的Unix下的程式設計師的注意。 在開始講述為什麼要當心LD_PRELOAD環

Linux調整ext3分割槽大小

本文轉載自:https://blog.csdn.net/cruise_h/article/details/22403529 本文討論如何再不丟失資料的情況下調整已有ext3分割槽的大小,包括: 壓縮已有分割槽 增大已有分割槽 合併兩個ext3分割槽 這在沒有使用LVM(邏輯卷管理),而

在 Eclipse 利用 gradle 構建系統

在 Eclipse 下利用 gradle 構建系統 簡介:  構建系統時候常常要用到 Ant, Maven 等工具,對於初學者來說,它們還是過於複雜,上手還是需要時間的。本文將向讀者介紹一種全新的構建專案的方式 gradle,它簡單、上手快,能大大節省專案的時間

無向圖的最短路徑求解演算法——Dijkstra演算法

在準備ACM比賽的過程中,研究了圖論中一些演算法。首先研究的便是最短路的問題。《離散數學》第四版(清華大學出版社)一書中講解的Dijkstra演算法是我首先研究的源材料。       如何求圖中V0到V5的最短路徑呢?         java實現的方式如下:   

xv6 + Qemu 在Ubuntu編譯運行教程

x86_64 mpi 出現 system 兩種方法 開發 n-1 不出 soft 轉自:https://blog.csdn.net/yinglang19941010/article/details/49310111 如果想要離線看教程,可以下載該 文檔 一、使用工具說明

jvm虛擬機器工作機制

轉自:https://www.cnblogs.com/lao-liang/p/5110710.html 1 概述   眾所周知,Java支援平臺無關性、安全性和網路移動性。而Java平臺由Java虛擬機器和Java核心類所構成,它為純Java程式提供了統一的程式設計介面,而不管下層作業系統是什麼。正是得益

中斷半部機制工作佇列詳解

工作佇列的使用方法和tasklet 非常相似,下面的程式碼用於定義一個工作佇列和一個底半部執行函式。 struct work_struct my_wq; /*定義一個工作佇列*/ void my_wq_func(unsigned long); /*定義一個處理函式*/ 通過INIT_W

linux裝置驅動歸納總結(六):3.中斷半部tasklet

linux裝置驅動歸納總結(六):3.中斷的上半部和下半部——tasklet xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 一、什麼是下半部 中斷是一個很霸道的東西,處理

搭建 Linux GitLab 服務

ssh認證 target libcurl 擁有 strong 部分 system shel tin 轉自:http://blog.csdn.net/passion_wu128/article/details/8216086 版權聲明:本文為博主原創文章,未經博主允許不

ArcGIS Server10.1服務新特性(WMTS1.0.0)

class href 知識 技術分享 restful cgi art 存在 alt http://blog.csdn.net/esrichinacd/article/details/7825587 ArcGIS Server10.1正式支持OGC的WMTS1.0.0版