Java的阻塞和中斷機制( wait notify使用 wait和sleep區別 interrupt使用和其他中斷方法)
wait、notify和notifyAll
wait和notify(notifyAll)一般是成對搭配出現的,用來資源調控。wait用來將當然執行緒掛起,notify/notifyAll用來恢復執行緒。它是類Object的方法,也就是所有的物件都可以使用。一個簡單的例子
public class WaitClassDemo { private static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); public static void main(String[] args) { Object obj = new Object(); new AThread(obj).start(); new BThread(obj).start(); } static class AThread extends Thread { Object obj; public AThread(Object obj) { setName("AThread"); this.obj = obj; } @Override public void run() { synchronized (obj) { System.out.println(sdf.format(new Date()) + " AThread before wait()"); try { obj.wait(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(sdf.format(new Date()) + " AThread after wait()"); } } } static class BThread extends Thread { Object obj; public BThread(Object obj) { setName("BThread"); this.obj = obj; } @Override public void run() { synchronized (obj) { System.out.println(sdf.format(new Date()) + " BThread before notify()"); obj.notify(); try { Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(sdf.format(new Date()) + " BThread after notify()"); } } } } //列印 //14:22:34 AThread before wait() //14:22:34 BThread before notify() //14:22:39 BThread after notify() //14:22:39 AThread after wait()
1、wait/notify是需要需要獲取物件鎖的,也就是需要寫在同步程式碼塊或同步方法內部,可以理解為用synchronize包裹的。如果不用,編譯會通過,但執行時會丟擲java.lang.IllegalMonitorStateException。
2、wait/notify是針對某個物件,是類Object的方法,並且注意要保證synchronize、waite和notify3者都是針對同一個具體物件。比如上面的synchronize鎖的是obj這個物件,wait和notify也是由的obj物件。
3、上面這個中wait()執行後該執行緒就處於阻塞階段,並且把當前的鎖給釋放了。BThread得以繼續。notify呼叫後,會立即輪轉到wait()方法那嗎?答案是不會,上面的例子顯示,notify()需要把這個程式碼塊的Thread.sleep(5000L)執行完,退出程式碼塊後才輪轉到wait()方法那。也很合理,畢竟同一時間裡,只有一個執行緒能拿到鎖執行synchronize包裹的程式碼裡。
wait方法也有帶引數版的,wait(long timeout)和wait(long timeout, int nanos),後者看了下原始碼,只是判斷如果如果nanos>0,讓timeout++。看來虛擬機器時間還是沒精確到納秒的地步。
帶引數的wait方法意思是等過了timeout毫秒後,就會獲得該鎖。
但是如果此時鎖在別的執行緒那裡,wait()處於的AThread是不能往下執行,下面例子中如果把BThread的註釋開啟,就是要等BThread走出synchronize塊後才可以。
public class WaitClassDemo { private static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); public static void main(String[] args) { Object obj = new Object(); new AThread(obj).start(); //new BThread(obj).start(); } static class AThread extends Thread { Object obj; public AThread(Object obj) { setName("AThread"); this.obj = obj; } @Override public void run() { synchronized (obj) { System.out.println(sdf.format(new Date()) + " AThread before wait()"); try { obj.wait(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(sdf.format(new Date()) + " AThread after wait()"); } } } static class BThread extends Thread { Object obj; public BThread(Object obj) { setName("BThread"); this.obj = obj; } @Override public void run() { synchronized (obj) { System.out.println(sdf.format(new Date()) + " BThread before"); try { Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(sdf.format(new Date()) + " BThread after"); } } } } //註釋BThread //19:03:43 AThread before wait() //19:03:44 AThread after wait() //不註釋BThread //19:14:13 AThread before wait() //19:14:13 BThread before //19:14:18 BThread after //19:14:18 AThread after wait()
呼叫notify()會恢復第一個執行wait()的執行緒。其他的不動。
呼叫notifyAll()會按照後進先出(LIFO)的原則恢復執行緒。
public class WaitClassDemo {
private static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
public static void main(String[] args) {
Object obj = new Object();
for (int i = 0; i < 3; i++) {
new AThread(i + "", obj).start();
}
new BThread(obj).start();
}
static class AThread extends Thread {
Object obj;
public AThread(String name, Object obj) {
setName("AThread" + name);
this.obj = obj;
}
@Override
public void run() {
synchronized (obj) {
System.out.println(sdf.format(new Date()) + " " + getName() + " before wait()");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(sdf.format(new Date()) + " " + getName() + " after wait()");
}
}
}
static class BThread extends Thread {
Object obj;
public BThread(Object obj) {
setName("BThread");
this.obj = obj;
}
@Override
public void run() {
synchronized (obj) {
System.out.println(sdf.format(new Date()) + " BThread before notify()");
obj.notify();
// obj.notifyAll();
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(sdf.format(new Date()) + " BThread after notify()");
}
}
}
}
//呼叫notify
//19:35:51 AThread0 before wait()
//19:35:51 AThread1 before wait()
//19:35:51 AThread2 before wait()
//19:35:51 BThread before notify()
//19:35:56 BThread after notify()
//19:35:56 AThread0 after wait()
//呼叫notifyAll
//19:39:51 AThread0 before wait()
//19:39:51 AThread2 before wait()
//19:39:51 AThread1 before wait()
//19:39:51 BThread before notify()
//19:39:56 BThread after notify()
//19:39:56 AThread1 after wait()
//19:39:56 AThread2 after wait()
//19:39:56 AThread0 after wait()
下面是一個利用wait和notifyAll實現的生產者消費者佇列。因為即使notifyAll呼叫了,也需要退出synchronize程式碼才會真正去喚醒另一個執行緒,所以notifyAll可以寫在Queue的操作之前。
public class MainClass {
public static void main(String[] args) {
QueueBuffer q = new QueueBuffer(2);
for(int i=0; i<5; i++) {
Producer p = new Producer(q);
p.start();
}
for(int i=0; i<2; i++) {
Consumer c = new Consumer(q);
c.start();
}
}
static class QueueBuffer{
Queue<Integer> queue = new LinkedList<>();
int size;
AtomicInteger seq = new AtomicInteger();
public QueueBuffer(int size) {
this.size = size;
}
public synchronized void put() {
while (queue.size() == size) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
notifyAll();
int num = seq.getAndIncrement();
queue.offer(num);
System.out.println("producer --- " + num);
}
public synchronized int get() {
while (queue.size() == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
notifyAll();
return queue.poll();
}
}
static class Producer extends Thread{
QueueBuffer q;
static AtomicInteger seq = new AtomicInteger();
public Producer(QueueBuffer q) {
this.q = q;
}
@Override
public void run() {
while (true) {
q.put();
}
}
}
static class Consumer extends Thread{
QueueBuffer q;
public Consumer(QueueBuffer q) {
this.q = q;
}
@Override
public void run() {
while (true) {
int num = q.get();
try {
sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("consumer --- " + num);
}
}
}
}
sleep
Thread.sleep(long)應該是我們最常用的,一般也知道sleep方法不會釋放鎖(如果寫在synchronize裡的話)。
所以跟wait的區別是
1、sleep是Thread類的方法,是「靜態方法」。wait是Object類的方法,呼叫需要具體的物件。
2、sleep是不釋放鎖的,解除方法要麼是timeout,或者interrupt一下讓它丟擲InterruptedException。wait是釋放鎖的,可以被notify/notifyAll恢復,同樣也可以timeout或者interrupt。
3、sleep在哪裡都可以呼叫,wait必須在同步方法或同步塊裡呼叫,並且同步的物件要跟wait的物件一樣。
4、sleep作用只是執行緒的操作,用於短時間暫停執行緒,wait/notify可以用作執行緒間通訊,達到資源排程的功能。
yield
yield方法也是Thread類的靜態方法,會把當前執行緒從可執行狀態變成就緒狀態,之後會cpu會從眾多就緒狀態的執行緒中選擇一個來執行。選執行緒是根據執行緒優先順序順序的,如果沒有比當前執行緒更高優先順序的就緒執行緒,完全有可能選回剛才執行yield方法的執行緒。
join
join也是Thread類方法,非靜態,表示等待該執行緒結束,當前執行緒才繼續執行。
public class JoinClassDemo {
private static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
public static void main(String[] args) {
System.out.println(sdf.format(new Date()) + " MainThread entry");
JoinThread t = new JoinThread();
t.start();
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(sdf.format(new Date()) + " MainThread exit");
}
static class JoinThread extends Thread {
@Override
public void run() {
System.out.println(sdf.format(new Date()) + " JoinThread entry");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(sdf.format(new Date()) + " JoinThread exit");
}
}
}
//17:24:18 MainThread entry
//17:24:18 JoinThread entry
//17:24:23 JoinThread exit
//17:24:23 MainThread exit
執行緒中斷
首先Thread有兩個一個暫停方法suspend()和一個停止方法stop()。兩個都已經已經@deprecated廢棄了。suspend()暫停和resume()繼續容易造成死鎖,stop()具有固有的不安全性。具體可以看Java API的文件註釋。
所以拋棄上面的方法後,一般我們會用以下幾個方法退出執行緒。
1.設計標記位法
public class InterruptDemo {
private static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
public static void main(String[] args) {
new InterruptThread().start();
}
static class InterruptThread extends Thread {
public boolean stopFlag = true;
@Override
public void run() {
while (stopFlag) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(sdf.format(new Date()));
}
}
}
}
一個變數作為標記位,判斷標記位以確定退出迴圈達到退出執行緒。
缺點就是如果程式碼並沒有這種迴圈語句,或者執行緒被其他語句阻塞了,執行緒可能一直不會去檢查標記位。
2.interrupt中斷
public class Thread implements Runnable {
//中斷目標執行緒
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
//返回目標執行緒的中斷狀態 static在這裡理解為:只有當前執行緒才能中斷自己,不允許別的執行緒中斷自己
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
//判斷目標執行緒是否中斷
public boolean isInterrupted() {
return isInterrupted(false);
}
private native boolean isInterrupted(boolean ClearInterrupted);
}
執行緒裡有一個boolean型別的中斷狀態,是一個標記位,是存在Native層的。當使用Thread的interrupt()方法時,執行緒的中斷狀態會被設定為true。一些阻塞方法就會丟擲一個異常InterruptedException。如果沒有這種阻塞方法?那就什麼都不會做。下面是兩種標準用法
public class InterruptDemo {
private static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss:SSS");
public static void main(String[] args) {
InterruptThread t = new InterruptThread();
// Interrupt2Thread t = new Interrupt2Thread();
t.start();
try {
Thread.sleep(3500);
} catch (InterruptedException e) {
e.printStackTrace();
}
t.interrupt();
}
static class InterruptThread extends Thread {
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//丟擲InterruptedException後中斷標誌被清除
System.out.println(sdf.format(new Date()) + " catch " + isInterrupted());
return;
}
System.out.println(sdf.format(new Date()) + " " + isInterrupted());
}
}
}
static class Interrupt2Thread extends Thread {
@Override
public void run() {
while (!isInterrupted()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//丟擲InterruptedException後中斷標誌被清除
//可以再次呼叫interrupt恢復中斷
System.out.println(sdf.format(new Date()) + " catch " + isInterrupted());
interrupt();
}
System.out.println(sdf.format(new Date()) + " " + isInterrupted());
}
}
}
}
//15:08:01:249 false
//15:08:02:251 false
//15:08:03:252 false
//15:08:03:749 catch false
InterruptThread是在catch中直接return結束執行緒。Interrupt2Thread是catch中再次呼叫interrupt恢復中斷狀態,下次判斷isInterrupted()中結束執行緒。
需要注意的點如下
①、執行緒不應該交給別的執行緒中斷,應該由自己中斷自己,過程中保證資源和變數已合理的處理了(該關的關,該釋放的釋放)。
②、所謂的interrupt執行緒中斷,只是修改了一個標記位,需要我們判斷標記位做後續的處理。如果catch程式碼塊什麼都不處理,會繼續跑完剩下的程式碼。所以應該理解為『並不是中斷,而是通知你應該自行中斷了』
③、注意在Thread.sleep這些方法,丟擲InterruptedException異常後會清除標記位狀態。下圖為文件說明
類似的方法有
Thread.sleep
Thread.join
Object.wait
BlockingQueue.put(e)和take() 這可以用於實現生產者消費者佇列
3.使用FutureTask.cancel(true)或者使用執行緒池的shutdown()方法(比如ThreadPoolExecutor.shutdown)
《AsyncTask原始碼解析 從AsyncTask講到執行緒池》中講到了java1.5的java.util.concurrent包帶來新的執行緒處理方式。比如說FutureTask和ExecutorService。
看FutureTask.cancel原始碼可以知道,所謂的cancel(true),內部也只是呼叫了interrupt()
public class FutureTask<V> implements RunnableFuture<V>
...
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
...
}
ThreadPoolExecutor.shutdown方法也是一樣
public class ThreadPoolExecutor extends AbstractExecutorService {
...
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
...
}