1. 程式人生 > >死磕Synchronized底層實現--偏向鎖

死磕Synchronized底層實現--偏向鎖

本文為synchronized系列第二篇。主要內容為分析偏向鎖的實現。

偏向鎖的誕生背景和基本原理在上文中已經講過了,強烈建議在有看過上篇文章的基礎下閱讀本文

更多文章見個人部落格:github.com/farmerjohng…

本文將分為幾塊內容:

1.偏向鎖的入口

2.偏向鎖的獲取流程

3.偏向鎖的撤銷流程

4.偏向鎖的釋放流程

5.偏向鎖的批量重偏向和批量撤銷

本文分析的JVM版本是JVM8,具體版本號以及程式碼可以在這裡看到。

偏向鎖入口

目前網上的很多文章,關於偏向鎖原始碼入口都找錯地方了,導致我之前對於偏向鎖的很多邏輯一直想不通,走了很多彎路。

synchronized

分為synchronized程式碼塊和synchronized方法,其底層獲取鎖的邏輯都是一樣的,本文講解的是synchronized程式碼塊的實現。上篇文章也說過,synchronized程式碼塊是由monitorentermonitorexit兩個指令實現的。

關於HotSpot虛擬機器中獲取鎖的入口,網上很多文章要麼給出的方法入口為interpreterRuntime.cpp#monitorenter,要麼給出的入口為bytecodeInterpreter.cpp#1816。包括佔小狼的這篇文章關於鎖入口的位置說法也是有問題的(當然文章還是很好的,在我剛開始研究synchronized

的時候,小狼哥的這篇文章給了我很多幫助)。

要找鎖的入口,肯定是要在原始碼中找到對monitorenter指令解析的地方。在HotSpot的中有兩處地方對monitorenter指令進行解析:一個是在bytecodeInterpreter.cpp#1816 ,另一個是在templateTable_x86_64.cpp#3667

前者是JVM中的位元組碼直譯器(bytecodeInterpreter),用C++實現了每條JVM指令(如monitorenterinvokevirtual等),其優點是實現相對簡單且容易理解,缺點是執行慢。後者是模板直譯器(templateInterpreter

),其對每個指令都寫了一段對應的彙編程式碼,啟動時將每個指令與對應彙編程式碼入口繫結,可以說是效率做到了極致。模板直譯器的實現可以看這篇文章在研究的過程中也請教過文章作者‘汪先生’一些問題,這裡感謝一下。

在HotSpot中,只用到了模板直譯器,位元組碼直譯器根本就沒用到,R大的讀書筆記中說的很清楚了,大家可以看看,這裡不再贅述。

所以montorenter的解析入口在模板直譯器中,其程式碼位於templateTable_x86_64.cpp#3667。通過呼叫路徑:templateTable_x86_64#monitorenter->interp_masm_x86_64#lock_object進入到偏向鎖入口macroAssembler_x86#biased_locking_enter,在這裡大家可以看到會生成對應的彙編程式碼。需要注意的是,不是說每次解析monitorenter指令都會呼叫biased_locking_enter,而是隻會在JVM啟動的時候呼叫該方法生成彙編程式碼,之後對指令的解析是通過直接執行彙編程式碼。

其實bytecodeInterpreter的邏輯和templateInterpreter的邏輯是大同小異的,因為templateInterpreter中都是彙編程式碼,比較晦澀,所以看bytecodeInterpreter的實現會便於理解一點。但這裡有個坑,在jdk8u之前,bytecodeInterpreter並沒有實現偏向鎖的邏輯。我之前看的JDK8-87ee5ee27509這個版本就沒有實現偏向鎖的邏輯,導致我看了很久都沒看懂。在這個commit中對bytecodeInterpreter加入了偏向鎖的支援,我大致了看了下和templateInterpreter對比除了棧結構不同外,其他邏輯大致相同,所以下文就按bytecodeInterpreter中的程式碼對偏向鎖邏輯進行講解templateInterpreter的彙編程式碼講解可以看這篇文章,其實彙編原始碼中都有英文註釋,瞭解了彙編幾個基本指令的作用再結合註釋理解起來也不是很難。

偏向鎖獲取流程

下面開始偏向鎖獲取流程分析,程式碼在bytecodeInterpreter.cpp#1816注意本文程式碼都有所刪減

