1. 程式人生 > >併發實戰——原子類AtomicReference及底層原始碼CompareAndSwapObject分析

併發實戰——原子類AtomicReference及底層原始碼CompareAndSwapObject分析

本文內容:

  • 分析原子類AtomicReference
  • 分析原始碼

AtomicReference中

public final boolean compareAndSet(V expect, V update) {
        return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
    }
public final V getAndSet(V newValue) {
        return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
    }


    public
final Object getAndSetObject(Object var1, long var2, Object var4) { Object var5; do { var5 = this.getObjectVolatile(var1, var2); } while(!this.compareAndSwapObject(var1, var2, var5, var4)); return var5; }

compareAndSwapObject 原始碼探析

public final
native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

可以看到宣告的是native方法,意味著該方法使用更底層的程式碼(cpp)實現的,需要下載openjdk原始碼才可以看到。下面開始簡要地介紹下尋找原始碼的過程。

下載好openjdk原始碼後,將其匯入到idea裡該project的lib中,方便全域性查詢。

參照native實現規則,可以猜測javah工具生成的cpp標頭檔案。

這裡寫圖片描述

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapObject(JNIEnv *
env, jobject unsafe, jobject obj, jlong offset, jobject e_h, jobject x_h)) UnsafeWrapper("Unsafe_CompareAndSwapObject"); oop x = JNIHandles::resolve(x_h); // 新值 oop e = JNIHandles::resolve(e_h); // 預期值 oop p = JNIHandles::resolve(obj); HeapWord* addr = (HeapWord *)index_oop_from_field_offset_long(p, offset);// 在記憶體中的具體位置 oop res = oopDesc::atomic_compare_exchange_oop(x, addr, e, true);// 呼叫了另一個方法 jboolean success = (res == e); // 如果返回的res等於e,則判定滿足compare條件(說明res應該為記憶體中的當前值),但實際上會有ABA的問題 if (success) // success為true時,說明此時已經交換成功(呼叫的是最底層的cmpxchg指令) update_barrier_set((void*)addr, x); // 每次Reference型別資料寫操作時,都會產生一個Write Barrier暫時中斷操作,配合垃圾收集器 return success; UNSAFE_END

我們直接全域性搜尋(搜尋範圍是openjdk)這個被呼叫的方法,可以看到該方法的定義和實現

定義:
在opp.hpp檔案中

  static oop atomic_compare_exchange_oop(oop exchange_value,
                                         volatile HeapWord *dest,
                                         oop compare_value,
                                         bool prebarrier = false);
inline oop oopDesc::atomic_compare_exchange_oop(oop exchange_value,
                                                volatile HeapWord *dest,
                                                oop compare_value,
                                                bool prebarrier) {
  if (UseCompressedOops) { // 如果使用了壓縮普通物件指標(CompressedOops),有一個重新編解碼的過程
    if (prebarrier) {
      update_barrier_set_pre((narrowOop*)dest, exchange_value);
    }
    // encode exchange and compare value from oop to T
    narrowOop val = encode_heap_oop(exchange_value); // 新值
    narrowOop cmp = encode_heap_oop(compare_value); // 預期值

    narrowOop old = (narrowOop) Atomic::cmpxchg(val, (narrowOop*)dest, cmp); // 這裡呼叫的方法見文章最後,因為被壓縮到了32位,所以可以用int操作  
    // decode old from T to oop
    return decode_heap_oop(old);
  } else {
    if (prebarrier) {
      update_barrier_set_pre((oop*)dest, exchange_value);
    }
    return (oop)Atomic::cmpxchg_ptr(exchange_value, (oop*)dest, compare_value); // 可以看到這裡繼續呼叫了其他方法
  }
}

針對64位地址的操作

inline jlong    Atomic::cmpxchg    (jlong    exchange_value, volatile jlong*    dest, jlong    compare_value) {
  int mp = os::is_MP();
  jint ex_lo  = (jint)exchange_value;
  jint ex_hi  = *( ((jint*)&exchange_value) + 1 );
  jint cmp_lo = (jint)compare_value;
  jint cmp_hi = *( ((jint*)&compare_value) + 1 );
  __asm {
    push ebx
    push edi
    mov eax, cmp_lo
    mov edx, cmp_hi
    mov edi, dest
    mov ebx, ex_lo
    mov ecx, ex_hi
    LOCK_IF_MP(mp)
    cmpxchg8b qword ptr [edi]
    pop edi
    pop ebx
  }
}

