1. 程式人生 > >7.併發程式設計--多執行緒通訊-wait-notify

7.併發程式設計--多執行緒通訊-wait-notify

併發程式設計--多執行緒通訊-wait-notify

多執行緒通訊:執行緒通訊的目的是為了能夠讓執行緒之間相互發送訊號;

1. 多執行緒通訊:

執行緒通訊的目的是為了能夠讓執行緒之間相互發送訊號。另外,執行緒通訊還能夠使得執行緒等待其它執行緒的訊號,比如,執行緒B可以等待執行緒A的訊號,這個訊號可以是執行緒A已經處理完成的訊號;
Object提供了三個方法wait(), notify(), notifyAll()線上程之間進行通訊,以此來解決執行緒間執行順序等問題。

  • * wait():釋放當前執行緒的同步監視控制器,並讓當前執行緒進入阻塞狀態,直到別的執行緒發出notify將該執行緒喚醒。
  • * notify():喚醒在等待控制監視器的其中一個執行緒(隨機)。只有當前執行緒釋放了同步監視器鎖(呼叫wait)之後,被喚醒的執行緒才有機會執行。
  • * notifyAll():與上面notify的區別是同時喚醒多個等待執行緒。

值得注意的是這三個方法是屬於Object而不是屬於Thread的,但是呼叫的時候必須用同步監視器來呼叫,wait(), notify(), notifyAll() 必須和synchronized關鍵字聯合使用

模擬執行緒通訊:自定義實現的通訊模式
示例:ListAdd1.java

 1   public class ListAdd1 {
 2
private volatile static List list = new ArrayList(); 3 4 public void add(){ 5 list.add("bjsxt"); 6 } 7 public int size(){ 8 return list.size(); 9 } 10 11 public static void main(String[] args) { 12 13 final ListAdd1 list1 = new ListAdd1();
14 15 Thread t1 = new Thread(new Runnable() { 16 @Override 17 public void run() { 18 try { 19 for(int i = 0; i <10; i++){ 20 list1.add(); 21 System.out.println("當前執行緒:" + Thread.currentThread().getName() + "添加了一個元素.."); 22 Thread.sleep(500); 23 } 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } 27 } 28 }, "t1"); 29 30 Thread t2 = new Thread(new Runnable() { 31 @Override 32 public void run() { 33 while(true){ 34 if(list1.size() == 5){ 35 System.out.println("當前執行緒收到通知:" + Thread.currentThread().getName() + " list size = 5 執行緒停止.."); 36 throw new RuntimeException(); 37 } 38 } 39 } 40 }, "t2"); 41 42 t1.start(); 43 t2.start(); 44 } 45 }

2. 使用JDK的 Object提供了三個方法wait(), notify(), notifyAll()線上程之間進行通訊

