1. 程式人生 > >Java並發之Condition

Java並發之Condition

exceptio 圖片 back use 時也 als sig thread ice

在使用Lock之前,我們使用的最多的同步方式應該是synchronized關鍵字來實現同步方式了。配合Object的wait()、notify()系列方法可以實現等待/通知模式。Condition接口也提供了類似Object的監視器方法,與Lock配合可以實現等待/通知模式,但是這兩者在使用方式以及功能特性上還是有差別的。Object和Condition接口的一些對比。摘自《Java並發編程的藝術》

技術分享圖片

一、Condition接口介紹和示例

首先我們需要明白condition對象是依賴於lock對象的,意思就是說condition對象需要通過lock對象進行創建出來(調用Lock對象的newCondition()方法)。consition的使用方式非常的簡單。但是需要註意在調用方法前獲取鎖。

  1 package com.ydl.test.juc;
  2 
  3 import java.util.concurrent.ExecutorService;
  4 import java.util.concurrent.Executors;
  5 import java.util.concurrent.locks.Condition;
  6 import java.util.concurrent.locks.Lock;
  7 import java.util.concurrent.locks.ReentrantLock;
  8 
  9 public class ConditionUseCase {
 10
11 public Lock lock = new ReentrantLock(); 12 public Condition condition = lock.newCondition(); 13 14 public static void main(String[] args) { 15 ConditionUseCase useCase = new ConditionUseCase(); 16 ExecutorService executorService = Executors.newFixedThreadPool (2); 17
executorService.execute(new Runnable() { 18 @Override 19 public void run() { 20 useCase.conditionWait(); 21 } 22 }); 23 executorService.execute(new Runnable() { 24 @Override 25 public void run() { 26 useCase.conditionSignal(); 27 } 28 }); 29 } 30 31 public void conditionWait() { 32 lock.lock(); 33 try { 34 System.out.println(Thread.currentThread().getName() + "拿到鎖了"); 35 System.out.println(Thread.currentThread().getName() + "等待信號"); 36 condition.await(); 37 System.out.println(Thread.currentThread().getName() + "拿到信號"); 38 }catch (Exception e){ 39 40 }finally { 41 lock.unlock(); 42 } 43 } 44 public void conditionSignal() { 45 lock.lock(); 46 try { 47 Thread.sleep(5000); 48 System.out.println(Thread.currentThread().getName() + "拿到鎖了"); 49 condition.signal(); 50 System.out.println(Thread.currentThread().getName() + "發出信號"); 51 }catch (Exception e){ 52 53 }finally { 54 lock.unlock(); 55 } 56 } 57 58 } 59 60
  1 pool-1-thread-1拿到鎖了
  2 pool-1-thread-1等待信號
  3 pool-1-thread-2拿到鎖了
  4 pool-1-thread-2發出信號
  5 pool-1-thread-1拿到信號

如示例所示,一般都會將Condition對象作為成員變量。當調用await()方法後,當前線程會釋放鎖並在此等待,而其他線程調用Condition對象的signal()方法,通知當前線程後,當前線程才從await()方法返回,並且在返回前已經獲取了鎖。

二、Condition接口常用方法

condition可以通俗的理解為條件隊列。當一個線程在調用了await方法以後,直到線程等待的某個條件為真的時候才會被喚醒。這種方式為線程提供了更加簡單的等待/通知模式。Condition必須要配合鎖一起使用,因為對共享狀態變量的訪問發生在多線程環境下。一個Condition的實例必須與一個Lock綁定,因此Condition一般都是作為Lock的內部實現。

  1. await() :造成當前線程在接到信號或被中斷之前一直處於等待狀態。

  2. await(long time, TimeUnit unit) :造成當前線程在接到信號、被中斷或到達指定等待時間之前一直處於等待狀態。

  3. awaitNanos(long nanosTimeout) :造成當前線程在接到信號、被中斷或到達指定等待時間之前一直處於等待狀態。返回值表示剩余時間,如果在nanosTimesout之前喚醒,那麽返回值 = nanosTimeout - 消耗時間,如果返回值 <= 0 ,則可以認定它已經超時了。

  4. awaitUninterruptibly() :造成當前線程在接到信號之前一直處於等待狀態。【註意:該方法對中斷不敏感】。

  5. awaitUntil(Date deadline) :造成當前線程在接到信號、被中斷或到達指定最後期限之前一直處於等待狀態。如果沒有到指定時間就被通知,則返回true,否則表示到了指定時間,返回返回false。

  6. signal() :喚醒一個等待線程。該線程從等待方法返回前必須獲得與Condition相關的鎖。

  7. signal()All :喚醒所有等待線程。能夠從等待方法返回的線程必須獲得與Condition相關的鎖。

