1. 程式人生 > >Linux等待佇列(Wait Queue)

Linux等待佇列(Wait Queue)

1. Linux等待佇列概述

Linux核心的等待佇列(Wait Queue)是重要的資料結構,與程序排程機制緊密相關聯,可以用來同步對系統資源的訪問、非同步事件通知、跨程序通訊等。
在Linux中,等待佇列以迴圈連結串列為基礎結構,包括兩種資料結構:等待佇列頭(wait queue head)和等待佇列元素(wait queue),整個等待佇列由等待佇列頭進行管理。下文將用核心原始碼(基於Linux kernel 5.2)對等待佇列進行介紹,詳細說明採用等待佇列實現程序阻塞和喚醒的方法。

 

2. 等待佇列頭和等待佇列元素

等待佇列以迴圈連結串列為基礎結構,連結串列頭和連結串列項分別為等待佇列頭和等待佇列元素,分別用結構體 wait_queue_head_t 和 wait_queue_entry_t 描述(定義在 linux/wait.h )。

2.1 基本概念

struct wait_queue_head {
    spinlock_t          lock;
    struct list_head    head;
};

typedef struct wait_queue_head wait_queue_head_t;
typedef int (*wait_queue_func_t)(struct wait_queue_entry *wq_entry, unsigned mode, int flags, void *key);
int default_wake_function(struct wait_queue_entry *wq_entry, unsigned mode, int flags, void *key);

/* wait_queue_entry::flags */
#define WQ_FLAG_EXCLUSIVE   0x01
#define WQ_FLAG_WOKEN       0x02
#define WQ_FLAG_BOOKMARK    0x04

/*
 * A single wait-queue entry structure:
 */
struct wait_queue_entry {
    unsigned int        flags;
    void                *private;
    wait_queue_func_t   func;
    struct list_head    entry;
};

typedef struct wait_queue_entry wait_queue_entry_t;

等待佇列頭結構包括一個自旋鎖和一個連結串列頭。等待佇列元素除了包括連結串列項,還包括:

  •  flags : 標識佇列元素狀態和屬性
  •  *private : 用於指向關聯程序 task_struct 結構體的指標
  •  func : 函式指標,用於指向等待佇列被喚醒時的回撥的喚醒函式

以程序阻塞和喚醒的過程為例,等待佇列的使用場景可以簡述為:
程序 A 因等待某些資源(依賴程序 B 的某些操作)而不得不進入阻塞狀態,便將當前程序加入到等待佇列 Q 中。程序 B 在一系列操作後,可通知程序 A 所需資源已到位,便呼叫喚醒函式 wake up 來喚醒等待佇列上 Q 的程序,注意此時所有等待在佇列 Q 上的程序均被置為可執行狀態。
藉助上述描述場景,說明等待佇列元素屬性 flags 標誌的作用,下文也將結合原始碼進行詳細解讀。

  (1) WQ_FLAG_EXCLUSIVE 

上述場景中看到,當某程序呼叫 wake up 函式喚醒等待佇列時,佇列上所有的程序均被喚醒,在某些場合會出現喚醒的所有程序中,只有某個程序獲得了期望的資源,而其他程序由於資源被佔用不得不再次進入休眠。如果等待佇列中程序數量龐大時,該行為將影響系統性能。
核心增加了"獨佔等待”(WQ_FLAG_EXCLUSIVE)來解決此類問題。一個獨佔等待的行為和通常的休眠類似,但有如下兩個重要的不同:

  • 等待佇列元素設定 WQ_FLAG_EXCLUSIVE 標誌時,會被新增到等待佇列的尾部,而非頭部。
  • 在某等待佇列上呼叫 wake up 時,執行獨佔等待的程序每次只會喚醒其中第一個(所有非獨佔等待程序仍會被同時喚醒)。

  (2) WQ_FLAG_WOKEN 
暫時還未理解,TODO

  (3) WQ_FLAG_BOOKMARK 
用於 wake_up() 喚醒等待佇列時實現分段遍歷,減少單次對自旋鎖的佔用時間。

 

