1. 程式人生 > >Java併發程式設計:用AQS寫一把可重入鎖

Java併發程式設計:用AQS寫一把可重入鎖

前一篇部落格Java併發程式設計:自己動手寫一把可重入鎖詳述瞭如何用synchronized同步的方式來實現一把可重入鎖,今天我們來效仿ReentrantLock類用AQS來改寫一下這把鎖。要想使用AQS為我們服務,首先得弄懂三個問題:AQS是什麼?AQS已經做了什麼以及我們還需要做些什麼?

AQS簡介

AQS是J.U.C包下AbstractQueuedSynchronizer抽象的佇列式的同步器的簡稱,這是一個抽象類,它定義了一套多執行緒訪問共享資源的同步器框架,J.U.C包下的許多同步類實現都依賴於它,比如ReentrantLock/Semaphore/CountDownLatch,可以說這個抽象類是J.U.C併發包的基礎。

之所以把這一章節叫做AQS簡介而不是叫AQS詳解,是因為已經有大神寫過詳解的文章Java併發之AQS詳解,這篇文章對AQS的原始碼解析很透徹,博主讀了之後受益匪淺,鑑於對原作者的尊重,所以如上附上原文的連結。要想弄懂AQS還得從這一圖說起。
在這裡插入圖片描述
如上圖所述,AQS維護了一個state變數和一個FIFO先進先出佇列,這個state用來幹嘛的可以參考我前一篇部落格中的那個count計數器,就是用來計數執行緒的重入次數的。上一篇部落格還用了一個變數currentThread來記錄已經獲得這把鎖的執行緒。而我們的AQS用的是一個先進先出的等待佇列的完成這件事。當新的執行緒進來的時候,AQS呼叫tryAquice()方法試圖去獲得鎖,如果獲得的話,則呼叫interupt中斷方法;如果沒有獲得鎖,則把當前執行緒放入排隊的佇列,AQS佇列不斷的自旋嘗試去判斷已經佔用的執行緒是否已經放開,如果鎖依然被執行緒繼續佔用,則繼續新增進等待佇列。

原始碼如下:

public final void acquire(int arg) {
     if (!tryAcquire(arg) &&
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
         selfInterrupt();
 }

那個addWaiter方法,此方法用於將當前執行緒加入到等待佇列的隊尾,並返回當前執行緒所在的結點。

private Node addWaiter(Node mode) {
    //以給定模式構造結點。mode有兩種:EXCLUSIVE(獨佔)和SHARED(共享)
    Node node = new Node(Thread.currentThread(), mode);
    
    //嘗試快速方式直接放到隊尾。
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    
    //上一步失敗則通過enq入隊。
    enq(node);
    return node;
}

我們以獨佔式的同步幫助器為例來看一下AQS的執行流程。
在這裡插入圖片描述
大致流程如下:

  1. 呼叫自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
  2. 沒成功,則addWaiter()將該執行緒加入等待佇列的尾部,並標記為獨佔模式;
  3. acquireQueued()使執行緒在等待佇列中休息,有機會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
  4. 如果執行緒在等待過程中被中斷過,它是不響應的。只是獲取資源後才再進行自我中斷selfInterrupt(),將中斷補上。

上述的流程和步驟已經是AQS幫我們實現了的功能,估計我講的也不太清楚,這裡再次推薦讀者閱讀這篇文章Java併發之AQS詳解,下面我們應該來看看如何使用AQS。

用AQS寫一把互斥鎖

互斥鎖是為了保證資料的安全,在任一時刻只能有一個執行緒訪問該物件。由上一個小節我們可知,AQS已經為我們實現所有排隊和阻塞機制,我們只需要呼叫getState()、setState(int) 和 compareAndSetState(int, int) 方發來維護state變數的數值和呼叫setExclusiveOwnerThread/getExclusiveOwnerThread來維護當前佔用的執行緒是誰就行了。翻越JDK提供的API,它建議我們:應該將子類定義為非公共內部幫助器類,可用它們來實現其封閉類的同步屬性。類 AbstractQueuedSynchronizer 沒有實現任何同步介面。而是定義了諸如 acquireInterruptibly(int) 之類的一些方法,在適當的時候可以通過具體的鎖和相關同步器來呼叫它們,以實現其公共方法。

什麼意思呢?意思就是建議我們:如果你想要使用AQS實現一把互斥鎖Mutex,就必須先用一個類去繼承AbstractQueuedSynchronizer這個抽象類,然而這個實現的子類(暫取名叫Sync)應該是作為Mutex的內部類來用的,提供給Mutex當作幫助器來使用。那麼Lock介面,Mutex互斥鎖,AbstractQueuedSynchronizer抽象類和Sync幫助器這四者存在什麼聯絡呢?為了避免你聽糊塗了,下面我整理他們的UML類圖如下。
Mutex圖類

由上圖可知:Mutex互斥鎖繼承了Lock鎖的介面,具有鎖的屬性,可以提供上鎖和釋放鎖的方法,他是對外提供服務的服務者,而Mutex類有個Sync型別的私有物件sync,這個私有物件繼承了AbstractQueuedSynchronizer抽象類,是Mutex鎖和AQS的橋樑,是加鎖和釋放鎖真正的服務者。如果你看明白了上面的UML類圖,那麼我們的Mutex互斥鎖的定義應該如下:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class Mutex implements Lock {
	private Sync sync = new Sync();
	
	private class Sync extends AbstractQueuedSynchronizer {

		@Override
		protected boolean tryAcquire(int arg) {
			// TODO Auto-generated method stub
			return super.tryAcquire(arg);
		}

		@Override
		protected boolean tryRelease(int arg) {
			// TODO Auto-generated method stub
			return super.tryRelease(arg);
		}
	}

	@Override
	public void lock() {
		// TODO Auto-generated method stub
		
	}

	@Override
	public void lockInterruptibly() throws InterruptedException {
		// TODO Auto-generated method stub
		
	}

	@Override
	public boolean tryLock() {
		// TODO Auto-generated method stub
		return false;
	}

	@Override
	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
		// TODO Auto-generated method stub
		return false;
	}

	@Override
	public void unlock() {
		// TODO Auto-generated method stub
		
	}

	@Override
	public Condition newCondition() {
		// TODO Auto-generated method stub
		return null;
	}

}