三、Condition接口原理簡單解析

Condition是AQS的內部類。每個Condition對象都包含一個隊列(等待隊列)。等待隊列是一個FIFO的隊列,在隊列中的每個節點都包含了一個線程引用,該線程就是在Condition對象上等待的線程,如果一個線程調用了Condition.await()方法,那麽該線程將會釋放鎖、構造成節點加入等待隊列並進入等待狀態。等待隊列的基本結構如下所示。

技術分享圖片

等待分為首節點和尾節點。當一個線程調用Condition.await()方法,將會以當前線程構造節點,並將節點從尾部加入等待隊列。新增節點就是將尾部節點指向新增的節點。節點引用更新本來就是在獲取鎖以後的操作,所以不需要CAS保證。同時也是線程安全的操作。

3.2、等待

當線程調用了await方法以後。線程就作為隊列中的一個節點被加入到等待隊列中去了。同時會釋放鎖的擁有。當從await方法返回的時候。一定會獲取condition相關聯的鎖。當等待隊列中的節點被喚醒的時候,則喚醒節點的線程開始嘗試獲取同步狀態。如果不是通過 其他線程調用Condition.signal()方法喚醒,而是對等待線程進行中斷,則會拋出InterruptedException異常信息。

3.3、通知

調用Condition的signal()方法,將會喚醒在等待隊列中等待最長時間的節點(條件隊列裏的首節點),在喚醒節點前,會將節點移到同步隊列中。當前線程加入到等待隊列中如圖所示:

技術分享圖片

在調用signal()方法之前必須先判斷是否獲取到了鎖。接著獲取等待隊列的首節點,將其移動到同步隊列並且利用LockSupport喚醒節點中的線程。節點從等待隊列移動到同步隊列如下圖所示:

技術分享圖片

被喚醒的線程將從await方法中的while循環中退出。隨後加入到同步狀態的競爭當中去。成功獲取到競爭的線程則會返回到await方法之前的狀態。

四、總結

調用await方法後,將當前線程加入Condition等待隊列中。當前線程釋放鎖。否則別的線程就無法拿到鎖而發生死鎖。自旋(while)掛起,不斷檢測節點是否在同步隊列中了,如果是則嘗試獲取鎖,否則掛起。當線程被signal方法喚醒,被喚醒的線程將從await()方法中的while循環中退出來,然後調用acquireQueued()方法競爭同步狀態。

五、利用Condition實現生產者消費者模式

  1 
  2 import java.util.LinkedList;
  3 import java.util.concurrent.locks.Condition;
  4 import java.util.concurrent.locks.Lock;
  5 import java.util.concurrent.locks.ReentrantLock;
  6 
  7 public class BoundedQueue {
  8 
  9     private LinkedList<Object> buffer;    //生產者容器
 10     private int maxSize ;           //容器最大值是多少
 11     private Lock lock;
 12     private Condition fullCondition;
 13     private Condition notFullCondition;
 14     BoundedQueue(int maxSize){
 15         this.maxSize = maxSize;
 16         buffer = new LinkedList<Object>();
 17         lock = new ReentrantLock();
 18         fullCondition = lock.newCondition();
 19         notFullCondition = lock.newCondition();
 20     }
 21 
 22     /**
 23      * 生產者
 24      * @param obj
 25      * @throws InterruptedException
 26      */
 27     public void put(Object obj) throws InterruptedException {
 28         lock.lock();    //獲取鎖
 29         try {
 30             while (maxSize == buffer.size()){
 31                 notFullCondition.await();       //滿了,添加的線程進入等待狀態
 32             }
 33             buffer.add(obj);
 34             fullCondition.signal(); //通知
 35         } finally {
 36             lock.unlock();
 37         }
 38     }
 39 
 40     /**
 41      * 消費者
 42      * @return
 43      * @throws InterruptedException
 44      */
 45     public Object get() throws InterruptedException {
 46         Object obj;
 47         lock.lock();
 48         try {
 49             while (buffer.size() == 0){ //隊列中沒有數據了 線程進入等待狀態
 50                 fullCondition.await();
 51             }
 52             obj = buffer.poll();
 53             notFullCondition.signal(); //通知
 54         } finally {
 55             lock.unlock();
 56         }
 57         return obj;
 58     }
 59 
 60 }
 61 

參考:

Java並發編程的藝術

Java並發編程網

Java並發之Condition