2.2 等待佇列的建立和初始化

等待佇列頭的定義和初始化有兩種方式: init_waitqueue_head(&wq_head) 和巨集定義 DECLARE_WAITQUEUE(name, task) 。

#define init_waitqueue_head(wq_head)                            \
    do {                                                        \
        static struct lock_class_key __key;                     \
        __init_waitqueue_head((wq_head), #wq_head, &__key);     \
    } while (0)

void __init_waitqueue_head(struct wait_queue_head *wq_head, const char *name, struct lock_class_key *key)
{
    spin_lock_init(&wq_head->lock);
    lockdep_set_class_and_name(&wq_head->lock, key, name);
    INIT_LIST_HEAD(&wq_head->head);
}
#define DECLARE_WAIT_QUEUE_HEAD(name)                       \
    struct wait_queue_head name = __WAIT_QUEUE_HEAD_INITIALIZER(name)

#define __WAIT_QUEUE_HEAD_INITIALIZER(name) {               \
    .lock       = __SPIN_LOCK_UNLOCKED(name.lock),          \
    .head       = { &(name).head, &(name).head } }

 

2.3 等待佇列元素的建立和初始化

建立等待佇列元素較為普遍的一種方式是呼叫巨集定義 DECLARE_WAITQUEUE(name, task) ,將定義一個名為 name 的等待佇列元素, private 資料指向給定的關聯程序結構體 task ,喚醒函式為 default_wake_function() 。後文介紹喚醒細節時詳細介紹喚醒函式的工作。

#define DECLARE_WAITQUEUE(name, tsk)                        \
    struct wait_queue_entry name = __WAITQUEUE_INITIALIZER(name, tsk)

#define __WAITQUEUE_INITIALIZER(name, tsk) {                \
    .private    = tsk,                                      \
    .func       = default_wake_function,                    \
    .entry      = { NULL, NULL } }

核心原始碼中還存在其他定義等待佇列元素的方式,呼叫巨集定義 DEFINE_WAIT(name) 和 init_wait(&wait_queue) 。
這兩種方式都將當前程序(current)關聯到所定義的等待佇列上,喚醒函式為 autoremove_wake_function() ,注意此函式與上述巨集定義方式時不同(上述定義中使用 default_wake_function() )。
下文也將介紹 DEFINE_WAIT() 和 DECLARE_WAITQUEUE() 在使用場合上的不同。

#define DEFINE_WAIT(name)   DEFINE_WAIT_FUNC(name, autoremove_wake_function)

#define DEFINE_WAIT_FUNC(name, function)                    \
    struct wait_queue_entry name = {                        \
        .private    = current,                              \
        .func       = function,                             \
        .entry      = LIST_HEAD_INIT((name).entry),         \
    }
#define init_wait(wait)                                     \
    do {                                                    \
        (wait)->private = current;                          \
        (wait)->func = autoremove_wake_function;            \
        INIT_LIST_HEAD(&(wait)->entry);                     \
        (wait)->flags = 0;                                  \
    } while (0)

2.4 新增和移除等待佇列

核心提供了兩個函式(定義在 kernel/sched/wait.c )用於將等待佇列元素 wq_entry 新增到等待佇列 wq_head 中: add_wait_queue() 和 add_wait_queue_exclusive() 。

  •  add_wait_queue() :在等待佇列頭部新增普通的等待佇列元素(非獨佔等待,清除 WQ_FLAG_EXCLUSIVE 標誌)。
  •  add_wait_queue_exclusive() :在等待佇列尾部新增獨佔等待佇列元素(設定了 WQ_FLAG_EXCLUSIVE 標誌)。
void add_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
    unsigned long flags;

    // 清除WQ_FLAG_EXCLUSIVE標誌
    wq_entry->flags &= ~WQ_FLAG_EXCLUSIVE;
    spin_lock_irqsave(&wq_head->lock, flags);
    __add_wait_queue(wq_head, wq_entry);
    spin_unlock_irqrestore(&wq_head->lock, flags);
}   

