1. 程式人生 > >Kernel常見鎖的原理和實現

Kernel常見鎖的原理和實現

    鎖是核心中使用最頻繁,最基礎的設施之一,在核心的各個模組中被大量使用。鎖的本質是在併發過程中保證資源的互斥使用。Linux核心提供了多種鎖,應用的場合也各不相同,主要包括:原子操作,訊號量,讀寫鎖,自旋鎖,以及RCU鎖機制等。

    RCU是比讀寫鎖更高效,並且同時支援多個讀者和多個寫者併發的機制,其實現非常複雜,涉及到軟中斷,completion機制等,將不再本篇分析,另起一篇RCU機制和實現。

原子操作(atomic)

原子操作是實現其他各種鎖的基礎。考慮如下程式碼語句:

i++;

編譯器在編譯的過程中,該語句有可能被編譯成如下三條CPU指令:

  1. 載入記憶體變數i的值到暫存器

  2. 暫存器值+1

  3. 將暫存器值寫回記憶體

我們知道,一條指令在單CPU上對記憶體的訪問是原子的(因為中斷只能是在當前指令執行完之後才去檢查處理),但多條指令之間並不是原子的,單條指令在多處理器系統上也不是原子的。因此,上面i++語句雖然在程式碼編寫上是一條語句,但在二進位制可執行指令上卻是三條指令,如果在兩個併發的例程中對同一個i變數同時執行i++操作,必然會導致資料錯亂。因此,每種CPU架構都應該提供一套指令,用於鎖定/解鎖記憶體匯流排,使得在鎖定區內執行的指令是一個原子操作。

以x86架構為例,提供了lock;字首指令,用於在指令執行前先鎖定特定記憶體,保證對特定記憶體的互斥訪問。以atomic_add()實現為例:

#ifdef CONFIG_SMP

#define LOCK_PREFIX_HERE \

".pushsection .smp_locks,\"a\"\n" \

".balign 4\n" \

".long 671f - .\n" /* offset */ \

".popsection\n" \

"671:"

 

#define LOCK_PREFIX LOCK_PREFIX_HERE "\n\tlock; "

 

#else /* ! CONFIG_SMP */

#define LOCK_PREFIX_HERE ""

#define LOCK_PREFIX ""

#endif

 

static __always_inline void atomic_add(int i, atomic_t *v)

{

asm volatile(LOCK_PREFIX "addl %1,%0"

: "+m" (v->counter)

: "ir" (i));

}

atomic_add()使用內聯彙編的方式實現整型變數的加法,核心是:addl %1,%0彙編指令。如果是在單CPU系統架構上呼叫該函式,那麼這條指令是原子的。但是如果在SMP(對稱多處理器)系統上,該單條指令就不是原子的。因此,該實現還增加了LOCK_PREFIX巨集,用於區分是否是SMP系統:如果是SMP系統,LOCK_PREFIX巨集定義中包含lock;指令字首,否則LOCK_PREFIX為空(單CPU系統上單條指令本身就是原子的)。

 

訊號量(semaphore/mutex)

訊號量semaphore是一種睡眠鎖,實現對多個同類資源的互斥訪問,如果資源個數降為1個,就是互斥鎖mutex。訊號量實現原理如下:初始有n個同類資源,當某個執行緒獲取(down操作)資源時,資源個數-1。當所有資源被分配完,此時當前執行緒被掛起在等待佇列上,直到某個執行緒釋放了(up操作)資源後,喚醒在等待佇列上的執行緒重新獲取資源。

訊號量的資料結構簡單清晰:

struct semaphore {

raw_spinlock_t lock; // 自旋鎖,保護count和 wait_list 的互斥訪問

unsigned int count; // 資源個數,在sema_init介面中初始化

struct list_head wait_list; // 等待佇列

};

基本介面定義也清晰明瞭:

void sema_init(struct semaphore *sem, int val);

void down(struct semaphore *sem);

void up(struct semaphore *sem);

簡單分析一下實現:

void down(struct semaphore *sem)

{

unsigned long flags;

 

raw_spin_lock_irqsave(&sem->lock, flags);

if (likely(sem->count > 0))

sem->count--; // 上鎖成功

else

__down(sem); // 資源已經用完,掛起當前執行緒

raw_spin_unlock_irqrestore(&sem->lock, flags);

}

 