CASE(_monitorenter): {
  // lockee 就是鎖物件
  oop lockee = STACK_OBJECT(-1);
  // derefing's lockee ought to provoke implicit null check
  CHECK_NULL(lockee);
  // code 1:找到一個空閒的Lock Record
  BasicObjectLock* limit = istate->monitor_base();
  BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
  BasicObjectLock* entry = NULL;
  while (most_recent != limit ) {
    if (most_recent->obj() == NULL) entry = most_recent;
    else if (most_recent->obj() == lockee) break;
    most_recent++;
  }
  //entry不為null,代表還有空閒的Lock Record
  if (entry != NULL) {
    // code 2:將Lock Record的obj指標指向鎖物件
    entry->set_obj(lockee);
    int success = false;
    uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
	// markoop即物件頭的mark word
    markOop mark = lockee->mark();
    intptr_t hash = (intptr_t) markOopDesc::no_hash;
    // code 3:如果鎖物件的mark word的狀態是偏向模式
    if (mark->has_bias_pattern()) {
      uintptr_t thread_ident;
      uintptr_t anticipated_bias_locking_value;
      thread_ident = (uintptr_t)istate->thread();
     // code 4:這裡有幾步操作,下文分析
      anticipated_bias_locking_value =
        (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
        ~((uintptr_t) markOopDesc::age_mask_in_place);
	 // code 5:如果偏向的執行緒是自己且epoch等於class的epoch
      if  (anticipated_bias_locking_value == 0) {
        // already biased towards this thread, nothing to do
        if (PrintBiasedLockingStatistics) {
          (* BiasedLocking::biased_lock_entry_count_addr())++;
        }
        success = true;
      }
       // code 6:如果偏向模式關閉,則嘗試撤銷偏向鎖
      else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
        markOop header = lockee->klass()->prototype_header();
        if (hash != markOopDesc::no_hash) {
          header = header->copy_set_hash(hash);
        }
        // 利用CAS操作將mark word替換為class中的mark word
        if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
          if (PrintBiasedLockingStatistics)
            (*BiasedLocking::revoked_lock_entry_count_addr())++;
        }
      }
         // code 7:如果epoch不等於class中的epoch,則嘗試重偏向
      else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
        // 構造一個偏向當前執行緒的mark word
        markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
        if (hash != markOopDesc::no_hash) {
          new_header = new_header->copy_set_hash(hash);
        }
        // CAS替換物件頭的mark word  
        if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {
          if (PrintBiasedLockingStatistics)
            (* BiasedLocking::rebiased_lock_entry_count_addr())++;
        }
        else {
          // 重偏向失敗,代表存在多執行緒競爭,則呼叫monitorenter方法進行鎖升級
          CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
        }
        success = true;
      }
      else {
         // 走到這裡說明當前要麼偏向別的執行緒,要麼是匿名偏向(即沒有偏向任何執行緒)
       	// code 8:下面構建一個匿名偏向的mark word,嘗試用CAS指令替換掉鎖物件的mark word
        markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |(uintptr_t)markOopDesc::age_mask_in_place |epoch_mask_in_place));
        if (hash != markOopDesc::no_hash) {
          header = header->copy_set_hash(hash);
        }
        markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
        // debugging hint
        DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
        if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
           // CAS修改成功
          if (PrintBiasedLockingStatistics)
            (* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
        }
        else {
          // 如果修改失敗說明存在多執行緒競爭,所以進入monitorenter方法
          CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
        }
        success = true;
      }
    }

    // 如果偏向執行緒不是當前執行緒或沒有開啟偏向模式等原因都會導致success==false
    if (!success) {
      // 輕量級鎖的邏輯
      //code 9: 構造一個無鎖狀態的Displaced Mark Word,並將Lock Record的lock指向它
      markOop displaced = lockee->mark()->set_unlocked();
      entry->lock()->set_displaced_header(displaced);
      //如果指定了-XX:+UseHeavyMonitors,則call_vm=true,代表禁用偏向鎖和輕量級鎖
      bool call_vm = UseHeavyMonitors;
      // 利用CAS將物件頭的mark word替換為指向Lock Record的指標
      if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
        // 判斷是不是鎖重入
        if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {		//code 10: 如果是鎖重入,則直接將Displaced Mark Word設定為null
          entry->lock()->set_displaced_header(NULL);
        } else {
          CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
        }
      }
    }
    UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
  } else {
    // lock record不夠,重新執行
    istate->set_msg(more_monitors);
    UPDATE_PC_AND_RETURN(0); // Re-execute
  }
}
複製程式碼