static inline void __add_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
    list_add(&wq_entry->entry, &wq_head->head);
}
void add_wait_queue_exclusive(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
    unsigned long flags;

    // 設定WQ_FLAG_EXCLUSIVE標誌
    wq_entry->flags |= WQ_FLAG_EXCLUSIVE;
    spin_lock_irqsave(&wq_head->lock, flags);
    __add_wait_queue_entry_tail(wq_head, wq_entry);
    spin_unlock_irqrestore(&wq_head->lock, flags);
}

static inline void __add_wait_queue_entry_tail(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
    list_add_tail(&wq_entry->entry, &wq_head->head);
}

 remove_wait_queue() 函式用於將等待佇列元素 wq_entry 從等待佇列 wq_head 中移除。

void remove_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
    unsigned long flags;

    spin_lock_irqsave(&wq_head->lock, flags);
    __remove_wait_queue(wq_head, wq_entry);
    spin_unlock_irqrestore(&wq_head->lock, flags);
}

static inline void
__remove_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
    list_del(&wq_entry->entry);
}

新增和移除等待佇列的示意圖如下所示:

3. 等待事件

核心中提供了等待事件 wait_event() 巨集(以及它的幾個變種),可用於實現簡單的程序休眠,等待直至某個條件成立,主要包括如下幾個定義:

wait_event(wq_head, condition)
wait_event_timeout(wq_head, condition, timeout) 
wait_event_interruptible(wq_head, condition)
wait_event_interruptible_timeout(wq_head, condition, timeout)
io_wait_event(wq_head, condition)

上述所有形式函式中, wq_head 是等待佇列頭(採用”值傳遞“的方式傳輸函式), condition 是任意一個布林表示式。使用 wait_event ,程序將被置於非中斷休眠,而使用 wait_event_interruptible 時,程序可以被訊號中斷。
另外兩個版本 wait_event_timeout 和 wait_event_interruptible_timeout 會使程序只等待限定的時間(以jiffy表示,給定時間到期時,巨集均會返回0,而無論 condition 為何值)。

詳細介紹 wait_event() 函式的實現原理。

#define wait_event(wq_head, condition)                      \
    do {                                                    \
        might_sleep();                                      \
        // 如果condition滿足,提前返回                       \
        if (condition)                                      \
           break;                                           \
        __wait_event(wq_head, condition);                   \
    } while (0)
 
#define __wait_event(wq_head, condition)                    \
     (void)___wait_event(wq_head, condition, TASK_UNINTERRUPTIBLE, 0, 0, schedule())


/* 定義等待佇列元素,並將元素加入到等待佇列中
 * 迴圈判斷等待條件condition是否滿足,若條件滿足,或者接收到中斷訊號,等待結束,函式返回
 * 若condition滿足,返回0;否則返回-ERESTARTSYS
 */
#define ___wait_event(wq_head, condition, state, exclusive, ret, cmd)       \
({                                                          \
     __label__ __out;                                       \
     struct wait_queue_entry __wq_entry;                    \
     long __ret = ret;          /* explicit shadow */       \
                                                            \
     // 初始化等待佇列元素__wq_entry,關聯當前程序,根據exclusive引數初始化屬性標誌 \
     // 喚醒函式為autoremove_wake_function()                                        \
     init_wait_entry(&__wq_entry, exclusive ? WQ_FLAG_EXCLUSIVE : 0);    \
     // 等待事件迴圈                                        \
     for (;;) {                                             \
        // 如果程序可被訊號中斷並且剛好有訊號掛起,返回-ERESTARTSYS     \
        // 否則,將等待佇列元素加入等待佇列,並且設定程序狀態,返回0    \
        long __int = prepare_to_wait_event(&wq_head, &__wq_entry, state);\
                                                            \
        // 當前程序讓出排程器前,判斷condition是否成立。若成立,提前結束,後續將返回0   \
        if (condition)                                      \
            break;                                          \
                                                            \
        // 當前程序讓出排程器前,判斷當前程序是否接收到中斷訊號(或KILL訊號)       \
        // 如果成立,將提前返回-ERESTARTSYS                 \
        if (___wait_is_interruptible(state) && __int) {     \
            __ret = __int;                                  \
            goto __out;                                     \
        }                                                   \
                                                            \
        // 此處實際執行schedule(),當前程序讓出排程器       \
        cmd;                                                \
     }                                                      \
     // 設定程序為可執行狀態,並且將等待佇列元素從等待佇列中刪除    \
     finish_wait(&wq_head, &__wq_entry);                    \
     __out:  __ret;                                         \
})  

