Java多執行緒學習筆記14之執行緒間通訊
詳細程式碼見:github程式碼地址
本節內容:
1) 實戰 等待/通知之交叉備份
2) 方法join的使用(Jdk文件翻譯及原始碼解析)
join()及join(long)的使用和實現原理
join(long)和sleep(long)的區別及原始碼分析
(3) 實戰 等待/通知之交叉備份
建立20個執行緒,其中10個執行緒是將資料備份到A資料庫中,另外10個執行緒將資料備份到B資料庫中,並且備份
A資料庫和B資料庫是交叉進行的。
我們使用等待/通知技術,讓20個執行緒的執行效果變成有序的。
package chapter03.section1.thread_3_1_14.project_1_wait_noify_insert_test; public class DBTools { /** * 變數prevIsA的主要作用就是確保備份"****"資料庫A首先執行, * 然後"@@@@"資料庫B交替進行備份 */ volatile private boolean prevIsA = false; synchronized public void backupA() { try { while(prevIsA == true) { wait(); //先前A備份,則A等待B備份 } for(int i = 0; i < 5; i++) { System.out.println("****"); } prevIsA = true; notifyAll(); } catch (InterruptedException e) { // TODO: handle exception e.printStackTrace(); } } synchronized public void backupB() { try { while(prevIsA == false) { wait(); } for(int i = 0; i < 5; i++) { System.out.println("@@@@"); } prevIsA = false; notifyAll(); } catch (InterruptedException e) { // TODO: handle exception e.printStackTrace(); } } } package chapter03.section1.thread_3_1_14.project_1_wait_noify_insert_test; public class BackupA extends Thread{ private DBTools dbtools; public BackupA(DBTools dbtools) { super(); this.dbtools = dbtools; } @Override public void run() { dbtools.backupA(); } } package chapter03.section1.thread_3_1_14.project_1_wait_noify_insert_test; public class BackupB extends Thread{ private DBTools dbtools; public BackupB(DBTools dbtools) { super(); this.dbtools = dbtools; } @Override public void run() { dbtools.backupB(); } } package chapter03.section1.thread_3_1_14.project_1_wait_noify_insert_test; public class Run { public static void main(String[] args) { DBTools dbtools = new DBTools(); for(int i = 0; i < 20; i++) { BackupB output = new BackupB(dbtools); output.start(); BackupA input = new BackupA(dbtools); input.start(); } } } /* result: @@@@ @@@@ @@@@ @@@@ @@@@ **** **** **** **** **** @@@@ @@@@ @@@@ @@@@ @@@@ 交替執行 */
結果分析:
這個程式並沒有利用好多執行緒優勢,只是一個示例
4. 方法join的使用
在很多情況下,主執行緒建立並啟動子執行緒,如果子執行緒中要進行大量的耗時運算(比如網路請
求,下載資料、讀寫檔案等)主執行緒往往將早於子執行緒結束之前結束。這時,如果主執行緒想等待
子執行緒執行完之後再結束,比如子執行緒處理一個數據,主執行緒要取得這個資料中的值,就要用
到join()方法了。方法join()的作用是等待執行緒物件銷燬。
方法join的作用是使所屬的執行緒物件x正常執行run()方法中的任務,而使當前執行緒z進行無限期
的阻塞,等待執行緒x銷燬後(即執行緒的狀態是TEMINATED)再繼續執行執行緒z後面的程式碼
join在內部使用wait()方法進行等待,而synchronized關鍵字使用的是"物件監視器"原理作為
同步
閱讀JDK9文件
join():
public final void join()
throws InterruptedException
Waits for this thread to die.
An invocation of this method behaves in exactly the same way as the invocation
等待此執行緒死亡
呼叫方法join()與呼叫join(0)作用完全相同
join(0)
Throws:
InterruptedException - if any thread has interrupted the current thread. The
interrupted status of the current thread is cleared when this exception is thrown
丟擲:
InterrupedException - 如果有任何執行緒中斷了當前的執行緒。當這個異常被丟擲,則當前執行緒
的中斷狀態被清除
join(long millis):
public final void join(long millis)
throws InterruptedException
Waits at most millis milliseconds for this thread to die. A timeout of 0 means
to wait forever.
This implementation uses a loop of this.wait calls conditioned on this.isAlive.
As a thread terminates the this.notifyAll method is invoked. It is recommended
that applications not use wait, notify, or notifyAll on Thread instances.
最多等待呼叫此方法的執行緒死亡millis毫秒,0意味著永遠等待。實現是使用一個呼叫wait(0)的迴圈
while(isAlive){this.wait(0);}.當這個執行緒被終止之後釋放鎖,this.notifyAll通知執行緒來
準備獲得這個鎖。建議應用程式不要線上程例項上使用等待、通知或notifyAll方法。
Parameters:
millis - the time to wait in milliseconds 最大等待的毫秒數
Throws:
IllegalArgumentException - if the value of millis is negative
如果millis是負數,那麼丟擲非法引數異常IllegalArgumentException
InterruptedException - if any thread has interrupted the current thread. The
interrupted status of the current thread is cleared when this exception is thrown.
如果任意執行緒中斷了當前執行緒。那麼丟擲這個異常,同時當前執行緒的中斷狀態被清除
(1) 學習方法join前的鋪墊
package chapter03.section2.thread_3_2_1.project_1_joinTest1;
public class MyTest extends Thread {
@Override
public void run() {
try {
int secondValue = (int)(Math.random() * 10000);
System.out.println(secondValue);
Thread.sleep(secondValue);
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
package chapter03.section2.thread_3_2_1.project_1_joinTest1;
public class Test {
public static void main(String[] args) {
MyTest threadTest = new MyTest();
threadTest.start();
// Thread.sleep(?)
System.out.println("我想當threadTest物件執行完畢後我再執行");
System.out.println("但上面程式碼中的sleep()中的值應該寫多少呢?");
System.out.println("答案是:根據不能確定:)");
}
}
結果分析:
只需要修改Test.java類就可以解決
package chapter03.section2.thread_3_2_2.project_1_joinTest2;
public class Test {
public static void main(String[] args) throws InterruptedException{
MyTest threadTest = new MyTest();
threadTest.start();
threadTest.join();
// Thread.sleep(?)
System.out.println("我想當threadTest物件執行完畢後我再執行");
System.out.println("但上面程式碼中的sleep()中的值應該寫多少呢?");
System.out.println("答案是:根據不能確定:)");
}
}
/*
result:
8370
我想當threadTest物件執行完畢後我再執行
但上面程式碼中的sleep()中的值應該寫多少呢?
答案是:根據不能確定:)
*/
結果分析:
join方法實現是通過wait.當main執行緒呼叫t.join時,main執行緒會獲得執行緒物件t鎖,
呼叫該物件的wait(等待時間),直到該物件喚醒main執行緒.這些具體程式碼設計到JVM
低層實現,由於本人對C++不太熟,也暫時沒有學習計劃,對於這塊感興趣的可以去看
OpenJdk 原始碼: http://openjdk.java.net/
(2) 方法join與異常
在join過程中,如果當前執行緒物件被中斷,則當前執行緒出現異常
package chapter03.section2.thread_3_2_3.project_1_joinException;
public class ThreadA extends Thread {
@Override
public void run() {
for(int i = 0; i < Integer.MAX_VALUE; i++) {
String newString = new String();
Math.random();
}
}
}
package chapter03.section2.thread_3_2_3.project_1_joinException;
public class ThreadB extends Thread{
@Override
public void run() {
try {
ThreadA a = new ThreadA();
a.start();
a.join();
System.out.println("執行緒B在run end處列印了");
} catch (InterruptedException e) {
// TODO: handle exception
System.out.println("執行緒B在catch處列印了");
e.printStackTrace();
}
}
}
package chapter03.section2.thread_3_2_3.project_1_joinException;
public class ThreadC extends Thread {
private ThreadB threadB;
public ThreadC(ThreadB threadB) {
super();
this.threadB = threadB;
}
@Override
public void run() {
threadB.interrupt();
}
}
package chapter03.section2.thread_3_2_3.project_1_joinException;
public class Run {
public static void main(String[] args) {
try {
ThreadB b = new ThreadB();
b.start();
Thread.sleep(500);
ThreadC c = new ThreadC(b);
c.start();
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
/*
result:
執行緒B在catch處列印了
java.lang.InterruptedException
at java.base/java.lang.Object.wait(Native Method)
at java.base/java.lang.Thread.join(Unknown Source)
at java.base/java.lang.Thread.join(Unknown Source)
at chapter03.section2.thread_3_2_3.project_1_joinException.ThreadB.run(ThreadB.java:10)
*/
結果分析:
可以看到執行緒B呼叫了join()方法,並且被C執行緒中斷,丟擲了InterruptedException
異常,但是A執行緒並沒有出現異常,是正常執行的狀態。
(3) 方法join(long)的使用
方法join(long)中的引數是設定等待的時間
package chapter03.section2.thread_3_2_4.project_1_joinLong;
public class MyThread extends Thread {
@Override
public void run() {
try {
System.out.println("begin Timer=" + System.currentTimeMillis());
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
package chapter03.section2.thread_3_2_4.project_1_joinLong;
public class Test extends Thread{
public static void main(String[] args) {
try {
MyThread threadTest = new MyThread();
threadTest.start();
threadTest.join(2000); //只等待2秒
// Thread.sleep(2000);
System.out.println(" end Timer=" + System.currentTimeMillis());
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
/*
result:
begin Timer=1540606857302
end Timer=1540606859304
*/
可以看到大約2秒後主執行緒開始執行,執行結束之後回到另一個執行緒繼續執行3秒
(4) 方法join(long)與sleep(long的區別)
方法join(long)的功能在內部是是使用wait(long)方法來實現的,所以join(long)方
法具有釋放鎖的特點。
下面是實現的原始碼(不包含JVM實現部分):
sleep(long millis):
/**
* Causes the currently executing thread to sleep (temporarily cease
* execution) for the specified number of milliseconds, subject to
* the precision and accuracy of system timers and schedulers. The thread
* does not lose ownership of any monitors.
*
* @param millis
* the length of time to sleep in milliseconds
*
* @throws IllegalArgumentException
* if the value of {@code millis} is negative
*
* @throws InterruptedException
* if any thread has interrupted the current thread. The
* <i>interrupted status</i> of the current thread is
* cleared when this exception is thrown.
*/
public static native void sleep(long millis) throws InterruptedException;
可以看到這個方法是native方法,大家可以自己去OpenJDK檢視C++程式碼,這裡有機會以後分析。
sleep(long millis)造成當前執行緒休眠(暫時停止執行)一段特定的時間,取決於當前系統時鐘
和排程器的精度和準確度。這個執行緒不釋放任何監視器的所有權。
wait(long millis):
/**
* Waits at most {@code millis} milliseconds for this thread to
* die. A timeout of {@code 0} means to wait forever.
*
* <p> This implementation uses a loop of {@code this.wait} calls
* conditioned on {@code this.isAlive}. As a thread terminates the
* {@code this.notifyAll} method is invoked. It is recommended that
* applications not use {@code wait}, {@code notify}, or
* {@code notifyAll} on {@code Thread} instances.
*
* @param millis
* the time to wait in milliseconds
*
* @throws IllegalArgumentException
* if the value of {@code millis} is negative
*
* @throws InterruptedException
* if any thread has interrupted the current thread. The
* <i>interrupted status</i> of the current thread is
* cleared when this exception is thrown.
*/
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
其中wait()與wait(long timeout)是native方法,可以看到
millis為負數丟擲非法引數異常
millis為0,永遠等待,直到其他執行緒isAlive()為false,呼叫join方法的當前執行緒拿到鎖,
然後就可以繼續執行了
millis為正,則呼叫wait(delay)方法延遲時間到了自動喚醒呼叫join方法執行緒,然後執行
當前執行緒。
驗證Thread.sleep(long)方法不釋放鎖
package chapter03.section2.thread_3_2_5.project_1_join_sleep_1;
public class ThreadA extends Thread {
private ThreadB b;
public ThreadA(ThreadB b) {
super();
this.b = b;
}
@Override
public void run() {
try {
synchronized(b) {
b.start();
Thread.sleep(6000);
}
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
package chapter03.section2.thread_3_2_5.project_1_join_sleep_1;
public class ThreadB extends Thread{
@Override
public void run() {
try {
System.out.println(" b run begin timer="
+ System.currentTimeMillis());
Thread.sleep(5000);
System.out.println(" b run end timer="
+ System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized public void bService() {
System.out.println("列印了bService timer=" + System.currentTimeMillis());
}
}
package chapter03.section2.thread_3_2_5.project_1_join_sleep_1;
public class ThreadC extends Thread {
private ThreadB threadB;
public ThreadC(ThreadB threadB) {
super();
this.threadB = threadB;
}
@Override
public void run() {
threadB.bService();
}
}
package chapter03.section2.thread_3_2_5.project_1_join_sleep_1;
public class Run {
public static void main(String[] args) {
try {
ThreadB b = new ThreadB();
ThreadA a = new ThreadA(b);
a.start();
Thread.sleep(1000);
ThreadC c = new ThreadC(b);
c.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/*
result:
b run begin timer=1540608282186
b run end timer=1540608287194
列印了bService timer=1540608288190
*/
結果分析:
執行緒ThreadA一開始執行持有ThreadB物件的鎖,時間是6秒多,所以執行緒ThreadC只有在
ThreadA時間到達6秒後run方法執行結束釋放ThreadB的鎖時,才可以呼叫ThreadB中的
同步方法ysynchronized public void bService()方法。
驗證join()方法釋放鎖的特點
修改上面中的ThreadA.java
package chapter03.section2.thread_3_2_5.project_1_join_sleep_1;
public class ThreadA extends Thread {
private ThreadB b;
public ThreadA(ThreadB b) {
super();
this.b = b;
}
@Override
public void run() {
try {
synchronized(b) {
b.start();
b.join(); //說明join釋放鎖了
for (int i = 0; i < Integer.MAX_VALUE; i++) {
String newString = new String();
Math.random();
}
}
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
/*
result:
b run begin timer=1540608854468
列印了bService timer=1540608855473
b run
ThreadA呼叫了join,釋放了ThreadB物件鎖,所以執行緒ThreadC拿到鎖可以呼叫ThreadB
中的同步方法bService()
方法join()後面的程式碼提前執行: 出現以外
舉個例子:
package chapter03.section2.thread_3_2_6.project_1_joinMoreTest;
public class ThreadA extends Thread{
private ThreadB b;
public ThreadA(ThreadB b) {
super();
this.b = b;
}
@Override
public void run() {
try {
synchronized(b) {
System.out.println("begin A ThreadName="
+ Thread.currentThread().getName() + " "
+ System.currentTimeMillis());
Thread.sleep(5000);
System.out.println(" end A ThreadName="
+ Thread.currentThread().getName() + " "
+ System.currentTimeMillis());
}
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
package chapter03.section2.thread_3_2_6.project_1_joinMoreTest;
public class ThreadB extends Thread {
@Override
synchronized public void run() {
try {
System.out.println("begin B ThreadName="
+ Thread.currentThread().getName() + " "
+ System.currentTimeMillis());
Thread.sleep(5000);
System.out.println(" end B ThreadName="
+ Thread.currentThread().getName() + " "
+ System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package chapter03.section2.thread_3_2_6.project_1_joinMoreTest;
public class Run1 {
public static void main(String[] args) {
ThreadB b = new ThreadB();
ThreadA a = new ThreadA(b);
a.start();
b.start();
System.out.println(" main end "
+ System.currentTimeMillis());
}
}
/*
result3:
main end 1540622542511
begin A ThreadName=Thread-1 1540622542511
end A ThreadName=Thread-1 1540622547512
begin B ThreadName=Thread-0 1540622547512
end B ThreadName=Thread-0 1540622552517
*/
package chapter03.section2.thread_3_2_6.project_1_joinMoreTest;
public class RunFirst {
public static void main(String[] args) throws InterruptedException {
ThreadB b = new ThreadB();
ThreadA a = new ThreadA(b);
a.start();
b.start();
b.join(2000);
System.out.println(" main end=" + System.currentTimeMillis());
}
}
/*
result1:
begin B ThreadName=Thread-0 1540621815361
end B ThreadName=Thread-0 1540621820366
main end=1540621820366
begin A ThreadName=Thread-1 1540621820366
end A ThreadName=Thread-1 1540621825376
result2:
begin A ThreadName=Thread-1 1540621971188
end A ThreadName=Thread-1 1540621976192
main end=1540621976192
begin B ThreadName=Thread-0 1540621976192
end B ThreadName=Thread-0 1540621981197
*/