1. 程式人生 > >linux核心級同步機制--futex

linux核心級同步機制--futex

關於同步的一點思考-下一文中,我們知道glibc的pthread_cond_timedwait底層是用linux futex機制實現的。

理想的同步機制應該是沒有鎖衝突時在使用者態利用原子指令就解決問題,而需要掛起等待時再使用核心提供的系統呼叫進行睡眠與喚醒。換句話說,在使用者態的自旋失敗時,能不能讓程序掛起,由持有鎖的執行緒釋放鎖時將其喚醒? 如果你沒有較深入地考慮過這個問題,很可能想當然的認為類似於這樣就行了(虛擬碼):

void lock(int lockval) {
	//trylock是使用者級的自旋鎖
	while(!trylock(lockval)) {
		wait();//釋放cpu,並將當期執行緒加入等待佇列,是系統呼叫
	}
}

boolean trylock(int lockval){
	int i=0; 
	//localval=1代表上鎖成功
	while(!compareAndSet(lockval,0,1)){
		if(++i>10){
			return false;
		}
	}
	return true;
}

void unlock(int lockval) {
	 compareAndSet(lockval,1,0);
	 notify();
}
複製程式碼

上述程式碼的問題是trylock和wait兩個呼叫之間存在一個視窗: 如果一個執行緒trylock失敗,在呼叫wait時持有鎖的執行緒釋放了鎖,當前執行緒還是會呼叫wait進行等待,但之後就沒有人再喚醒該執行緒了。

為了解決上述問題,linux核心引入了futex機制,futex主要包括等待和喚醒兩個方法:futex_waitfutex_wake,其定義如下

//uaddr指向一個地址,val代表這個地址期待的值,當*uaddr==val時,才會進行wait
int futex_wait(int *uaddr, int val);
//喚醒n個在uaddr指向的鎖變數上掛起等待的程序
int futex_wake(int *uaddr, int n);
複製程式碼

futex在真正將程序掛起之前會檢查addr指向的地址的值是否等於val,如果不相等則會立即返回,由使用者態繼續trylock。否則將當期執行緒插入到一個佇列中去,並掛起。

關於同步的一點思考-上文章中對futex的背景與基本原理有介紹,對futex不熟悉的人可以先看下。

本文將深入分析futex的實現,讓讀者對於鎖的最底層實現方式有直觀認識,再結合之前的兩篇文章(關於同步的一點思考-上關於同步的一點思考-下)能對作業系統的同步機制有個全面的理解。

下文中的程序一詞包括常規程序與執行緒

futex_wait

在看下面的原始碼分析前,先思考一個問題:如何確保掛起程序時,val的值是沒有被其他程序修改過的?

程式碼在kernel/futex.c中

static int futex_wait(u32 __user *uaddr, int fshared,
		      u32 val, ktime_t *abs_time, u32 bitset, int clockrt)
{
	struct hrtimer_sleeper timeout, *to = NULL;
	struct restart_block *restart;
	struct futex_hash_bucket *hb;
	struct futex_q q;
	int ret;

	...

	//設定hrtimer定時任務:在一定時間(abs_time)後,如果程序還沒被喚醒則喚醒wait的程序
	if (abs_time) {
	    ...
		hrtimer_init_sleeper(to, current);
		...
	}

retry:
	//該函式中判斷uaddr指向的值是否等於val,以及一些初始化操作
	ret = futex_wait_setup(uaddr, val, fshared, &q, &hb);
	//如果val發生了改變,則直接返回
	if (ret)
		goto out;

	//將當前程序狀態改為TASK_INTERRUPTIBLE,並插入到futex等待佇列,然後重新排程。
	futex_wait_queue_me(hb, &q, to);

	/* If we were woken (and unqueued), we succeeded, whatever. */
	ret = 0;
	//如果unqueue_me成功,則說明是超時觸發(因為futex_wake喚醒時,會將該程序移出等待佇列,所以這裡會失敗)
	if (!unqueue_me(&q))
		goto out_put_key;
	ret = -ETIMEDOUT;
	if (to && !to->task)
		goto out_put_key;

	/*
	 * We expect signal_pending(current), but we might be the
	 * victim of a spurious wakeup as well.
	 */
	if (!signal_pending(current)) {
		put_futex_key(fshared, &q.key);
		goto retry;
	}

	ret = -ERESTARTSYS;
	if (!abs_time)
		goto out_put_key;

	...

out_put_key:
	put_futex_key(fshared, &q.key);
out:
	if (to) {
		//取消定時任務
		hrtimer_cancel(&to->timer);
		destroy_hrtimer_on_stack(&to->timer);
	}
	return ret;
}
複製程式碼

在將程序阻塞前會將當期程序插入到一個等待佇列中,需要注意的是這裡說的等待佇列其實是一個類似Java HashMap的結構,全域性唯一。

struct futex_hash_bucket {
	spinlock_t lock;
	//雙向連結串列
	struct plist_head chain;
};

static struct futex_hash_bucket futex_queues[1<<FUTEX_HASHBITS];
複製程式碼

著重看futex_wait_setup和兩個函式futex_wait_queue_me

