1. 程式人生 > >Concurrency Managed Workqueue(一)workqueue基本概念

Concurrency Managed Workqueue(一)workqueue基本概念

使用 似的 stat lba 插入 hand 然而 基於 我們

一、前言

workqueue是一個驅動工程師常用的工具,在舊的內核中(指2.6.36之前的內核版本)workqueue代碼比較簡單(大概800行),在2.6.36內核版本中引入了CMWQ(Concurrency Managed Workqueue),workqueue.c的代碼膨脹到5000多行,為了深入的理解CMWQ,單單一份文檔很難將其描述的清楚,因此CMWQ作為一個主題將會產生一系列的文檔,本文是這一系列文檔中的第一篇,主要是基於2.6.23內核的代碼實現來講述workqueue的一些基本概念(之所以選擇較低版本的內核,主要是因為代碼簡單,適合理解基本概念)。

二、為何需要workqueue

1、什麽是中斷上下文和進程上下文?

在繼續描述workqueue之前,我們先梳理一下中斷上下文和進程上下文。對於中斷上下文,主要包括兩種情況:

(1)執行該中斷的處理函數(我們一般稱之interrupt handler或者叫做top half),也就是hard interrupt context

(2)執行軟中斷處理函數,執行tasklet函數,執行timer callback函數。(或者統稱bottom half),也就是software interrupt context。

top half當然是絕對的interrupt context,但對於上面的第二種情況,稍微有些復雜,其執行的現場包括:

(1)執行完top half,立刻啟動bottom half的執行

(2)當負荷比較重的時候(中斷產生的比較多),系統在一段時間內都在處理interrupt handler以及相關的softirq,從而導致無法調度到進程執行,這時候,linux kernel采用了將softirq推遲到softirqd這個內核線程中執行

(3)進程在內核態運行的時候,由於內核同步的需求,需要使用local_bh_disable/local_bh_enable來保護臨界區。在臨界區代碼執行的時候,有可能中斷觸發並raise softirq,但是由於softirq處於disable狀態從而在中斷返回的時候沒有辦法invoke softirq的執行,當調用local_bh_enable的時候,會調用已經觸發的那個softirq handler。

對於上面的情況1和情況3,毫無疑問,絕對的中斷上下文,執行現場的current task和softirq handler沒有任何的關系。對於情況2,雖然是在專屬的內核線程中執行,但是我也傾向將其歸入software interrupt context。

對於linux而言,中斷上下文都是驚鴻一瞥,只有進程(線程、或者叫做task)是永恒的。整個kernel都是在各種進程中切來切去,一會兒運行在進程的用戶空間,一會兒通過系統調用進入內核空間。當然,系統不是封閉的,還是需要通過外設和User或者其他的系統進行交互,這裏就需要中斷上下文了,在中斷上下文中,完成硬件的交互,最終把數據交付進程或者進程將數據傳遞給外設。進程上下文有豐富的、屬於自己的資源:例如有硬件上下文,有用戶棧、有內核棧,有用戶空間的正文段、數據段等等。而中斷上下文什麽也沒有,只有一段執行代碼及其附屬的數據。那麽問題來了:中斷執行thread中的臨時變量應該保存在棧上,那麽中斷上下文的棧在哪裏?中斷上下文沒有屬於自己的棧,腫麽辦?那麽只能借了,當中斷發生的時候,遇到哪一個進程就借用哪一個進程的資源(遇到就是緣分吶)。

2、如何判定當前的context?

OK,上一節描述中斷上下文和進程上下文的含義,那麽代碼如何知道自己的上下文呢?下面我們結合代碼來進一步分析。in_irq()是用來判斷是否在hard interrupt context的,我們一起來來看看in_irq()是如何定義的:

#define in_irq() (hardirq_count())

#define hardirq_count() (preempt_count() & HARDIRQ_MASK)

top half的處理是被irq_enter()和irq_exit()所包圍,在irq_enter函數中會調用preempt_count_add(HARDIRQ_OFFSET),為hardirq count的bit field增加1。在irq_exit函數中,會調用preempt_count_sub(HARDIRQ_OFFSET),為hardirq count的bit field減去1。因此,只要in_irq非零,則說明在中斷上下文並且處於top half部分。