示例:

 1   import java.util.ArrayList;
 2   import java.util.List;
 3   import java.util.Queue;
 4   import java.util.concurrent.CountDownLatch;
 5   import java.util.concurrent.LinkedBlockingDeque;
 6   import java.util.concurrent.LinkedBlockingQueue;
 7   /**
 8   * wait notfiy 方法,wait釋放鎖,notfiy不釋放鎖
 9   * @@author Maozw
10   *
11   */
12   public class ListAdd2 {
13       private volatile static List list = new ArrayList();
14 
15       public void add(){
16         list.add("bjsxt");
17       }
18       public int size(){
19         return list.size();
20       }
21 
22       public static void main(String[] args) {
23 
24         final ListAdd2 list2 = new ListAdd2();
25 
26         // 1 例項化出來一個 lock
27         // 當使用wait 和 notify 的時候 , 一定要配合著synchronized關鍵字去使用
28         //final Object lock = new Object();
29 
30         final CountDownLatch countDownLatch = new CountDownLatch(1);
31 
32         Thread t1 = new Thread(new Runnable() {
33           @Override
34           public void run() {
35             try {
36               //synchronized (lock) {
37                 for(int i = 0; i <10; i++){
38                   list2.add();
39                   System.out.println("當前執行緒:" + Thread.currentThread().getName() + "添加了一個元素..");
40                   Thread.sleep(500);
41                   if(list2.size() == 5){
42                     System.out.println("已經發出通知..");
43                     countDownLatch.countDown();
44                     //lock.notify();
45                   }
46                 }                        
47               //}
48             } catch (InterruptedException e) {
49               e.printStackTrace();
50             }
51 
52           }
53         }, "t1");
54 
55         Thread t2 = new Thread(new Runnable() {
56           @Override
57           public void run() {
58             //synchronized (lock) {
59               if(list2.size() != 5){
60                 try {
61                   //System.out.println("t2進入...");
62                   //lock.wait();
63                   countDownLatch.await();
64                 } catch (InterruptedException e) {
65                   e.printStackTrace();
66                 }
67               }
68               System.out.println("當前執行緒:" + Thread.currentThread().getName() + "收到通知執行緒停止..");
69               throw new RuntimeException();
70             //}
71           }
72         }, "t2");
73 
74         t2.start();
75         t1.start();
76 
77       }
78   }

問題1:wait()方法外面為什麼是while迴圈而不是if判斷?

問題2:notify()是喚醒一個執行緒,notifyAll()是喚醒全部執行緒,但是喚醒然後呢,不管是notify()還是notifyAll(),最終拿到鎖的只會有一個執行緒,那它們到底有什麼區別呢?

OK! 要回答上述兩個問題?我們首先需要明白java物件鎖的模型:
JVM 會為每一個使用內部鎖(synchronized)的物件維護兩個集合,Entry Set和Wait Set,也有人翻譯為鎖池和等待池,意思基本一致。
**Entry Set**

  • 如果執行緒A已經持有了物件鎖,此時如果有其他執行緒也想獲得該物件鎖的話,它只能進入Entry Set,並且處於執行緒的BLOCKED狀態。

**Wait Set**

  • 如果執行緒A呼叫了wait()方法,那麼執行緒A會釋放該物件的鎖,進入到Wait Set,並且處於執行緒的WAITING狀態。

sequenceDiagram
Entry Set(鎖池)->>Wait Set(等待池): wait()
Wait Set(等待池)->>Entry Set(鎖池): noitify()

 注意:某個執行緒B想要獲得物件鎖,一般情況下有兩個先決條件,

  • 一是物件鎖已經被釋放了(如曾經持有鎖的前任執行緒A執行完了synchronized程式碼塊或者呼叫了wait()方法等等)
  • 二是執行緒B已處於RUNNABLE狀態。

那麼這兩類集合中的執行緒都是在什麼條件下可以轉變為RUNNABLE呢?

  • 對於Entry Set中的執行緒,當物件鎖被釋放的時候,JVM會喚醒處於Entry Set中的某一個執行緒,這個執行緒的狀態就從BLOCKED轉變為RUNNABLE。
  • 對於Wait Set中的執行緒,當物件的notify()方法被呼叫時,JVM會喚醒處於Wait Set中的某一個執行緒,這個執行緒的狀態就從WAITING轉變為RUNNABLE;或者當notifyAll()方法被呼叫時,Wait Set中的全部執行緒會轉變為RUNNABLE狀態。所有Wait Set中被喚醒的執行緒會被轉移到Entry Set中,然後 每當物件的鎖被釋放後,那些所有處於RUNNABLE狀態的執行緒會共同去競爭獲取物件的鎖.

解答
 第一個問題 :wait()方法外面為什麼是while迴圈而不是if判斷?

  •  因為wait()的執行緒永遠不能確定其他執行緒會在什麼狀態下notify(),所以必須在被喚醒、搶佔到鎖並且從wait()方法退出的時候再次進行指定條件的判斷,以決定是滿足條件往下執行呢還是不滿足條件再次wait()呢。