void init_wait_entry(struct wait_queue_entry *wq_entry, int flags) 
{
    wq_entry->flags = flags;
    wq_entry->private = current;
    wq_entry->func = autoremove_wake_function;
    INIT_LIST_HEAD(&wq_entry->entry);
}

long prepare_to_wait_event(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry, int state)
{
    unsigned long flags;
    long ret = 0;

    spin_lock_irqsave(&wq_head->lock, flags);
    // 返回非0值條件:可被訊號中斷並且確實有訊號掛起
    if (signal_pending_state(state, current)) {
        // 將等待佇列元素從等待佇列中刪除,返回-ERESTARTSYS
        list_del_init(&wq_entry->entry);
        ret = -ERESTARTSYS;
    } else {
        // 判斷wq_entry->entry是否為空,即等待佇列元素是否已經被新增到等待佇列中
        if (list_empty(&wq_entry->entry)) {
            // WQ_FLAG_EXCLUSIVE標誌被設定時,將等待佇列元素新增到等待佇列尾部(獨佔等待)
            // 否則,將等待佇列元素新增到等待佇列頭部。同2.1中對WQ_FLAG_EXCLUSIVE標誌介紹。
            if (wq_entry->flags & WQ_FLAG_EXCLUSIVE)
                __add_wait_queue_entry_tail(wq_head, wq_entry);
            else
                __add_wait_queue(wq_head, wq_entry);
        }
        // 改變當前程序的狀態
        set_current_state(state);
    }
    spin_unlock_irqrestore(&wq_head->lock, flags);

    return ret;
}

// 用state_value改變當前的程序狀態,並且執行了一次記憶體屏障
// 注意,只是改變了排程器處理該程序的方式,但尚未使該程序讓出處理器
#define set_current_state(state_value)              \
    do {                            \
        WARN_ON_ONCE(is_special_task_state(state_value));\
        current->task_state_change = _THIS_IP_;     \
        smp_store_mb(current->state, (state_value));    \
    } while (0)

/*  設定程序為可執行狀態,並且將等待佇列元素從等待佇列中刪除  */
void finish_wait(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
    unsigned long flags;
    // 將當前程序狀態改為可執行狀態(TASK_RUNNING)
    // 類似set_current_state(),差別在於未進行記憶體屏障
    __set_current_state(TASK_RUNNING);

    // 等待佇列元素若仍在等待佇列中,則將其刪除
    if (!list_empty_careful(&wq_entry->entry)) {
        spin_lock_irqsave(&wq_head->lock, flags);
        list_del_init(&wq_entry->entry);
        spin_unlock_irqrestore(&wq_head->lock, flags);
    }
}

 經過原始碼分析可以看到, wait_event 使程序進入非中斷休眠狀態,迴圈等待直至特定條件滿足,否則程序繼續保持休眠狀態。

可以簡單總結出使用等待佇列使程序休眠的一般步驟:

  • 將當前程序關聯的等待佇列元素加入到等待佇列中。 __add_wait_queue()/__add_wait_queue_entry_tail() 
  • 設定當前程序狀態(可中斷 TASK_INTERRUPTIBLE 或不可中斷 TASK_UNINTERRUPTIBLE)。 set_current_state() 
  • 判斷資源是否得到,或是否捕獲中斷訊號。
  • 程序讓出排程器,進入休眠狀態。 schedule() 
  • 資源得到滿足時,將等待佇列元素從等待佇列中移除。

 

4. 等待佇列喚醒

前文已經簡單提到, wake_up 函式可用於將等待佇列上的所有程序喚醒,和 wait_event 相對應, wake_up 函式也包括多個變體。主要包括:

wake_up(&wq_head)
wake_up_interruptible(&wq_head)
wake_up_nr(&wq_head, nr)
wake_up_interruptible_nr(&wq_head, nr)
wake_up_interruptible_all(&wq_head)

4.1 wake_up()

 wake_up() 可以用來喚醒等待佇列上的所有程序,而 wake_up_interruptible() 只會喚醒那些執行可中斷休眠的程序。因此約定, wait_event() 和 wake_up() 搭配使用,而 wait_event_interruptible() 和 wake_up_interruptible() 搭配使用。
前文提到,對於獨佔等待的程序, wake_up() 只會喚醒第一個獨佔等待程序。 wake_up_nr() 函式提供功能,它能喚醒給定數目nr個獨佔等待程序,而不是隻有一個。

 wake_up() 函式的實現如下:

#define TASK_NORMAL         (TASK_INTERRUPTIBLE | TASK_UNINTERRUPTIBLE)
// 可以看出wake_up函式將喚醒TASK_INTERRUPTIBLE和TASK_UNINTERRUPTIBLE的所有程序
#define wake_up(x)          __wake_up(x, TASK_NORMAL, 1, NULL)

void __wake_up(struct wait_queue_head *wq_head, unsigned int mode, int nr_exclusive, void *key)
{
    __wake_up_common_lock(wq_head, mode, nr_exclusive, 0, key);
}

static void __wake_up_common_lock(struct wait_queue_head *wq_head, unsigned int mode,
        int nr_exclusive, int wake_flags, void *key)
{
    unsigned long flags;
    wait_queue_entry_t bookmark;

    bookmark.flags = 0;
    bookmark.private = NULL;
    bookmark.func = NULL;
    INIT_LIST_HEAD(&bookmark.entry);

    // 第一次嘗試呼叫__wake_up_common(),如果需要進行BOOKMARK過程,bookmark.flags會被置為WQ_FLAG_BOOKMARK
    spin_lock_irqsave(&wq_head->lock, flags);
    nr_exclusive = __wake_up_common(wq_head, mode, nr_exclusive, wake_flags, key, &bookmark);
    spin_unlock_irqrestore(&wq_head->lock, flags);

    // 如果還有需要處理的元素,那麼bookmark.flags肯定置上WQ_FLAG_BOOKMARK;否則,在一個loop內便處理完成
    while (bookmark.flags & WQ_FLAG_BOOKMARK) {
        spin_lock_irqsave(&wq_head->lock, flags);
        nr_exclusive = __wake_up_common(wq_head, mode, nr_exclusive, wake_flags, key, &bookmark);
        spin_unlock_irqrestore(&wq_head->lock, flags);
    }
}

#define WAITQUEUE_WALK_BREAK_CNT 64

