Java多執行緒之執行緒間通訊--等待(wait)/通知(notify)機制,等待/通知之交叉備份例項
1、什麼是等待/通知機制
等待/通知機制在生活中比比皆是,比如在就餐時就會出現,如圖所示。
廚師和服務員之間的互動要在“菜品傳遞臺”上,在這期間會有幾個問題:
1).廚師做完一道菜的時間不確定,所以廚師將菜品放到‘菜品傳遞言,上的時間也不確定。
2).服務員取到菜的時間取決於廚師,所以服務員就有“等待”(wait)的狀態。
3).服務員如何能取到菜呢?這又得取決於廚師,廚師將菜放在“菜品傳遞臺”上,其實就相當於一種通知(notify),這時服務員才可以拿到菜並交給就餐者。
4).在這個過程中出現了“等待/通知”機制。
需要說明的是,前面章節中多個執行緒之間也可以實現通訊,原因就是多個執行緒共同訪問同一個變數,但那種通訊機制不是“等待/通知”,兩個執行緒完全是主動式地讀取一個共享變數,在花費讀取時間的基礎上,讀到的值是不是想要的,並不能完全確定。所以現在迫切需要一種“等待/通知”機制來滿足上面的需求。
2、等待/通知機制的實現
方法wait()的作用是使當前執行程式碼的執行緒進行等待,wait()方法是Object類的方法,該方法用來將當前執行緒置入“預執行佇列”中,並且在wait()所在的程式碼行處停止執行,直到接到通知或被中斷為止。在呼叫wait()之前,執行緒必須獲得該物件的物件級別鎖,即只能在同步方法或同步塊中呼叫wait方法。在執行wait()方法後,當前執行緒釋放鎖。在從wait()返回前,執行緒與其他執行緒競爭重新獲得鎖。如果呼叫wait()時沒有持有適當的鎖,則丟擲IllegalMonitorStateException,它是RuntimeException的一個子類,因此,不需要try-catch語句進行捕捉異常。
方法notify()也要在同步方法或同步塊中呼叫,即在呼叫前,執行緒也必須獲得該物件的物件級別鎖。如果呼叫notify()時沒有持有適當的鎖,一也會丟擲IllegalMonitorStateException。該方法用來通知那些可能等待該物件的物件鎖的其他執行緒,如果有多個執行緒等待,則由執行緒規劃器隨機挑選出其中一個呈wait狀態的執行緒,對其發出通知notify,並使它等待獲取該物件的物件鎖。需要說明的是,在執行notify方法後,當前執行緒不會馬上釋放該物件鎖,呈wait狀態的執行緒也並不能馬上獲取該物件鎖,要等到執行notify()方法的執行緒將程式執行完,也就是退出synchronized程式碼塊後,當前執行緒才會釋放鎖,而呈wait狀態所在的執行緒才可以獲取該物件鎖
用一句話來總結一下wait和notify: wait使執行緒停止執行,而notify使停止的執行緒繼續執行。
public class Test1 {
public static void main(String[] args) {
try {
String newString = new String("");
newString.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Exception in thread “main” java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:503)
出現的異常的原因是沒有“物件監視器”,也就是沒有同步加鎖。
public class Test1 {
public static void main(String[] args) {
try {
String lock = new String("");
System.out.println("syn上面");
synchronized (lock) {
System.out.println("syn第一行");
lock.wait();
System.out.println("wait下的程式碼");
}
System.out.println("syn下面的程式碼");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
syn上面
syn第一行
wait方法下面的程式碼不執行了。必須使用notify才行。
關鍵字synchronized可以將任何一個Object物件作為同步物件來看待,而Java為每個Object都實現了wait()和notify()方法,它們必須用在被synchronized同步的Object的臨界區內。通過呼叫wait()方法可以使處於臨界區內的執行緒進人等待狀態,同時釋放被同步物件的鎖。而notify操作可以喚醒一個因呼叫了wait操作而處於阻塞狀態中的執行緒,使其進人就緒狀態。被重新換醒的執行緒會試圖重新獲得臨界區的控制權,也就是鎖,並繼續執行臨界區內wait之後的程式碼。如果發出noti勿操作時沒有處於阻塞狀態中的執行緒,那麼該命令會被忽略。
3、執行緒狀態
執行緒的狀態轉換是執行緒控制的基礎。執行緒狀態總的可分為五大狀態:分別是生、死、可執行、執行、等待/阻塞。用一個圖來描述如下:
1).新建狀態:執行緒物件已經建立,還沒有在其上呼叫start()方法。
2).可執行狀態:當執行緒有資格執行,但排程程式還沒有把它選定為執行執行緒時執行緒所處的狀態。當start()方法呼叫時,執行緒首先進入可執行狀態。線上程執行之後或者從阻塞、等待或睡眠狀態回來後,也返回到可執行狀態。
3).執行狀態:執行緒排程程式從可執行池中選擇一個執行緒作為當前執行緒時執行緒所處的狀態。這也是執行緒進入執行狀態的唯一一種方式。
4).等待/阻塞/睡眠狀態:這是執行緒有資格執行時它所處的狀態。實際上這個三狀態組合為一種,其共同點是:執行緒仍舊是活的,但是當前沒有條件執行。換句話說,它是可執行的,但是如果某件事件出現,他可能返回到可執行狀態。
5).死亡狀態:當執行緒的run()方法完成時就認為它死去。這個執行緒物件也許是活的,但是,它已經不是一個單獨執行的執行緒。執行緒一旦死亡,就不能復生。如果在一個死去的執行緒上呼叫start()方法,會丟擲java.lang.IllegalThreadStateException異常。
4、方法wait()鎖釋放與notify()鎖不釋放
當方法wait()被執行後,鎖被自動釋放,但執行完notify()方法,鎖卻不自動釋放。
public class Service {
public void testMethod(Object lock) {
try {
synchronized (lock) {
System.out.println("begin wait()");
lock.wait();
System.out.println("end wait()");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class MyThread1 extends Thread {
private Object lock;
public MyThread1(Object lock) {
super();
this.lock = lock;
}
public void run() {
Service service = new Service();
service.testMethod(lock);
}
}
public class MyThread2 extends Thread{
private Object lock;
public MyThread2(Object lock) {
super();
this.lock = lock;
}
public void run() {
Service service = new Service();
service.testMethod(lock);
}
}
public class Run {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
MyThread1 a = new MyThread1(lock);
a.start();
MyThread2 b = new MyThread2(lock);
b.start();
}
}
begin wait()
begin wait()
方法wait()自動釋放鎖
如果將Service方法裡的lock.wait()修改為Thread.sleep(2000);,就成了同步效果,輸出結果為:
begin wait()
原因是sleep方法不釋放鎖。
證明:方法notify()被執行後,不釋放鎖
將上面的service類修改為:
public class Service {
public void testMethod(Object lock) {
try {
synchronized (lock) {
System.out.println("begin wait() ThreadName="
+ Thread.currentThread().getName());
lock.wait();
System.out.println(" end wait()ThreadName="
+ Thread.currentThread().getName());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void synNotifyMethod(Object lock) {
try {
synchronized (lock) {
System.out.println("begin wait() ThreadName="
+ Thread.currentThread().getName());
lock.notify();
Thread.sleep(3000);
System.out.println(" end wait()ThreadName="
+ Thread.currentThread().getName());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
MyThread2.java裡的run 方法修改為:
public void run() {
Service service = new Service();
service.synNotifyMethod(lock);
}
begin wait() ThreadName=Thread-0
begin wait() ThreadName=Thread-1
end wait()ThreadName=Thread-1
end wait()ThreadName=Thread-0
此實驗說明:必須執行完notify()方法所在的同步synchronized程式碼塊後才釋放鎖。
public class Service {
public void testMethod(Object lock) {
try {
synchronized (lock) {
System.out.println("begin wait() ThreadName="
+ Thread.currentThread().getName());
lock.wait();
System.out.println(" end wait()ThreadName="
+ Thread.currentThread().getName());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
MyThread1.java同上不變
public class Run {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
MyThread1 a = new MyThread1(lock);
a.start();
Thread.sleep(3000);
a.interrupt();
}
}
begin wait() ThreadName=Thread-0
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:503)
出現異常:因為wait狀態的執行緒被interrupt了
通過上面的幾個實驗可以總結如下3點:
1)執行完同步程式碼塊就會釋放物件的鎖。
2)在執行同步程式碼塊的過程中,遇到異常而導致執行緒終止,鎖也會被釋放。
3)在執行同步程式碼塊的過程中,執行了鎖所屬物件的wait()方法,這個執行緒會釋放物件鎖,而此執行緒物件會進人執行緒等待池中,等待被喚醒。
notify()一次只能喚醒一個執行緒,notify()可以喚醒全部執行緒。
5、方法wait(long)的使用
帶一個引數的wait(long)方法的功能是等待某一時間內是否有執行緒對鎖進行喚醒,如果超過這個時間則自動喚醒。這裡就不寫例子了。
6、通過過早
如果通過過早,則會打亂程式正常的執行邏輯。 下面是正常的
public class MyRunnable {
private String lock = new String("");
private Runnable runnableA = new Runnable() {
public void run() {
try {
synchronized (lock) {
System.out.println("begin wait");
lock.wait();
System.out.println("end wait");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
private Runnable runnableB = new Runnable() {
public void run() {
synchronized (lock) {
System.out.println("begin notify");
lock.notify();
System.out.println("end notify");
}
}
};
public static void main(String[] args) {
MyRunnable run = new MyRunnable();
Thread a = new Thread(run.runnableA);
a.start();
Thread b = new Thread(run.runnableB);
b.start();
}
}
begin wait
begin notify
end notify
end wait
將上面的main方法修改(修改執行緒的啟動順序)為如下
public static void main(String[] args) throws InterruptedException {
MyRunnable run = new MyRunnable();
Thread b = new Thread(run.runnableA);
b.start();
Thread a = new Thread(run.runnableB);
a.start();
}
begin notify
end notify
begin wait
方法wait永遠不能被通知。如果先通知,則wait方法也就沒必要執行了。
7、等待wait的條件發生變化
在使用wait/notify模式時,還需要注意另外一種情況,也就是wait等待的條件發生了變化,也容易造成程式邏輯的混亂。
public class ValueObjects {
public static List list = new ArrayList();
}
//這個類新增資料
public class Add {
private String lock;
public Add(String lock) {
this.lock = lock;
}
public void add() {
synchronized (lock) {
ValueObjects.list.add("anyString");
lock.notifyAll();
}
}
}
//這個類減去資料
public class Subtract {
private String lock;
public Subtract(String lock) {
this.lock = lock;
}
public void subtract() {
try {
synchronized (lock) {
if (ValueObjects.list.size() == 0){
System.out.println("wait begin ThreadName="
+ Thread.currentThread().getName());
lock.wait();
System.out.println("wait end ThreadName="
+ Thread.currentThread().getName());
}
ValueObjects.list.remove(0);
System.out.println("list size=" + ValueObjects.list.size());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 兩個執行緒類程式碼
public class MyThread1 extends Thread {
private Add p;
public MyThread1(Add p) {
super();
this.p = p;
}
public void run() {
p.add();
}
}
public class MyThread2 extends Thread{
private Subtract r;
public MyThread2(Subtract r) {
super();
this.r = r;
}
public void run() {
r.subtract();
}
}
public class Run {
public static void main(String[] args) throws InterruptedException {
String lock = new String("");
Add add = new Add(lock);
Subtract subtract = new Subtract(lock);
MyThread2 subThread1 = new MyThread2(subtract);
subThread1.setName("subtract1-Thread");
subThread1.start();
MyThread2 subThread12 = new MyThread2(subtract);
subThread12.setName("subtract2-Thread");
subThread12.start();
Thread.sleep(1000);
MyThread1 addThread = new MyThread1(add);
addThread.setName("add-Thread");
addThread.start();
}
}
wait begin ThreadName=subtract2-Thread
wait begin ThreadName=subtract1-Thread
wait end ThreadName=subtract1-Thread
list size=0
wait end ThreadName=subtract2-Thread
Exception in thread “subtract2-Thread” java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:635)
at java.util.ArrayList.remove(ArrayList.java:474)
出現這樣異常的原因是因為有兩個實現刪除removed操作的執行緒,它們在Thread.sleep(1000);之前都執行了wait()方法,呈等待狀態,當加操作的執行緒在1秒之後被執行時,通知了所有呈wait等待狀態的減操作的執行緒,那麼第一個實現減操作的執行緒能正確地刪除list中索引為0的資料,但第二個實現減操作的執行緒則出現索引溢位的異常,因為list中僅僅添加了一個數據,也只能刪除一個數據,所以沒有第二個資料可供刪除。如何解決這樣的問題呢?
很簡單,把Subtract.java中的subtract方法裡的 if 代換為 while就行了,結果如下:
wait begin ThreadName=subtract2-Thread
wait begin ThreadName=subtract1-Thread
wait end ThreadName=subtract1-Thread
list size=0
wait end ThreadName=subtract2-Thread
wait begin ThreadName=subtract2-Thread
二、等待/通知之交叉備份例項
題目:建立20個執行緒,其中10個執行緒是將資料備份到A資料庫中,另外10個執行緒將資料備份到B資料庫中,並且備份A資料庫和B資料庫是交叉進行的。
首先創建出20個執行緒,效果如圖3-41所示。
通過一些手段將這20個執行緒的執行效果變成有序的,如圖3-42所示。
使用的技術還是等待/通知。
public class DBTools {
//確保備份資料庫A首先執行
private volatile boolean prevIsA = false;
public synchronized void backupA() {
try {
while (prevIsA == true) {
wait();
}
for (int i=0; i<5; i++) {
System.out.println("+++++");
}
prevIsA = true;
notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void backupB() {
try {
while (prevIsA == false) {
wait();
}
for (int i=0; i<5; i++) {
System.out.println("^^^^^");
}
prevIsA = false;
notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
兩個自定義執行緒
public class BackupA extends Thread {
private DBTools dbTools;
public BackupA(DBTools dbTools) {
this.dbTools = dbTools;
}
public void run() {
dbTools.backupA();
}
}
public class BackupB extends Thread {
private DBTools dbTools;
public BackupB(DBTools dbTools) {
this.dbTools = dbTools;
}
public void run() {
dbTools.backupB();
}
}
public class Run {
public static void main(String[] args) throws InterruptedException {
DBTools dbTools = new DBTools();
for (int i=0; i<20; i++) {
BackupB output = new BackupB(dbTools);
output.start();
BackupA input = new BackupA(dbTools);
input.start();
}
}
}
+++++
+++++
+++++
+++++
+++++
^^^^^
^^^^^
^^^^^
^^^^^
^^^^^
+++++
+++++
+++++
+++++
+++++
^^^^^
^^^^^
^^^^^
^^^^^
^^^^^
……
列印的效果是交替執行的。
交替列印的原理就是使用如下程式碼作為標記:
private boolean volatile prevIsA = false;
實現了A和B執行緒交替備份的效果