再回顧下物件頭中mark word的格式:

image

JVM中的每個類也有一個類似mark word的prototype_header,用來標記該class的epoch和偏向開關等資訊。上面的程式碼中lockee->klass()->prototype_header()即獲取class的prototype_header。

code 1,從當前執行緒的棧中找到一個空閒的Lock Record即程式碼中的BasicObjectLock,下文都用Lock Record代指),判斷Lock Record是否空閒的依據是其obj欄位 是否為null。注意這裡是按記憶體地址從低往高找到最後一個可用的Lock Record,換而言之,就是找到記憶體地址最高的可用Lock Record

code 2,獲取到Lock Record後,首先要做的就是為其obj欄位賦值。

code 3,判斷鎖物件的mark word是否是偏向模式,即低3位是否為101。

code 4,這裡有幾步位運算的操作anticipated_bias_locking_value = (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) & ​ ~((uintptr_t) markOopDesc::age_mask_in_place); 這個位運算可以分為3個部分。

第一部分((uintptr_t)lockee->klass()->prototype_header() | thread_ident) 將當前執行緒id和類的prototype_header相或,這樣得到的值為(當前執行緒id + prototype_header中的(epoch + 分代年齡 + 偏向鎖標誌 + 鎖標誌位)),注意prototype_header的分代年齡那4個位元組為0

第二部分 ^ (uintptr_t)mark 將上面計算得到的結果與鎖物件的markOop進行異或,相等的位全部被置為0,只剩下不相等的位。

第三部分 & ~((uintptr_t) markOopDesc::age_mask_in_place) markOopDesc::age_mask_in_place為...0001111000,取反後,變成了...1110000111,除了分代年齡那4位,其他位全為1;將取反後的結果再與上面的結果相與,將上面異或得到的結果中分代年齡給忽略掉。

code 5anticipated_bias_locking_value==0代表偏向的執行緒是當前執行緒且mark word的epoch等於class的epoch,這種情況下什麼都不用做。

code 6(anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0代表class的prototype_header或物件的mark word中偏向模式是關閉的,又因為能走到這已經通過了mark->has_bias_pattern()判斷,即物件的mark word中偏向模式是開啟的,那也就是說class的prototype_header不是偏向模式。

然後利用CAS指令Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark撤銷偏向鎖,我們知道CAS會有幾個引數,1是預期的原值,2是預期修改後的值 ,3是要修改的物件,與之對應,cmpxchg_ptr方法第一個引數是預期修改後的值,第2個引數是修改的物件,第3個引數是預期原值,方法返回實際原值,如果等於預期原值則說明修改成功。

code 7,如果epoch已過期,則需要重偏向,利用CAS指令將鎖物件的mark word替換為一個偏向當前執行緒且epoch為類的epoch的新的mark word

code 8,CAS將偏向執行緒改為當前執行緒,如果當前是匿名偏向則能修改成功,否則進入鎖升級的邏輯。

code 9,這一步已經是輕量級鎖的邏輯了。從上圖的mark word的格式可以看到,輕量級鎖中mark word存的是指向Lock Record的指標。這裡構造一個無鎖狀態的mark word,然後儲存到Lock RecordLock Record的格式可以看第一篇文章)。設定mark word是無鎖狀態的原因是:輕量級鎖解鎖時是將物件頭的mark word設定為Lock Record中的Displaced Mark Word,所以建立時設定為無鎖狀態,解鎖時直接用CAS替換就好了。

code 10, 如果是鎖重入,則將Lock RecordDisplaced Mark Word設定為null,起到一個鎖重入計數的作用。

以上是偏向鎖加鎖的流程(包括部分輕量級鎖的加鎖流程),如果當前鎖已偏向其他執行緒||epoch值過期||偏向模式關閉||獲取偏向鎖的過程中存在併發衝突,都會進入到InterpreterRuntime::monitorenter方法, 在該方法中會對偏向鎖撤銷和升級。

偏向鎖的撤銷

