Java中的執行緒通訊、執行緒組、未處理的執行緒異常
執行緒通訊
傳統的執行緒通訊
假設現在系統中有兩個執行緒,這兩個執行緒分別代表存款者和取錢者,而系統有一種特殊的要求,系統要求存款者和取錢者不斷地重複存款、取錢的動作,而且要求每當存款者將錢存入指定賬戶後,取錢者就立即取錢。不允許存款者和取錢者操作連續超過兩次。
為了實現這種功能,可以藉助於Object類提供的wait()、notify()和notifyAll()三個方法,這三個方法必須由同步監視器物件來呼叫。
wait()
:導致當前執行緒等待,直到其他執行緒呼叫該同步監視器的notify()方法或notifyAll方法來喚醒該執行緒。呼叫該方法,當前執行緒會釋放對該同步監視器的鎖定。notify()
:喚醒再次同步監視器上等待的的單個執行緒,如果有多個執行緒都在等待,則喚醒任意一個。只有當前執行緒放棄對同步監視器的鎖定後,才可以執行被喚醒的執行緒。notifyAll()
:喚醒再次同步監視器上等待的所有執行緒,只有當前執行緒放棄對同步監視器的鎖定後,才可以執行被喚醒的執行緒。
案例思路
程式中可以通過一個標記來標識賬戶中是否已有存款,false表示沒有存款,存款者執行緒可向下執行。
當存款者把錢存入賬戶之後,將標識設為true,並且呼叫notify或notifyAll來通知其他執行緒;當存款折者進入執行緒體之後,如果標記為true,就呼叫wait方法讓該執行緒等待。
當標記為true時,表明賬戶已經存入了存款,則取錢者執行緒可以向下執行,當取錢者把錢從賬戶中取出來之後,則標記設為false,並呼叫notify()或notifyAll來喚醒其他執行緒,當取款折者進入執行緒體之後,如果標記為false,就呼叫wait方法讓該執行緒等待。
package org.westos.demo7; public class Account { //封裝賬戶編號,賬戶餘額的兩個成員變數 private String accountNO; private double balance; //表示賬戶中是否有存款的標記 private boolean flag=false; public Account(String accountNO, double balance) { this.accountNO = accountNO; this.balance = balance; } public String getAccountNO() { return accountNO; } public void setAccountNO(String accountNO) { this.accountNO = accountNO; } //賬戶餘額不能隨便修改,所以只提供getBalance方法 public double getBalance() { return balance; } public synchronized void drow(double drawAmount){ try { //如果flag為false,表示賬戶中沒有人存錢進去,取錢方法為阻塞 if(!flag){ wait(); }else { //執行取錢操作 System.out.println(Thread.currentThread().getName() +"取錢:"+drawAmount); balance-=drawAmount; System.out.println("賬戶餘額為:"+balance); //將賬戶是否有存款的標記設為false flag=false; //喚醒其他執行緒 notifyAll(); } } catch (InterruptedException e) { e.printStackTrace(); } } public synchronized void deposit(Double depositAmount){ //如果flag為真,說明賬戶裡有錢,存錢方法阻塞 try{ if(flag){ wait(); }else { //存款操作 System.out.println(Thread.currentThread().getName() +"存款:"+depositAmount); balance+=depositAmount; System.out.println("賬戶餘額:"+balance); //標記設為true flag=true; //喚醒其他執行緒 notifyAll(); } } catch (InterruptedException e) { e.printStackTrace(); } } }
取款操作
package org.westos.demo7;
public class DrawThread extends Thread {
//模擬賬戶使用者
private Account account;
//當前使用者取錢數
private double drawAmount;
public DrawThread(String name,Account account, double drawAmount) {
super(name);
this.account = account;
this.drawAmount = drawAmount;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
account.drow(drawAmount);
}
}
}
存款操作
package org.westos.demo7;
public class DepositThread extends Thread {
//模擬使用者賬戶
private Account account;
//當前使用者縣城額所希望存的錢數
private double depositAmount;
public DepositThread(String name,Account account, double depositAmount) {
super(name);
this.account = account;
this.depositAmount = depositAmount;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
account.deposit(depositAmount);
}
}
}
執行操作
package org.westos.demo7;
public class DrawTest {
public static void main(String[] args) {
//建立一個賬戶
Account acct = new Account("123456", 0);
new DrawThread("取錢者",acct,800).start();
new DepositThread("存錢者A",acct,800).start();
new DepositThread("存錢者B",acct,800).start();
new DepositThread("存錢者C",acct,800).start();
}
}
結論
從執行結果可以看出,存款者執行緒和取款者執行緒交替執行,只要賬戶裡沒錢就存,只要賬戶裡有錢就取,(這就實現了存款者執行緒與取款者執行緒之間的通訊)而三個存款者執行緒則是隨機向賬戶裡面存款,一次存款只有一個存款者操作。
使用Condition控制執行緒通訊
如果程式不使用synchronized關鍵字來保證同步,而是直接使用Lock物件來保證同步,則系統中不存在隱式的同步監視器,所以不能使用wait()、notify()、notifyAll()來進行執行緒通訊了。
當使用Lock物件來保證同步時,Java提供了一個Contidition類來保持協調。Condition為每個物件提供了多個等待集(wait-set)。
Condition例項被繫結在一個Lock物件上,要獲得特定Lock例項的Condition例項,呼叫newCondition()方法即可。Condition提供瞭如下三個方法:
await()
:類似於隱式同步監視器的wait方法;signal()
:喚醒在此Lock物件上等待的單個執行緒。signalAll()
:喚醒在此Lock物件上等待的所有執行緒。
將上述案例更改為使用Lock來保證同步:
package org.westos.demo7;
import java.time.LocalDate;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Account {
//顯式定義Lock物件
private final Lock lock=new ReentrantLock();
//獲得指定Lock物件對應的Condition
private final Condition cond=lock.newCondition();
//封裝賬戶編號,賬戶餘額的兩個成員變數
private String accountNO;
private double balance;
//表示賬戶中是否有存款的標記
private boolean flag=false;
public Account(String accountNO, double balance) {
this.accountNO = accountNO;
this.balance = balance;
}
public String getAccountNO() {
return accountNO;
}
public void setAccountNO(String accountNO) {
this.accountNO = accountNO;
}
//賬戶餘額不能隨便修改,所以只提供getBalance方法
public double getBalance() {
return balance;
}
public void drow(double drawAmount){
lock.lock();
try {
//如果flag為false,表示賬戶中沒有人存錢進去,取錢方法為阻塞
if(!flag){
cond.await();
}else {
//執行取錢操作
System.out.println(Thread.currentThread().getName()
+"取錢:"+drawAmount);
balance-=drawAmount;
System.out.println("賬戶餘額為:"+balance);
//將賬戶是否有存款的標記設為false
flag=false;
//喚醒其他執行緒
cond.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void deposit(Double depositAmount){
//如果flag為真,說明賬戶裡有錢,存錢方法阻塞
lock.lock();
try{
if(flag){
cond.await();
}else {
//存款操作
System.out.println(Thread.currentThread().getName()
+"存款:"+depositAmount);
balance+=depositAmount;
System.out.println("賬戶餘額:"+balance);
//標記設為true
flag=true;
//喚醒其他執行緒
cond.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
與上一個案例相比,程式邏輯基本相似,只是Lock要顯式定義同步監視器。
使用阻塞佇列(BlockingQueue)控制執行緒通訊
Java5提供了一個BlockingQueue介面,雖然BlockingQueue也是Queue的子介面,但是它不作為容器,而是作為線性同步的工具。BlockingQueue有一個特徵:當生產者執行緒試圖向佇列中放入元素時,如果該佇列已滿,則該執行緒阻塞;當消費者從佇列中拿出元素時,如果該佇列為空,則該執行緒被阻塞。
BlockingQueue提供如下兩個方法支援阻塞:
put(E e)
:嘗試把E元素放入佇列中,如果佇列已滿則阻塞該執行緒;take()
:嘗試從佇列的頭部取出元素,如果該佇列已空則阻塞該執行緒;
當然BlockingQueue繼承了Queue介面,所以可以使用Queue介面中的方法,大概分為三種:
- 在佇列尾部插元素,包括add、offer、put的方法;
- 在佇列頭部刪除元素,並返回刪除元素,包括remove、poll、take方法;
- 在佇列頭部取出但不刪除元素,包括element()和peek方法;
丟擲異常 | 不同返回值 | 阻塞執行緒 | 指定超時間時長 | |
---|---|---|---|---|
隊尾插入元素 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
隊頭插入元素 | remove() | poll() | take() | poll(time,unit) |
獲取,但不插入元素 | element() | peek() | 無 | 無 |
BlockingQueue包含如下5個實現類:
- ArrayBlockingQueue:基於陣列實現的BlockingQueue佇列;
- LinkedBlockingQueue:基於連結串列實現的BlockingQueue佇列;
- PriorityBlockingQueue:判斷元素大小,取出佇列中最小的元素;
- SynchronousQueue:同步佇列,對該佇列的存取必須交替執行;
- DelayQueue:特殊的BlockingQueue,底層基於PriorityBlockingQueue實現;
案例:以ArrayBlockingQueue為例介紹阻塞佇列:
import java.util.concurrent.ArrayBlockingQueue;
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
//定義一個長度為2的阻塞佇列
ArrayBlockingQueue<Object> bq = new ArrayBlockingQueue<>(2);
bq.put("Java");
bq.put("Java");
bq.put("Java");//阻塞執行緒
}
}
案例:使用BlockingQueue來實現執行緒通訊:
package org.westos.demo7;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class Produer extends Thread {
private BlockingQueue<String> bq;
public Produer(String name,BlockingQueue<String> bq) {
super(name);
this.bq = bq;
}
@Override
public void run() {
String[] strArr = new String[]
{
"Java",
"Struts",
"Spring"
};
for (int i = 0; i < 999999; i++) {
System.out.println(getName()+"生產者準備生產集合元素");
try {
Thread.sleep(200);
//嘗試放入元素,如果佇列已滿則執行緒阻塞
bq.put(strArr[i%3]);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName()+"生產完成"+bq);
}
}
}
class Consumer extends Thread{
private BlockingQueue<String> bq;
public Consumer(String name,BlockingQueue<String> bq) {
super(name);
this.bq = bq;
}
@Override
public void run() {
while (true){
System.out.println(getName()+"消費者準備消費集合元素");
try {
Thread.sleep(200);
//嘗試取出元素,如果佇列已空則執行緒阻塞
bq.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName()+"消費完成"+bq);
}
}
}
public class BlockingQueueTest2{
public static void main(String[] args) {
//建立一個容量為1的BlockingQueue
ArrayBlockingQueue<String> bq = new ArrayBlockingQueue<>(1);
//啟動三個生產執行緒
new Produer("生產者者1",bq).start();
new Produer("生產者2",bq).start();
new Produer("生產者3",bq).start();
//啟用一個消費執行緒
new Consumer("消費者C",bq).start();
}
}
由於佇列容量為1,所以無法連續放入元素,只能等消費者去除佇列中的元素,三個生產者才能隨機的放入元素。
執行緒組和未處理的異常
Java使用ThreadGroup來表示執行緒組,他可以對一批執行緒進行分類管理,Java允許程式設計師直接對執行緒組進行管控制。相當於同時控制著一批執行緒,使用者建立的所有執行緒都屬於指定執行緒組,如果沒有顯式指定執行緒的執行緒組,則該執行緒屬於預設執行緒組。預設情況下子執行緒和建立它的父執行緒處於同一執行緒組內。
一旦某個執行緒加入了指定執行緒組之後,該執行緒將一直屬於該執行緒組,直到執行緒死亡,執行緒執行中途不能改變它所屬的執行緒組。
Thread類提供的構造器來設定新建立的執行緒屬於哪個執行緒組;
Thread(ThreadGroup group,Runnable target)
:以target的run方法作為執行緒執行體建立新執行緒,屬於group執行緒組;Thread(ThreadGroup group,Runnable target,String name)
:以target的run方法作為執行緒執行體建立新執行緒,屬於group執行緒組,執行緒名為name;Thread(ThreadGroup group,String name)
:建立新執行緒,名為name,屬於group執行緒組。
由於執行緒所屬組不能改變,所以只提供了一個getThreadGroup方法來返回執行緒所屬的執行緒組,getThreadGroup方法返回的是ThreadGroup物件,表示一個執行緒組,ThreadGroup提供了兩個構造器來建立例項:
ThreadGroup(String name)
:以指定的執行緒組名來建立新的執行緒組;ThreadGroup(ThreadGroup parent,String name)
:以指定的執行緒組名,指定的父執行緒組,來建立新的執行緒組;
ThreadGroup類提供了幾個方法來操作整個執行緒組裡的所有執行緒:
int activeCount()
:返回此執行緒組中活動執行緒的數目;interrupt()
:中斷此執行緒組中的所有執行緒;isDaemon()
:判斷該執行緒組是否為後臺執行緒組;setDaemon(Boolean daemon)
;把該執行緒組設定為後臺執行緒組,當後臺執行緒組的最後一個執行緒執行結束或最後一個執行緒被銷燬後,後臺執行緒組將自動銷燬。setMaxPriotity(int pri)
:設定執行緒組的最高優先順序;
package org.westos.demo8;
class MyThread extends Thread{
//提供指定執行緒名的構造器
public MyThread(String name) {
super(name);
}
//提供指定執行緒名,指定執行緒組的構造器
public MyThread(ThreadGroup group,String name) {
super(group,name);
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
System.out.println(getName()+"執行緒的i變數"+i);
}
}
}
public class ThreadGroupTest {
public static void main(String[] args) {
//獲取主執行緒所線上程組(預設執行緒組)
ThreadGroup mainGroup = Thread.currentThread().getThreadGroup();
System.out.println("主執行緒組的名字:"+mainGroup);
//主執行緒組的名字:java.lang.ThreadGroup[name=main,maxpri=10]
System.out.println("主執行緒組是否為後臺執行緒組:"+mainGroup.isDaemon());
ThreadGroup newGroup = new ThreadGroup("新執行緒組");
newGroup.setDaemon(true);
MyThread tt = new MyThread(newGroup, "新執行緒組中的執行緒-1");
tt.start();
new MyThread("主執行緒組的執行緒").start();
new MyThread(newGroup,"新執行緒組中的執行緒-2").start();
}
}
未處理的異常
ThreadGroup內還定義了可以處理該執行緒組內任意執行緒所丟擲的未處理異常的方法:
void uncaughtException(Thread t,Throwable e)
。
Thread類提供瞭如下兩個方法來設定異常處理器:
static setDefaultUncaughtExceptionHandler(Thread UncaughtExceptionHandler eh)
:為該執行緒例項設定異常處理器;setUncaughtExceptionHandler(Thread UncaughtExceptionHandler eh)
:為指定的執行緒例項設定異常處理器。
執行緒處理異常的預設流程如下:
- 如果該執行緒組有父執行緒組,則呼叫父執行緒組的UncaughtException方法來處理異常;
- 如果有預設異常處理器,就呼叫預設的;
- 如果該異常物件是ThreadDeath物件,則不做任何處理;否則將異常資訊列印到錯誤輸出流,並結束執行緒。
package org.westos.demo8;
class MyExHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println(t+"執行緒出現了異常:"+e);
//Thread[main,5,main]執行緒出現了異常:java.lang.ArithmeticException: / by zero
}
}
public class ExHandler{
public static void main(String[] args) {
//設定主執行緒的異常處理器
Thread.currentThread().setUncaughtExceptionHandler(new MyExHandler());
int a=5/0;
System.out.println("程式正常結束");
}
}