【Java】使用Atomic變數實現鎖
Atomic原子操作
Java從JDK1.5開始提供了java.util.concurrent.atomic包,方便程式設計師在多執行緒環境下,無鎖的進行原子操作。原子變數的底層使用了處理器提供的原子指令,但是不同的CPU架構可能提供的原子指令不一樣,也有可能需要某種形式的內部鎖,所以該方法不能絕對保證執行緒不被阻塞。
在Atomic包裡一共有12個類,四種原子更新方式,分別是原子更新基本型別,原子更新陣列,原子更新引用和原子更新欄位。Atomic包裡的類基本都是使用Unsafe實現的包裝類。
- 原子更新基本型別類: AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference
- 原子更新陣列類:AtomicIntegerArray,AtomicLongArray
- 原子更新引用型別:AtomicMarkableReference,AtomicStampedReference,AtomicReferenceArray
- 原子更新欄位類:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater
詳細介紹可以參考:Java中的Atomic包使用指南
Atomic的原理
下面通過AtomicInteger
的原始碼來看一下是怎麼在沒有鎖的情況下保證資料正確性。首先看一下incrementAndGet()
/**
* Atomically increments by one the current value.
* @return the updated value
*/
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
我們繼續看,unsafe.getAndAddInt()
的實現是什麼樣的。
/**
* Atomically adds the given value to the current value of a field
* or array element within the given object <code>o</code>
* at the given <code>offset</code>.
*
* @param o object/array to update the field/element in
* @param offset field/element offset
* @param delta the value to add
* @return the previous value
* @since 1.8
*/
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);
這是一個迴圈,offset是變數v在記憶體中相對於物件o起始位置的偏移,傳給JNI層用來計算這個value的記憶體絕對地址。
然後找到JNI的實現程式碼,來看 native層的compareAndSwapInt()
方法的實現。這個方法的實現是這樣的:
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); //計算變數的記憶體絕對地址
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
這個函式其實很簡單,就是去看一下obj 的 offset 上的那個位置上的值是多少,如果是 e,那就把它更新為 x,返回true,如果不是 e,那就什麼也不做,並且返回false。裡面的核心方法是Atomic::compxchg()
,這個方法所屬的類檔案是在os_cpu目錄下面,由此可以看出這個類是和CPU操作有關,進入程式碼如下:
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
// alternative for InterlockedCompareExchange
int mp = os::is_MP();
__asm {
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
LOCK_IF_MP(mp)
cmpxchg dword ptr [edx], ecx
}
}
這個方法裡面都是彙編指令,看到LOCK_IF_MP
也有鎖指令實現的原子操作,其實CAS也算是有鎖操作,只不過是由CPU來觸發,比synchronized效能好的多。
什麼是CAS
CAS,Compare and Swap即比較並交換。 java.util.concurrent包藉助CAS實現了區別於synchronized同步鎖的一種樂觀鎖。樂觀鎖就是每次去取資料的時候都樂觀的認為資料不會被修改,所以不會上鎖,但是在更新的時候會判斷一下在此期間資料有沒有更新。CAS有3個運算元:記憶體值V,舊的預期值A,要修改的新值B。當且僅當預期值A和記憶體值V相同時,將記憶體值V修改為B,否則什麼都不做。CAS的關鍵點在於,系統在硬體層面保證了比較並交換操作的原子性,處理器使用基於對快取加鎖或匯流排加鎖的方式來實現多處理器之間的原子操作。
CAS的優缺點
- CAS由於是在硬體層面保證的原子性,不會鎖住當前執行緒,它的效率是很高的。
- CAS雖然很高效的實現了原子操作,但是它依然存在三個問題。
1、ABA問題。CAS在操作值的時候檢查值是否已經變化,沒有變化的情況下才會進行更新。但是如果一個值原來是A,變成B,又變成A,那麼CAS進行檢查時會認為這個值沒有變化,但是實際上卻變化了。ABA問題的解決方法是使用版本號。在變數前面追加上版本號,每次變數更新的時候把版本號加一,那麼A-B-A 就變成1A-2B-3A。從Java1.5開始JDK的atomic包裡提供了一個類AtomicStampedReference來解決ABA問題。
2、併發越高,失敗的次數會越多,CAS如果長時間不成功,會極大的增加CPU的開銷。因此CAS不適合競爭十分頻繁的場景。
3、只能保證一個共享變數的原子操作。當對多個共享變數操作時,CAS就無法保證操作的原子性,這時就可以用鎖,或者把多個共享變數合併成一個共享變數來操作。比如有兩個共享變數i=2,j=a,合併一下ij=2a,然後用CAS來操作ij。從Java1.5開始JDK提供了AtomicReference類來保證引用物件的原子性,你可以把多個變數放在一個物件裡來進行CAS操作。
實現自旋鎖
/**
* 使用AtomicInteger實現自旋鎖
*/
public class SpinLock {
private AtomicInteger state = new AtomicInteger(0);
/**
* 自旋等待直到獲得許可
*/
public void lock(){
for (;;){
//CAS指令要鎖匯流排,效率很差。所以我們通過一個if判斷避免了多次使用CAS指令。
if (state.get() == 1) {
continue;
} else if(state.compareAndSet(0, 1)){
return;
}
}
}
public void unlock(){
state.set(0);
}
}
原理很簡單,就是一直CAS搶鎖,如果搶不到,就一直死迴圈,直到搶到了才退出這個迴圈。
自旋鎖實現起來非常簡單,如果關鍵區的執行時間很短,往往自旋等待會是一種比較高效的做法,它可以避免執行緒的頻繁切換和排程。但如果關鍵區的執行時間很長,那這種做法就會大量地浪費CPU資源。
針對關鍵區執行時間長的情況,該怎麼辦呢?
實現可等待的鎖
如果關鍵區的執行時間很長,自旋的鎖會大量地浪費CPU資源,我們可以這樣改進:當一個執行緒拿不到鎖的時候,就讓這個執行緒先休眠等待。這樣,CPU就不會白白地空轉了。大致步驟如下:
- 需要一個容器,如果執行緒搶不到鎖,就把執行緒掛起來,並記錄到這個容器裡。
- 當一個執行緒放棄了鎖,得從容器裡找出一個掛起的執行緒,把它恢復了。
/**
* 使用AtomicInteger實現可等待鎖
*/
public class BlockLock implements Lock {
private AtomicInteger state = new AtomicInteger(0);
private ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>();
@Override
public void lock() {
if (state.compareAndSet(0, 1)) {
return;
}
//放到等待佇列
waiters.add(Thread.currentThread());
for (;;) {
if (state.get() == 0) {
if (state.compareAndSet(0, 1)) {
waiters.remove(Thread.currentThread());
return;
}
} else {
LockSupport.park(); //掛起執行緒
}
}
}
@Override
public void unlock() {
state.set(0);
//喚醒等待佇列的第一個執行緒
Thread waiterHead = waiters.peek();
if(waiterHead != null){
LockSupport.unpark(waiterHead); //喚醒執行緒
}
}
}
我們引入了一個 waitList,用於儲存搶不到鎖的執行緒,讓它掛起。這裡我們先借用一下JDK裡的ConcurrentLinkedQueue
,因為這個Queue也是使用CAS操作實現的無鎖佇列,所以並不會引入JDK裡的其他鎖機制。如果大家去看AbstractQueuedSynchronizer
的實現,就會發現,它的acquire()
方法的邏輯與上面的實現是一樣的。
不過上面的程式碼是不是沒問題了呢?如果一個執行緒在還未呼叫park掛起之前,是不是有可能被其他執行緒先呼叫一遍unpark?這就是喚醒發生在休眠之前。發生這樣的情況會不會帶來問題呢?