這裡我們實現的是獨佔式的鎖,Sync幫助器只需要覆蓋父類的tryAcquire(),tryRelease()方法就行了,其他方法可以暫時刪掉,如共享式的tryAcquireShared(),tryReleaseShared(),已經Condition用到的isHeldExclusively()和toString()方法都可以暫時不用實現,因為我們只是想先用AQS來做一把可以保證資料安全的鎖,考慮的問題暫時沒有那麼多。

/**
 * 互斥鎖
 * @author 張仕宗
 * @date 2018.11.9
 */
public class Mutex implements Lock{
	//AQS子類的物件,Mutex互斥鎖用它來工作
	private Sync sync = new Sync();
	
	//Sync同步器類作為公共內部幫助器,可用它來實現其封閉類的同步屬性
	private class Sync extends AbstractQueuedSynchronizer {

		@Override
		protected boolean tryAcquire(int arg) {
			assert arg == 1; //這裡用到了斷言,互斥鎖,鎖只能被獲取一次,如果arg不等於1,則直接中斷
			if(this.compareAndSetState(0, 1)) { //這裡做一下判斷,如果state的值為等於0,立馬將state設定為1
				//返回true,告訴acqure方法,獲取鎖成功
				return true;
			}
			return false;
		}

		@Override
		protected boolean tryRelease(int arg) {
			//釋放鎖,由於這是一把互斥鎖,state不是0就是1,所以你需要做兩步:
			//1.直接將state置為0
			this.setState(0);
			
			//返回true,告訴aqs的release方法釋放鎖成功
			return true;
		}
	}

	/**
	 * 上鎖的方法
	 */
	@Override
	public void lock() {
		sync.acquire(1);
	}
	
	/**
	 * 釋放鎖的方法
	 */
	@Override
	public void unlock() {
		sync.release(1);
	}
	
	@Override
	public void lockInterruptibly() throws InterruptedException {
		// TODO Auto-generated method stub
	}

	@Override
	public boolean tryLock() {
		// TODO Auto-generated method stub
		return false;
	}

	@Override
	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
		// TODO Auto-generated method stub
		return false;
	}

	@Override
	public Condition newCondition() {
		// TODO Auto-generated method stub
		return null;
	}
}

