1. 程式人生 > >執行緒之間的通訊方式:wait/notify

執行緒之間的通訊方式:wait/notify

1.什麼是執行緒間的通訊

通訊,顧名思義就是一種通知交通的方式,在多執行緒的環境下,如果各個執行緒之間可以互相通訊的話,可以很好地提高工作效率,提高CPU的利用率。

2.執行緒間常用的通訊方式

多執行緒間的通訊一般採取等待/通知機制進行實現,即Object類中的wait()和notify()方法實現的,一個是等待,一個是通知。其實就像我們平時去營業廳辦理業務一樣,我們要先取號,然後就開始等待,等到聽到叫我們號的時候,我們再過去辦理業務。

  1. Java中等待/通知機制的實現
    1. 如上面所說的,wait()和notify()這兩個方法都是Object類中的方法,之所以是超類的方法,其實是因為之前我們說過任何物件都可以作為鎖,而這兩個方法都是由鎖呼叫的,所以很自然地就可以理解為什麼這兩個方法是屬於超類的。
    2. wait方法:
      1. 作用是使當前執行程式碼的執行緒進行等待,該方法會將該執行緒放入”等待佇列“中,並且在wait()所在的程式碼處停止執行,直到接到通知或被中斷為止。
      2. 在呼叫 wait() 之前,執行緒必須獲得該物件級別鎖,即只能在同步方法或同步塊中呼叫 wait() 方法。
      3. wait() 是釋放鎖的,即在執行到 wait() 方法之後,當前執行緒會釋放鎖,當從 wait() 方法返回前,執行緒與其他執行緒競爭重新獲得鎖
      4. 此外,還有帶一個引數的wait(long),表示在等待一段時間內,如果沒有喚醒執行緒,則會自動喚醒。當然,在這段時間內,也可以由其他執行緒喚醒。
    3. notify方法:
      1. 和 wait() 方法一樣, notify() 方法也要在同步塊或同步方法中呼叫,即在呼叫前,執行緒也必須獲得該物件的物件級別鎖。
      2. 該方法用來通知那些可能等待該物件的物件鎖的其他執行緒,如果有多個執行緒等待,則由執行緒規劃器隨機挑選出其中一個是wait狀態的執行緒,對其發出通知notify,並使它等待獲取該物件的物件鎖。即notify方法一次只隨機喚醒一個wait狀態的執行緒。
      3. 這裡需要注意的是,執行notify方法之後,當前執行緒不會立即釋放其擁有的該物件鎖,而是執行完之後才會釋放該物件鎖,被通知的執行緒也不會立即獲得物件鎖,而是等待notify方法執行完之後,釋放了該物件鎖,才可以獲得該物件鎖。
      4.  notifyAll() 通知所有等待同一共享資源的全部執行緒從等待狀態退出,進入可執行狀態,重新競爭獲得物件鎖。即notifyAll方法可以喚醒所有wait狀態的執行緒。
    4. wait()/notify()總結:

      用一句話來說就是:wait使執行緒停止執行,notify使停止的執行緒繼續執行 。

      每個鎖物件都有兩個佇列,一個是就緒佇列,一個是阻塞佇列。就緒佇列儲存了將要獲得鎖的執行緒,阻塞佇列儲存了被阻塞的執行緒。一個執行緒被喚醒之後,才會進入就緒佇列,等待CPU的排程;反之,一個執行緒呼叫wait方法後,就會進入阻塞佇列,等待下一次被喚醒。 

      1. 要結合synchronized關鍵字一起使用,因為他們都需要首先獲取該物件的物件鎖;
      2. wait方法是釋放鎖,notify方法是不釋放鎖的;
      3. 執行緒的四種狀態如下圖:
  2. wait/notify執行緒間通訊示例程式碼
    1. Mylist程式碼:
      public class MyList {
          private static List list = new ArrayList();
      
          public static void add() {
              list.add("我是元素");
          }
      
          public static int size() {
              return list.size();
          }
      }
    2. 執行緒A:
      public class ThreadA extends Thread {
          private Object lock;
      
          public ThreadA(Object lock) {
              super();
              this.lock = lock;
          }
      
          @Override
          public void run() {
              try {
                  synchronized (lock) {
                      if (MyList.size() != 5) {
                          System.out.println("wait begin " +
                              System.currentTimeMillis());
                          lock.wait();
                          System.out.println("wait end " +
                              System.currentTimeMillis());
                      }
                  }
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      }
      

       

    3. 執行緒B:
      public class ThreadB extends Thread {
          private Object lock;
      
          public ThreadB(Object lock) {
              super();
              this.lock = lock;
          }
      
          @Override
          public void run() {
              try {
                  synchronized (lock) {
                      for (int i = 0; i < 10; i++) {
                          MyList.add();
      
                          if (MyList.size() == 5) {
                              lock.notify();
                              System.out.println("已發出通知!");
                          }
      
                          System.out.println("添加了" + (i + 1) + "個元素!");
                          Thread.sleep(1000);
                      }
                  }
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      }
      

       

    4. 測試程式碼:
      public class Run {
          public static void main(String[] args) {
              try {
                  Object lock = new Object();
                  ThreadA a = new ThreadA(lock);
                  a.start();
                  Thread.sleep(50);
      
                  ThreadB b = new ThreadB(lock);
                  b.start();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      }
      

       

    5. 執行結果:
       
      wait begin 1507634541467
      添加了1個元素!
      添加了2個元素!
      添加了3個元素!
      添加了4個元素!
      已發出通知!
      添加了5個元素!
      添加了6個元素!
      添加了7個元素!
      添加了8個元素!
      添加了9個元素!
      添加了10個元素!
      wait end 1507634551563

      由上面可以看出,雖然執行緒B在第五個元素的時候發出通知,而執行緒A實現執行緒B執行完之後才獲得物件鎖,這也可以明,wait方法是釋放鎖的而notify方法是不釋放鎖的。因為如果notify方法會釋放鎖的話,那麼應該在列印通知之前就執行執行緒A中的列印wait end。

  3. 使用wait/notify模擬BlockingQueue阻塞佇列

    1. BlockingQueue是阻塞佇列,我們需要實現的是阻塞的放入和得到資料,設計思路如下:
      (1)初始化佇列最大長度為5;
      (2)需要新加入的時候,判斷是否長度為5,如果是5則等待插入;
      (3)需要消費元素的時候,判斷是否為0,如果是0則等待消費;

    2. 實現程式碼如下:

      public class MyQueue {
      	//1、需要一個承裝元素的集合
      	private final LinkedList<Object> list = new LinkedList<>();
      	//2、需要一個計數器
      	private final AtomicInteger count = new AtomicInteger(0);
      	//3、需要指定上限和下限
      	private final int maxSize = 5;
      	private final int minSize = 0;
      	//5、初始化鎖物件
      	private final Object lock = new Object();
      	/**
      * put方法
      */
      	public void put(Object obj) {
      		synchronized (lock) {
      			//達到最大無法新增,進入等到
      			while (count.get() == maxSize) {
      				try {
      					lock.wait();
      				}catch (InterruptedException e) {
      					e.printStackTrace();
      				}
      			}
      			list.add(obj);
      			//加入元素
      			count.getAndIncrement();
      			//計數器增加
      			System.out.println(" 元素 " + obj + " 被新增 ");
      			lock.notify();
      			//通知另外一個阻塞的執行緒方法
      		}
      	}
      	/**
      * get方法
      */
      	public Object get() {
      		Object temp;
      		synchronized (lock) {
      			//達到最小,沒有元素無法消費,進入等待
      			while (count.get() == minSize) {
      				try {
      					lock.wait();
      				}catch (InterruptedException e) {
      					e.printStackTrace();
      				}
      			}
      			count.getAndDecrement();
      			temp = list.removeFirst();
      			System.out.println(" 元素 " + temp + " 被消費 ");
      			lock.notify();
      		}
      		return temp;
      	}
      	private int size() {
      		return count.get();
      	}
      	public static void main(String[] args) throws Exception {
      		final MyQueue myQueue = new MyQueue();
      		initMyQueue(myQueue);
      		Thread t1 = new Thread(() -> {
      			myQueue.put("h");
      			myQueue.put("i");
      		}, "t1");
      		Thread t2 = new Thread(() -> {
      			try {
      				Thread.sleep(2000);
      				myQueue.get();
      				Thread.sleep(2000);
      				myQueue.get();
      			}
      			catch (InterruptedException e) {
      				e.printStackTrace();
      			}
      		}, "t2");
      		t1.start();
      		Thread.sleep(1000);
      		t2.start();
      	}
      	private static void initMyQueue(MyQueue myQueue) {
      		myQueue.put("a");
      		myQueue.put("b");
      		myQueue.put("c");
      		myQueue.put("d");
      		myQueue.put("e");
      		System.out.println("當前元素個數:" + myQueue.size());
      	}
      }

       

    3. 執行結果:

      元素 a 被新增
      元素 b 被新增
      元素 c 被新增
      元素 d 被新增
      元素 e 被新增
      當前元素個數:5
      元素 a 被消費
      元素 h 被新增
      元素 b 被消費
      元素 i 被新增

      注意:在資料結構中,佇列是可以無長度限制的,就是可以無限擴充套件,但是對於阻塞佇列,他之所以稱之為阻塞佇列就是因為其有長度限制,也是上述例項中的maxSize,這也是常見的筆試面試題中比較容易忽略的一個地方,想當然的認為只要是佇列他就是無長度限制的,看到這裡你應該知道了Java中提供的阻塞佇列的類是有長度限制的!

      1. 當呼叫wait方法的時候,wait方法所在的程式碼塊停止執行,直到被notify喚醒才開始執行。所以這裡的get和put中都有wait和notify方法,可以理解為相互制約和喚醒。

  4. 注意事項:

    1. wait()和notify()方法要在同步塊或同步方法中呼叫,即在呼叫前,執行緒也必須獲得該物件的物件級別鎖。

    2. wait方法是釋放鎖,notify方法是不釋放鎖的;

    3. notify每次喚醒wait等待狀態的執行緒都是隨機的,且每次只喚醒一個;

    4. notifAll每次喚醒wait等待狀態的執行緒使之重新競爭獲取物件鎖,優先順序最高的那個執行緒會最先執行;

    5. 當執行緒處於wait()狀態時,呼叫執行緒物件的interrupt()方法會出現InterruptedException異常;
       

3.其他通訊方式

(1)程序間的通訊方式:
管道(pipe)、有名管道(named pipe)、訊號量(semophore)、訊息佇列(message
queue)、訊號(signal)、共享記憶體(shared memory)、套接字(socket);
(2)執行緒程間的通訊方式:
1、鎖機制:
1.1 互斥鎖:提供了以排它方式阻止資料結構被併發修改的方法。
1.2 讀寫鎖:允許多個執行緒同時讀共享資料,而對寫操作互斥。
1.3 條件變數:可以以原子的方式阻塞程序,直到某個特定條件為真為止。
對條件測試是在互斥鎖的保護下進行的。條件變數始終與互斥鎖一起使用。
2、訊號量機制:包括無名執行緒訊號量與有名執行緒訊號量
3、訊號機制:類似於程序間的訊號處理。
執行緒間通訊的主要目的是用於執行緒同步,所以執行緒沒有像程序通訊中用於資料交換的
通訊機制。

 

徐劉根大佬的多執行緒專欄:https://blog.csdn.net/column/details/17790.html