解決了hard interrupt context,我們來看software interrupt context。如何判定代碼當前正在執行bottom half(softirq、tasklet、timer)呢?in_serving_softirq給出了答案:

#define in_serving_softirq() (softirq_count() & SOFTIRQ_OFFSET)

需要註意的是:在2.6.23內核中沒有這個定義(上面的代碼來自4.0的內核)。內核中還有一個類似的定義:

#define in_softirq() (softirq_count())

#define softirq_count() (preempt_count() & SOFTIRQ_MASK)

in_softirq定義了更大的一個區域,不僅僅包括了in_serving_softirq上下文,還包括了disable bottom half的場景。我們用下面一個圖片來描述:

技術分享圖片

我們知道,在進程上下文中,由於內核同步的要求可能會禁止softirq。這時候,kernel提供了local_bf_enable和local_bf_disable這樣的接口函數,這種場景下,在local_bf_enable函數中會執行軟中斷handler(在臨界區中,雖然raise了softirq,但是由於disable了bottom half,因此無法執行,只有等到enable的時候第一時間執行該softirq handler)。in_softirq包括了進程上下文中disable bottom half的臨界區部分,而in_serving_softirq精準的命中了software interrupt context。

內核中還有一個in_interrupt的宏定義,從它的名字上看似乎是定義了hard interrupt context和software interrupt context,到底是怎樣的呢?我們來看看定義:

#define in_interrupt() (irq_count())
#define irq_count() (preempt_count() & (HARDIRQ_MASK | SOFTIRQ_MASK \
| NMI_MASK))

註:上面的代碼來自4.0的內核。HARDIRQ_MASK定義了hard interrupt contxt,NMI_MASK定義了NMI(對於ARM是FIQ)類型的hard interrupt context,SOFTIRQ_MASK包括software interrupt context加上禁止softirq情況下的進程上下文。因此,in_interrupt()除了包括了中斷上下文的場景,還包括了進程上下文禁止softirq的場景。

還有一個in_atomic的宏定義,大家可以自行學習,這裏不再描述了。

3、為何中斷上下文不能sleep?

linux驅動工程師應該都會聽說過這句話:中斷上下文不能sleep,但是為什麽呢?這個問題可以仔細思考一下。所謂sleep就是調度器掛起當前的task,然後在run queue中選擇另外一個合適的task運行。規則很簡單,不過實際操作就沒有那麽容易了。有一次,我們調試wifi驅動的時候,有一個issue很有意思:正常工作的時候一切都是OK的,但是當進行壓力測試的時候,系統就會down掉。最後發現是在timer的callback函數中輾轉多次調用了kmalloc函數,我們都知道,在某些情況下,kmalloc會導致當前進程被block。

從操作系統設計的角度來看,大部分的OS都規定中斷上下文不能sleep,有些是例外的,比如solaris,每個中斷的handler都是在它自己的task中處理的,因此可以在中斷handler中sleep。不過在這樣的系統中(很多RTOS也是如此處理的),實際的中斷上下文非常的薄,可能就是向該中斷handler對應的task發送一個message,所有的處理(ack中斷、mask中斷、copy FIFO等)都是在該中斷的task中處理。這樣的系統中,當然可以在中斷handler中sleep,不過這有點偷換概念,畢竟這時候的上下文不是interrupt context,更準確的說是中斷處理的process context,這樣的系統interrupt context非常非常的簡單,幾乎沒有。

當然,linux的設計並非如此(其實在rt linux中已經有了這樣的苗頭,可以參考中斷線程化的文章),中斷handler以及bottom half(不包括workqueue)都是在interrupt context中執行。當然一提到context,各種資源還是要存在的,例如說內核棧、例如說memory space等,interrupt context雖然單薄,但是可以借屍還魂。當中斷產生的那一個時刻,當前進程有幸成為interrupt context的殼,提供了內核棧,保存了hardware context,此外各種資源(例如mm_struct)也是借用當前進程的。本來呢interrupt context身輕如燕,沒有依賴的task,調度器其實是不知道如何調度interrupt context的(它處理的都是task),在interrupt context借了一個外殼後,從理論上將,調度器是完全可以block該interrupt context執行,並將其他的task調入進入running狀態。然而,block該interrupt context執行也就block其外殼task的執行,多麽的不公平,多麽的不確定,中斷命中你,你就活該被schedule out,擁有正常思維的linux應該不會這麽做的。

