數一數Linux中有多少種執行緒同步策略-『Linux 原始碼解析(二)』
本來這篇應該是上週發的,拖延症又犯了:see_no_evil:
上一篇主要討論了Linux執行緒的排程演算法
這篇來談談執行緒間的同步問題,暫時不包括 IPC(InterProcess Communication)
問題,IPC還是很有趣的。
有趣的事情就要慢慢品對吧,留到下次再來談:new_moon_with_face:(主要準備不過來 hhh 太真實了)

image
PS: 以下解析的Linux kernel版本號為 4.19.25
Thread synchronization
Motivation
為什麼執行緒之間需要同步?
一個原因,同一個父程序下的所有子執行緒共享同一個PC,同一個暫存器,同一個堆疊(同一片天空)
所以當多個子執行緒同時對同一個變數進行操作的時候,就很有可能出現熱點,甚至錯誤情況,這就是同步問題。
另外一個原因,很多時候執行緒之間執行情況實際上是有一定順序的,下一個執行緒需要知道上一個執行緒有沒有完成執行任務。
當然執行緒許可權沒有那麼大,這些事情都是排程程式來做的,但執行緒有感知上一個執行緒完成與否的需求,這就是互斥問題。
所以,總的而言,執行緒同步主要解決的是同步互斥問題。
至於怎麼解決,常見的套路主要是在棧中設立一個原子變數,通過搶佔這個全部變數實現同步互斥。
具體而言,有 互斥量mutex
, 鎖Lock
, 讀寫鎖rwlock
, 條件變數Condition
, 屏障Barrier
etc.
Souce code
這一部分程式碼比較多,有些還比較晦澀,Linux kernel4以後的程式碼相較於2.×版本還有比較大的改動
然後在實際工作中,這一部分用處還有一點,比如說寫個redis鎖 etc. 掌握這部分對多執行緒的理解應該會更進一步
Linux的執行緒同步機制和Nachos中使用的機制(訊號量,鎖,條件變數)基本一致。採用了互斥量mutex,條件變數,訊號量,讀寫鎖。
Mutex
Linux 下通過宣告一個Mutex的類來實現互斥量的實現。另外還聲明瞭一個 ww_mutex
(wound/wait)來避免死鎖
Linux kerenal 中關於Mutex struct的程式碼在 <include/linux/mutex.h>
中
struct mutex { atomic_long_towner;// mutex 獲得的owner ID // 若==0, 則mutex未被佔用; // 若> 0, 則mutex被此ownerId佔用, // 只能由當前owner解mutex spinlock_twait_lock; // 自旋鎖型別 #ifdef CONFIG_MUTEX_SPIN_ON_OWNER struct optimistic_spin_queue osq;/* Spinner MCS lock */ #endif struct list_headwait_list; // 等待佇列 #ifdef CONFIG_DEBUG_MUTEXES void*magic; #endif #ifdef CONFIG_DEBUG_LOCK_ALLOC struct lockdep_mapdep_map; #endif };
上面的struct用一個原子變數 owner
來實現mutex的互斥效果, 這裡已經和kernel 2.×版本不一樣了。
當owner為0時,表示這個mutex還未被佔用。當mutex不為零的時候,只能由id == owner的執行緒解除佔用
另外定義了一個 wait_list
用於儲存被sleep的thread
這部分程式碼和nachos中Semaphore的設計基本一致
而具體實現mutex的程式碼位於 <kernel/locking/mutex.c>
中
__mutex_init
函式主要做一些變數宣告和初始化的工作。
void __mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key) { atomic_long_set(&lock->owner, 0);// init atomic 變數 owner spin_lock_init(&lock->wait_lock);// init 自旋鎖型別變數 INIT_LIST_HEAD(&lock->wait_list);// init 等待佇列變數 #ifdef CONFIG_MUTEX_SPIN_ON_OWNER osq_lock_init(&lock->osq); #endif debug_mutex_init(lock, name, key); } 以加鎖為例,呼叫的的是mutex_lock函式。 void __sched mutex_lock(struct mutex *lock) { might_sleep();// 列印堆疊 debug sleep if (!__mutex_trylock_fast(lock))// atomic獲得owner, 如果能 __mutex_lock_slowpath(lock); // } EXPORT_SYMBOL(mutex_lock); #endif
其中,might_sleep()是一個全域性Linux API,主要用於在中斷時候,debug列印context堆疊,這個API在後面被廣泛使用。
__mutex_trylock_fast(lock)
是一個去獲取lock的owner的函式,如果能獲取則返回true
static __always_inline bool __mutex_trylock_fast(struct mutex *lock) {ww_acquire_ctx unsigned long curr = (unsigned long)current; unsigned long zero = 0UL; if (atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr))// 獲取owner return true; return false; }
如果有許可權獲取owner則
static noinline void __sched __mutex_lock_slowpath(struct mutex *lock) { __mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_);// 呼叫__mutex_lock }
然後再巢狀呼叫,不知道是為了什麼,寫了那麼多層(可能是有別的地方 複用到了)
static int __sched __mutex_lock(struct mutex *lock, long state, unsigned int subclass, struct lockdep_map *nest_lock, unsigned long ip) { // 呼叫__mutex_lock_common return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false); }
然後就到了Linux真正處理mock_lock的地方
static __always_inline int __schedw __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,// lock TASK_UNINTERRUPTIBLE 0 struct lockdep_map *nest_lock, unsigned long ip,// NULL _RET_IP_ struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx) // NULL false { struct mutex_waiter waiter; bool first = false; struct ww_mutex *ww;// ww = wound/wait mutex 用於死鎖檢驗 int ret; might_sleep(); // 一樣的去列印context的堆疊 ww = container_of(lock, struct ww_mutex, base); // 獲得ww_mutex if (use_ww_ctx && ww_ctx) {// mutet_lock進不到這個,ww_mutex_lock有可能進 if (unlikely(ww_ctx == READ_ONCE(ww->ctx))) // ww_mutex獲得的ctx和需要的ctx對比 return -EALREADY; /* * Reset the wounded flag after a kill. No other process can * race and wound us here since they can't have a valid owner * pointer if we don't have any locks held. */ if (ww_ctx->acquired == 0)// 如果ww_ctx沒有被獲得 則重設wounded 位 ww_ctx->wounded = 0; } preempt_disable();// 設定不可搶佔 mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip); // 檢查mutex 需要的條件 if (__mutex_trylock(lock) ||// 嘗試上lock mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) {// 嘗試上樂觀鎖 /* got the lock, yay! */ lock_acquired(&lock->dep_map, ip);// 上lock if (use_ww_ctx && ww_ctx)// ww_mutex_lock時 ww_mutex_set_context_fastpath(ww, ww_ctx);// 設定上下文path preempt_enable();// 解除不可搶佔 return 0; } spin_lock(&lock->wait_lock); // 對等待佇列上自旋鎖 /* * After waiting to acquire the wait_lock, try again. */ if (__mutex_trylock(lock)) {// 那再試試唄 hhh if (use_ww_ctx && ww_ctx) __ww_mutex_check_waiters(lock, ww_ctx); goto skip_wait; } debug_mutex_lock_common(lock, &waiter); // 掉一下debug模式下mutet_lock_common lock_contended(&lock->dep_map, ip);// 去等鎖 if (!use_ww_ctx) {// mutex_lock時候 /* add waiting tasks to the end of the waitqueue (FIFO): */ __mutex_add_waiter(lock, &waiter, &lock->wait_list); // 加到wait_queue #ifdef CONFIG_DEBUG_MUTEXES waiter.ww_ctx = MUTEX_POISON_WW_CTX; #endif } else { /* * Add in stamp order, waking up waiters that must kill * themselves. */ ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx); // 加到ww_mutex的wait_queue if (ret) goto err_early_kill; waiter.ww_ctx = ww_ctx; } waiter.task = current; set_current_state(state);// 設定state for (;;) {// 做了一個自旋操作 retry lock /* * Once we hold wait_lock, we're serialized against * mutex_unlock() handing the lock off to us, do a trylock * before testing the error conditions to make sure we pick up * the handoff. */ if (__mutex_trylock(lock))// 等到了 goto acquired; /* * Check for signals and kill conditions while holding * wait_lock. This ensures the lock cancellation is ordered * against mutex_unlock() and wake-ups do not go missing. */ if (unlikely(signal_pending_state(state, current))) { // if state不對 ret = -EINTR; goto err; } if (use_ww_ctx && ww_ctx) {// 如果是ww_mutex 且 wait_queue 有需要被kill掉的 ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx); if (ret) goto err; } spin_unlock(&lock->wait_lock); // 解自旋鎖 schedule_preempt_disabled();// 解除不可搶佔 /* * ww_mutex needs to always recheck its position since its waiter * list is not FIFO ordered. */ if ((use_ww_ctx && ww_ctx) || !first) { first = __mutex_waiter_is_first(lock, &waiter); if (first) __mutex_set_flag(lock, MUTEX_FLAG_HANDOFF); } set_current_state(state); // update state /* * Here we order against unlock; we must either see it change * state back to RUNNING and fall through the next schedule(), * or we must see its unlock and acquire. */ if (__mutex_trylock(lock) || // 再試一次 (first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter))) break; spin_lock(&lock->wait_lock); } spin_lock(&lock->wait_lock); acquired: __set_current_state(TASK_RUNNING); if (use_ww_ctx && ww_ctx) { /* * Wound-Wait; we stole the lock (!first_waiter), check the * waiters as anyone might want to wound us. */ if (!ww_ctx->is_wait_die && !__mutex_waiter_is_first(lock, &waiter)) __ww_mutex_check_waiters(lock, ww_ctx); } mutex_remove_waiter(lock, &waiter, current); // 從等待佇列中remove if (likely(list_empty(&lock->wait_list))) __mutex_clear_flag(lock, MUTEX_FLAGS); // 清除flag debug_mutex_free_waiter(&waiter); skip_wait: /* got the lock - cleanup and rejoice! */ lock_acquired(&lock->dep_map, ip); if (use_ww_ctx && ww_ctx) ww_mutex_lock_acquired(ww, ww_ctx); spin_unlock(&lock->wait_lock); // cleanup preempt_enable(); return 0; err: __set_current_state(TASK_RUNNING); mutex_remove_waiter(lock, &waiter, current); err_early_kill: spin_unlock(&lock->wait_lock); debug_mutex_free_waiter(&waiter); mutex_release(&lock->dep_map, 1, ip); preempt_enable(); return ret; }
上面的 __mutex_common
被 mutex_lock
, ww_mutex_lock
兩個函式複用
use_ww_ctx
&& ww_ctx
這兩個變數就是用來判斷到底是被哪個函式複用了
然後函式很多邏輯都是為了減少等待時間,用了多次自旋鎖進行等待,直到多次嘗試之後還不能上鎖的時候才真正去sleep等待
這樣的操作雖然可能會增大單次上鎖時間,但相比交換上下文Context的代價肯定是很省了
自旋鎖 spinlock
自旋鎖,就是一種反覆重試的鎖,因為實際生產過程中,經常會有稍微等一等這個互斥量就解除的情況
所以自旋鎖在工程中用處還是很大的,很多java程式都要寫spinlock
Spinlock相關程式碼在 <include/linux/spinlock_api_smp.h>
中
static inline int __raw_spin_trylock(raw_spinlock_t *lock) { preempt_disable(); // 設定不可搶佔 if (do_raw_spin_trylock(lock)) {// 嘗試獲得自旋鎖 spin_acquire(&lock->dep_map, 0, 1, _RET_IP_); // 獲得自旋鎖 return 1; } preempt_enable(); // 接觸不可搶佔 return 0; }
其中spin_acquire定義在 <include/linux/lockdep.h>
#define spin_acquire(l, s, t, i)lock_acquire_exclusive(l, s, t, NULL, i) #define lock_acquire_exclusive(l, s, t, n, i)lock_acquire(l, s, t, 0, 1, n, i)
而lock_acquire()實現的程式碼在 <kernel/locking/lockdep.c>
void lock_acquire(struct lockdep_map *lock, unsigned int subclass, int trylock, int read, int check, struct lockdep_map *nest_lock, unsigned long ip) { unsigned long flags; if (unlikely(current->lockdep_recursion)) // 如果鎖的遞迴深度標誌位!=0 return; raw_local_irq_save(flags); // 刷一下flags到disk check_flags(flags); // 檢查flag current->lockdep_recursion = 1; // 互斥 trace_lock_acquire(lock, subclass, trylock, read, check, nest_lock, ip); // 追蹤鎖獲得 列印日誌 __lock_acquire(lock, subclass, trylock, read, check, irqs_disabled_flags(flags), nest_lock, ip, 0, 0);// lock acquire current->lockdep_recursion = 0; // 解除互斥 raw_local_irq_restore(flags); // 再刷一下flags } EXPORT_SYMBOL_GPL(lock_acquire);
然後具體實現的時候,呼叫到 __lock_acquire()
static int __lock_acquire(struct lockdep_map *lock, unsigned int subclass, int trylock, int read, int check, int hardirqs_off, struct lockdep_map *nest_lock, unsigned long ip, int references, int pin_count) { struct task_struct *curr = current; struct lock_class *class = NULL; struct held_lock *hlock; unsigned int depth; int chain_head = 0; int class_idx; u64 chain_key; if (subclass < NR_LOCKDEP_CACHING_CLASSES) class = lock->class_cache[subclass]; // 找到cache /* * Not cached? */ if (unlikely(!class)) { class = register_lock_class(lock, subclass, 0); // 註冊lock if (!class) return 0; } atomic_inc((atomic_t *)&class->ops); // 原子操作獲得class 操作符 if (very_verbose(class)) { printk("\nacquire class [%px] %s", class->key, class->name); if (class->name_version > 1) printk(KERN_CONT "#%d", class->name_version); printk(KERN_CONT "\n"); dump_stack(); } depth = curr->lockdep_depth;// init depth if (DEBUG_LOCKS_WARN_ON(depth >= MAX_LOCK_DEPTH)) // stack深度溢位 return 0; class_idx = class - lock_classes + 1; if (depth) { hlock = curr->held_locks + depth - 1; if (hlock->class_idx == class_idx && nest_lock) { if (hlock->references) { /* * Check: unsigned int references:12, overflow. */ if (DEBUG_LOCKS_WARN_ON(hlock->references == (1 << 12)-1)) // 2^12 - 1 return 0; hlock->references++; } else { hlock->references = 2; } return 1; } } hlock = curr->held_locks + depth; if (DEBUG_LOCKS_WARN_ON(!class)) return 0; hlock->class_idx = class_idx; // 記錄hlock資訊 hlock->acquire_ip = ip; hlock->instance = lock; hlock->nest_lock = nest_lock; hlock->irq_context = task_irq_context(curr); hlock->trylock = trylock; hlock->read = read; hlock->check = check; hlock->hardirqs_off = !!hardirqs_off; hlock->references = references; #ifdef CONFIG_LOCK_STAT hlock->waittime_stamp = 0; hlock->holdtime_stamp = lockstat_clock(); #endif hlock->pin_count = pin_count; if (check && !mark_irqflags(curr, hlock)) return 0; /* mark it as used: */ if (!mark_lock(curr, hlock, LOCK_USED)) return 0; if (DEBUG_LOCKS_WARN_ON(class_idx > MAX_LOCKDEP_KEYS)) // 又溢位了 return 0; chain_key = curr->curr_chain_key; if (!depth) { /* * How can we have a chain hash when we ain't got no keys?! */ if (DEBUG_LOCKS_WARN_ON(chain_key != 0)) return 0; chain_head = 1; } hlock->prev_chain_key = chain_key; if (separate_irq_context(curr, hlock)) { chain_key = 0; chain_head = 1; } chain_key = iterate_chain_key(chain_key, class_idx); curr->curr_chain_key = chain_key; curr->lockdep_depth++; check_chain_key(curr); return 1; }
__lock_acquire()
被 spin_lock
和 mutex_lock
兩個class呼叫
實際上它的操作物件不是對單一class加鎖,是對一個鎖類的加鎖
這裡為了降低lockdep的搜尋消耗,用了一個cache
對於那些反覆加放鎖的部分有不小的效能上的提升
-
讀寫鎖rwlock
讀寫鎖的主要目的就是實現某一種狀態的併發性
-
條件變數 Condition
條件變數則是為了實現執行緒的批處理,一個個batch執行,定義了單個喚醒 & 廣播喚醒兩種方式
-
屏障 barrier
屏障的作用就很像兩階段鎖協議,第一階段只能等待,第二階段只能執行
當未達到屏障約定的上限時,通過條件變數實現進入wait_queue
當達到屏障上限的時候,通過廣播一次性喚醒