五、JAVA多執行緒:執行緒間通訊(wait、notify、notifyAll、wait set、自定義鎖 BooleanLock )
我們在開發多執行緒程式的時候,往往不會只存在一個獨立的執行緒,相反大多數情況下是需要多個執行緒之間進行協同工作的,如何在多個執行緒之間進行通訊,是本章學習的重點。另外,本章的最後部分將會分析synchronized關鍵字的缺陷,我們手動實現了一個顯式鎖(BooleanLock)可以解決synchronized所不具備的功能,其中也需要用到執行緒間通訊的知識。
同步阻塞與非同步非阻塞
非同步非阻塞訊息處理
單執行緒間通訊
初識wait和notify
程式碼樣例:
import java.util.LinkedList ; import static java.lang.Thread.currentThread ; public class EventQueue { private final int max ; static class Event { int i ; public Event(int i){ this.i = i ; } } private final LinkedList<Event> eventQueue = new LinkedList<>(); private final static int DEFAULT_MAX_EVENT = 10 ; public EventQueue(){ this(DEFAULT_MAX_EVENT); } public EventQueue (int max){ this.max = max; } public void offer(Event event){ synchronized (eventQueue){ if(eventQueue.size() >= max ){ try{ console("the queue is full ..."); eventQueue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } console("the new event is submitted "); eventQueue.addLast(event); eventQueue.notify(); } } public Event take(){ synchronized (eventQueue){ if(eventQueue.isEmpty()){ try { console("the queue is empty . "); eventQueue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Event event = eventQueue.removeFirst(); this.eventQueue.notify(); console("the event " + event.i + " is handled . "); return event ; } } private void console(String message){ System.out.printf("%s:%s \n", currentThread().getName() , message); } }
import java.util.concurrent.TimeUnit; public class EventClient { public static void main(String[] args) { final EventQueue eventQueue = new EventQueue() ; new Thread(() -> { for (int i = 0 ; i < 100000; i++){ eventQueue.offer(new EventQueue.Event(i)); } },"Producer" ).start(); new Thread(() -> { for (;;){ eventQueue.take(); try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } },"Consumer " ).start(); ; } }
執行截圖:
wait和notify方法詳解
wait和notify方法並不是Thread特有的方法,而是Object中的方法。也就是說JDK中每一個類都擁有這兩個方法。
先看wait的三個過載方法:
public final void wait() throws InterruptedException
public final void wait(long timeout) throws InterruptedException
public final void wait(long timeout, int nanos) throws InterruptedException
說明:
1.wait方法這三個過載方法都將呼叫wait(long timeout)方法 , wait()方法 等價於 wait(0), 0 代表永不超時。
2.Object的wait(long timeout)方法會導致當前執行緒進入阻塞,知道有其他執行緒呼叫了Object的nofify或者notifyAll方法才能將其喚醒,或者阻塞時間到達了timeout時間而自動喚醒。
3.wait方法必須擁有該物件的monitor,也就是waite方法必須在同步方法中使用。
4.當前執行緒執行了該物件的wai方法之後,將會放棄對該monitor的所有權,並進進入到與該物件關聯的wait set中,也就是說一旦執行緒執行了某個object的wait方法之後,他就會釋放對該物件的monitor的所有權, 其他執行緒也會有機會繼續爭搶該monitor的所有權。
notify解析
public final native void notify();
喚醒單個正在執行該物件的wait方法的執行緒
如果有某個執行緒由於執行該物件的wait方法而進入阻塞則會被喚醒,如果沒有則會忽略。
被喚醒的執行緒需要重新獲取對該物件所關聯monitor的lock才能繼續執行
關於wait和notify的注意事項
1.wait方法是可中斷方法,一旦呼叫了wait方法案進入了阻塞狀態, 其他執行緒可以使用interrupt方法將其打斷。
可中斷方法被打斷後會受到中斷異常InterruptedException,同時interrupt標識也會被擦除。
2.執行緒執行了某個物件的wait方法之後,會加入與之對應的wait set中, 每一個物件的monitor都會有一個與之關聯的wait set
3.當前程進入wait set之後, notify方法可以將其進行喚醒, 也就是從wait set中彈出,同時,中斷wait中的執行緒,也就將其喚醒。
4.必須在同步方法中使用wait和notify方法,因為執行wait和notify的前提條件是必須持有同步方法monitor的所有權。
wait和sleep
1.wait和sleep方法都可以使執行緒進入阻塞狀態。
2.wait和sleep方法都是可中斷方法,被中斷之後,都會受到中斷異常
3.wait是Object的方法, sleep是Thread中的特有方法
4.wait方法的執行必須要同步方法中進行,則sleep不需要
5.執行緒在同步方法中執行sleep方法是,並不釋放monitor鎖,而wait方法則會釋放
6.sleep方法短暫休眠後會主動退出阻塞,而wait方法(沒有指定wait時間)則需要被其他執行緒中斷之後才能退出阻塞
多執行緒間通訊
生產者和消費者
1.notifyAll方法
多執行緒間通訊需要用到Object的notifyAll方法,該方法與notify比較相似,都可以喚醒由於呼叫了wait方法而阻塞的執行緒,但是notify方法只能喚醒其中的一個執行緒,而notifyAll方法則可以同時喚醒全部的阻塞執行緒,同時被喚醒的執行緒仍然需要繼續爭搶monitor的鎖。
程式碼樣例:
package com.zl.step5.eventQueue2;
import java.util.LinkedList;
import static java.lang.Thread.currentThread;
public class EventQueue {
private final int max ;
static class Event {
}
private final LinkedList<Event> eventQueue = new LinkedList<>();
private final static int DEFAULT_MAX_EVENT = 10 ;
public EventQueue(){
this(DEFAULT_MAX_EVENT);
}
public EventQueue (int max){
this.max = max;
}
public void offer(Event event){
synchronized (eventQueue){
while(eventQueue.size() >= max ){
try{
console("the queue is full ...");
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
console("the new event is submitted :" +event);
eventQueue.addLast(event);
eventQueue.notifyAll();
}
}
public Event take(){
synchronized (eventQueue){
while(eventQueue.isEmpty()){
try {
console("the queue is empty . ");
eventQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Event event = eventQueue.removeFirst();
this.eventQueue.notifyAll();
console("the event " + event + " is handled . ");
return event ;
}
}
private void console(String message){
System.out.printf("%s:%s \n", currentThread().getName() , message);
}
}
package com.zl.step5.eventQueue2;
import java.util.concurrent.TimeUnit;
public class EventClient {
public static void main(String[] args) {
final EventQueue eventQueue = new EventQueue() ;
new Thread(() -> {
for (;;){
eventQueue.offer(new EventQueue.Event());
}
},"Producer-1" ).start();
new Thread(() -> {
for (;;){
eventQueue.offer(new EventQueue.Event());
}
},"Producer-2" ).start();
new Thread(() -> {
for (;;){
eventQueue.take();
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"Consumer " ).start(); ;
}
}
執行緒休息室wait set
執行緒呼叫了某個物件的wait方法之後,都會被加入該物件的monitor關聯的wait set中, 並且釋放monitor的所有權
nofify方法:
notifyAll方法
自定義顯示鎖BooleanLock
synchronized關鍵字的缺陷
synchronized關鍵字給提供了一種排他式的資料同步機制,某個執行緒在獲取monitor lock的時候可能會被阻塞,
有兩個明顯的缺陷:
1.無法控制阻塞時長
2.阻塞不可被中斷
測試程式碼:
package com.zl.step5;
import java.util.concurrent.TimeUnit;
public class SynchronizedDefect {
public synchronized void syncMethod(){
try {
System.out.printf("Thread [ %s ] is running .....\n ", Thread.currentThread().getName());
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
SynchronizedDefect defect = new SynchronizedDefect();
Thread t1 = new Thread(defect::syncMethod,"T1");
t1.start();
TimeUnit.MILLISECONDS.sleep(2);
Thread t2 = new Thread(defect::syncMethod,"T2");
t2.start();
}
}
package com.zl.step5;
import java.util.concurrent.TimeUnit;
public class SynchronizedDefect2 {
public synchronized void syncMethod(){
try {
System.out.printf("Thread [ %s ] is running .....\n ", Thread.currentThread().getName());
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
SynchronizedDefect2 defect = new SynchronizedDefect2();
Thread t1 = new Thread(defect::syncMethod,"T1");
t1.start();
TimeUnit.MILLISECONDS.sleep(2);
Thread t2 = new Thread(defect::syncMethod,"T2");
t2.start();
TimeUnit.MILLISECONDS.sleep(2);
t2.interrupt();
System.out.printf("t2 interrupted : %s \n ", t2.isInterrupted());
System.out.printf("t2 state : %s \n ", t2.getState());
}
}
t2 在競爭資源的時候,處於阻塞狀態 ,是無法被打斷的。
顯示鎖BooleanLock
使其具備synchronized關鍵字所有的功能,同時又具備可中斷和lock超時的功能
1.定義Lock介面
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
* 1.lock()方法永遠阻塞,除非獲取到了鎖,這一點和synchronized非常類似
* 但是該方法是可以被中斷的,中斷的時候,丟擲InterruptedException
* 2.lock(long mills) 方法除了可以被中斷以外,還增加了對應的超時功能
* 3.unlock() 方法可用來釋放鎖
* 4.getBlockedThreads() 用於獲取當前有哪些執行緒被阻塞
*
*
*/
public interface Lock {
void lock() throws InterruptedException;
void lock(long mills) throws InterruptedException, TimeoutException ;
void unlock();
List<Thread> getBlockedThreads();
}
* 1.lock()方法永遠阻塞,除非獲取到了鎖,這一點和synchronized非常類似
* 但是該方法是可以被中斷的,中斷的時候,丟擲InterruptedException
* 2.lock(long mills) 方法除了可以被中斷以外,還增加了對應的超時功能
* 3.unlock() 方法可用來釋放鎖
* 4.getBlockedThreads() 用於獲取當前有哪些執行緒被阻塞
2.實現BooleanLock
BooleanLock是Lock的一個boolean實現,通過控制一個Boolean變數的開關來決定是否允許當前的執行緒獲取該鎖。
程式碼:
package com.zl.step5.booleanlock;
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
* 1.lock()方法永遠阻塞,除非獲取到了鎖,這一點和synchronized非常類似
* 但是該方法是可以被中斷的,中斷的時候,丟擲InterruptedException
* 2.lock(long mills) 方法除了可以被中斷以外,還增加了對應的超時功能
* 3.unlock() 方法可用來釋放鎖
* 4.getBlockedThreads() 用於獲取當前有哪些執行緒被阻塞
*
*
*/
public interface Lock {
void lock() ;
void lock(long mills) throws InterruptedException, TimeoutException ;
void unlock();
List<Thread> getBlockedThreads();
}
package com.zl.step5.booleanlock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
public class BooleanLock implements Lock {
// 當前擁有鎖的執行緒
private Thread lockThread ;
// ture 該鎖已經被某個執行緒所獲得
// false代表該鎖沒有被任何執行緒獲得或者已經被釋放
private boolean locked = false ;
// 用來儲存哪些執行緒在獲取當前執行緒時計入阻塞狀態
private final List<Thread> blockedList = new ArrayList<>();
@Override
public synchronized void lock() {
// System.out.println("lock 當前執行緒:"+currentThread() );
// System.out.println("lock 持有鎖執行緒:"+lockThread );
// 使用lock方法使用同步程式碼塊的方式進行方法同步
synchronized (this){
// 如果當前鎖已經被某個執行緒所獲得,則該執行緒加入阻塞佇列
// 並且使當前執行緒wait釋放對this monitor的所有權
while (locked){
final Thread tempThread = currentThread();
try{
if(!blockedList.contains(currentThread())){
blockedList.add(currentThread());
}
this.wait();
}catch (InterruptedException e){
blockedList.remove(tempThread) ;
}
}
// 如果當前鎖沒有被其他執行緒所獲得,則該執行緒嘗試從阻塞佇列中刪除自己
blockedList.remove(currentThread());
// locked 開關指定為true
this.locked = true ;
// 記錄獲取鎖的執行緒
this.lockThread = currentThread() ;
}
}
@Override
public void lock(long mills) throws InterruptedException, TimeoutException {
synchronized (this){
// 如果mills 不合法, 則預設呼叫lock方法 ,當然也可以丟擲引數非法異常
if(mills <= 0){
this.lock();
}else{
long remainingMills = mills ;
long endMills = currentTimeMillis() + remainingMills ;
while(locked){
//如果remainingMills 小於等於0 ,
// 則意味著當前執行緒被其他執行緒喚醒或者在指定的wait時間到了之後,
// 還沒有獲得鎖,則丟擲超時異常
if(remainingMills <= 0){
throw new TimeoutException("can not get the lock during " + mills) ;
}
if(!blockedList.contains(currentThread())){
blockedList.add(currentThread());
}
// 呼叫wait
this.wait(remainingMills);
// 重新計算remainingMills 時間,因為該執行緒有可能會被喚醒
remainingMills = endMills - currentTimeMillis() ;
}
blockedList.remove(currentThread()) ;
this.locked = true ;
this.lockThread = currentThread() ;
}
}
}
@Override
public void unlock() {
// System.out.println("unlock當前執行緒:"+currentThread() );
// System.out.println("unlock持有鎖執行緒:"+lockThread );
synchronized (this){
//解鎖, 那個執行緒加的鎖,只能由該執行緒進行解鎖
if (lockThread == currentThread()){
this.locked = false;
Optional.of(currentThread().getName()+" release the lock." ).ifPresent(System.out::println);
this.notifyAll();
}else{
System.out.println(currentThread() + " unlock fail ... ");
}
}
}
@Override
public List<Thread> getBlockedThreads() {
return Collections.unmodifiableList(blockedList);
}
}
package com.zl.step5.booleanlock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.ThreadLocalRandom.current;
public class BooleanLockTest {
private final Lock lock = new BooleanLock();
public void syncMethod() {
try {
lock.lock(10 * 1000L);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
int randomInt = current().nextInt(3) ;
System.out.println(currentThread() + " get the lock , is running .... ");
TimeUnit.SECONDS.sleep(randomInt);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("mian Thread : " + Thread.currentThread());
BooleanLockTest blt = new BooleanLockTest();
IntStream.range(0,10)
.mapToObj(i -> new Thread(blt::syncMethod))
.forEach(Thread::start);
// new Thread(blt::syncMethod,"T1").start();
//
// TimeUnit.MILLISECONDS.sleep(2);
//
// Thread t2 = new Thread(blt::syncMethod,"T2") ;
//
// t2.start();;
// TimeUnit.MILLISECONDS.sleep(2);
// t2.interrupt();
}
}
本文整理來源於:
《Java高併發程式設計詳解:多執行緒與架構設計》 --汪文君