1. 程式人生 > >【Java併發程式設計】之二十:併發新特性—Lock鎖和條件變數(含程式碼)

【Java併發程式設計】之二十:併發新特性—Lock鎖和條件變數(含程式碼)

簡單使用Lock鎖

    Java 5中引入了新的鎖機制——java.util.concurrent.locks中的顯式的互斥鎖:Lock介面,它提供了比synchronized更加廣泛的鎖定操作。Lock介面有3個實現它的類:ReentrantLock、ReetrantReadWriteLock.ReadLock和ReetrantReadWriteLock.WriteLock,即重入鎖、讀鎖和寫鎖。lock必須被顯式地建立、鎖定和釋放,為了可以使用更多的功能,一般用ReentrantLock為其例項化。為了保證鎖最終一定會被釋放(可能會有異常發生),要把互斥區放在try語句塊內,並在finally語句塊中釋放鎖,尤其當有return語句時,return語句必須放在try字句中,以確保unlock()不會過早發生,從而將資料暴露給第二個任務。因此,採用lock加鎖和釋放鎖的一般形式如下:

  1. Lock lock = new ReentrantLock();//預設使用非公平鎖,如果要使用公平鎖,需要傳入引數true

  2. ........

  3. lock.lock();

  4. try {

  5. //更新物件的狀態

  6. //捕獲異常,必要時恢復到原來的不變約束

  7. //如果有return語句,放在這裡

  8. } finally {

  9. lock.unlock(); //鎖必須在finally塊中釋放

  10. }