因此,在中斷上下文中(包括hard interrupt context和software interrupt context)不能睡眠。

4、為何需要workqueue

workqueue和其他的bottom half最大的不同是它是運行在進程上下文中的,它可以睡眠,這和其他bottom half機制有本質的不同,大大方便了驅動工程師撰寫中斷處理代碼。當然,驅動模塊也可以自己創建一個kernel thread來解決defering work,但是,如果每個driver都創建自己的kernel thread,那麽內核線程數量過多,這會影響整體的性能。因此,最好的方法就是把這些需求匯集起來,提供一個統一的機制,也就是傳說中的work queue了。

三、數據抽象

1、workqueue。定義如下:

struct workqueue_struct {
struct cpu_workqueue_struct *cpu_wq; -----per-cpu work queue struct
struct list_head list; ---workqueue list
const char *name;
int singlethread; ----single thread or multi thread
int freezeable; ----和電源管理相關的一個flag
};

我們知道,workqueue就是一種把某些任務(work)推遲到一個或者一組內核線程中去執行,那個內核線程被稱作worker thread(每個processor上有一個work thread)。系統中所有的workqueue會掛入一個全局鏈表,鏈表頭定義如下:

static LIST_HEAD(workqueues);

list成員就是用來掛入workqueue鏈表的。singlethread是workqueue的一個特殊模式,一般而言,當創建一個workqueue的時候會為每一個系統內的processor創建一個內核線程,該線程處理本cpu調度的work。但是有些場景中,創建per-cpu的worker thread有些浪費(或者有一些其他特殊的考量),這時候創建single-threaded workqueue是一個更合適的選擇。freezeable成員是一個和電源管理相關的一個flag,當系統suspend的時候,有一個階段會將所有的用戶空間的進程凍結,那麽是否也凍結內核線程(包括workqueue)呢?缺省情況下,所有的內核線程都是nofrezable的,當然也可以調用set_freezable讓一個內核線程是可以被凍結的。具體是否需要設定該flag是和程序邏輯相關的,具體情況具體分析。OK,上面描述的都是workqueue中各個processor共享的成員,下面我們看看per-cpu的數據結構:

struct cpu_workqueue_struct {

spinlock_t lock; ----用來保護worklist資源的訪問

struct list_head worklist;
wait_queue_head_t more_work; -----等待隊列頭
struct work_struct *current_work; ----當前正在處理的work

struct workqueue_struct *wq; ------指向work queue struct
struct task_struct *thread; -------worker thread task

int run_depth; /* Detect run_workqueue() recursion depth */
} ____cacheline_aligned;

worker thread要處理work,這些work被掛入work queue中的鏈表結構。由於每個processor都需要處理自己的work,因此這個work list是per cpu的。worklist成員就是這個per cpu的鏈表頭,當worker thread被調度到的時候,就從這個隊列中一個個的摘下work來處理。

2、work。定義如下:

struct work_struct {
atomic_long_t data;
struct list_head entry;
work_func_t func;
};

所謂work就是異步執行的函數。你可能會覺得,反正是函數,直接調用不就OK了嗎?但是,事情沒有那麽簡單,如果該函數的代碼中有些需要sleep的場景的時候,那麽在中斷上下文中直接調用將產生嚴重的問題。這時候,就需要到進程上下文中異步執行。下面我們仔細看看各個成員:func就是這個異步執行的函數,當work被調度執行的時候其實就是調用func這個callback函數,該函數的定義如下:

typedef void (*work_func_t)(struct work_struct *work);

work對應的callback函數需要傳遞該work的struct作為callback函數的參數。work是被組織成隊列的,entry成員就是掛入隊列的那個節點,data包含了該work的狀態flag和掛入workqueue的信息。

3、總結

我們把上文中描述的各個數據結構集合在一起,具體請參考下圖:

技術分享圖片

我們自上而下來描述各個數據結構。首先,系統中包括若幹的workqueue,最著名的workqueue就是系統缺省的的workqueue了,定義如下:

