Java多執行緒學習筆記11之執行緒間通訊
本文是我學習Java多執行緒以及高併發知識的第一本書的學習筆記,
書名是<<Java多執行緒程式設計核心技術>>,作者是大佬企業高階專案經理
高洪巖前輩,在此向他致敬。我將配合開發文件以及本書和其他的部落格
奉獻著的文章來學習,同時做一些簡單的總結。有些基礎的東西我就不
細分析了,建議以前接觸過其他語言多執行緒或者沒有系統學習過多執行緒
的開發者來看。另外需要注意的是,部落格中給出的一些英文文件我就簡單
翻譯了,重要的部分會詳細翻譯,要不太浪費時間了,這個是我想提高
自己的英文閱讀水平和文件檢視能力,想要積攢內功的人可以用有谷歌
翻譯自己看文件細讀。(中文文件建議只參考,畢竟你懂得...)
詳細程式碼見
本節內容:
1) 等待/通知機制概念
2) 中斷和輪詢兩種基本的方式
3) 等待/通知機制的實現
4) wait()/notify()/wait(long)/notifyAll()文件翻譯
執行緒間通訊
執行緒是作業系統中獨立的個體,使執行緒間進行通訊後,系統之間的互動性會更強大,在大大提高 CPU利用率的同時還會使程式設計師對各執行緒任務在處理的過程中進行有效的把控和監督。重點掌握以下幾個內容:
(1) 使用wait/notify機制實現執行緒間的通訊 (2) 生產者/消費者模式的實現 (3) 方法join的使用 (4) ThreadLocal類的使用
1.等待/通知機制
(1) 不使用等待/通知機制實現執行緒間通訊 在實驗中使用sleep()結合while(true)死迴圈法來實現多個執行緒間通訊。其實就是拿一個佇列 來實現
package chapter03.section1.thread_3_1_1.project_1_TwoThreadTransData; import java.util.ArrayList; import java.util.List; public class MyList { private List<String> list = new ArrayList<>(); public void add() { list.add("Clarence"); } public int size() { return list.size(); } } package chapter03.section1.thread_3_1_1.project_1_TwoThreadTransData; public class ThreadA extends Thread{ private MyList list; public ThreadA(MyList list) { super(); this.list = list; } @Override public void run() { try { for(int i = 0; i < 10; i++) { list.add(); System.out.println("添加了" + (i + 1) + "個元素"); Thread.sleep(1000); } } catch (InterruptedException e) { // TODO: handle exception e.printStackTrace(); } } } package chapter03.section1.thread_3_1_1.project_1_TwoThreadTransData; public class ThreadB extends Thread{ private MyList list; public ThreadB(MyList list) { super(); this.list = list; } @Override public void run() { try { while (true) { System.out.println(list.size()); //疑問?為什麼沒有上面的語句就輪詢檢測不到list.size大小了 if (list.size() == 5) { System.out.println("==5了,執行緒b要退出了!"); throw new InterruptedException(); } } } catch (InterruptedException e) { e.printStackTrace(); } } } package chapter03.section1.thread_3_1_1.project_1_TwoThreadTransData; public class Test { public static void main(String[] args) { MyList service = new MyList(); ThreadA a = new ThreadA(service); a.setName("A"); a.start(); ThreadB b = new ThreadB(service); b.setName("B"); b.start(); } } /* result: ............. 添加了5個元素 ==5了,執行緒b要退出了! java.lang.InterruptedException at chapter03.section1.thread_3_1_1.project_1_TwoThreadTransData.ThreadB.run(ThreadB.java:20) 添加了6個元素 添加了7個元素 添加了8個元素 添加了9個元素 添加了10個元素 */
可以看到是通過執行緒B不斷輪詢佇列的大小來達到通訊的目的。
注:雖然兩個執行緒間實現了通訊,但有一個弊端就是,執行緒ThreadB.java不停地通過while語句輪 詢機制來檢測某一個條件,這樣會浪費CPU資源。如果輪詢的時間間隔很小,更浪費CPU資源; 如果輪詢的時間間隔很大,有可能會取不到想要得到的資料。所以就需要有一種機制來講減少 CPU的資源浪費,而且還可以實現在多個執行緒間通訊,它就是"wait/notify"機制
大家在學習硬體方面的知識應該知道CPU輪詢檢測引腳高低電平狀態或者收到中斷請求來轉到 中斷服務程式,在這裡簡單介紹一下.中斷的基本概念:
程式中斷通常簡稱中斷,是指CPU在正常執行程式的過程中,由於預選安排或發生了各種 隨機的內部或外部事件(根據中斷源的不同,所以把中斷分為硬體中斷和軟體中斷兩大類,而 如硬體中斷又可分為外部中斷和內部中斷,外部中斷一般是指計算機外設發出的中斷請求;內部 中斷是指因硬體出錯或運算出錯所引起的中斷),使CPU中斷正在執行的程式,而轉到為相應 的服務程式去處理(有一箇中斷程式表可以查詢),這個過程稱為程式中斷。
輪詢的基本概念:
輪詢(Polling)I/O方式或程式控制I/O方式,是讓CPU以一定的週期按次序查詢每一個外設, 看它是否有資料輸入或輸出的要求,若有,則進行相應的輸入/輸出服務;若無,或I/O處理完畢, CPU就接著查詢下一個外設。
(2) wait/notify機制概述 java的wait/nofity的通知機制可以用來實現執行緒間通訊,wait表示執行緒的等待,呼叫該方法 會導致執行緒阻塞,直至另一執行緒呼叫notify或notifyAll方法才可令其繼續執行。 多個執行緒共同訪問同一個變數起到通訊作用,但那種通訊機制不是"等待/通知",兩個執行緒完全是 主動式地讀取一個共享變數,在花費讀取時間的基礎上,讀到的值不是想要的,並不能完全確定。
(3) 等待/通知機制的實現wait方法: wait()的作用是使當前執行程式碼的執行緒進行等待,wait()方法是Object類的方法,該方法 用來將當前執行緒置入"預執行佇列"中,並且在wait()所在的程式碼行處停止執行,直到接到通知 或被中斷為止。在呼叫wait()之前,執行緒必須獲得該物件的物件級別鎖,即只能在同步方法或 同步塊中呼叫wait()方法。在執行wait()方法後,當前執行緒釋放鎖。在從wait()返回前,執行緒 與其他執行緒競爭重新獲得鎖。如果呼叫wait()時,沒有持有適當的鎖,則丟擲IllegalMonitor StateException異常
notify方法: 方法notify()也要在同步方法或同步塊中呼叫,即在呼叫前,執行緒也必須獲得該物件的對 象級別鎖。如果沒有呼叫notify()時沒有持有適當的鎖,也會丟擲IllegalMonitorStateExcep tion.該方法用來通知那些可能等待該物件的物件鎖的其他執行緒,如果有多個執行緒等待,則由線 程規劃器隨機挑選出其中一個呈wait狀態的執行緒,對其發出nofity通知,並使它等待獲取該對 象的物件鎖。需要說明的是,在執行notify()方法後,當前執行緒不會馬上釋放該物件鎖,呈wait 狀態的執行緒也並不能馬上獲取該物件鎖,要等待執行notify()方法的執行緒將程式執行完,也就是 說出synchronized程式碼塊後,當前執行緒才會釋放鎖,而呈wait狀態所在的執行緒才可以獲取該物件 鎖。當第一個獲得了該物件鎖的wait執行緒執行完畢以後,它會釋放掉該物件鎖,此時如果該物件 沒有再次使用notify語句,則即便該物件已經空閒,其他wait狀態等待的執行緒由於沒有得到該 物件的通知,還會繼續阻塞在wait狀態,直到這個物件發出一個notify或notifyAll. 總之: wait使執行緒停止執行,而notify使停止的執行緒繼續執行。
舉個例子:
package chapter03.section1.thread_3_1_3.project_1_test1;
public class Test1 {
public static void main(String[] args) {
try {
String newString = new String("");
newString.wait();
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
/*
result:
Exception in thread "main" java.lang.IllegalMonitorStateException
at java.base/java.lang.Object.wait(Native Method)
at java.base/java.lang.Object.wait(Unknown Source)
at chapter03.section1.thread_3_1_3.project_1_test1.Test1.main(Test1.java:7)
*/
結果分析: 可以看到沒有在同步方法或同步塊中呼叫wait()方法,丟擲IllegalMonitorStateException異常
package chapter03.section1.thread_3_1_3.project_1_test1;
public class Test2 {
public static void main(String[] args) {
try {
String lock = new String();
System.out.println("syn上面");
synchronized(lock) {
System.out.println("syn第一行");
lock.wait();
System.out.println("wait下的程式碼!");
}
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
/*
result:
syn上面
syn第一行
*/
結果分析: 可以看到main執行緒始終得不到喚醒,阻塞在wait()方法
下面我們看wait/notify如何配合起來使用 舉個例子:
package chapter03.section1.thread_3_1_3.project_2_test2;
public class MyThread1 extends Thread{
private Object lock;
public MyThread1(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
System.out.println("開始 wait time=" + System.currentTimeMillis());
lock.wait();
System.out.println("結束 wait time=" + System.currentTimeMillis());
}
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
package chapter03.section1.thread_3_1_3.project_2_test2;
public class MyThread2 extends Thread{
private Object lock;
public MyThread2(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
synchronized(lock) {
System.out.println("開始notify time=" + System.currentTimeMillis());
lock.notify();
System.out.println("結束notify time=" + System.currentTimeMillis());
}
}
}
package chapter03.section1.thread_3_1_3.project_2_test2;
public class Test {
public static void main(String[] args) {
try {
Object lock = new Object();
MyThread1 t1 = new MyThread1(lock);
t1.start();
Thread.sleep(3000);
MyThread2 t2 = new MyThread2(lock);
t2.start();
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
/*
result:
開始 wait time=1540272811455
開始notify time=1540272814462
結束notify time=1540272814462
結束 wait time=1540272814462
*/
結果分析: 可以看到3秒後執行緒被notify通知喚醒,執行緒B執行run結束釋放lock物件鎖,執行緒A獲得鎖繼續 執行。
利用等待通知實現前面執行緒之間的通訊 舉個例子:
package chapter03.section1.thread_3_1_3.project_3_wait_notify_size5;
import java.util.ArrayList;
import java.util.List;
public class MyList {
private static List<String> list = new ArrayList<>();
public static void add() {
list.add("anyString");
}
public static int size() {
return list.size();
}
}
package chapter03.section1.thread_3_1_3.project_3_wait_notify_size5;
public class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
try {
synchronized(lock) {
if(MyList.size() != 5) {
System.out.println("wait begin "
+ System.currentTimeMillis());
lock.wait();
System.out.println("wait end "
+ System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
package chapter03.section1.thread_3_1_3.project_3_wait_notify_size5;
public class ThreadB extends Thread {
private Object lock;
public ThreadB(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
try {
synchronized(lock) {
for(int i = 0; i < 10; i++) {
MyList.add();
if(MyList.size() == 5) {
lock.notify();
System.out.println("已發出通知!");
}
System.out.println("添加了" + (i + 1) + "個元素!");
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
package chapter03.section1.thread_3_1_3.project_3_wait_notify_size5;
public class Run {
public static void main(String[] args) {
try {
Object lock = new Object();
ThreadA a = new ThreadA(lock);
a.start();
Thread.sleep(50);
ThreadB b = new ThreadB(lock);
b.start();
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
}
}
/*
result:
wait begin 1540274044982
添加了1個元素!
添加了2個元素!
添加了3個元素!
添加了4個元素!
已發出通知!
添加了5個元素!
添加了6個元素!
添加了7個元素!
添加了8個元素!
添加了9個元素!
添加了10個元素!
wait end 1540274055097
*/
結果分析: 日誌資訊wait end在最後輸出,這也說明notify()方法執行後並不立即釋放鎖。
(4) wait/notify方法文件閱讀 我們將簡單閱讀Thread類中的wait()、notify()、wait(long)、notifyAll()文件1) wait()
public final void wait()
throws InterruptedException
Causes the current thread to wait until another thread invokes the notify()
method or the notifyAll() method for this object. In other words, this method
behaves exactly as if it simply performs the call wait(0).
The current thread must own this object's monitor. The thread releases ownership
of this monitor and waits until another thread notifies threads waiting on this
object's monitor to wake up either through a call to the notify method or the
notifyAll method. The thread then waits until it can re-obtain ownership of
the monitor and resumes execution.
wait方法將導致當前執行緒等待,直到另一個執行緒呼叫notify()方法或者此物件的notifyAll()方法
換言之,這個方法就與簡單呼叫wait(0)一樣的作用。
當前執行緒必須擁有此物件的監視器,當前執行緒釋放監視器的所有權並且等待直到另一個執行緒通過呼叫
notify方法或notifyAll方法通知正在等待這個物件監視器的執行緒。然後執行緒等待,直到它可以重新獲得
監視器的所有權並恢復執行。
在一個引數版本中,中斷和虛假喚醒是有可能的,並且這個方法wait應該總被使用在迴圈中:
As in the one argument version, interrupts and spurious wakeups are possible,
and this method should always be used in a loop:
在一個引數版本中,中斷和虛假喚醒是有可能的,並且這個方法wait應該總被使用在迴圈中:
synchronized (obj) {
while (<condition does not hold>)
obj.wait();
... // Perform action appropriate to condition
}
This method should only be called by a thread that is the owner of this object's
monitor. See the notify method for a description of the ways in which a thread
can become the owner of a monitor.
這個方法應該由擁有物件監視器的執行緒呼叫.有關執行緒成為監視器所有者的方式的描述,請參閱notify
方法。
Throws:
IllegalMonitorStateException - if the current thread is not the owner of the object's
monitor.
InterruptedException - if any thread interrupted the current thread before or while
the current thread was waiting for a notification. The interrupted status of the
current thread is cleared when this exception is thrown.
丟擲的異常:
IllegalMonitorStateException - 如果當前執行緒沒有擁有物件監視器,呼叫wait方法丟擲
InterruptedException - 如果有任何執行緒在之前或噹噹前執行緒正在等待通知的時候中斷當前執行緒。當
這個異常被丟擲的時候當前執行緒的中斷標記被清除
2) wait(long)
public final void wait(long timeout)
throws InterruptedException
Causes the current thread to wait until either another thread invokes the notify()
method or the notifyAll() method for this object, or a specified amount of time
has elapsed.
The current thread must own this object's monitor.
此方法造成當前執行緒等待阻塞,直到另一個執行緒在同一物件監視器上呼叫notify()或notifyAll()方法,
或者已經過了指定的時間timeout.當前執行緒必須擁有這個物件的監視器才能呼叫wait(long)
This method causes the current thread (call it T) to place itself in the wait set
for this object and then to relinquish any and all synchronization claims on this
object. Thread T becomes disabled for thread scheduling purposes and lies dormant
until one of four things happens:
此方法使當前執行緒(T)將自己放置在該物件的阻塞佇列中,然後放棄對該物件的所有的同步宣告。執行緒T
為執行緒排程器目的被禁用,並且處於休眠狀態,直到發生以下四種情況之一:
Some other thread invokes the notify method for this object and thread T happens to
be arbitrarily chosen as the thread to be awakened.
Some other thread invokes the notifyAll method for this object.
Some other thread interrupts thread T.
The specified amount of real time has elapsed, more or less. If timeout is zero, however,
then real time is not taken into consideration and the thread simply waits until notified.
The thread T is then removed from the wait set for this object and re-enabled for thread
scheduling. It then competes in the usual manner with other threads for the right to
synchronize on the object; once it has gained control of the object, all its synchronization
claims on the object are restored to the status quo ante - that is, to the situation as
of the time that the wait method was invoked. Thread T then returns from the invocation
of the wait method. Thus, on return from the wait method, the synchronization state of
the object and of thread T is exactly as it was when the wait method was invoked.
A thread can also wake up without being notified, interrupted, or timing out, a so-called
spurious wakeup. While this will rarely occur in practice, applications must guard against
it by testing for the condition that should have caused the thread to be awakened, and
continuing to wait if the condition is not satisfied. In other words, waits should always
occur in loops, like this one:
其他一些執行緒呼叫此物件的notify方法,而執行緒T碰巧被執行緒規劃器任意選擇為要喚醒的執行緒。
其他一些執行緒呼叫這個物件的notifyAll方法
其他一些執行緒中斷了執行緒T
指定的實時時間已經超過了,或多或少。如果timeout是0,然而,則不考慮實時,執行緒只是等待被通知。
然後,執行緒T從此物件的阻塞序列中移出,並重新能夠參與到執行緒排程中。然後,它以通常的方式與其他執行緒
競爭物件的同步權;一旦它獲得了對物件的控制,它對物件的所有同步宣告都恢復到以前的狀態--也就是說
,恢復到呼叫wait方法時的狀態。然後,執行緒T從wait方法的呼叫中返回。因此,從wait方法返回時,物件
和執行緒T的同步狀態與呼叫wait方法時完全相同。
執行緒也可以在沒有被通知、中斷或超時的情況下醒來,這就是所謂的虛假喚醒。雖然在實踐中很少發生這種情況,
但應用程式必須通過測試可能造成執行緒被喚醒的條件來監控它,並在條件不滿足時繼續等待來防止這種情況的
發生。換言之,等待應該總是以迴圈的形式出現,比如下面這個:
synchronized (obj) {
while (<condition does not hold>)
obj.wait(timeout);
... // Perform action appropriate to condition
}
(For more information on this topic, see section 14.2, Condition Queues, in Brian Goetz
and others' "Java Concurrency in Practice" (Addison-Wesley, 2006) or Item 69 in Joshua
Bloch's "Effective Java (Second Edition)" (Addison-Wesley, 2008).
If the current thread is interrupted by any thread before or while it is waiting, then
an InterruptedException is thrown. This exception is not thrown until the lock status
of this object has been restored as described above.
如果當前執行緒在等待之前或等待期間被任何執行緒中斷,則丟擲InterruptedException異常。直到按照上面
描述的那樣恢復了該物件的鎖狀態,否則不會引發此異常。
Note that the wait method, as it places the current thread into the wait set for this
object, unlocks only this object; any other objects on which the current thread may be
synchronized remain locked while the thread waits.
注意,當wait方法將當前執行緒放入這個物件的阻塞佇列中時,它只會解鎖這個物件。當執行緒等待時,當前執行緒
同步持有的其他物件鎖保持鎖定
This method should only be called by a thread that is the owner of this object's monitor.
See the notify method for a description of the ways in which a thread can become the
owner of a monitor.
該方法應該由持有此物件的監視器的執行緒來呼叫,有關執行緒成為監視器所有者的方式的描述,請參閱notify方法
Parameters:
timeout - the maximum time to wait in milliseconds. 最大等待時間,單位為毫秒
Throws:
IllegalArgumentException - if the value of timeout is negative.
如果引數是負數丟擲此異常
IllegalMonitorStateException - if the current thread is not the owner of the object's monitor.
如果當前執行緒不是擁有此物件的監視器
InterruptedException - if any thread interrupted the current thread before or while the
current thread was waiting for a notification. The interrupted status of the current
thread is cleared when this exception is thrown.
如果任何執行緒在當前執行緒等待通知之前或之中中斷它。當此異常被丟擲後,當前執行緒的中斷狀態被清除
3) notify()
public final void notify()
Wakes up a single thread that is waiting on this object's monitor. If any threads are
waitingon this object, one of them is chosen to be awakened. The choice is arbitrary
and occurs at the discretion of the implementation. A thread waits on an object's
monitor by calling one of the wait methods.
The awakened thread will not be able to proceed until the current thread relinquishes
the lock on this object. The awakened thread will compete in the usual manner with
any other threads that might be actively competing to synchronize on this object;
for example, the awakened thread enjoys no reliable privilege or disadvantage in
being the next thread to lock this object.
notify方法喚醒正在等待此物件監視器的一個執行緒。如果有任何執行緒正在等待此物件,那麼將任意選擇其中
之一。這個選擇是任意的,由JVM實現決定。一個執行緒通過呼叫wait方法來等待這個物件監視器.在當前執行緒
放棄此物件上的鎖之前,喚醒的執行緒將無法繼續執行。喚醒的執行緒將以通常的方式與其他正在積極競爭此同步
此物件的執行緒競爭;例如,喚醒的執行緒在成為下一個鎖定該物件的執行緒時不享有可靠的特權或劣勢。
This method should only be called by a thread that is the owner of this object's
monitor. A thread becomes the owner of the object's monitor in one of three ways:
該方法只能由該物件的監視器的所有者執行緒呼叫。一個執行緒通過以下三種方式之一成為物件監視器的
所有者:
By executing a synchronized instance method of that object.
通過執行該物件的synchronized的同步例項方法
By executing the body of a synchronized statement that synchronizes on the object.
通過執行sychronized(lock)同步語句塊
For objects of type Class, by executing a synchronized static method of that class.
Only one thread at a time can own an object's monitor.
對於類物件,通過執行該類的synchronized靜態方法,同一時間只有一個執行緒可以擁有此物件監視器(類鎖)
Throws:
IllegalMonitorStateException - if the current thread is not the owner of this object's monitor.
4) notifyAll
和notify一樣,但是是喚醒所有等待此物件監視器的執行緒
總結: wait()方法可以使呼叫該方法的執行緒釋放共享資源的鎖,然後從執行狀態退出,進入等待佇列, 直到被再次喚醒 notify()方法可以隨機喚醒等待佇列中等待同一共享資源的"一個"執行緒,並使執行緒退出等待隊 列,進入可執行狀態,也就是notify()方法僅通知"一個"執行緒notifyAll()方法可以使所有正在等待佇列中等待統一共享資源的"全部"執行緒從等待狀態退出, 進入可執行狀態。此時,優先順序最高的那個執行緒最先執行,但也有可能是隨機執行,因為這取決於 JVM虛擬機器的實現。每個鎖物件都有兩個佇列,一個是就緒佇列,一個是阻塞佇列。就緒佇列儲存了將要獲得鎖的執行緒, 阻塞佇列儲存了被阻塞的執行緒。一個執行緒被喚醒後,才會進入就緒佇列,等待CPU的排程;反之, 一個執行緒被wait後,就會進入阻塞佇列,等待下一次被喚醒。