ReetrankLock與synchronized比較

    效能比較

    在JDK1.5中,synchronized是效能低效的。因為這是一個重量級操作,它對效能最大的影響是阻塞的是實現,掛起執行緒和恢復執行緒的操作都需要轉入核心態中完成,這些操作給系統的併發性帶來了很大的壓力。相比之下使用Java提供的Lock物件,效能更高一些。Brian Goetz對這兩種鎖在JDK1.5、單核處理器及雙Xeon處理器環境下做了一組吞吐量對比的實驗,發現多執行緒環境下,synchronized的吞吐量下降的非常嚴重,而ReentrankLock則能基本保持在同一個比較穩定的水平上。但與其說ReetrantLock效能好,倒不如說synchronized還有非常大的優化餘地,於是到了JDK1.6,發生了變化,對synchronize加入了很多優化措施,有自適應自旋,鎖消除,鎖粗化,輕量級鎖,偏向鎖等等。導致在JDK1.6上synchronize的效能並不比Lock差。官方也表示,他們也更支援synchronize,在未來的版本中還有優化餘地,所以還是提倡在synchronized能實現需求的情況下,優先考慮使用synchronized來進行同步。


    下面淺析以下兩種鎖機制的底層的實現策略。

    互斥同步最主要的問題就是進行執行緒阻塞和喚醒所帶來的效能問題,因而這種同步又稱為阻塞同步,它屬於一種悲觀的併發策略,即執行緒獲得的是獨佔鎖。獨佔鎖意味著其他執行緒只能依靠阻塞來等待執行緒釋放鎖。而在CPU轉換執行緒阻塞時會引起執行緒上下文切換,當有很多執行緒競爭鎖的時候,會引起CPU頻繁的上下文切換導致效率很低。synchronized採用的便是這種併發策略。

    隨著指令集的發展,我們有了另一種選擇:基於衝突檢測的樂觀併發策略,通俗地講就是先進性操作,如果沒有其他執行緒爭用共享資料,那操作就成功了,如果共享資料被爭用,產生了衝突,那就再進行其他的補償措施(最常見的補償措施就是不斷地重拾,直到試成功為止),這種樂觀的併發策略的許多實現都不需要把執行緒掛起,因此這種同步被稱為非阻塞同步。ReetrantLock採用的便是這種併發策略。

    在樂觀的併發策略中,需要操作和衝突檢測這兩個步驟具備原子性,它靠硬體指令來保證,這裡用的是CAS操作(Compare and Swap)。JDK1.5之後,Java程式才可以使用CAS操作。我們可以進一步研究ReentrantLock的原始碼,會發現其中比較重要的獲得鎖的一個方法是compareAndSetState,這裡其實就是呼叫的CPU提供的特殊指令。現代的CPU提供了指令,可以自動更新共享資料,而且能夠檢測到其他執行緒的干擾,而compareAndSet() 就用這些代替了鎖定。這個演算法稱作非阻塞演算法,意思是一個執行緒的失敗或者掛起不應該影響其他執行緒的失敗或掛起。

    Java 5中引入了注入AutomicInteger、AutomicLong、AutomicReference等特殊的原子性變數類,它們提供的如:compareAndSet()、incrementAndSet()和getAndIncrement()等方法都使用了CAS操作。因此,它們都是由硬體指令來保證的原子方法。

   用途比較

    基本語法上,ReentrantLock與synchronized很相似,它們都具備一樣的執行緒重入特性,只是程式碼寫法上有點區別而已,一個表現為API層面的互斥鎖(Lock),一個表現為原生語法層面的互斥鎖(synchronized)。ReentrantLock相對synchronized而言還是增加了一些高階功能,主要有以下三項:

    1、等待可中斷:當持有鎖的執行緒長期不釋放鎖時,正在等待的執行緒可以選擇放棄等待,改為處理其他事情,它對處理執行時間非常上的同步塊很有幫助。而在等待由synchronized產生的互斥鎖時,會一直阻塞,是不能被中斷的。

    2、可實現公平鎖:多個執行緒在等待同一個鎖時,必須按照申請鎖的時間順序排隊等待,而非公平鎖則不保證這點,在鎖釋放時,任何一個等待鎖的執行緒都有機會獲得鎖。synchronized中的鎖時非公平鎖,ReentrantLock預設情況下也是非公平鎖,但可以通過構造方法ReentrantLock(ture)來要求使用公平鎖。

    3、鎖可以繫結多個條件:ReentrantLock物件可以同時繫結多個Condition物件(名曰:條件變數或條件佇列),而在synchronized中,鎖物件的wait()和notify()或notifyAll()方法可以實現一個隱含條件,但如果要和多於一個的條件關聯的時候,就不得不額外地新增一個鎖,而ReentrantLock則無需這麼做,只需要多次呼叫newCondition()方法即可。而且我們還可以通過繫結Condition物件來判斷當前執行緒通知的是哪些執行緒(即與Condition物件繫結在一起的其他執行緒)。