這裡說的撤銷是指在獲取偏向鎖的過程因為不滿足條件導致要將鎖物件改為非偏向鎖狀態;釋放是指退出同步塊時的過程,釋放鎖的邏輯會在下一小節闡述。請讀者注意本文中撤銷與釋放的區別

如果獲取偏向鎖失敗會進入到InterpreterRuntime::monitorenter方法

IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
  ...
  Handle h_obj(thread, elem->obj());
  assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
         "must be NULL or an object");
  if (UseBiasedLocking) {
    // Retry fast entry if bias is revoked to avoid unnecessary inflation
    ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
  } else {
    ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
  }
  ...
IRT_END
複製程式碼

可以看到如果開啟了JVM偏向鎖,那會進入到ObjectSynchronizer::fast_enter方法中。

void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
 if (UseBiasedLocking) {
    if (!SafepointSynchronize::is_at_safepoint()) {
      BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
      if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
        return;
      }
    } else {
      assert(!attempt_rebias, "can not rebias toward VM thread");
      BiasedLocking::revoke_at_safepoint(obj);
    }
    assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
 }

 slow_enter (obj, lock, THREAD) ;
}
複製程式碼

如果是正常的Java執行緒,會走上面的邏輯進入到BiasedLocking::revoke_and_rebias方法,如果是VM執行緒則會走到下面的BiasedLocking::revoke_at_safepoint。我們主要看BiasedLocking::revoke_and_rebias方法。這個方法的主要作用像它的方法名:撤銷或者重偏向,第一個引數封裝了鎖物件和當前執行緒,第二個引數代表是否允許重偏向,這裡是true。

BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
  assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");
    
  markOop mark = obj->mark();
  if (mark->is_biased_anonymously() && !attempt_rebias) {
     //如果是匿名偏向且attempt_rebias==false會走到這裡,如鎖物件的hashcode方法被呼叫會出現這種情況,需要撤銷偏向鎖。
    markOop biased_value       = mark;
    markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
    markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
    if (res_mark == biased_value) {
      return BIAS_REVOKED;
    }
  } else if (mark->has_bias_pattern()) {
    // 鎖物件開啟了偏向模式會走到這裡
    Klass* k = obj->klass();
    markOop prototype_header = k->prototype_header();
    //code 1: 如果對應class關閉了偏向模式
    if (!prototype_header->has_bias_pattern()) {
      markOop biased_value       = mark;
      markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
      assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
      return BIAS_REVOKED;
    //code2: 如果epoch過期
    } else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
      if (attempt_rebias) {
        assert(THREAD->is_Java_thread(), "");
        markOop biased_value       = mark;
        markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
        markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
        if (res_mark == biased_value) {
          return BIAS_REVOKED_AND_REBIASED;
        }
      } else {
        markOop biased_value       = mark;
        markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
        markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
        if (res_mark == biased_value) {
          return BIAS_REVOKED;
        }
      }
    }
  }
  //code 3:批量重偏向與批量撤銷的邏輯
  HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
  if (heuristics == HR_NOT_BIASED) {
    return NOT_BIASED;
  } else if (heuristics == HR_SINGLE_REVOKE) {
    //code 4:撤銷單個執行緒
    Klass *k = obj->klass();
    markOop prototype_header = k->prototype_header();
    if (mark->biased_locker() == THREAD &&
        prototype_header->bias_epoch() == mark->bias_epoch()) {
      // 走到這裡說明需要撤銷的是偏向當前執行緒的鎖,當呼叫Object#hashcode方法時會走到這一步
      // 因為只要遍歷當前執行緒的棧就好了,所以不需要等到safepoint再撤銷。
      ResourceMark rm;
      if (TraceBiasedLocking) {
        tty->print_cr("Revoking bias by walking my own stack:");
      }
      BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD);
      ((JavaThread*) THREAD)->set_cached_monitor_info(NULL);
      assert(cond == BIAS_REVOKED, "why not?");
      return cond;
    } else {
      // 下面程式碼最終會在VM執行緒中的safepoint呼叫revoke_bias方法
      VM_RevokeBias revoke(&obj, (JavaThread*) THREAD);
      VMThread::execute(&revoke);
      return revoke.status_code();
    }
  }
	
  assert((heuristics == HR_BULK_REVOKE) ||
         (heuristics == HR_BULK_REBIAS), "?");
   //code5:批量撤銷、批量重偏向的邏輯
  VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
                                (heuristics == HR_BULK_REBIAS),
                                attempt_rebias);
  VMThread::execute(&bulk_revoke);
  return bulk_revoke.status_code();
}