static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,
        int nr_exclusive, int wake_flags, void *key, wait_queue_entry_t *bookmark)
{
    wait_queue_entry_t *curr, *next;
    int cnt = 0;
    
    // 判斷自旋鎖已經被持有
    lockdep_assert_held(&wq_head->lock);

    // 如果bookmark元素中標誌`WQ_FLAG_BOOKMARK`已被設定,則curr被設定為bookmark下一個元素
    // 同時將bookmark從等待佇列中刪除,bookmark->flags清零
    // 否則,curr設定為等待佇列wq_head的第一個元素(實際上為第一次呼叫__wake_up_common)
    if (bookmark && (bookmark->flags & WQ_FLAG_BOOKMARK)) {
        curr = list_next_entry(bookmark, entry);

        list_del(&bookmark->entry);
        bookmark->flags = 0;
    } else
        curr = list_first_entry(&wq_head->head, wait_queue_entry_t, entry);

    if (&curr->entry == &wq_head->head)
        return nr_exclusive;

    // 在等待佇列頭指向的連結串列上,從curr指向的元素開始依次遍歷元素
    list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {
        unsigned flags = curr->flags;
        int ret;

        // 跳過標記為WQ_FLAG_BOOKMARK的元素,等待佇列元素被置上WQ_FLAG_BOOKMARK?
        if (flags & WQ_FLAG_BOOKMARK)
            continue;

        // 呼叫等待佇列元素繫結的喚醒回撥函式
        // 注意,具體喚醒何種程序(TASK_INTERRUPTIBLE/TASK_UNINTERRUPTIBLE),作為引數傳遞給喚醒函式處理
        // 當程序不符合喚醒條件時,ret為0,詳見try_to_wake_up()
        ret = curr->func(curr, mode, wake_flags, key);
        if (ret < 0)
            break;

        // 如果當前等待佇列元素為獨佔等待,並且獨佔等待個數已經等於nr_exclusive,提前退出迴圈
        // 如2.1所述,獨佔等待程序被加入到等待佇列的尾部,因此此時表明所有喚醒工作已經完成
        if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
            break;
 
        // 連續喚醒的程序數目達到指定數目WAITQUEUE_WALK_BREAK_CNT(仍有程序元素需要處理),
        // 標記bookmark->flags為WQ_FLAG_BOOKMARK,同時將下一個要處理的元素新增到bookmark作為頭節點的連結串列尾部,並退出遍歷迴圈
        // 通過這種機制,實現了程序分批次喚醒,避免了等待佇列中自旋鎖被持有時間過長
        if (bookmark && (++cnt > WAITQUEUE_WALK_BREAK_CNT) &&
                (&next->entry != &wq_head->head)) {
            bookmark->flags = WQ_FLAG_BOOKMARK;
            list_add_tail(&bookmark->entry, &next->entry);
            break;
        }
    }

    return nr_exclusive;
}

 wake_up() 函式會遍歷等待佇列上的所有元素(包括TASK_INTERRUPTIBLE和TASK_UNINTERRUPTIBLE)),根據 nr_exclusive 引數的要求喚醒程序,同時實現了分批次喚醒工作。最終會回撥等待佇列元素所繫結的喚醒函式。

前文已經提到,定義等待佇列元素時主要涉及到兩種喚醒回撥函式:

  •  default_wake_function() :巨集定義 DECLARE_WAITQUEUE(name, tsk) 使用的喚醒函式。
  •  autoremove_wake_function() : DEFINE_WAIT(name) , init_wait(wait) 和 wait_event() 中呼叫的 init_wait_entry() 使用此喚醒函式。

4.2 default_wake_function()

int default_wake_function(wait_queue_entry_t *curr, unsigned mode, int wake_flags, void *key)
{
    return try_to_wake_up(curr->private, mode, wake_flags);
}

static int
try_to_wake_up(struct task_struct *p, unsigned int state, int wake_flags)
{
    unsigned long flags;
    int cpu, success = 0;

    raw_spin_lock_irqsave(&p->pi_lock, flags);
    smp_mb__after_spinlock();
    // 此處對程序的狀態進行篩選,跳過不符合狀態的程序(TASK_INTERRUPTIBLE/TASK_UNINTERRUPTIBLE)
    if (!(p->state & state))
        goto out;

    trace_sched_waking(p);

    /* We're going to change ->state: */
    success = 1;
    cpu = task_cpu(p);

    smp_rmb();
    if (p->on_rq && ttwu_remote(p, wake_flags))
        goto stat;

    ... ...

    // Try-To-Wake-Up
    ttwu_queue(p, cpu, wake_flags);
stat:
    ttwu_stat(p, cpu, wake_flags);
out:
    raw_spin_unlock_irqrestore(&p->pi_lock, flags);

    return success;
}

static void ttwu_queue(struct task_struct *p, int cpu, int wake_flags)
{
    struct rq *rq = cpu_rq(cpu);
    struct rq_flags rf;

    ... ...
    rq_lock(rq, &rf);
    update_rq_clock(rq);
    ttwu_do_activate(rq, p, wake_flags, &rf);
    rq_unlock(rq, &rf);
}

static void
ttwu_do_activate(struct rq *rq, struct task_struct *p, int wake_flags,
        struct rq_flags *rf)
{
    int en_flags = ENQUEUE_WAKEUP | ENQUEUE_NOCLOCK;