第二個問題:既然notify()和notifyAll()最終的結果都是隻有一個執行緒能拿到鎖,那喚醒一個和喚醒多個有什麼區別呢?

  • 通過下面這個例子可以非常好的說明;是這樣一個場景:兩個生產者兩個消費者的場景,我們都使用notify()而非notifyAll(),假設消費者執行緒1拿到了鎖,判斷buffer為空,那麼wait(),釋放鎖;然後消費者2拿到了鎖,同樣buffer為空,wait(),也就是說此時Wait Set中有兩個執行緒;然後生產者1拿到鎖,生產,buffer滿,notify()了, 那麼可能消費者1被喚醒了,但是此時還有另一個執行緒生產者2在Entry Set中盼望著鎖,並且最終搶佔到了鎖, 但因為此時buffer是滿的,因此它要wait();然後消費者1拿到了鎖,消費,notify();這時就有問題了,此時生產者2和消費者2都在Wait Set中,buffer為空,如果喚醒生產者2,沒毛病;但如果喚醒了消費者2,因為buffer為空,它會再次wait(),這就尷尬了,萬一生產者1已經退出不再生產了,沒有其他執行緒在競爭鎖了,只有生產者2和消費者2在Wait Set中互相等待,那傳說中的死鎖就發生了。
  • notify()換成notifyAll(),這樣的情況就不會再出現了,因為每次notifyAll()都會使其他等待的執行緒從Wait Set進入Entry Set,從而有機會獲得鎖。
 1 import java.util.ArrayList;
 2 import java.util.List;
 3 
 4 public class Something {
 5     private Buffer mBuf = new Buffer();
 6 
 7     public void produce() {
 8         synchronized (this) {
 9             while (mBuf.isFull()) {
10                 try {
11                     wait();
12                 } catch (InterruptedException e) {
13                     e.printStackTrace();
14                 }
15             }
16             mBuf.add();
17             notifyAll();
18         }
19     }
20 
21     public void consume() {
22         synchronized (this) {
23             while (mBuf.isEmpty()) {
24                 try {
25                     wait();
26                 } catch (InterruptedException e) {
27                     e.printStackTrace();
28                 }
29             }
30             mBuf.remove();
31             notifyAll();
32         }
33     }
34 
35     private class Buffer {
36         private static final int MAX_CAPACITY = 1;
37         private List innerList = new ArrayList<>(MAX_CAPACITY);
38 
39         void add() {
40             if (isFull()) {
41                 throw new IndexOutOfBoundsException();
42             } else {
43                 innerList.add(new Object());
44             }
45             System.out.println(Thread.currentThread().toString() + " add");
46 
47         }
48 
49         void remove() {
50             if (isEmpty()) {
51                 throw new IndexOutOfBoundsException();
52             } else {
53                 innerList.remove(MAX_CAPACITY - 1);
54             }
55             System.out.println(Thread.currentThread().toString() + " remove");
56         }
57 
58         boolean isEmpty() {
59             return innerList.isEmpty();
60         }
61 
62         boolean isFull() {
63             return innerList.size() == MAX_CAPACITY;
64         }
65     }
66 
67     public static void main(String[] args) {
68         Something sth = new Something();
69         Runnable runProduce = new Runnable() {
70             int count = 4;
71 
72             @Override
73             public void run() {
74                 while (count-- > 0) {
75                     sth.produce();
76                 }
77             }
78         };
79         Runnable runConsume = new Runnable() {
80             int count = 4;
81 
82             @Override
83             public void run() {
84                 while (count-- > 0) {
85                     sth.consume();
86                 }
87             }
88         };
89         for (int i = 0; i < 2; i++) {
90             new Thread(runConsume).start();
91         }
92         for (int i = 0; i < 2; i++) {
93             new Thread(runProduce).start();
94         }
95     }
96 }

join