複製程式碼

會走到該方法的邏輯有很多,我們只分析最常見的情況:假設鎖已經偏向執行緒A,這時B執行緒嘗試獲得鎖。

上面的code 1code 2B執行緒都不會走到,最終會走到code 4處,如果要撤銷的鎖偏向的是當前執行緒則直接呼叫revoke_bias撤銷偏向鎖,否則會將該操作push到VM Thread中等到safepoint的時候再執行。

關於VM Thread這裡介紹下:在JVM中有個專門的VM Thread,該執行緒會源源不斷的從VMOperationQueue中取出請求,比如GC請求。對於需要safepoint的操作(VM_Operationevaluate_at_safepoint返回true)必須要等到所有的Java執行緒進入到safepoint才開始執行。 關於safepoint可以參考下這篇文章

接下來我們著重分析下revoke_bias方法。第一個引數為鎖物件,第2、3個引數為都為false

static BiasedLocking::Condition revoke_bias(oop obj, bool allow_rebias, bool is_bulk, JavaThread* requesting_thread) {
  markOop mark = obj->mark();
  // 如果沒有開啟偏向模式,則直接返回NOT_BIASED
  if (!mark->has_bias_pattern()) {
    ...
    return BiasedLocking::NOT_BIASED;
  }

  uint age = mark->age();
  // 構建兩個mark word,一個是匿名偏向模式(101),一個是無鎖模式(001)
  markOop   biased_prototype = markOopDesc::biased_locking_prototype()->set_age(age);
  markOop unbiased_prototype = markOopDesc::prototype()->set_age(age);

  ...

  JavaThread* biased_thread = mark->biased_locker();
  if (biased_thread == NULL) {
     // 匿名偏向。當呼叫鎖物件的hashcode()方法可能會導致走到這個邏輯
     // 如果不允許重偏向,則將物件的mark word設定為無鎖模式
    if (!allow_rebias) {
      obj->set_mark(unbiased_prototype);
    }
    ...
    return BiasedLocking::BIAS_REVOKED;
  }

  // code 1:判斷偏向執行緒是否還存活
  bool thread_is_alive = false;
  // 如果當前執行緒就是偏向執行緒 
  if (requesting_thread == biased_thread) {
    thread_is_alive = true;
  } else {
     // 遍歷當前jvm的所有執行緒,如果能找到,則說明偏向的執行緒還存活
    for (JavaThread* cur_thread = Threads::first(); cur_thread != NULL; cur_thread = cur_thread->next()) {
      if (cur_thread == biased_thread) {
        thread_is_alive = true;
        break;
      }
    }
  }
  // 如果偏向的執行緒已經不存活了
  if (!thread_is_alive) {
    // 允許重偏向則將物件mark word設定為匿名偏向狀態,否則設定為無鎖狀態
    if (allow_rebias) {
      obj->set_mark(biased_prototype);
    } else {
      obj->set_mark(unbiased_prototype);
    }
    ...
    return BiasedLocking::BIAS_REVOKED;
  }

  // 執行緒還存活則遍歷執行緒棧中所有的Lock Record
  GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread);
  BasicLock* highest_lock = NULL;
  for (int i = 0; i < cached_monitor_info->length(); i++) {
    MonitorInfo* mon_info = cached_monitor_info->at(i);
    // 如果能找到對應的Lock Record說明偏向的執行緒還在執行同步程式碼塊中的程式碼
    if (mon_info->owner() == obj) {
      ...
      // 需要升級為輕量級鎖,直接修改偏向執行緒棧中的Lock Record。為了處理鎖重入的case,在這裡將Lock Record的Displaced Mark Word設定為null,第一個Lock Record會在下面的程式碼中再處理
      markOop mark = markOopDesc::encode((BasicLock*) NULL);
      highest_lock = mon_info->lock();
      highest_lock->set_displaced_header(mark);
    } else {
      ...
    }
  }
  if (highest_lock != NULL) {
    // 修改第一個Lock Record為無鎖狀態,然後將obj的mark word設定為執行該Lock Record的指標
    highest_lock->set_displaced_header(unbiased_prototype);
    obj->release_set_mark(markOopDesc::encode(highest_lock));
    ...
  } else {
    // 走到這裡說明偏向執行緒已經不在同步塊中了
    ...
    if (allow_rebias) {
       //設定為匿名偏向狀態
      obj->set_mark(biased_prototype);
    } else {
      // 將mark word設定為無鎖狀態
      obj->set_mark(unbiased_prototype);
    }
  }

  return BiasedLocking::BIAS_REVOKED;
}
複製程式碼