void up(struct semaphore *sem)

{

unsigned long flags;

 

raw_spin_lock_irqsave(&sem->lock, flags);

if (likely(list_empty(&sem->wait_list)))

sem->count++; // 等待佇列為空,count++後直接退出

else

__up(sem); // 喚醒等待佇列執行緒

raw_spin_unlock_irqrestore(&sem->lock, flags);

}

 

讀寫訊號量(rw_sem)

    讀寫訊號量是訊號量的更細粒度實現。我們知道,對於單個資源,無論讀寫,資源都是互斥的。也就是說,同時只能有一個讀執行緒或者寫執行緒獨佔資源,這種情況不是最優的。考慮如下情形:有多個讀執行緒和一個寫執行緒訪問同一個資源。訊號量實現的是所有執行緒的獨佔訪問,無法實現某一時刻多個讀執行緒同時訪問資源。而讀寫訊號量可以實現多個讀執行緒的併發。對同一個資源訪問,讀寫訊號量實現如下機制:

    (1)同時只能有一個寫者獨佔資源

    (2)同時可以有多個讀者訪問資源

    (3)讀者和寫者不能同時訪問資源

    因此,讀寫訊號量適用於有讀者很多,寫者很少的情形。kernel中提供兩種實現方式:一種是平臺相關的實現,使用內聯彙編實現,效率很高;另外一種是通用的實現方式,與平臺無關,效率較低。我們只關注基本的通用實現方式:

    struct rw_semaphore {
    __s32            count;                    //  引用計數
    raw_spinlock_t        wait_lock;    //  自旋鎖,保護wait_list的互斥訪問
    struct list_head    wait_list;         //  等待佇列
#ifdef CONFIG_DEBUG_LOCK_ALLOC
    struct lockdep_map dep_map;
#endif
};

void __sched down_read(struct rw_semaphore *sem);

void __sched down_write(struct rw_semaphore *sem);

void up_read(struct rw_semaphore *sem);

void up_write(struct rw_semaphore *sem);

    說明:int型別的count設計得很巧妙,count > 0表示當前佔用資源的讀執行緒個數,count = -1表示當前有個寫執行緒佔用資源,count = 0表示當前資源空閒。

    下面分析核心實現:

    (1)讀者上鎖:

void __sched __down_read(struct rw_semaphore *sem)
{
    struct rwsem_waiter waiter;
    struct task_struct *tsk;
    unsigned long flags;

    raw_spin_lock_irqsave(&sem->wait_lock, flags);    // 自旋鎖保護等待佇列

    if (sem->count >= 0 && list_empty(&sem->wait_list)) {    // 資源空閒或者被其他讀者佔用,並且等待佇列為空,當前讀者可以獲取鎖。增加讀者引用計數,釋放自旋鎖並退出,上鎖成功。
        /* granted */
        sem->count++; 
        raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
        goto out;
    }

    // 要麼資源被寫者佔用(count == -1),要麼等待佇列不空,那麼當前讀者都要入佇列。判斷等待佇列不空是為了防止讀者飢餓。

    tsk = current;
    set_task_state(tsk, TASK_UNINTERRUPTIBLE);    // 優化措施:只能被其他讀者/寫者主動喚醒,防止被系統喚醒,增加系統開銷

    /* set up my own style of waitqueue */
    waiter.task = tsk;
    waiter.type = RWSEM_WAITING_FOR_READ;
    get_task_struct(tsk);

    list_add_tail(&waiter.list, &sem->wait_list);    // 當前讀者加入等待佇列

    /* we don't need to touch the semaphore struct anymore */
    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);    // 釋放自旋鎖

    /* wait to be given the lock */
    for (;;) {
        if (!waiter.task)
            break;
        schedule();    // 切換執行緒,排程其他讀者/寫者執行
        set_task_state(tsk, TASK_UNINTERRUPTIBLE);     // for 迴圈是為了判斷本次喚醒是否真的輪到自己執行。存在如下情形:等待佇列上有多個讀寫執行緒,當寫執行緒完成之後,會喚醒等待佇列上所有執行緒。為了防止讀寫飢餓,先入佇列的執行緒先得到執行。如果當前執行緒前面有個寫執行緒得到機會執行,那麼當前讀執行緒需要再次掛起在等待佇列,等待下次排程。
    }

    __set_task_state(tsk, TASK_RUNNING);    // 當前讀者可以獲取鎖,上鎖成功
 out:
    ;
}

    (2)讀者解鎖:

void __up_read(struct rw_semaphore *sem)
{
    unsigned long flags;

    raw_spin_lock_irqsave(&sem->wait_lock, flags);    // 自旋鎖上鎖

    if (--sem->count == 0 && !list_empty(&sem->wait_list))    // 資源空閒並且等待佇列不空,此時等待佇列的首個物件一定是一個寫者
        sem = __rwsem_wake_one_writer(sem);    // 喚醒等待佇列上的寫者

    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
}
    (3)寫者上鎖:

void __sched __down_write_nested(struct rw_semaphore *sem, int subclass)
{
    struct rwsem_waiter waiter;
    struct task_struct *tsk;
    unsigned long flags;

    raw_spin_lock_irqsave(&sem->wait_lock, flags);    // 自旋鎖上鎖

    /* set up my own style of waitqueue */
    tsk = current;
    waiter.task = tsk;
    waiter.type = RWSEM_WAITING_FOR_WRITE;
    list_add_tail(&waiter.list, &sem->wait_list);    // 寫者先入佇列

    /* wait for someone to release the lock */
    for (;;) {
        /*
         * That is the key to support write lock stealing: allows the
         * task already on CPU to get the lock soon rather than put
         * itself into sleep and waiting for system woke it or someone
         * else in the head of the wait list up.
         */
        if (sem->count == 0)    // 如果資源沒被佔用,跳出for迴圈後,退出佇列,上鎖成功。
            break;
        set_task_state(tsk, TASK_UNINTERRUPTIBLE);
        raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
        schedule();        //  資源被佔用,排程其他執行緒
        raw_spin_lock_irqsave(&sem->wait_lock, flags);
    }

    // 上鎖成功,退出等待佇列
    /* got the lock */
    sem->count = -1;
    list_del(&waiter.list);

    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
}

void __sched __down_write(struct rw_semaphore *sem)
{
    __down_write_nested(sem, 0);
}
    (4)寫者解鎖:

void __up_write(struct rw_semaphore *sem)
{
    unsigned long flags;

    raw_spin_lock_irqsave(&sem->wait_lock, flags);    // 自旋鎖上鎖

    sem->count = 0;    // 讀者解鎖成功,資源空閒
    if (!list_empty(&sem->wait_list))    // 等待佇列不空
        sem = __rwsem_do_wake(sem, 1);    // 排程等待佇列上的讀者/寫者執行

    raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
}
 

自旋鎖(spin lock)

自旋鎖設計可用於SMP系統中中斷上下文的臨界區保護。我們知道,中斷上下文中是不可以睡眠的(1,中斷上下文要求及時處理,2中斷上下文不是可排程實體 等種種原因),因此訊號量/讀寫鎖等睡眠鎖不能用於中斷上下文。自旋鎖設計原理是:當鎖被其他核心執行緒CPU或其他CPU中斷上下文持有時,當前CPU不是去睡眠,而是不停的空轉並輪詢該鎖的狀態,直到該鎖被其他CPU釋放,當前CPU獲取該鎖並進入臨界區,執行完之後釋放該鎖。這也是自旋鎖的名稱來源:當獲取不到鎖時,CPU空等,不停的自旋。

根據自旋鎖的設計原理,我們知道:

(1)自旋鎖保護的臨界區程式碼執行時間要儘可能短,否則其他CPU會一直忙等,浪費CPU資源。

(2)中斷上下文中的自旋鎖保護的臨界區一定不能睡眠,否則會死鎖。執行緒上下文中可以睡眠,只要保證能被喚醒即可。

自旋鎖可以用在核心執行緒中,也可以用在中斷上下文中。如果某個自旋鎖只用於核心執行緒中,那麼該自旋鎖的實現只需要關閉核心搶佔即可(對應spin_lock/spin_unlock版本)。如果該自旋鎖不僅用於核心執行緒,也會在中斷上下文使用,那麼該自旋鎖的實現不僅要關閉核心搶佔,而且要禁止中斷(對應spin_lock_irqsave/spin_unlock_ir