1. 程式人生 > >ava多執行緒程式設計-(4)-執行緒間通訊機制的介紹與使用

ava多執行緒程式設計-(4)-執行緒間通訊機制的介紹與使用

原文出自 : https://blog.csdn.net/xlgen157387/article/details/78195817

執行緒間通訊簡介

我們知道執行緒是作業系統中獨立的個體,但是這個單獨的個體之間沒有一種特殊的處理方式使之成為一個整體,執行緒之間沒有任何交流和溝通的話,他就是一個個單獨的個體,不足以形成一個強大的互動性較強的整體。

為了提高CPU的利用率和各執行緒之間相互協作,Java的一種實現執行緒間通訊的機制是:wait/notify執行緒間通訊,下邊就一起學習一下這種執行緒間的通訊機制。

不使用等待/通知機制實現執行緒間通訊

假如,我們不使用下邊需要介紹的機制,那我們如何實現兩個執行緒之間的通訊哪,下邊看一段程式碼,實現的是兩個執行緒向一個List裡填充資料:

MyList程式碼:

public class MyList {

    private List list = new ArrayList();

    public void add() {
        list.add("我是元素");
    }

    public int size() {
        return list.size();
    }

}
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

執行緒A:

public class ThreadA
extends Thread {
private MyList list; public ThreadA(MyList list) { super(); this.list = list; } @Override public void run() { try { for (int i = 0; i < 10; i++) { list.add(); System.out.println("添加了" + (i + 1
) + "個元素"); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

執行緒B:

public class ThreadB extends Thread {

    private MyList list;

    public ThreadB(MyList list) {
        super();
        this.list = list;
    }

    @Override
    public void run() {
        try {
            while (true) {
                if (list.size() == 5) {
                    System.out.println("==5了,執行緒b要退出了!");
                    throw new InterruptedException();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

測試類Test:

public class Test {

    public static void main(String[] args) {
        MyList myList = new MyList();

        ThreadA a = new ThreadA(myList);
        a.setName("A");
        a.start();

        ThreadB b = new ThreadB(myList);
        b.setName("B");
        b.start();
    }
}
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

執行結果:

添加了1個元素
添加了2個元素
添加了3個元素
添加了4個元素
添加了5個元素
==5了,執行緒b要退出了!
java.lang.InterruptedException
    at text.ThreadB.run(ThreadB.java:20)
添加了6個元素
添加了7個元素
添加了8個元素
添加了9個元素
添加了10個元素
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

可以看出,當List集合中的資料為5個的時候執行緒B退出,雖然兩個執行緒之間實現了通訊,但是程式碼中我們的執行緒B是一直執行著while(true) 迴圈的,直到長度為5才終止執行,顯然這種方式是很消耗資源的。所以,就需要一種機制能避免上述的操作又能實現多個執行緒之間的通訊,這就是接下來需要學習的“wait/notify執行緒間通訊”

什麼是等待/通知機制

道理很簡單,就像我們去銀行辦業務,進門之後取票號,等到達的時候會廣播通知我們辦業務一樣,這就是很實際的一個場景,我們取了票號就需要等待,等業務員輪到票號的時候就會廣播通知。

Java中等待/通知機制的實現

Java中對應等待/通知的方法是wait()/notify(),這兩個方法都是超類Object中的方法,如下圖所示:

這裡寫圖片描述

之所以會是超類Object中的方法,我們可以簡單的理解:上幾篇文章中我們知道任何物件都可以作為鎖,而wait()/notify()是由鎖呼叫的,想到這裡自然可以體會到這裡設計的巧妙之處。

一、wait方法

(1)方法wait()的作用是使當前執行程式碼的執行緒進行等待,該方法會將該執行緒放入”預執行佇列“中,並且在wait()所在的程式碼處停止執行,直到接到通知或被中斷為止。

(2)在呼叫wait()之前,執行緒必須獲得該物件級別鎖,這是一個很重要的地方,很多時候我們可能會忘記這一點,即只能在同步方法或同步塊中呼叫wait()方法。

(3)還需要注意的是wait()是釋放鎖的,即在執行到wait()方法之後,當前執行緒會釋放鎖,當從wait()方法返回前,執行緒與其他執行緒競爭重新獲得鎖。

二、notify方法

(1)和wait()方法一樣,notify()方法也要在同步塊或同步方法中呼叫,即在呼叫前,執行緒也必須獲得該物件的物件級別鎖。

(2)該方法是用來通知那些可能等待該物件的物件鎖的其他執行緒,如果有多個執行緒等待,則由執行緒規劃器隨機挑選出其中一個呈wait狀態的執行緒,對其發出通知notify,並使它等待獲取該物件的物件鎖。

(3)這裡需要注意的是,執行notify方法之後,當前執行緒不會立即釋放其擁有的該物件鎖,而是執行完之後才會釋放該物件鎖,被通知的執行緒也不會立即獲得物件鎖,而是等待notify方法執行完之後,釋放了該物件鎖,才可以獲得該物件鎖。

(3)notifyAll()通知所有等待同一共享資源的全部執行緒從等待狀態退出,進入可執行狀態,重新競爭獲得物件鎖。

三、wait()/notify()方法總結

(1)wait()/notify()要集合synchronized關鍵字一起使用,因為他們都需要首先獲取該物件的物件鎖;

(2)wait方法是釋放鎖,notify方法是不釋放鎖的;

(3)執行緒的四種狀態如下圖:

這裡寫圖片描述

wait/notify執行緒間通訊示例程式碼

根據上述不使用wait/notify的程式碼改造如下:

MyList程式碼:

public class MyList {

    private static List list = new ArrayList();

    public static void add() {
        list.add("我是元素");
    }

    public static int size() {
        return list.size();
    }
}
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

執行緒A:

public class ThreadA extends Thread {

    private Object lock;

    public ThreadA(Object lock) {
        super();
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            synchronized (lock) {
                if (MyList.size() != 5) {
                    System.out.println("wait begin " + System.currentTimeMillis());
                    lock.wait();
                    System.out.println("wait end  " + System.currentTimeMillis());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

執行緒B:

public class ThreadB extends Thread {

    private Object lock;

    public ThreadB(Object lock) {
        super();
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            synchronized (lock) {
                for (int i = 0; i < 10; i++) {
                    MyList.add();
                    if (MyList.size() == 5) {
                        lock.notify();
                        System.out.println("已發出通知!");
                    }
                    System.out.println("添加了" + (i + 1) + "個元素!");
                    Thread.sleep(1000);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

測試程式碼:

public class Run {

    public static void main(String[] args) {
        try {
            Object lock = new Object();
            ThreadA a = new ThreadA(lock);
            a.start();
            Thread.sleep(50);
            ThreadB b = new ThreadB(lock);
            b.start();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

執行結果:

wait begin 1507634541467
添加了1個元素!
添加了2個元素!
添加了3個元素!
添加了4個元素!
已發出通知!
添加了5個元素!
添加了6個元素!
添加了7個元素!
添加了8個元素!
添加了9個元素!
添加了10個元素!
wait end  1507634551563
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

上述例項已經實現了簡單的等待通知機制,並且我們也可以看到,雖然執行緒B在第五個元素的時候發出通知,而執行緒A實現執行緒B執行完之後才獲得物件鎖,這也可以說明,wait方法是釋放鎖的而notify方法是不釋放鎖的。

另一個案例:使用wait/notify模擬BlockingQueue阻塞佇列

BlockingQueue是阻塞佇列,我們需要實現的是阻塞的放入和得到資料,設計思路如下:

(1)初始化佇列最大長度為5;
(2)需要新加入的時候,判斷是否長度為5,如果是5則等待插入;
(3)需要消費元素的時候,判斷是否為0,如果是0則等待消費;

實現程式碼如下:

public class MyQueue {

    //1、需要一個承裝元素的集合
    private final LinkedList<Object> list = new LinkedList<>();
    //2、需要一個計數器
    private final AtomicInteger count = new AtomicInteger(0);
    //3、需要指定上限和下限
    private final int maxSize = 5;
    private final int minSize = 0;

    //5、初始化鎖物件
    private final Object lock = new Object();

    /**
     * put方法
     */
    public void put(Object obj) {
        synchronized (lock) {
            //達到最大無法新增,進入等到
            while (count.get() == maxSize) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.add(obj); //加入元素
            count.getAndIncrement(); //計數器增加
            System.out.println(" 元素 " + obj + " 被新增 ");
            lock.notify(); //通知另外一個阻塞的執行緒方法
        }
    }

    /**
     * get方法
     */
    public Object get() {
        Object temp;
        synchronized (lock) {
            //達到最小,沒有元素無法消費,進入等到
            while (count.get() == minSize) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            count.getAndDecrement();
            temp = list.removeFirst();
            System.out.println(" 元素 " + temp + " 被消費 ");
            lock.notify();
        }
        return temp;
    }

    private int size() {
        return count.get();
    }

    public static void main(String[] args) throws Exception {

        final MyQueue myQueue = new MyQueue();
        initMyQueue(myQueue);

        Thread t1 = new Thread(() -> {
            myQueue.put("h");
            myQueue.put("i");
        }, "t1");

        Thread t2 = new Thread(() -> {
            try {
                Thread.sleep(2000);
                myQueue.get();
                Thread.sleep(2000);
                myQueue.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t2");

        t1.start();
        Thread.sleep(1000);
        t2.start();

    }

    private static void initMyQueue(MyQueue myQueue) {
        myQueue.put("a");
        myQueue.put("b");
        myQueue.put("c");
        myQueue.put("d");
        myQueue.put("e");
        System.out.println("當前元素個數:" + myQueue.size());
    }
}
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95

執行結果:

 元素 a 被新增 
 元素 b 被新增 
 元素 c 被新增 
 元素 d 被新增 
 元素 e 被新增 
當前元素個數:5
 元素 a 被消費 
 元素 h 被新增 
 元素 b 被消費 
 元素 i 被新增 
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

其他注意事項

(1)wait()和notify()方法要在同步塊或同步方法中呼叫,即在呼叫前,執行緒也必須獲得該物件的物件級別鎖。

(2)wait方法是釋放鎖,notify方法是不釋放鎖的;

(3)notify每次喚醒wait等待狀態的執行緒都是隨機的,且每次只喚醒一個;

(4)notifAll每次喚醒wait等待狀態的執行緒使之重新競爭獲取物件鎖,優先順序最高的那個執行緒會最先執行;

(5)當執行緒處於wait()狀態時,呼叫執行緒物件的interrupt()方法會出現InterruptedException異常;

其他知識點

(1)程序間的通訊方式:

管道(pipe)、有名管道(named pipe)、訊號量(semophore)、訊息佇列(message queue)、訊號(signal)、共享記憶體(shared memory)、套接字(socket);

(2)執行緒程間的通訊方式:

1、鎖機制
1.1 互斥鎖:提供了以排它方式阻止資料結構被併發修改的方法。
1.2 讀寫鎖:允許多個執行緒同時讀共享資料,而對寫操作互斥。
1.3 條件變數:可以以原子的方式阻塞程序,直到某個特定條件為真為止。

對條件測試是在互斥鎖的保護下進行的。條件變數始終與互斥鎖一起使用。

2、訊號量機制:包括無名執行緒訊號量與有名執行緒訊號量
3、訊號機制:類似於程序間的訊號處理。

執行緒間通訊的主要目的是用於執行緒同步,所以執行緒沒有象程序通訊中用於資料交換的通訊機制。