需要注意下,當呼叫鎖物件的Object#hashSystem.identityHashCode()方法會導致該物件的偏向鎖或輕量級鎖升級。這是因為在Java中一個物件的hashcode是在呼叫這兩個方法時才生成的,如果是無鎖狀態則存放在mark word中,如果是重量級鎖則存放在對應的monitor中,而偏向鎖是沒有地方能存放該資訊的,所以必須升級。具體可以看這篇文章hashcode()方法對偏向鎖的影響小節(注意:該文中對於偏向鎖的加鎖描述有些錯誤),另外我也向該文章作者請教過一些問題,他很熱心的回答了我,在此感謝一下!

言歸正傳,revoke_bias方法邏輯:

  1. 檢視偏向的執行緒是否存活,如果已經不存活了,則直接撤銷偏向鎖。JVM維護了一個集合存放所有存活的執行緒,通過遍歷該集合判斷某個執行緒是否存活。
  2. 偏向的執行緒是否還在同步塊中,如果不在了,則撤銷偏向鎖。我們回顧一下偏向鎖的加鎖流程:每次進入同步塊(即執行monitorenter)的時候都會以從高往低的順序在棧中找到第一個可用的Lock Record,將其obj欄位指向鎖物件。每次解鎖(即執行monitorexit)的時候都會將最低的一個相關Lock Record移除掉。所以可以通過遍歷執行緒棧中的Lock Record來判斷執行緒是否還在同步塊中。
  3. 將偏向執行緒所有相關Lock RecordDisplaced Mark Word設定為null,然後將最高位的Lock RecordDisplaced Mark Word 設定為無鎖狀態,最高位的Lock Record也就是第一次獲得鎖時的Lock Record(這裡的第一次是指重入獲取鎖時的第一次),然後將物件頭指向最高位的Lock Record,這裡不需要用CAS指令,因為是在safepoint。 執行完後,就升級成了輕量級鎖。原偏向執行緒的所有Lock Record都已經變成輕量級鎖的狀態。這裡如果看不明白,請再回顧遊戲啊上篇文章的輕量級鎖加鎖過程。

偏向鎖的釋放

偏向鎖的釋放入口在bytecodeInterpreter.cpp#1923

CASE(_monitorexit): {
  oop lockee = STACK_OBJECT(-1);
  CHECK_NULL(lockee);
  // derefing's lockee ought to provoke implicit null check
  // find our monitor slot
  BasicObjectLock* limit = istate->monitor_base();
  BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
  // 從低往高遍歷棧的Lock Record
  while (most_recent != limit ) {
    // 如果Lock Record關聯的是該鎖物件
    if ((most_recent)->obj() == lockee) {
      BasicLock* lock = most_recent->lock();
      markOop header = lock->displaced_header();
      // 釋放Lock Record
      most_recent->set_obj(NULL);
      // 如果是偏向模式,僅僅釋放Lock Record就好了。否則要走輕量級鎖or重量級鎖的釋放流程
      if (!lockee->mark()->has_bias_pattern()) {
        bool call_vm = UseHeavyMonitors;
        // header!=NULL說明不是重入,則需要將Displaced Mark Word CAS到物件頭的Mark Word
        if (header != NULL || call_vm) {
          if (call_vm || Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
            // CAS失敗或者是重量級鎖則會走到這裡,先將obj還原,然後呼叫monitorexit方法
            most_recent->set_obj(lockee);
            CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
          }
        }
      }
      //執行下一條命令
      UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
    }
    //處理下一條Lock Record
    most_recent++;
  }
  // Need to throw illegal monitor state exception
  CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
  ShouldNotReachHere();
}
複製程式碼

上面的程式碼結合註釋理解起來應該不難,偏向鎖的釋放很簡單,只要將對應Lock Record釋放就好了,而輕量級鎖則需要將Displaced Mark Word替換到物件頭的mark word中。如果CAS失敗或者是重量級鎖則進入到InterpreterRuntime::monitorexit方法中。該方法會在輕量級與重量級鎖的文章中講解。