static struct workqueue_struct *keventd_wq __read_mostly;

如果沒有特別的性能需求,那麽一般驅動使用keventd_wq就OK了,畢竟系統創建太多內核線程也不是什麽好事情(消耗太多資源)。當然,如果有需要,驅動模塊可以創建自己的workqueue。因此,系統中存在一個workqueues的鏈表,管理了所有的workqueue實例。一個workqueue對應一組work thread(先不考慮single thread的場景),每個cpu一個,由cpu_workqueue_struct來抽象,這些cpu_workqueue_struct們共享一個workqueue,畢竟這些worker thread是同一種type。

從底層驅動的角度來看,我們只關心如何處理deferable task(由work_struct抽象)。驅動程序定義了work_struct,其func成員就是deferred work,然後掛入work list就OK了(當然要喚醒worker thread了),系統的調度器調度到worker thread的時候,該work自然會被處理了。當然,掛入哪一個workqueue的那一個worker thread呢?如何選擇workqueue是driver自己的事情,可以使用系統缺省的workqueue,簡單,實用。當然也可以自己創建一個workqueue,並把work掛入其中。選擇哪一個worker thread比較簡單:work在哪一個cpu上被調度,那麽就掛入哪一個worker thread。

四、接口以及內部實現

1、初始化一個work。我們可以靜態定義一個work,接口如下:

#define DECLARE_WORK(n, f) \
struct work_struct n = __WORK_INITIALIZER(n, f)

#define DECLARE_DELAYED_WORK(n, f) \
struct delayed_work n = __DELAYED_WORK_INITIALIZER(n, f)

一般而言,work都是推遲到worker thread被調度的時刻,但是有時候,我們希望在指定的時間過去之後再調度worker thread來處理該work,這種類型的work被稱作delayed work,DECLARE_DELAYED_WORK用來初始化delayed work,它的概念和普通work類似,本文不再描述。

動態創建也是OK的,不過初始化的時候需要把work的指針傳遞給INIT_WORK,定義如下:

#define INIT_WORK(_work, _func) \
do { \
(_work)->data = (atomic_long_t) WORK_DATA_INIT(); \
INIT_LIST_HEAD(&(_work)->entry); \
PREPARE_WORK((_work), (_func)); \
} while (0)

2、調度一個work執行。調度work執行有兩個接口,一個是schedule_work,將work掛入缺省的系統workqueue(keventd_wq),另外一個是queue_work,可以將work掛入指定的workqueue。具體代碼如下:

int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
{
int ret = 0;

if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
__queue_work(wq_per_cpu(wq, get_cpu()), work);---掛入work list並喚醒worker thread
put_cpu();
ret = 1;
}
return ret;
}

處於pending狀態的work不會重復掛入workqueue。我們假設A驅動模塊靜態定義了一個work,當中斷到來並分發給cpu0的時候,中斷handler會在cpu0上執行,我們在handler中會調用schedule_work將該work掛入cpu0的worker thread,也就是keventd 0的work list。在worker thread處理A驅動的work之前,中斷很可能再次觸發並分發給cpu1執行,這時候,在cpu1上執行的handler在調用schedule_work的時候實際上是沒有任何具體的動作的,也就是說該work不會掛入keventd 1的work list,因為該work還pending在keventd 0的work list中。

到底插入workqueue的哪一個worker thread呢?這是由wq_per_cpu定義的:

static struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu)
{
if (unlikely(is_single_threaded(wq)))
cpu = singlethread_cpu;
return per_cpu_ptr(wq->cpu_wq, cpu);
}

普通情況下,都是根據當前的cpu id,通過per_cpu_ptr獲取cpu_workqueue_struct的數據結構,對於single thread而言,cpu是固定的。

3、創建workqueue,接口如下:

#define create_workqueue(name) __create_workqueue((name), 0, 0)
#define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1)
#define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0)

create_workqueue是創建普通workqueue,也就是每個cpu創建一個worker thread的那種。當然,作為“普通”的workqueue,在freezeable屬性上也是跟隨缺省的行為,即在suspend的時候不凍結該內核線程的worker thread。create_freezeable_workqueue和create_singlethread_workqueue都是創建single thread workqueue,只不過一個是freezeable的,另外一個是non-freezeable的。的代碼如下:

struct workqueue_struct *__create_workqueue(const char *name, int singlethread, int freezeable)
{
struct workqueue_struct *wq;
struct cpu_workqueue_struct *cwq;
int err = 0, cpu;

wq = kzalloc(sizeof(*wq), GFP_KERNEL);----分配workqueue的數據結構

wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);---分配worker thread的數據結構

wq->name = name;----------初始化workqueue
wq->singlethread = singlethread;
wq->freezeable = freezeable;
INIT_LIST_HEAD(&wq->list);

if (singlethread) {-----------------------(1)
cwq = init_cpu_workqueue(wq, singlethread_cpu); ---初始化cpu_workqueue_struct
err = create_workqueue_thread(cwq, singlethread_cpu); ---創建worker thread
start_workqueue_thread(cwq, -1); ----wakeup worker thread
} else { -----------------------------(2)
mutex_lock(&workqueue_mutex);
list_add(&wq->list, &workqueues);

for_each_possible_cpu(cpu) {
cwq = init_cpu_workqueue(wq, cpu);
if (err || !cpu_online(cpu)) ----沒有online的cpu就不需要創建worker thread了
continue;
err = create_workqueue_thread(cwq, cpu);
start_workqueue_thread(cwq, cpu);
}
mutex_unlock(&workqueue_mutex);
}
return wq;
}

(1)不管是否是single thread workqueue,worker thread(cpu_workqueue_struct)的數據結構總是per cpu分配的(稍顯浪費),不過實際上對於single thread workqueue而言,只會使用其中之一,那麽問題來了:使用哪一個processor的cpu_workqueue_struct呢?workqueue代碼定義了一個singlethread_cpu的變量,如下:

static int singlethread_cpu __read_mostly;

該變量會在init_workqueues函數中進行初始化。實際上,使用哪一個cpu的cpu_workqueue_struct是無所謂的,選擇其一就OK了。由於是single thread workqueue,因此創建的worker thread並不綁定在任何的cpu上,調度器可以自由的調度該內核線程在任何的cpu上運行。

(2)對於普通的workqueue,和single thread的處理有所有不同。一方面,single thread的workqueue沒有掛入workqueues的全局鏈表,另外一方面for_each_possible_cpu確保在每一個cpu上創建了一個worker thread並通過start_workqueue_thread啟動其運行,具體代碼如下:

static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
struct task_struct *p = cwq->thread;

if (p != NULL) {
if (cpu >= 0)
kthread_bind(p, cpu);
wake_up_process(p);
}
}

對於single thread,kthread_bind不會執行,對於普通的workqueue,我們必須調用kthread_bind以便讓worker thread在特定的cpu上執行。

4、work執行的時機

work執行的時機是和調度器相關的,當系統調度到worker thread這個內核線程後,該thread就會開始工作。每個cpu上執行的worker thread的內核線程的代碼邏輯都是一樣的,在worker_thread中實現:

static int worker_thread(void *__cwq)
{
struct cpu_workqueue_struct *cwq = __cwq;
DEFINE_WAIT(wait);

if (cwq->wq->freezeable)---如果是freezeable的內核線程,那麽需要清除task flag中的
set_freezable(); PF_NOFREEZE標記,以便在系統suspend的時候凍結該thread

set_user_nice(current, -5); ----提高進程優先級,呵呵,worker thread還是有些特權的哦

for (;;) {
prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
if (!freezing(current) && !kthread_should_stop() && list_empty(&cwq->worklist))
schedule();--------------(1)
finish_wait(&cwq->more_work, &wait);

try_to_freeze(); ------處理來自電源管理模塊的凍結請求

if (kthread_should_stop()) -----處理停止該thread的請求
break;

run_workqueue(cwq); ------依次處理work list上的各個work
}

return 0;
}

(1)導致worker thread進入sleep狀態有三個條件:(a)電源管理模塊沒有請求凍結該worker thread。(b)該thread沒有被其他模塊請求停掉。(c)work list為空,也就是說沒有work要處理

Concurrency Managed Workqueue(一)workqueue基本概念