static int futex_wait_setup(u32 __user *uaddr, u32 val, int fshared,
			   struct futex_q *q, struct futex_hash_bucket **hb)
{
	u32 uval;
	int ret;
retry:
	q->key = FUTEX_KEY_INIT;
	//初始化futex_q
	ret = get_futex_key(uaddr, fshared, &q->key, VERIFY_READ);
	if (unlikely(ret != 0))
		return ret;

retry_private:
	//獲得自旋鎖
	*hb = queue_lock(q);
	//原子的將uaddr的值設定到uval中
	ret = get_futex_value_locked(&uval, uaddr);

   ... 
	//如果當期uaddr指向的值不等於val,即說明其他程序修改了
	//uaddr指向的值,等待條件不再成立,不用阻塞直接返回。
	if (uval != val) {
		//釋放鎖
		queue_unlock(q, *hb);
		ret = -EWOULDBLOCK;
	}

   ...
	return ret;
}
複製程式碼

函式futex_wait_setup中主要做了兩件事,一是獲得自旋鎖,二是判斷*uaddr是否為預期值。

static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
				struct hrtimer_sleeper *timeout)
{
	//設定程序狀態為TASK_INTERRUPTIBLE,cpu排程時只會選擇
	//狀態為TASK_RUNNING的程序
	set_current_state(TASK_INTERRUPTIBLE);
	//將當期程序(q封裝)插入到等待佇列中去,然後釋放自旋鎖
	queue_me(q, hb);

	//啟動定時任務
	if (timeout) {
		hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
		if (!hrtimer_active(&timeout->timer))
			timeout->task = NULL;
	}

	/*
	 * If we have been removed from the hash list, then another task
	 * has tried to wake us, and we can skip the call to schedule().
	 */
	if (likely(!plist_node_empty(&q->list))) {
		 
		 //如果沒有設定過期時間 || 設定了過期時間且還沒過期
		if (!timeout || timeout->task)
			//系統重新進行程序排程,這個時候cpu會去執行其他程序,該程序會阻塞在這裡
			schedule();
	}
	//走到這裡說明又被cpu選中運行了
	__set_current_state(TASK_RUNNING);
}
複製程式碼

futex_wait_queue_me中主要做幾件事:

  1. 將當期程序插入到等待佇列
  2. 啟動定時任務
  3. 重新排程程序

如何保證條件與等待之間的原子性

futex_wait_setup方法中會加自旋鎖;在futex_wait_queue_me中將狀態設定為TASK_INTERRUPTIBLE,呼叫queue_me將當期執行緒插入到等待佇列中,然後才釋放自旋鎖。也就是說檢查uaddr的值的過程跟程序掛起的過程放在同一個臨界區中。當釋放自旋鎖後,這時再更改addr地址的值已經沒有關係了,因為當期程序已經加入到等待佇列中,能被wake喚醒,不會出現本文開頭提到的沒人喚醒的問題。

futex_wait小結

總結下futex_wait流程:

  1. 加自旋鎖
  2. 檢測*uaddr是否等於val,如果不相等則會立即返回
  3. 將程序狀態設定為TASK_INTERRUPTIBLE
  4. 將當期程序插入到等待佇列中
  5. 釋放自旋鎖
  6. 建立定時任務:當超過一定時間還沒被喚醒時,將程序喚醒
  7. 掛起當前程序

futex_wake

futex_wake

static int futex_wake(u32 __user *uaddr, int fshared, int nr_wake, u32 bitset)
{
	struct futex_hash_bucket *hb;
	struct futex_q *this, *next;
	struct plist_head *head;
	union futex_key key = FUTEX_KEY_INIT;
	int ret;

	...
	//根據uaddr的值填充&key的內容
	ret = get_futex_key(uaddr, fshared, &key, VERIFY_READ);
	if (unlikely(ret != 0))
		goto out;
	//根據&key獲得對應uaddr所在的futex_hash_bucket
	hb = hash_futex(&key);
	//對該hb加自旋鎖
	spin_lock(&hb->lock);
	head = &hb->chain;
	//遍歷該hb的連結串列,注意連結串列中儲存的節點是plist_node型別,而而這裡的this卻是futex_q型別,這種型別轉換是通過c中的container_of機制實現的
	plist_for_each_entry_safe(this, next, head, list) {
		if (match_futex (&this->key, &key)) {
			...
			//喚醒對應程序
			wake_futex(this);
			if (++ret >= nr_wake)
				break;
		}
	}
	//釋放自旋鎖
	spin_unlock(&hb->lock);
	put_futex_key(fshared, &key);
out:
	return ret;
}
複製程式碼

futex_wake流程如下:

  1. 找到uaddr對應的futex_hash_bucket,即程式碼中的hb
  2. 對hb加自旋鎖
  3. 遍歷fb的連結串列,找到uaddr對應的節點
  4. 呼叫wake_futex喚起等待的程序
  5. 釋放自旋鎖

wake_futex中將制定程序狀態設定為TASK_RUNNING並加入到系統排程列表中,同時將程序從futex的等待佇列中移除掉,具體程式碼就不分析了,有興趣的可以自行研究。

End

Java中的ReentrantLock,Object.wait和Thread.sleep等等底層都是用futex進行執行緒同步,理解futex的實現能幫助你更好的理解與使用這些上層的同步機制。另外因篇幅與精力有限,涉及到程序排程的相關內容沒有具體分析,不過並不妨礙理解文章內容