    lockdep_assert_held(&rq->lock);

    ... ...
    activate_task(rq, p, en_flags);
    ttwu_do_wakeup(rq, p, wake_flags, rf);
}

/*
 * Mark the task runnable and perform wakeup-preemption.
 */
static void ttwu_do_wakeup(struct rq *rq, struct task_struct *p, int wake_flags,
        struct rq_flags *rf)
{
    check_preempt_curr(rq, p, wake_flags);
    p->state = TASK_RUNNING;
    trace_sched_wakeup(p);
    ... ...
}

從函式呼叫過程中可以看到, default_wake_function() 實現喚醒程序的過程為:

default_wake_function() --> try_to_wake_up() --> ttwu_queue() --> ttwu_do_activate() --> ttwu_do_wakeup()

值得一提的是, default_wake_function() 的實現中並未將等待佇列元素從等待佇列中刪除。因此,編寫程式時不能忘記新增步驟將等待佇列元素從等待佇列元素中刪除。

4.3 autoremove_wake_function()

int autoremove_wake_function(struct wait_queue_entry *wq_entry, unsigned mode, int sync, void *key)
{
    int ret = default_wake_function(wq_entry, mode, sync, key);

    if (ret)
        list_del_init(&wq_entry->entry);

    return ret;
}

 autoremove_wake_function() 相比於 default_wake_function() ,在成功執行程序喚醒工作後,會自動將等待佇列元素從等待佇列中移除。

 

5. 原始碼例項

等待佇列在核心中有著廣泛的運用,此處以 MMC 驅動子系統中 mmc_claim_host() 和 mmc_release_host() 來說明等待佇列的運用例項。
 mmc_claim_host() 的功能為:藉助等待佇列申請獲得 MMC 主控制器 (host) 的使用權,相對應, mmc_release_host() 則是放棄 host 使用權,並喚醒所有等待佇列上的程序。

static inline void mmc_claim_host(struct mmc_host *host)
{
    __mmc_claim_host(host, NULL, NULL);
}

int __mmc_claim_host(struct mmc_host *host, struct mmc_ctx *ctx, atomic_t *abort)
{
    struct task_struct *task = ctx ? NULL : current;

    // 定義等待佇列元素,關聯當前程序,喚醒回撥函式為default_wake_function()
    DECLARE_WAITQUEUE(wait, current);
    unsigned long flags;
    int stop;
    bool pm = false;

    might_sleep();

    // 將當前等待佇列元素加入到等待佇列host->wq中
    add_wait_queue(&host->wq, &wait);
    spin_lock_irqsave(&host->lock, flags);
    while (1) {
        // 當前程序狀態設定為 TASK_UPINTERRUPTIBLE,此時仍未讓出CPU
        set_current_state(TASK_UNINTERRUPTIBLE);
        stop = abort ? atomic_read(abort) : 0;
        // 真正讓出CPU前判斷等待的資源是否已經得到
        if (stop || !host->claimed || mmc_ctx_matches(host, ctx, task))
            break;
        spin_unlock_irqrestore(&host->lock, flags);
        // 呼叫排程器,讓出CPU,當前程序可進入休眠
        schedule();
        spin_lock_irqsave(&host->lock, flags);
    }
    // 從休眠中恢復,設定當前程序狀態為可執行(TASK_RUNNING)
    set_current_state(TASK_RUNNING);
    if (!stop) {
        host->claimed = 1;
        mmc_ctx_set_claimer(host, ctx, task);
        host->claim_cnt += 1;
        if (host->claim_cnt == 1)
            pm = true;
    } else
        // 可利用abort引數執行一次等待佇列喚醒工作
        wake_up(&host->wq);
    spin_unlock_irqrestore(&host->lock, flags);

    // 等待佇列結束,將等待佇列元素從等待佇列中移除
    remove_wait_queue(&host->wq, &wait);

    if (pm)
        pm_runtime_get_sync(mmc_dev(host));

    return stop;
}