上訴程式碼實現了一把最簡單的鎖,我們只實現其lock()和unlock()方法,其他方法請暫時忽略,而lock()方法和unlock()方法是如何實現的呢?lock()方法呼叫了Sync幫助器物件的sync.acquire(1)方法,由於我們的幫助器Sync並沒有實現這個方法,所以實際呼叫的是AQS的acquire()方法,而AQS這時候做了什麼時呢?再來一次該方法的原始碼:

     if (!tryAcquire(arg) &&
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
         selfInterrupt();

次方法乾的第一件事就是去呼叫tryAcquire()方法,這個方法需要Sync來實現,如果自己的Sync沒有實現這個方法的話,父類會直接丟擲UnsupportedOperationException這個異常。

		@Override
		protected boolean tryAcquire(int arg) {
			assert arg == 1; //這裡用到了斷言,互斥鎖,鎖只能被獲取一次,如果arg不等於1,則直接中斷
			if(this.compareAndSetState(0, 1)) { //這裡做一下判斷,如果state的值為等於0,立馬將state設定為1
				//返回true,告訴acqure方法,獲取鎖成功
				return true;
			}
			return false;
		}

由於這是一把互斥鎖,所以只能有同一時刻只能獲得一次鎖。程式碼中用到了assert斷言,如果預獲得鎖的次數不是1,則中斷。接下來if中判斷state狀態是否為0,如果state狀態為0,則說明鎖還沒有被佔用,那麼我立刻佔用這把鎖,判斷state當前值和設定state為1這兩步用原子性操作的程式碼語句是this.compareAndSetState(0, 1),並立馬放回true,這時候AQS獲得返回值,獲得鎖成功。如果是第二個執行緒進來,if語句判斷得到的值非0,則直接返回false,這時候AQS將新進來的執行緒放進FIFO佇列排隊。

接下來看看Mutex的unlock()方法,該方法呼叫了sync.release(1),看看AQS這時候做了什麼!

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

此方法是獨佔模式下執行緒釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待佇列裡的其他執行緒來獲取資源。同樣的,我們的同步器Sync需要去實現這個tryRelease方法,不然同樣會丟擲UnsupportedOperationException異常。Sync的tryRelease方法比較簡單:

		@Override
		protected boolean tryRelease(int arg) {
			//釋放鎖,由於這是一把互斥鎖,state不是0就是1,所以你需要做兩步:
			//1.直接將state置為0
			this.setState(0);
			
			//返回true,告訴aqs的release方法釋放鎖成功
			return true;
		}

只需要設定state為0即可,由於這是一把互斥鎖,state不是0就是1所以直接呼叫this.setSate(0)。

用AQS寫一把重入鎖

上訴的Mutex並非一把可重入鎖,為了實現這把鎖能夠讓同一執行緒多次進來,回憶一下上一篇部落格中怎麼實現的?當時的做法是在鎖的lock()自旋方法中判斷新進來的是不是正在執行的執行緒,如果新進來的執行緒就是正在執行的執行緒,則獲取鎖成功,並讓計數器+1。而在釋放鎖的時候,如果釋放鎖的執行緒等於當前執行緒,讓計數器-1,只有當計數器count歸零的時候才真正的釋放鎖。同樣的,用AQS實現的鎖也是這個思路,那麼我們的tryAcquice方法如下:

		@Override
		protected boolean tryAcquire(int arg) {
			//如果第一個執行緒進來,直接獲得鎖,並設定當前獨佔的執行緒為當前執行緒
			int state = this.getState();
			if(state == 0) { //state為0,說明當前沒有執行緒佔用該執行緒
				if(this.compareAndSetState(0, arg)) { //判斷當前state值,第一個執行緒進來,立刻設定state為arg
					this.setExclusiveOwnerThread(Thread.currentThread()); //設定當前獨佔執行緒為當前執行緒
					return true; //告訴頂級aqs獲取鎖成功
				}
			} else { //如果是第二個執行緒進來
				Thread currentThread = Thread.currentThread();//當前進來的執行緒
				Thread ownerThread = this.getExclusiveOwnerThread();//已經儲存進去的獨佔式執行緒
				if(currentThread == ownerThread) { //判斷一下進來的執行緒和儲存進去的執行緒是同一執行緒麼?如果是,則獲取鎖成功,如果不是則獲取鎖失敗
					this.setState(state+arg); //設定state狀態
					return true;
				}
			}
			return false;
		}

tryAcquice()方法程式碼含義如註釋所示,與Mutex互斥鎖不同的是當state狀態不為0時我們的邏輯處理,如果第二次進來的執行緒currentThread和正在獨佔的執行緒ownerThread為統一執行緒,第一步設定state增加1,第二步返回true給AQS。

tryRelease()方法程式碼如下:

		@Override
		protected boolean tryRelease(int arg) {
			//鎖的獲取和鎖的釋放是一一對應的,獲取過多少次鎖就釋放多少次鎖
			if(Thread.currentThread() != this.getExclusiveOwnerThread()) {
				//如果釋放鎖的不是當前執行緒,則丟擲異常
				throw new RuntimeException();
			}
			int state = this.getState()-arg;
			//接下來判斷state是否已經歸零,只有state歸零的時候才真正的釋放鎖
			if(state == 0) {
				//state已經歸零,做掃尾工作
				this.setState(0);
				this.setExclusiveOwnerThread(null);
				return true;
			}
			this.setState(state);
			return false;
		}

tryRelease()首先是獲取當前state的值,並對這個值進行欲判:如果當前值state減去sync.release()傳來的引數歸零,則真正的釋放鎖,那麼我們要做的第一步是設定state為0,接著設定當前獨佔的執行緒為null,再然後返回true告訴AQS釋放鎖成功。如果如果當前值state減去sync.release()傳來的引數歸零,如果讓state的值為state-arg相減之後的值。

目前為此,我們以來了AQS框架來改寫的重入鎖程式碼如下:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 用AQS實現的重入鎖
 * @author 張仕宗
 * @date 2018.11.9
 */
public class MyAqsLock implements Lock{
	//AQS子類的物件,用它來輔助MyAqsLock工作
	private Sync sync = new Sync();
	
	private class Sync extends AbstractQueuedSynchronizer {

		@Override
		protected boolean tryAcquire(int arg) {
			//如果第一個執行緒進來,直接獲得鎖,並設定當前獨佔的執行緒為當前執行緒
			int state = this.getState();
			if(state == 0) { //state為0,說明當前沒有執行緒佔用該執行緒
				if(this.compareAndSetState(0, arg)) { //判斷當前state值,第一個執行緒進來,立刻設定state為arg
					this.setExclusiveOwnerThread(Thread.currentThread()); //設定當前獨佔執行緒為當前執行緒
					return true; //告訴頂級aqs獲取鎖成功
				}
			} else { //如果是第二個執行緒進來
				Thread currentThread = Thread.currentThread();//當前進來的執行緒
				Thread ownerThread = this.getExclusiveOwnerThread();//已經儲存進去的獨佔式執行緒
				if(currentThread == ownerThread) { //判斷一下進來的執行緒和儲存進去的執行緒是同一執行緒麼?如果是,則獲取鎖成功,如果不是則獲取鎖失敗
					this.setState(state+arg); //設定state狀態
					return true;
				}
			}
			return false;
		}

		@Override
		protected boolean tryRelease(int arg) {
			//鎖的獲取和鎖的釋放是一一對應的,獲取過多少次鎖就釋放多少次鎖
			if(Thread.currentThread() != this.getExclusiveOwnerThread()) {
				//如果釋放鎖的不是當前執行緒,則丟擲異常
				throw new RuntimeException();
			}
			int state = this.getState()-arg;
			//接下來判斷state是否已經歸零,只有state歸零的時候才真正的釋放鎖
			if(state == 0) {
				//state已經歸零,做掃尾工作
				this.setState(0);
				this.setExclusiveOwnerThread(null);
				return true;
			}
			this.setState(state);
			return false;
		}
		
		public Condition newCondition() {
			return new ConditionObject();
		}
	}

	/**
	 * 上鎖的方法
	 */
	@Override
	public void lock() {
		sync.acquire(1);
	}
	
	/**
	 * 釋放鎖的方法
	 */
	@Override
	public void unlock() {
		sync.release(1);
	}
	
	@Override
	public void lockInterruptibly() throws InterruptedException {
		sync.acquireInterruptibly(1);
	}

	@Override
	public boolean tryLock() {
		//呼叫幫助器的tryAcquire方法,測試獲取鎖一次,不會自旋
		return sync.tryAcquire(1);
	}

	@Override
	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
		//呼叫幫助器的tryRelease方法,測試釋放鎖一次,不會子旋
		return sync.tryRelease(1);
	}

	@Override
	public Condition newCondition() {
		//呼叫幫助類獲取Condition物件
		return sync.newCondition();
	}
}

關於測試的案例,則需要讀者你自己來驗證了,寫測試用例可以參考我前一篇文章Java併發程式設計:自己動手寫一把可重入鎖