批量重偏向和批量撤銷

批量重偏向和批量撤銷的背景可以看上篇文章,相關實現在BiasedLocking::revoke_and_rebias中:

BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
  ...
  //code 1:重偏向的邏輯
  HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
  // 非重偏向的邏輯
  ...
      
  assert((heuristics == HR_BULK_REVOKE) ||
         (heuristics == HR_BULK_REBIAS), "?");	
   //code 2:批量撤銷、批量重偏向的邏輯
  VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
                                (heuristics == HR_BULK_REBIAS),
                                attempt_rebias);
  VMThread::execute(&bulk_revoke);
  return bulk_revoke.status_code();
}
複製程式碼

在每次撤銷偏向鎖的時候都通過update_heuristics方法記錄下來,以類為單位,當某個類的物件撤銷偏向次數達到一定閾值的時候JVM就認為該類不適合偏向模式或者需要重新偏向另一個物件,update_heuristics就會返回HR_BULK_REVOKEHR_BULK_REBIAS。進行批量撤銷或批量重偏向。

先看update_heuristics方法。

static HeuristicsResult update_heuristics(oop o, bool allow_rebias) {
  markOop mark = o->mark();
  //如果不是偏向模式直接返回
  if (!mark->has_bias_pattern()) {
    return HR_NOT_BIASED;
  }
 
  // 鎖物件的類
  Klass* k = o->klass();
  // 當前時間
  jlong cur_time = os::javaTimeMillis();
  // 該類上一次批量撤銷的時間
  jlong last_bulk_revocation_time = k->last_biased_lock_bulk_revocation_time();
  // 該類偏向鎖撤銷的次數
  int revocation_count = k->biased_lock_revocation_count();
  // BiasedLockingBulkRebiasThreshold是重偏向閾值(預設20),BiasedLockingBulkRevokeThreshold是批量撤銷閾值(預設40),BiasedLockingDecayTime是開啟一次新的批量重偏向距離上次批量重偏向的後的延遲時間,預設25000。也就是開啟批量重偏向後,經過了一段較長的時間(>=BiasedLockingDecayTime),撤銷計數器才超過閾值,那我們會重置計數器。
  if ((revocation_count >= BiasedLockingBulkRebiasThreshold) &&
      (revocation_count <  BiasedLockingBulkRevokeThreshold) &&
      (last_bulk_revocation_time != 0) &&
      (cur_time - last_bulk_revocation_time >= BiasedLockingDecayTime)) {
    // This is the first revocation we've seen in a while of an
    // object of this type since the last time we performed a bulk
    // rebiasing operation. The application is allocating objects in
    // bulk which are biased toward a thread and then handing them
    // off to another thread. We can cope with this allocation
    // pattern via the bulk rebiasing mechanism so we reset the
    // klass's revocation count rather than allow it to increase
    // monotonically. If we see the need to perform another bulk
    // rebias operation later, we will, and if subsequently we see
    // many more revocation operations in a short period of time we
    // will completely disable biasing for this type.
    k->set_biased_lock_revocation_count(0);
    revocation_count = 0;
  }

  // 自增撤銷計數器
  if (revocation_count <= BiasedLockingBulkRevokeThreshold) {
    revocation_count = k->atomic_incr_biased_lock_revocation_count();
  }
  // 如果達到批量撤銷閾值則返回HR_BULK_REVOKE
  if (revocation_count == BiasedLockingBulkRevokeThreshold) {
    return HR_BULK_REVOKE;
  }
  // 如果達到批量重偏向閾值則返回HR_BULK_REBIAS
  if (revocation_count == BiasedLockingBulkRebiasThreshold) {
    return HR_BULK_REBIAS;
  }
  // 沒有達到閾值則撤銷單個物件的鎖
  return HR_SINGLE_REVOKE;
}
複製程式碼

當達到閾值的時候就會通過VM 執行緒在safepoint呼叫bulk_revoke_or_rebias_at_safepoint, 引數bulk_rebias如果是true代表是批量重偏向否則為批量撤銷。attempt_rebias_of_object代表對操作的鎖物件o是否執行重偏向,這裡是true

