1. 程式人生 > >轉: 【Java並發編程】之二十:並發新特性—Lock鎖和條件變量(含代碼)

轉: 【Java並發編程】之二十:並發新特性—Lock鎖和條件變量(含代碼)

ets exc n) 否則 max 長時間 info trace space


簡單使用Lock鎖

Java 5中引入了新的鎖機制——Java.util.concurrent.locks中的顯式的互斥鎖:Lock接口,它提供了比synchronized更加廣泛的鎖定操作。Lock接口有3個實現它的類:ReentrantLock、ReetrantReadWriteLock.ReadLock和ReetrantReadWriteLock.WriteLock,即重入鎖、讀鎖和寫鎖。lock必須被顯式地創建、鎖定和釋放,為了可以使用更多的功能,一般用ReentrantLock為其實例化。為了保證鎖最終一定會被釋放(可能會有異常發生),要把互斥區放在try語句塊內,並在finally語句塊中釋放鎖,尤其當有return語句時,return語句必須放在try字句中,以確保unlock()不會過早發生,從而將數據暴露給第二個任務。因此,采用lock加鎖和釋放鎖的一般形式如下:

[java] view plain copy
  1. Lock lock = new ReentrantLock();//默認使用非公平鎖,如果要使用公平鎖,需要傳入參數true
  2. ........
  3. lock.lock();
  4. try {
  5. //更新對象的狀態
  6. //捕獲異常,必要時恢復到原來的不變約束
  7. //如果有return語句,放在這裏
  8. finally {
  9. lock.unlock(); //鎖必須在finally塊中釋放


ReetrankLock與synchronized比較


性能比較

在JDK1.5中,synchronized是性能低效的。因為這是一個重量級操作,它對性能最大的影響是阻塞的是實現,掛起線程和恢復線程的操作都需要轉入內核態中完成,這些操作給系統的並發性帶來了很大的壓力。相比之下使用Java提供的Lock對象,性能更高一些。Brian Goetz對這兩種鎖在JDK1.5、單核處理器及雙Xeon處理器環境下做了一組吞吐量對比的實驗,發現多線程環境下,synchronized的吞吐量下降的非常嚴重,而ReentrankLock則能基本保持在同一個比較穩定的水平上。但與其說ReetrantLock性能好,倒不如說synchronized還有非常大的優化余地,於是到了JDK1.6,發生了變化,對synchronize加入了很多優化措施,有自適應自旋,鎖消除,鎖粗化,輕量級鎖,偏向鎖等等。導致在JDK1.6上synchronize的性能並不比Lock差。官方也表示,他們也更支持synchronize,在未來的版本中還有優化余地,所以還是提倡在synchronized能實現需求的情況下,優先考慮使用synchronized來進行同步。


下面淺析以下兩種鎖機制的底層的實現策略。

互斥同步最主要的問題就是進行線程阻塞和喚醒所帶來的性能問題,因而這種同步又稱為阻塞同步,它屬於一種悲觀的並發策略,即線程獲得的是獨占鎖。獨占鎖意味著其他線程只能依靠阻塞來等待線程釋放鎖。而在CPU轉換線程阻塞時會引起線程上下文切換,當有很多線程競爭鎖的時候,會引起CPU頻繁的上下文切換導致效率很低。synchronized采用的便是這種並發策略。

隨著指令集的發展,我們有了另一種選擇:基於沖突檢測的樂觀並發策略,通俗地講就是先進性操作,如果沒有其他線程爭用共享數據,那操作就成功了,如果共享數據被爭用,產生了沖突,那就再進行其他的補償措施(最常見的補償措施就是不斷地重拾,直到試成功為止),這種樂觀的並發策略的許多實現都不需要把線程掛起,因此這種同步被稱為非阻塞同步。ReetrantLock采用的便是這種並發策略。

在樂觀的並發策略中,需要操作和沖突檢測這兩個步驟具備原子性,它靠硬件指令來保證,這裏用的是CAS操作(Compare and Swap)。JDK1.5之後,Java程序才可以使用CAS操作。我們可以進一步研究ReentrantLock的源代碼,會發現其中比較重要的獲得鎖的一個方法是compareAndSetState,這裏其實就是調用的CPU提供的特殊指令。現代的CPU提供了指令,可以自動更新共享數據,而且能夠檢測到其他線程的幹擾,而compareAndSet() 就用這些代替了鎖定。這個算法稱作非阻塞算法,意思是一個線程的失敗或者掛起不應該影響其他線程的失敗或掛起。

Java 5中引入了註入AutomicInteger、AutomicLong、AutomicReference等特殊的原子性變量類,它們提供的如:compareAndSet()、incrementAndSet()和getAndIncrement()等方法都使用了CAS操作。因此,它們都是由硬件指令來保證的原子方法。


用途比較

基本語法上,ReentrantLock與synchronized很相似,它們都具備一樣的線程重入特性,只是代碼寫法上有點區別而已,一個表現為API層面的互斥鎖(Lock),一個表現為原生語法層面的互斥鎖(synchronized)。ReentrantLock相對synchronized而言還是增加了一些高級功能,主要有以下三項:

1、等待可中斷:當持有鎖的線程長期不釋放鎖時,正在等待的線程可以選擇放棄等待,改為處理其他事情,它對處理執行時間非常上的同步塊很有幫助。而在等待由synchronized產生的互斥鎖時,會一直阻塞,是不能被中斷的。

2、可實現公平鎖:多個線程在等待同一個鎖時,必須按照申請鎖的時間順序排隊等待,而非公平鎖則不保證這點,在鎖釋放時,任何一個等待鎖的線程都有機會獲得鎖。synchronized中的鎖時非公平鎖,ReentrantLock默認情況下也是非公平鎖,但可以通過構造方法ReentrantLock(ture)來要求使用公平鎖。

3、鎖可以綁定多個條件:ReentrantLock對象可以同時綁定多個Condition對象(名曰:條件變量或條件隊列),而在synchronized中,鎖對象的wait()和notify()或notifyAll()方法可以實現一個隱含條件,但如果要和多於一個的條件關聯的時候,就不得不額外地添加一個鎖,而ReentrantLock則無需這麽做,只需要多次調用newCondition()方法即可。而且我們還可以通過綁定Condition對象來判斷當前線程通知的是哪些線程(即與Condition對象綁定在一起的其他線程)。


可中斷鎖

ReetrantLock有兩種鎖:忽略中斷鎖和響應中斷鎖。忽略中斷鎖與synchronized實現的互斥鎖一樣,不能響應中斷,而響應中斷鎖可以響應中斷。

如果某一線程A正在執行鎖中的代碼,另一線程B正在等待獲取該鎖,可能由於等待時間過長,線程B不想等待了,想先處理其他事情,我們可以讓它中斷自己或者在別的線程中中斷它,如果此時ReetrantLock提供的是忽略中斷鎖,則它不會去理會該中斷,而是讓線程B繼續等待,而如果此時ReetrantLock提供的是響應中斷鎖,那麽它便會處理中斷,讓線程B放棄等待,轉而去處理其他事情。

獲得響應中斷鎖的一般形式如下:

[java] view plain copy
  1. ReentrantLock lock = new ReentrantLock();
  2. ...........
  3. lock.lockInterruptibly();//獲取響應中斷鎖
  4. try {
  5. //更新對象的狀態
  6. //捕獲異常,必要時恢復到原來的不變約束
  7. //如果有return語句,放在這裏
  8. }finally{
  9. lock.unlock(); //鎖必須在finally塊中釋放
  10. }

這裏有一個不錯的分析中斷的示例代碼(摘自網上)

當用synchronized中斷對互斥鎖的等待時,並不起作用,該線程依然會一直等待,如下面的實例:

[java] view plain copy
  1. public class Buffer {
  2. private Object lock;
  3. public Buffer() {
  4. lock = this;
  5. }
  6. public void write() {
  7. synchronized (lock) {
  8. long startTime = System.currentTimeMillis();
  9. System.out.println("開始往這個buff寫入數據…");
  10. for (;;)// 模擬要處理很長時間
  11. {
  12. if (System.currentTimeMillis()
  13. - startTime > Integer.MAX_VALUE) {
  14. break;
  15. }
  16. }
  17. System.out.println("終於寫完了");
  18. }
  19. }
  20. public void read() {
  21. synchronized (lock) {
  22. System.out.println("從這個buff讀數據");
  23. }
  24. }
  25. public static void main(String[] args) {
  26. Buffer buff = new Buffer();
  27. final Writer writer = new Writer(buff);
  28. final Reader reader = new Reader(buff);
  29. writer.start();
  30. reader.start();
  31. new Thread(new Runnable() {
  32. @Override
  33. public void run() {
  34. long start = System.currentTimeMillis();
  35. for (;;) {
  36. //等5秒鐘去中斷讀
  37. if (System.currentTimeMillis()
  38. - start > 5000) {
  39. System.out.println("不等了,嘗試中斷");
  40. reader.interrupt(); //嘗試中斷讀線程
  41. break;
  42. }
  43. }
  44. }
  45. }).start();
  46. // 我們期待“讀”這個線程能退出等待鎖,可是事與願違,一旦讀這個線程發現自己得不到鎖,
  47. // 就一直開始等待了,就算它等死,也得不到鎖,因為寫線程要21億秒才能完成 T_T ,即使我們中斷它,
  48. // 它都不來響應下,看來真的要等死了。這個時候,ReentrantLock給了一種機制讓我們來響應中斷,
  49. // 讓“讀”能伸能屈,勇敢放棄對這個鎖的等待。我們來改寫Buffer這個類,就叫BufferInterruptibly吧,可中斷緩存。
  50. }
  51. }
  52. class Writer extends Thread {
  53. private Buffer buff;
  54. public Writer(Buffer buff) {
  55. this.buff = buff;
  56. }
  57. @Override
  58. public void run() {
  59. buff.write();
  60. }
  61. }
  62. class Reader extends Thread {
  63. private Buffer buff;
  64. public Reader(Buffer buff) {
  65. this.buff = buff;
  66. }
  67. @Override
  68. public void run() {
  69. buff.read();//這裏估計會一直阻塞
  70. System.out.println("讀結束");
  71. }
  72. }

執行結果如下:

技術分享 我們等待了很久,後面依然沒有輸出,說明讀線程對互斥鎖的等待並沒有被中斷,也就是該戶吃鎖沒有響應對讀線程的中斷。 我們再將上面代碼中synchronized的互斥鎖改為ReentrantLock的響應中斷鎖,即改為如下代碼:

[java] view plain copy
  1. import java.util.concurrent.locks.ReentrantLock;
  2. public class BufferInterruptibly {
  3. private ReentrantLock lock = new ReentrantLock();
  4. public void write() {
  5. lock.lock();
  6. try {
  7. long startTime = System.currentTimeMillis();
  8. System.out.println("開始往這個buff寫入數據…");
  9. for (;;)// 模擬要處理很長時間
  10. {
  11. if (System.currentTimeMillis()
  12. - startTime > Integer.MAX_VALUE) {
  13. break;
  14. }
  15. }
  16. System.out.println("終於寫完了");
  17. } finally {
  18. lock.unlock();
  19. }
  20. }
  21. public void read() throws InterruptedException {
  22. lock.lockInterruptibly();// 註意這裏,可以響應中斷
  23. try {
  24. System.out.println("從這個buff讀數據");
  25. } finally {
  26. lock.unlock();
  27. }
  28. }
  29. public static void main(String args[]) {
  30. BufferInterruptibly buff = new BufferInterruptibly();
  31. final Writer2 writer = new Writer2(buff);
  32. final Reader2 reader = new Reader2(buff);
  33. writer.start();
  34. reader.start();
  35. new Thread(new Runnable() {
  36. @Override
  37. public void run() {
  38. long start = System.currentTimeMillis();
  39. for (;;) {
  40. if (System.currentTimeMillis()
  41. - start > 5000) {
  42. System.out.println("不等了,嘗試中斷");
  43. reader.interrupt(); //此處中斷讀操作
  44. break;
  45. }
  46. }
  47. }
  48. }).start();
  49. }
  50. }
  51. class Reader2 extends Thread {
  52. private BufferInterruptibly buff;
  53. public Reader2(BufferInterruptibly buff) {
  54. this.buff = buff;
  55. }
  56. @Override
  57. public void run() {
  58. try {
  59. buff.read();//可以收到中斷的異常,從而有效退出
  60. } catch (InterruptedException e) {
  61. System.out.println("我不讀了");
  62. }
  63. System.out.println("讀結束");
  64. }
  65. }
  66. class Writer2 extends Thread {
  67. private BufferInterruptibly buff;
  68. public Writer2(BufferInterruptibly buff) {
  69. this.buff = buff;
  70. }
  71. @Override
  72. public void run() {
  73. buff.write();
  74. }
  75. }

執行結果如下:

技術分享 從結果中可以看出,嘗試中斷後輸出了catch語句塊中的內容,也輸出了後面的“讀結束”,說明線程對互斥鎖的等待被中斷了,也就是該互斥鎖響應了對讀線程的中斷。

條件變量實現線程間協作



下面將一文中的代碼改為用條件變量實現,如下: [java] view plain copy
  1. import java.util.concurrent.*;
  2. import java.util.concurrent.locks.*;
  3. class Info{ // 定義信息類
  4. private String name = "name";//定義name屬性,為了與下面set的name屬性區別開
  5. private String content = "content" ;// 定義content屬性,為了與下面set的content屬性區別開
  6. private boolean flag = true ; // 設置標誌位,初始時先生產
  7. private Lock lock = new ReentrantLock();
  8. private Condition condition = lock.newCondition(); //產生一個Condition對象
  9. public void set(String name,String content){
  10. lock.lock();
  11. try{
  12. while(!flag){
  13. condition.await() ;
  14. }
  15. this.setName(name) ; // 設置名稱
  16. Thread.sleep(300) ;
  17. this.setContent(content) ; // 設置內容
  18. flag = false ; // 改變標誌位,表示可以取走
  19. condition.signal();
  20. }catch(InterruptedException e){
  21. e.printStackTrace() ;
  22. }finally{
  23. lock.unlock();
  24. }
  25. }
  26. public void get(){
  27. lock.lock();
  28. try{
  29. while(flag){
  30. condition.await() ;
  31. }
  32. Thread.sleep(300) ;
  33. System.out.println(this.getName() +
  34. " --> " + this.getContent()) ;
  35. flag = true ; // 改變標誌位,表示可以生產
  36. condition.signal();
  37. }catch(InterruptedException e){
  38. e.printStackTrace() ;
  39. }finally{
  40. lock.unlock();
  41. }
  42. }
  43. public void setName(String name){
  44. this.name = name ;
  45. }
  46. public void setContent(String content){
  47. this.content = content ;
  48. }
  49. public String getName(){
  50. return this.name ;
  51. }
  52. public String getContent(){
  53. return this.content ;
  54. }
  55. }
  56. class Producer implements Runnable{ // 通過Runnable實現多線程
  57. private Info info = null ; // 保存Info引用
  58. public Producer(Info info){
  59. this.info = info ;
  60. }
  61. public void run(){
  62. boolean flag = true ; // 定義標記位
  63. for(int i=0;i<10;i++){
  64. if(flag){
  65. this.info.set("姓名--1","內容--1") ; // 設置名稱
  66. flag = false ;
  67. }else{
  68. this.info.set("姓名--2","內容--2") ; // 設置名稱
  69. flag = true ;
  70. }
  71. }
  72. }
  73. }
  74. class Consumer implements Runnable{
  75. private Info info = null ;
  76. public Consumer(Info info){
  77. this.info = info ;
  78. }
  79. public void run(){
  80. for(int i=0;i<10;i++){
  81. this.info.get() ;
  82. }
  83. }
  84. }
  85. public class ThreadCaseDemo{
  86. public static void main(String args[]){
  87. Info info = new Info(); // 實例化Info對象
  88. Producer pro = new Producer(info) ; // 生產者
  89. Consumer con = new Consumer(info) ; // 消費者
  90. new Thread(pro).start() ;
  91. //啟動了生產者線程後,再啟動消費者線程
  92. try{
  93. Thread.sleep(500) ;
  94. }catch(InterruptedException e){
  95. e.printStackTrace() ;
  96. }
  97. new Thread(con).start() ;
  98. }
  99. }
執行後,同樣可以得到如下的結果: 姓名--1 --> 內容--1
姓名--2 --> 內容--2
姓名--1 --> 內容--1
姓名--2 --> 內容--2
姓名--1 --> 內容--1
姓名--2 --> 內容--2
姓名--1 --> 內容--1
姓名--2 --> 內容--2
姓名--1 --> 內容--1
姓名--2 --> 內容--2
從以上並不能看出用條件變量的await()、signal()、signalAll()方法比用Object對象的wait()、notify()、notifyAll()方法實現線程間協作有多少優點,但它在處理更復雜的多線程問題時,會有明顯的優勢。所以,Lock和Condition對象只有在更加困難的多線程問題中才是必須的。

讀寫鎖

另外,synchronized獲取的互斥鎖不僅互斥讀寫操作、寫寫操作,還互斥讀讀操作,而讀讀操作時不會帶來數據競爭的,因此對對讀讀操作也互斥的話,會降低性能。Java 5中提供了讀寫鎖,它將讀鎖和寫鎖分離,使得讀讀操作不互斥,獲取讀鎖和寫鎖的一般形式如下:

[java] view plain copy
  1. ReadWriteLock rwl = new ReentrantReadWriteLock();
  2. rwl.writeLock().lock() //獲取寫鎖
  3. rwl.readLock().lock() //獲取讀鎖

用讀鎖來鎖定讀操作,用寫鎖來鎖定寫操作,這樣寫操作和寫操作之間會互斥,讀操作和寫操作之間會互斥,但讀操作和讀操作就不會互斥。

《Java並發編程實踐》一書給出了使用 ReentrantLock的最佳時機:

當你需要以下高級特性時,才應該使用:可定時的、可輪詢的與可中斷的鎖獲取操作,公平隊列,或者非塊結構的鎖。否則,請使用synchronized。

轉: 【Java並發編程】之二十:並發新特性—Lock鎖和條件變量(含代碼)