void mmc_release_host(struct mmc_host *host)
{
    unsigned long flags;

    WARN_ON(!host->claimed);

    spin_lock_irqsave(&host->lock, flags);
    if (--host->claim_cnt) {
        /* Release for nested claim */
        spin_unlock_irqrestore(&host->lock, flags);
    } else {
        host->claimed = 0;
        host->claimer->task = NULL;
        host->claimer = NULL;
        spin_unlock_irqrestore(&host->lock, flags);

        // 喚醒等待佇列host->wq上的所有程序
        wake_up(&host->wq);
        pm_runtime_mark_last_busy(mmc_dev(host));
        if (host->caps & MMC_CAP_SYNC_RUNTIME_PM)
            pm_runtime_put_sync_suspend(mmc_dev(host));
        else
            pm_runtime_put_autosuspend(mmc_dev(host));
    }
}

從原始碼實現過程可以看到,此例項中等待佇列的使用和第3節中總結得基本過程一致,使用到的函式依次為:

  •  DECLARE_WAITQUEUE(wait, current) 
  •  add_wait_queue(&host->wq, &wait) 
  •  set_current_state(TASK_UNINTERRUPTIBLE) 
  •  schedule() 
  •  set_current_state(TASK_RUNNING) 
  •  remove_wait_queue(&host->wq, &wait) 

 

6. 另一種休眠方式

回顧上文的介紹,2.3節中介紹了另外一種初始化等待佇列元素的方式 DEFINE_WAIT() ,而至目前仍未見使用。實際上此巨集定義和另一個函式搭配使用: prepare_to_wait() 。

void
prepare_to_wait(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry, int state)
{
    unsigned long flags;

    wq_entry->flags &= ~WQ_FLAG_EXCLUSIVE;
    spin_lock_irqsave(&wq_head->lock, flags);
    if (list_empty(&wq_entry->entry))
        __add_wait_queue(wq_head, wq_entry);
    set_current_state(state);
    spin_unlock_irqrestore(&wq_head->lock, flags);
}

可以看到 prepare_to_wait() 實際做的事情也就是將等待佇列元素加入到等待佇列中,然後更新當前程序狀態。可以看出此過程依舊符合之前介紹的等待佇列一般使用流程,只是核心原始碼將部分流程封裝成為此函式。

 prepare_to_wait() 配合 finish_wait() 函式可實現等待佇列。

 

7. 總結

綜上文分析,等待佇列的使用主要有三種方式:
(1) 等待事件方式
 wait_event() 和 wake_up() 函式配合,實現程序阻塞睡眠和喚醒。


(2) 手動休眠方式1

DECLARE_WAIT_QUEUE_HEAD(queue);
DECLARE_WAITQUEUE(wait, current);

for (;;) {
    add_wait_queue(&queue, &wait);
    set_current_state(TASK_INTERRUPTIBLE);
    if (condition)
        break;
    schedule();
    remove_wait_queue(&queue, &wait);
    if (signal_pending(current))
        return -ERESTARTSYS;
}
set_current_state(TASK_RUNNING);
remove_wait_queue(&queue, &wait);

 

(3) 手動休眠方式2(藉助核心封裝函式)

DELARE_WAIT_QUEUE_HEAD(queue);
DEFINE_WAIT(wait);

while (! condition) {
    prepare_to_wait(&queue, &wait, TASK_INTERRUPTIBLE);
    if (! condition)
        schedule();
    finish_wait(&queue, &wait)
}

 

 

 

參考資料

[1] LINUX 裝置驅動程式(LDD3),2012年
[2] Linux裝置驅動開發詳解(基於最新的Linux4.0核心),宋寶華編著,2016年
[3] linux裝置驅動模型:https://blog.csdn.net/qq_40732350/article/details/82992904
[4] Linux 等待佇列 (wait queue):https://xyfu.me/posts/236f51d8/
[5] Linux Wait Queue 等待佇列:https://www.cnblogs.com/gctech/p/6872301.html
[6] 原始碼解讀Linux等待佇列:http://gityuan.com/2018/12/02/linux-wait-queue/
[7] Driver porting: sleeping and waking up:https://lwn.net/Articles/22913/