static BiasedLocking::Condition bulk_revoke_or_rebias_at_safepoint(oop o,
                                                                   bool bulk_rebias,
                                                                   bool attempt_rebias_of_object,
                                                                   JavaThread* requesting_thread) {
  ...
  jlong cur_time = os::javaTimeMillis();
  o->klass()->set_last_biased_lock_bulk_revocation_time(cur_time);


  Klass* k_o = o->klass();
  Klass* klass = k_o;

  if (bulk_rebias) {
    // 批量重偏向的邏輯
    if (klass->prototype_header()->has_bias_pattern()) {
      // 自增前類中的的epoch
      int prev_epoch = klass->prototype_header()->bias_epoch();
      // code 1:類中的epoch自增
      klass->set_prototype_header(klass->prototype_header()->incr_bias_epoch());
      int cur_epoch = klass->prototype_header()->bias_epoch();

      // code 2:遍歷所有執行緒的棧,更新型別為該klass的所有鎖例項的epoch
      for (JavaThread* thr = Threads::first(); thr != NULL; thr = thr->next()) {
        GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(thr);
        for (int i = 0; i < cached_monitor_info->length(); i++) {
          MonitorInfo* mon_info = cached_monitor_info->at(i);
          oop owner = mon_info->owner();
          markOop mark = owner->mark();
          if ((owner->klass() == k_o) && mark->has_bias_pattern()) {
            // We might have encountered this object already in the case of recursive locking
            assert(mark->bias_epoch() == prev_epoch || mark->bias_epoch() == cur_epoch, "error in bias epoch adjustment");
            owner->set_mark(mark->set_bias_epoch(cur_epoch));
          }
        }
      }
    }

    // 接下來對當前鎖物件進行重偏向
    revoke_bias(o, attempt_rebias_of_object && klass->prototype_header()->has_bias_pattern(), true, requesting_thread);
  } else {
    ...

    // code 3:批量撤銷的邏輯,將類中的偏向標記關閉,markOopDesc::prototype()返回的是一個關閉偏向模式的prototype
    klass->set_prototype_header(markOopDesc::prototype());

    // code 4:遍歷所有執行緒的棧,撤銷該類所有鎖的偏向
    for (JavaThread* thr = Threads::first(); thr != NULL; thr = thr->next()) {
      GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(thr);
      for (int i = 0; i < cached_monitor_info->length(); i++) {
        MonitorInfo* mon_info = cached_monitor_info->at(i);
        oop owner = mon_info->owner();
        markOop mark = owner->mark();
        if ((owner->klass() == k_o) && mark->has_bias_pattern()) {
          revoke_bias(owner, false, true, requesting_thread);
        }
      }
    }

    // 撤銷當前鎖物件的偏向模式
    revoke_bias(o, false, true, requesting_thread);
  }

  ...
  
  BiasedLocking::Condition status_code = BiasedLocking::BIAS_REVOKED;

  if (attempt_rebias_of_object &&
      o->mark()->has_bias_pattern() &&
      klass->prototype_header()->has_bias_pattern()) {
    // 構造一個偏向請求執行緒的mark word
    markOop new_mark = markOopDesc::encode(requesting_thread, o->mark()->age(),
                                           klass->prototype_header()->bias_epoch());
    // 更新當前鎖物件的mark word
    o->set_mark(new_mark);
    status_code = BiasedLocking::BIAS_REVOKED_AND_REBIASED;
    ...
  }

  ...

  return status_code;
}

複製程式碼

該方法分為兩個邏輯:批量重偏向和批量撤銷。

先看批量重偏向,分為兩步:

code 1 將類中的撤銷計數器自增1,之後當該類已存在的例項獲得鎖時,就會嘗試重偏向,相關邏輯在偏向鎖獲取流程小節中。

code 2 處理當前正在被使用的鎖物件,通過遍歷所有存活執行緒的棧,找到所有正在使用的偏向鎖物件,然後更新它們的epoch值。也就是說不會重偏向正在使用的鎖,否則會破壞鎖的執行緒安全性。

批量撤銷邏輯如下:

code 3將類的偏向標記關閉,之後當該類已存在的例項獲得鎖時,就會升級為輕量級鎖;該類新分配的物件的mark word則是無鎖模式。

code 4處理當前正在被使用的鎖物件,通過遍歷所有存活執行緒的棧,找到所有正在使用的偏向鎖物件,然後撤銷偏向鎖。