首先,join()是Thread類的一個方法,而不是object的方法;
JDK中是這樣描述的:

//join()方法的作用,是等待這個執行緒結束
public final void join()throws InterruptedException: Waits for this thread to die.
在Java 7 Concurrency Cookbook"的定義為:
join() method suspends the execution of the calling thread until the object called finishes its execution.
也就是說,t.join()方法阻塞呼叫此方法的執行緒(calling thread),直到執行緒t完成,此執行緒再繼續;
舉個例子:通常用於在main()主執行緒內,等待其它執行緒完成再結束main()主執行緒。例如:
 1 package com.maozw.springmvc.controller;
 2 
 3 import java.util.Date;
 4 import java.util.concurrent.TimeUnit;
 5 
 6 public class JoinTest implements Runnable {
 7 
 8     private String name;
 9 
10     public JoinTest(String name) {
11         this.name = name;
12     }
13 
14     public void run() {
15         System.out.printf("%s begins: %s\n", name, new Date());
16         try {
17             TimeUnit.SECONDS.sleep(4);
18         } catch (InterruptedException e) {
19             e.printStackTrace();
20         }
21         System.out.printf("%s has finished: %s\n", name, new Date());
22     }
23 
24     public static void main(String[] args) {
25         Thread thread1 = new Thread(new JoinTest("One"));
26         Thread thread2 = new Thread(new JoinTest("Two"));
27         thread1.start();
28         thread2.start();
29 
30         try {
31             thread1.join();
32             thread2.join();
33         } catch (InterruptedException e) {
34             e.printStackTrace();
35         }
36         System.out.println("Main thread is finished");
37     }
38 }

輸出結果

1 One begins: Mon Jul 23 22:41:21 CST 2018
2 Two begins: Mon Jul 23 22:41:21 CST 2018
3 Two has finished: Mon Jul 23 22:41:25 CST 2018
4 One has finished: Mon Jul 23 22:41:25 CST 2018
5 Main thread is finished

解說join原理:

我們嘗試去開啟的起原始碼:

  • 通過原始碼可以看出,Join方法實現是通過wait。 當main執行緒呼叫t.join時候,main執行緒會獲得執行緒物件t的鎖(wait 意味著拿到該物件的鎖),呼叫該物件的wait(等待時間),直到該物件喚醒main執行緒 ,比如退出後。這就意味著main 執行緒呼叫t.join時,必須能夠拿到執行緒t物件的鎖。
/**
 * Waits for this thread to die.
 *
 * <p> An invocation of this method behaves in exactly the same
 * way as the invocation
 *
 * <blockquote>
 * {@linkplain #join(long) join}{@code (0)}
 * </blockquote>
 *
 * @throws  InterruptedException
 *          if any thread has interrupted the current thread. The
 *          <i>interrupted status</i> of the current thread is
 *          cleared when this exception is thrown.
 */
public final void join() throws InterruptedException {
        join(0);
}
/**
 * Waits at most {@code millis} milliseconds for this thread to
 * die. A timeout of {@code 0} means to wait forever.
 *
 * <p> This implementation uses a loop of {@code this.wait} calls
 * conditioned on {@code this.isAlive}. As a thread terminates the
 * {@code this.notifyAll} method is invoked. It is recommended that
 * applications not use {@code wait}, {@code notify}, or
 * {@code notifyAll} on {@code Thread} instances.
 *
 * @param  millis
 *         the time to wait in milliseconds
 *
 * @throws  IllegalArgumentException
 *          if the value of {@code millis} is negative
 *
 * @throws  InterruptedException
 *          if any thread has interrupted the current thread. The
 *          <i>interrupted status</i> of the current thread is
 *          cleared when this exception is thrown.
 */
public final synchronized void join(long millis) throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }

    if (millis == 0) {
        while (isAlive()) {
            wait(0);
        }
    } else {
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay);
            now = System.currentTimeMillis() - base;
        }
    }
}