可中斷鎖

    ReetrantLock有兩種鎖:忽略中斷鎖和響應中斷鎖。忽略中斷鎖與synchronized實現的互斥鎖一樣,不能響應中斷,而響應中斷鎖可以響應中斷。

    如果某一執行緒A正在執行鎖中的程式碼,另一執行緒B正在等待獲取該鎖,可能由於等待時間過長,執行緒B不想等待了,想先處理其他事情,我們可以讓它中斷自己或者在別的執行緒中中斷它,如果此時ReetrantLock提供的是忽略中斷鎖,則它不會去理會該中斷,而是讓執行緒B繼續等待,而如果此時ReetrantLock提供的是響應中斷鎖,那麼它便會處理中斷,讓執行緒B放棄等待,轉而去處理其他事情。

  獲得響應中斷鎖的一般形式如下:

  1. ReentrantLock lock = new ReentrantLock();

  2. ...........

  3. lock.lockInterruptibly();//獲取響應中斷鎖

  4. try {

  5. //更新物件的狀態

  6. //捕獲異常,必要時恢復到原來的不變約束

  7. //如果有return語句,放在這裡

  8. }finally{

  9. lock.unlock(); //鎖必須在finally塊中釋放

  10. }

    這裡有一個不錯的分析中斷的示例程式碼(摘自網上)

    當用synchronized中斷對互斥鎖的等待時,並不起作用,該執行緒依然會一直等待,如下面的例項:

  1. public class Buffer {

  2. private Object lock;

  3. public Buffer() {

  4. lock = this;

  5. }

  6. public void write() {

  7. synchronized (lock) {

  8. long startTime = System.currentTimeMillis();

  9. System.out.println("開始往這個buff寫入資料…");

  10. for (;;)// 模擬要處理很長時間

  11. {

  12. if (System.currentTimeMillis()

  13. - startTime > Integer.MAX_VALUE) {

  14. break;

  15. }

  16. }

  17. System.out.println("終於寫完了");

  18. }

  19. }

  20. public void read() {

  21. synchronized (lock) {

  22. System.out.println("從這個buff讀資料");

  23. }

  24. }

  25. public static void main(String[] args) {

  26. Buffer buff = new Buffer();

  27. final Writer writer = new Writer(buff);

  28. final Reader reader = new Reader(buff);

  29. writer.start();

  30. reader.start();

  31. new Thread(new Runnable() {

  32. @Override

  33. public void run() {

  34. long start = System.currentTimeMillis();

  35. for (;;) {

  36. //等5秒鐘去中斷讀

  37. if (System.currentTimeMillis()

  38. - start > 5000) {

  39. System.out.println("不等了,嘗試中斷");

  40. reader.interrupt(); //嘗試中斷讀執行緒

  41. break;

  42. }

  43. }

  44. }

  45. }).start();

  46. // 我們期待“讀”這個執行緒能退出等待鎖,可是事與願違,一旦讀這個執行緒發現自己得不到鎖,

  47. // 就一直開始等待了,就算它等死,也得不到鎖,因為寫執行緒要21億秒才能完成 T_T ,即使我們中斷它,

  48. // 它都不來響應下,看來真的要等死了。這個時候,ReentrantLock給了一種機制讓我們來響應中斷,

  49. // 讓“讀”能伸能屈,勇敢放棄對這個鎖的等待。我們來改寫Buffer這個類,就叫BufferInterruptibly吧,可中斷快取。

  50. }

  51. }

  52. class Writer extends Thread {

  53. private Buffer buff;

  54. public Writer(Buffer buff) {

  55. this.buff = buff;

  56. }

  57. @Override

  58. public void run() {

  59. buff.write();

  60. }

  61. }

  62. class Reader extends Thread {

  63. private Buffer buff;

  64. public Reader(Buffer buff) {

  65. this.buff = buff;

  66. }

  67. @Override

  68. public void run() {

  69. buff.read();//這裡估計會一直阻塞

  70. System.out.println("讀結束");

  71. }

  72. }

    執行結果如下:

    我們等待了很久,後面依然沒有輸出,說明讀執行緒對互斥鎖的等待並沒有被中斷,也就是該戶吃鎖沒有響應對讀執行緒的中斷。

    我們再將上面程式碼中synchronized的互斥鎖改為ReentrantLock的響應中斷鎖,即改為如下程式碼: 

  1. import java.util.concurrent.locks.ReentrantLock;

  2. public class BufferInterruptibly {

  3. private ReentrantLock lock = new ReentrantLock();

  4. public void write() {

  5. lock.lock();

  6. try {

  7. long startTime = System.currentTimeMillis();

  8. System.out.println("開始往這個buff寫入資料…");

  9. for (;;)// 模擬要處理很長時間

  10. {

  11. if (System.currentTimeMillis()

  12. - startTime > Integer.MAX_VALUE) {

  13. break;

  14. }

  15. }

  16. System.out.println("終於寫完了");

  17. } finally {

  18. lock.unlock();

  19. }

  20. }

  21. public void read() throws InterruptedException {

  22. lock.lockInterruptibly();// 注意這裡,可以響應中斷

  23. try {

  24. System.out.println("從這個buff讀資料");

  25. } finally {

  26. lock.unlock();

  27. }

  28. }

  29. public static void main(String args[]) {

  30. BufferInterruptibly buff = new BufferInterruptibly();

  31. final Writer2 writer = new Writer2(buff);

  32. final Reader2 reader = new Reader2(buff);

  33. writer.start();

  34. reader.start();

  35. new Thread(new Runnable() {

  36. @Override

  37. public void run() {

  38. long start = System.currentTimeMillis();

  39. for (;;) {

  40. if (System.currentTimeMillis()

  41. - start > 5000) {

  42. System.out.println("不等了,嘗試中斷");

  43. reader.interrupt(); //此處中斷讀操作

  44. break;

  45. }

  46. }

  47. }

  48. }).start();

  49. }

  50. }

  51. class Reader2 extends Thread {

  52. private BufferInterruptibly buff;

  53. public Reader2(BufferInterruptibly buff) {

  54. this.buff = buff;

  55. }

  56. @Override

  57. public void run() {

  58. try {

  59. buff.read();//可以收到中斷的異常,從而有效退出

  60. } catch (InterruptedException e) {

  61. System.out.println("我不讀了");

  62. }

  63. System.out.println("讀結束");

  64. }

  65. }

  66. class Writer2 extends Thread {

  67. private BufferInterruptibly buff;

  68. public Writer2(BufferInterruptibly buff) {

  69. this.buff = buff;

  70. }

  71. @Override

  72. public void run() {

  73. buff.write();

  74. }

  75. }

    執行結果如下:

    從結果中可以看出,嘗試中斷後輸出了catch語句塊中的內容,也輸出了後面的“讀結束”,說明執行緒對互斥鎖的等待被中斷了,也就是該互斥鎖響應了對讀執行緒的中斷。

