1. 程式人生 > >Java多執行緒學習筆記14之執行緒間通訊

Java多執行緒學習筆記14之執行緒間通訊

詳細程式碼見:github程式碼地址

 

本節內容:

1) 實戰 等待/通知之交叉備份

2) 方法join的使用(Jdk文件翻譯及原始碼解析)

    join()及join(long)的使用和實現原理

    join(long)和sleep(long)的區別及原始碼分析

 


(3) 實戰 等待/通知之交叉備份
建立20個執行緒,其中10個執行緒是將資料備份到A資料庫中,另外10個執行緒將資料備份到B資料庫中,並且備份
A資料庫和B資料庫是交叉進行的。
我們使用等待/通知技術,讓20個執行緒的執行效果變成有序的。

package chapter03.section1.thread_3_1_14.project_1_wait_noify_insert_test;

public class DBTools {
	
	/**
	 * 變數prevIsA的主要作用就是確保備份"****"資料庫A首先執行,
	 * 然後"@@@@"資料庫B交替進行備份
	 */
	volatile private boolean prevIsA = false;
	
	synchronized public void backupA() {
		try {
			while(prevIsA == true) {
				wait(); //先前A備份,則A等待B備份
			}
			for(int i = 0; i < 5; i++) {
				System.out.println("****");
			}
			prevIsA = true;
			notifyAll();
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
	
	synchronized public void backupB() {
		try {
			while(prevIsA == false) {
				wait();
			}
			for(int i = 0; i < 5; i++) {
				System.out.println("@@@@");
			}
			prevIsA = false;
			notifyAll();
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}


package chapter03.section1.thread_3_1_14.project_1_wait_noify_insert_test;

public class BackupA extends Thread{
	private DBTools dbtools;
	
	public BackupA(DBTools dbtools) {
		super();
		this.dbtools = dbtools;
	}
	
	@Override
	public void run() {
		dbtools.backupA();
	}
}


package chapter03.section1.thread_3_1_14.project_1_wait_noify_insert_test;

public class BackupB extends Thread{
	private DBTools dbtools;
	
	public BackupB(DBTools dbtools) {
		super();
		this.dbtools = dbtools;
	}
	
	@Override
	public void run() {
		dbtools.backupB();
	}
}


package chapter03.section1.thread_3_1_14.project_1_wait_noify_insert_test;

public class Run {

	public static void main(String[] args) {
		DBTools dbtools = new DBTools();
		for(int i = 0; i < 20; i++) {
			BackupB output = new BackupB(dbtools);
			output.start();
			BackupA input = new BackupA(dbtools);
			input.start();
		}
	}
}
/*
result:
@@@@
@@@@
@@@@
@@@@
@@@@
****
****
****
****
****
@@@@
@@@@
@@@@
@@@@
@@@@
交替執行
*/

結果分析:
這個程式並沒有利用好多執行緒優勢,只是一個示例

 

4. 方法join的使用
在很多情況下,主執行緒建立並啟動子執行緒,如果子執行緒中要進行大量的耗時運算(比如網路請
求,下載資料、讀寫檔案等)主執行緒往往將早於子執行緒結束之前結束。
這時,如果主執行緒想等待
子執行緒執行完之後再結束,比如子執行緒處理一個數據,主執行緒要取得這個資料中的值,就要用
到join()方法了。方法join()的作用是等待執行緒物件銷燬。

方法join的作用是使所屬的執行緒物件x正常執行run()方法中的任務,而使當前執行緒z進行無限期
的阻塞,等待執行緒x銷燬後(即執行緒的狀態是TEMINATED)再繼續執行執行緒z後面的程式碼
join在內部使用wait()方法進行等待,而synchronized關鍵字使用的是"物件監視器"原理作為
同步


閱讀JDK9文件

join():
public final void join​()
                throws InterruptedException
Waits for this thread to die.
An invocation of this method behaves in exactly the same way as the invocation
等待此執行緒死亡
呼叫方法join()與呼叫join(0)作用完全相同

join(0)
Throws:
InterruptedException - if any thread has interrupted the current thread. The 
interrupted status of the current thread is cleared when this exception is thrown
丟擲:
InterrupedException - 如果有任何執行緒中斷了當前的執行緒。當這個異常被丟擲,則當前執行緒
的中斷狀態被清除


join(long millis):
public final void join​(long millis)
                throws InterruptedException
Waits at most millis milliseconds for this thread to die. A timeout of 0 means 
to wait forever.
This implementation uses a loop of this.wait calls conditioned on this.isAlive. 
As a thread terminates the this.notifyAll method is invoked. It is recommended 
that applications not use wait, notify, or notifyAll on Thread instances.
最多等待呼叫此方法的執行緒死亡millis毫秒,0意味著永遠等待。實現是使用一個呼叫wait(0)的迴圈
while(isAlive){this.wait(0);}.當這個執行緒被終止之後釋放鎖,this.notifyAll通知執行緒來
準備獲得這個鎖。建議應用程式不要線上程例項上使用等待、通知或notifyAll方法。

Parameters:
millis - the time to wait in milliseconds 最大等待的毫秒數
Throws:
IllegalArgumentException - if the value of millis is negative 
如果millis是負數,那麼丟擲非法引數異常IllegalArgumentException
InterruptedException - if any thread has interrupted the current thread. The 
interrupted status of the current thread is cleared when this exception is thrown.
如果任意執行緒中斷了當前執行緒。那麼丟擲這個異常,同時當前執行緒的中斷狀態被清除


(1) 學習方法join前的鋪墊

package chapter03.section2.thread_3_2_1.project_1_joinTest1;

public class MyTest extends Thread {
	
	@Override
	public void run() {
		try {
			int secondValue = (int)(Math.random() * 10000);
			System.out.println(secondValue);
			Thread.sleep(secondValue);
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}


package chapter03.section2.thread_3_2_1.project_1_joinTest1;

public class Test {
	public static void main(String[] args) {
		MyTest threadTest = new MyTest();
		threadTest.start();
		
		// Thread.sleep(?)
		System.out.println("我想當threadTest物件執行完畢後我再執行");
		System.out.println("但上面程式碼中的sleep()中的值應該寫多少呢?");
		System.out.println("答案是:根據不能確定:)");	
	}
}
結果分析:
只需要修改Test.java類就可以解決
package chapter03.section2.thread_3_2_2.project_1_joinTest2;

public class Test {
	public static void main(String[] args) throws InterruptedException{
		MyTest threadTest = new MyTest();
		threadTest.start();
		
		threadTest.join();
		
		// Thread.sleep(?)
		System.out.println("我想當threadTest物件執行完畢後我再執行");
		System.out.println("但上面程式碼中的sleep()中的值應該寫多少呢?");
		System.out.println("答案是:根據不能確定:)");	
	}
}
/*
result:
8370
我想當threadTest物件執行完畢後我再執行
但上面程式碼中的sleep()中的值應該寫多少呢?
答案是:根據不能確定:)
*/

結果分析:
join方法實現是通過wait.當main執行緒呼叫t.join時,main執行緒會獲得執行緒物件t鎖,
呼叫該物件的wait(等待時間),直到該物件喚醒main執行緒.這些具體程式碼設計到JVM
低層實現,由於本人對C++不太熟,也暫時沒有學習計劃,對於這塊感興趣的可以去看
OpenJdk 原始碼: http://openjdk.java.net/

 

(2) 方法join與異常
在join過程中,如果當前執行緒物件被中斷,則當前執行緒出現異常

package chapter03.section2.thread_3_2_3.project_1_joinException;

public class ThreadA extends Thread {
	
	@Override
	public void run() {
		for(int i = 0; i < Integer.MAX_VALUE; i++) {
			String newString = new String();
			Math.random();
		}
	}
}


package chapter03.section2.thread_3_2_3.project_1_joinException;

public class ThreadB extends Thread{
	
	@Override
	public void run() {
		try {
			ThreadA a = new ThreadA();
			a.start();
			a.join();
			
			System.out.println("執行緒B在run end處列印了");
		} catch (InterruptedException e) {
			// TODO: handle exception
			System.out.println("執行緒B在catch處列印了");
			e.printStackTrace();
		}
	}

}


package chapter03.section2.thread_3_2_3.project_1_joinException;

public class ThreadC extends Thread {
	
	private ThreadB threadB;
	
	public ThreadC(ThreadB threadB) {
		super();
		this.threadB = threadB;
	}
	
	@Override
	public void run() {
		threadB.interrupt();
	}
}


package chapter03.section2.thread_3_2_3.project_1_joinException;

public class Run {
	
	public static void main(String[] args) {
		try {
			ThreadB b = new ThreadB();
			b.start();
			
			Thread.sleep(500);
			
			ThreadC c = new ThreadC(b);
			c.start();
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}
/*
result:
執行緒B在catch處列印了
java.lang.InterruptedException
	at java.base/java.lang.Object.wait(Native Method)
	at java.base/java.lang.Thread.join(Unknown Source)
	at java.base/java.lang.Thread.join(Unknown Source)
	at chapter03.section2.thread_3_2_3.project_1_joinException.ThreadB.run(ThreadB.java:10)
*/

結果分析:
可以看到執行緒B呼叫了join()方法,並且被C執行緒中斷,丟擲了InterruptedException
異常,但是A執行緒並沒有出現異常,是正常執行的狀態。

 

(3) 方法join(long)的使用
方法join(long)中的引數是設定等待的時間

package chapter03.section2.thread_3_2_4.project_1_joinLong;

public class MyThread extends Thread {
	@Override
	public void run() {
		try {
			System.out.println("begin Timer=" + System.currentTimeMillis());
			Thread.sleep(5000);
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}


package chapter03.section2.thread_3_2_4.project_1_joinLong;

public class Test extends Thread{
	
	public static void main(String[] args) {
		try {
			MyThread threadTest = new MyThread();
			threadTest.start();
			
			threadTest.join(2000); //只等待2秒
//			Thread.sleep(2000);
			
			System.out.println("  end Timer=" + System.currentTimeMillis());
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}
/*
result:
begin Timer=1540606857302
  end Timer=1540606859304
*/

可以看到大約2秒後主執行緒開始執行,執行結束之後回到另一個執行緒繼續執行3秒

 

(4) 方法join(long)與sleep(long的區別)
方法join(long)的功能在內部是是使用wait(long)方法來實現的,所以join(long)方
法具有釋放鎖的特點。


下面是實現的原始碼(不包含JVM實現部分):

sleep(long millis):
/**
 * Causes the currently executing thread to sleep (temporarily cease
 * execution) for the specified number of milliseconds, subject to
 * the precision and accuracy of system timers and schedulers. The thread
 * does not lose ownership of any monitors.
 *
 * @param  millis
 *         the length of time to sleep in milliseconds
 *
 * @throws  IllegalArgumentException
 *          if the value of {@code millis} is negative
 *
 * @throws  InterruptedException
 *          if any thread has interrupted the current thread. The
 *          <i>interrupted status</i> of the current thread is
 *          cleared when this exception is thrown.
 */
public static native void sleep(long millis) throws InterruptedException;
可以看到這個方法是native方法,大家可以自己去OpenJDK檢視C++程式碼,這裡有機會以後分析。
sleep(long millis)造成當前執行緒休眠(暫時停止執行)一段特定的時間,取決於當前系統時鐘
和排程器的精度和準確度。這個執行緒不釋放任何監視器的所有權。


wait(long millis):
/**
 * Waits at most {@code millis} milliseconds for this thread to
 * die. A timeout of {@code 0} means to wait forever.
 *
 * <p> This implementation uses a loop of {@code this.wait} calls
 * conditioned on {@code this.isAlive}. As a thread terminates the
 * {@code this.notifyAll} method is invoked. It is recommended that
 * applications not use {@code wait}, {@code notify}, or
 * {@code notifyAll} on {@code Thread} instances.
 *
 * @param  millis
 *         the time to wait in milliseconds
 *
 * @throws  IllegalArgumentException
 *          if the value of {@code millis} is negative
 *
 * @throws  InterruptedException
 *          if any thread has interrupted the current thread. The
 *          <i>interrupted status</i> of the current thread is
 *          cleared when this exception is thrown.
 */
public final synchronized void join(long millis)
throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }

    if (millis == 0) {
        while (isAlive()) {
            wait(0); 
        }
    } else {
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay); 
            now = System.currentTimeMillis() - base;
        }
    }
}
其中wait()與wait(long timeout)是native方法,可以看到
millis為負數丟擲非法引數異常
millis為0,永遠等待,直到其他執行緒isAlive()為false,呼叫join方法的當前執行緒拿到鎖,
然後就可以繼續執行了
millis為正,則呼叫wait(delay)方法延遲時間到了自動喚醒呼叫join方法執行緒,然後執行
當前執行緒。

 


驗證Thread.sleep(long)方法不釋放鎖

package chapter03.section2.thread_3_2_5.project_1_join_sleep_1;

public class ThreadA extends Thread {
	
	private ThreadB b;
	
	public ThreadA(ThreadB b) {
		super();
		this.b = b;
	}
	
	@Override
	public void run() {
		try {
			synchronized(b) {
				b.start();
				Thread.sleep(6000);
			}
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}


package chapter03.section2.thread_3_2_5.project_1_join_sleep_1;

public class ThreadB extends Thread{

	@Override
	public void run() {
		try {
			System.out.println("   b run begin timer="
					+ System.currentTimeMillis());
			Thread.sleep(5000);
			System.out.println("   b run   end timer="
					+ System.currentTimeMillis());
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	synchronized public void bService() {
		System.out.println("列印了bService timer=" + System.currentTimeMillis());
	}
}


package chapter03.section2.thread_3_2_5.project_1_join_sleep_1;

public class ThreadC extends Thread {
	
	private ThreadB threadB;

	public ThreadC(ThreadB threadB) {
		super();
		this.threadB = threadB;
	}

	@Override
	public void run() {
		threadB.bService();
	}
}


package chapter03.section2.thread_3_2_5.project_1_join_sleep_1;

public class Run {
	
	public static void main(String[] args) {

		try {
			ThreadB b = new ThreadB();

			ThreadA a = new ThreadA(b);
			a.start();

			Thread.sleep(1000);

			ThreadC c = new ThreadC(b);
			c.start();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}
/*
result:
   b run begin timer=1540608282186
   b run   end timer=1540608287194
列印了bService timer=1540608288190
*/

結果分析:
執行緒ThreadA一開始執行持有ThreadB物件的鎖,時間是6秒多,所以執行緒ThreadC只有在
ThreadA時間到達6秒後run方法執行結束釋放ThreadB的鎖時,才可以呼叫ThreadB中的
同步方法ysynchronized public void bService()方法。

 

驗證join()方法釋放鎖的特點
修改上面中的ThreadA.java

package chapter03.section2.thread_3_2_5.project_1_join_sleep_1;

public class ThreadA extends Thread {
	
	private ThreadB b;
	
	public ThreadA(ThreadB b) {
		super();
		this.b = b;
	}
	
	@Override
	public void run() {
		try {
			synchronized(b) {
				b.start();
				b.join(); //說明join釋放鎖了
				for (int i = 0; i < Integer.MAX_VALUE; i++) {
					String newString = new String();
					Math.random();
				}
			}
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}
/*
result:
   b run begin timer=1540608854468
列印了bService timer=1540608855473
   b run 

ThreadA呼叫了join,釋放了ThreadB物件鎖,所以執行緒ThreadC拿到鎖可以呼叫ThreadB
中的同步方法bService()

方法join()後面的程式碼提前執行: 出現以外
舉個例子:

package chapter03.section2.thread_3_2_6.project_1_joinMoreTest;

public class ThreadA extends Thread{
	private ThreadB b;
	
	public ThreadA(ThreadB b) {
		super();
		this.b = b;
	}
	
	@Override
	public void run() {
		try {
			synchronized(b) {
				System.out.println("begin A ThreadName="
						+ Thread.currentThread().getName() + "  "
						+ System.currentTimeMillis());
				Thread.sleep(5000);
				System.out.println("  end A ThreadName="
						+ Thread.currentThread().getName() + "  "
						+ System.currentTimeMillis());
			}
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}


package chapter03.section2.thread_3_2_6.project_1_joinMoreTest;

public class ThreadB extends Thread {
	@Override
	synchronized public void run() {
		try {
			System.out.println("begin B ThreadName="
					+ Thread.currentThread().getName() + "  "
					+ System.currentTimeMillis());
			Thread.sleep(5000);
			System.out.println("  end B ThreadName="
					+ Thread.currentThread().getName() + "  "
					+ System.currentTimeMillis());
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}


package chapter03.section2.thread_3_2_6.project_1_joinMoreTest;

public class Run1 {
	public static void main(String[] args) {
			ThreadB b = new ThreadB();
			ThreadA a = new ThreadA(b);
			a.start();
			b.start();
			System.out.println("                    main end "
					+ System.currentTimeMillis());
	}
}
/*
result3:
                    main end 1540622542511
begin A ThreadName=Thread-1  1540622542511
  end A ThreadName=Thread-1  1540622547512
begin B ThreadName=Thread-0  1540622547512
  end B ThreadName=Thread-0  1540622552517
 */


package chapter03.section2.thread_3_2_6.project_1_joinMoreTest;

public class RunFirst {

	public static void main(String[] args) throws InterruptedException {
		ThreadB b = new ThreadB();
		ThreadA a = new ThreadA(b);
		a.start();
		b.start();
		b.join(2000);
		System.out.println("   main end=" + System.currentTimeMillis());
	}
}
/*
result1:
begin B ThreadName=Thread-0  1540621815361
  end B ThreadName=Thread-0  1540621820366
   main end=1540621820366
begin A ThreadName=Thread-1  1540621820366
  end A ThreadName=Thread-1  1540621825376
result2:
begin A ThreadName=Thread-1  1540621971188
  end A ThreadName=Thread-1  1540621976192
   main end=1540621976192
begin B ThreadName=Thread-0  1540621976192
  end B ThreadName=Thread-0  1540621981197
*/