/**
CMPXCHG8B - 比較並交換 8 位元組
說明
比較 EDX:EAX 中的 64 位值與運算元(目標運算元)。如果這兩個值相等,則將 ECX:EBX 中的 64 位值儲存到目標運算元。否則,將目標運算元的值載入到 EDX:EAX。目標運算元是 8 位元組記憶體位置。對於一對 EDX:EAX 與 ECX:EBX 暫存器,EDX 與 ECX 包含 64 位值的 32 個高位,EAX 與 EBX 包含 32 個低位。
此指令可以配合 LOCK 字首使用,此時指令將以原子方式執行。為了簡化處理器的匯流排介面,目標運算元可以不考慮比較結果而接收一個寫入週期。如果比較失敗,則寫回目標運算元;否則,將源運算元寫入目標。(處理器永遠不會只產生鎖定讀取而不產生鎖定寫入)。
*/

CompareAndSwapInt

再來看一個基本型別的CAS原始碼

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e; // 這裡呼叫的就是上面的CPU指令cmpxchg
UNSAFE_END

atomic_windows_x86.inline.hpp中:
MP表示multiprocessor,即多處理器。最終根據具體的處理器架構轉換成彙編指令來實現CAS。
當多處理器時需要在前面加上lock指令。

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  // alternative for InterlockedCompareExchange
  int mp = os::is_MP();
  __asm {
    mov edx, dest  // eax,ecx,edx都是32位暫存器
    mov ecx, exchange_value
    mov eax, compare_value
    LOCK_IF_MP(mp)
    cmpxchg dword ptr [edx], ecx
  }
}

上述是cpu層面的CAS,介紹如下:

Compares the value in the AL, AX, or EAX register (depending on the size of the operand) with the first operand (destination operand). If the two values are equal, the second operand (source operand) is loaded into the destination operand. Otherwise, the destination operand is loaded into the AL, AX, or EAX register.(若期望值等於物件地址儲存的值,則用新值來替換物件地址儲存的值,否則,把期望值變為當前物件地址儲存的值)

This instruction can be used with a LOCK prefix to allow the instruction to be executed atomi-cally. To simplify the interface to the processor’s bus, the destination operand receives a write cycle without regard to the result of the comparison. The destination operand is written back if the comparison fails; otherwise, the source operand is written into the destination. (The processor never produces a locked read without also producing a locked write.)

CAS是所有原子變數的原子性的基礎,為什麼一個看起來如此不自然的操作卻如此重要呢?其原因就在於這個native操作會最終演化為一條CPU指令cmpxchg,而不是多條CPU指令。由於CAS僅僅是一條指令,因此它不會被多執行緒的排程所打斷,所以能夠保證CAS操作是一個原子操作。補充一點,當代的很多CPU種類都支援cmpxchg操作,但不是所有CPU都支援,對於不支援的CPU,會自動加鎖來保證其操作不會被打斷。
由此可知,原子變數提供的原子性來自CAS操作,CAS來自Unsafe,然後由CPU的cmpxchg指令來保證。

(以下應該是針對byte型資料的CAS,但是具體的操作有點奇怪,主要是賦值那一塊)

不使用全域性搜尋,也可以用另一種方式尋找函式。
首先看之前檔案中#inclued裡出現的檔案
找到如下(因為之前呼叫的方法也是Atomic開頭的)

#include "runtime/atomic.inline.hpp"

然後進入該hpp檔案,找到

#include "runtime/atomic.hpp"
jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest, jbyte compare_value) {
  assert(sizeof(jbyte) == 1, "assumption.");
  uintptr_t dest_addr = (uintptr_t)dest;
  uintptr_t offset = dest_addr % sizeof(jint);
  volatile jint* dest_int = (volatile jint*)(dest_addr - offset);
  jint cur = *dest_int; // 當前值
  jbyte* cur_as_bytes = (jbyte*)(&cur); // cur的地址
  jint new_val = cur;
  jbyte* new_val_as_bytes = (jbyte*)(&new_val); // new_val的地址
  // 將新值先寫在new_val的地址上(因為最後取值也是從jbyte* dest地址上取值?)
  new_val_as_bytes[offset] = exchange_value;
  //當前值和期望值進行比較,假設不滿足,直接返回cur_as_bytes[offset],因為與compare_value值不同,上層方法會判定交換失敗
  while (cur_as_bytes[offset] == compare_value) {
    // 嘗試交換,呼叫機器指令,返回的是當前該地址上的值
    jint res = cmpxchg(new_val, dest_int, cur);

    if (res == cur) break;
    //否則,修改cur
    cur = res;
    new_val = cur;
    new_val_as_bytes[offset] = exchange_value;
  }
  return cur_as_bytes[offset];
}