條件變數實現執行緒間協作

    在一文中,我們用synchronized實現互斥,並配合使用Object物件的wait()和notify()或notifyAll()方法來實現執行緒間協作。Java 5之後,我們可以用Reentrantlock鎖配合Condition物件上的await()和signal()或signalAll()方法來實現執行緒間協作。在ReentrantLock物件上newCondition()可以得到一個Condition物件,可以通過在Condition上呼叫await()方法來掛起一個任務(執行緒),通過在Condition上呼叫signal()來通知任務,從而喚醒一個任務,或者呼叫signalAll()來喚醒所有在這個Condition上被其自身掛起的任務。另外,如果使用了公平鎖,signalAll()的與Condition關聯的所有任務將以FIFO佇列的形式獲取鎖,如果沒有使用公平鎖,則獲取鎖的任務是隨機的,這樣我們便可以更好地控制處在await狀態的任務獲取鎖的順序。與notifyAll()相比,signalAll()是更安全的方式。另外,它可以指定喚醒與自身Condition物件繫結在一起的任務。

    下面將生產者——消費者模型一文中的程式碼改為用條件變數實現,如下:

  1. import java.util.concurrent.*;

  2. import java.util.concurrent.locks.*;

  3. class Info{ // 定義資訊類

  4. private String name = "name";//定義name屬性,為了與下面set的name屬性區別開

  5. private String content = "content" ;// 定義content屬性,為了與下面set的content屬性區別開

  6. private boolean flag = true ; // 設定標誌位,初始時先生產

  7. private Lock lock = new ReentrantLock();

  8. private Condition condition = lock.newCondition(); //產生一個Condition物件

  9. public void set(String name,String content){

  10. lock.lock();

  11. try{

  12. while(!flag){

  13. condition.await() ;

  14. }

  15. this.setName(name) ; // 設定名稱

  16. Thread.sleep(300) ;

  17. this.setContent(content) ; // 設定內容

  18. flag = false ; // 改變標誌位,表示可以取走

  19. condition.signal();

  20. }catch(InterruptedException e){

  21. e.printStackTrace() ;

  22. }finally{

  23. lock.unlock();

  24. }

  25. }

  26. public void get(){

  27. lock.lock();

  28. try{

  29. while(flag){

  30. condition.await() ;

  31. }

  32. Thread.sleep(300) ;

  33. System.out.println(this.getName() +

  34. " --> " + this.getContent()) ;

  35. flag = true ; // 改變標誌位,表示可以生產

  36. condition.signal();

  37. }catch(InterruptedException e){

  38. e.printStackTrace() ;

  39. }finally{

  40. lock.unlock();

  41. }

  42. }

  43. public void setName(String name){

  44. this.name = name ;

  45. }

  46. public void setContent(String content){

  47. this.content = content ;

  48. }

  49. public String getName(){

  50. return this.name ;

  51. }

  52. public String getContent(){

  53. return this.content ;

  54. }

  55. }

  56. class Producer implements Runnable{ // 通過Runnable實現多執行緒

  57. private Info info = null ; // 儲存Info引用

  58. public Producer(Info info){

  59. this.info = info ;

  60. }

  61. public void run(){

  62. boolean flag = true ; // 定義標記位

  63. for(int i=0;i<10;i++){

  64. if(flag){

  65. this.info.set("姓名--1","內容--1") ; // 設定名稱

  66. flag = false ;

  67. }else{

  68. this.info.set("姓名--2","內容--2") ; // 設定名稱

  69. flag = true ;

  70. }

  71. }

  72. }

  73. }

  74. class Consumer implements Runnable{

  75. private Info info = null ;

  76. public Consumer(Info info){

  77. this.info = info ;

  78. }

  79. public void run(){

  80. for(int i=0;i<10;i++){

  81. this.info.get() ;

  82. }

  83. }

  84. }

  85. public class ThreadCaseDemo{

  86. public static void main(String args[]){

  87. Info info = new Info(); // 例項化Info物件

  88. Producer pro = new Producer(info) ; // 生產者

  89. Consumer con = new Consumer(info) ; // 消費者

  90. new Thread(pro).start() ;

  91. //啟動了生產者執行緒後,再啟動消費者執行緒

  92. try{

  93. Thread.sleep(500) ;

  94. }catch(InterruptedException e){

  95. e.printStackTrace() ;

  96. }

  97. new Thread(con).start() ;

  98. }

  99. }

    執行後,同樣可以得到如下的結果:

姓名--1 --> 內容--1
姓名--2 --> 內容--2
姓名--1 --> 內容--1
姓名--2 --> 內容--2
姓名--1 --> 內容--1
姓名--2 --> 內容--2
姓名--1 --> 內容--1
姓名--2 --> 內容--2
姓名--1 --> 內容--1
姓名--2 --> 內容--2
    從以上並不能看出用條件變數的await()、signal()、signalAll()方法比用Object物件的wait()、notify()、notifyAll()方法實現執行緒間協作有多少優點,但它在處理更復雜的多執行緒問題時,會有明顯的優勢。所以,Lock和Condition物件只有在更加困難的多執行緒問題中才是必須的。

讀寫鎖

    另外,synchronized獲取的互斥鎖不僅互斥讀寫操作、寫寫操作,還互斥讀讀操作,而讀讀操作時不會帶來資料競爭的,因此對對讀讀操作也互斥的話,會降低效能。Java 5中提供了讀寫鎖,它將讀鎖和寫鎖分離,使得讀讀操作不互斥,獲取讀鎖和寫鎖的一般形式如下:

  1. ReadWriteLock rwl = new ReentrantReadWriteLock();

  2. rwl.writeLock().lock() //獲取寫鎖

  3. rwl.readLock().lock() //獲取讀鎖

   用讀鎖來鎖定讀操作,用寫鎖來鎖定寫操作,這樣寫操作和寫操作之間會互斥,讀操作和寫操作之間會互斥,但讀操作和讀操作就不會互斥。

   《Java併發程式設計實踐》一書給出了使用 ReentrantLock的最佳時機:

    當你需要以下高階特性時,才應該使用:可定時的、可輪詢的與可中斷的鎖獲取操作,公平佇列,或者非塊結構的鎖。否則,請使用synchronized。