1. 程式人生 > >生產者消費者和虛假喚醒

生產者消費者和虛假喚醒

正常 而不是 string lan log cond 問題 釋放 UC

1 定義

虛假喚醒,即spurious wakeups。wait需要在while循環內使用,原因就是因為存在虛假喚醒。

2 Monitor

還是放上這個神圖來復習下線程間通信
技術分享圖片

  • 線程在競爭鎖失敗的情況下會放到Entry Set中,圖中2表示線程可以獲取鎖
  • 獲取到鎖的線程可以調用wait方法,讓線程阻塞,此時線程被放到了Wait Set中,如圖中3所示;Wait Set中的線程在時間到或者被notify後可以競爭鎖,如圖中4所示
  • Wait Set中的線程在獲取到鎖後才可以繼續執行。
  • notify會喚醒Wait Set中的一個線程來競爭鎖,notifyAll會喚醒Wait Set中全部的線程,但是只有一個線程能獲取到鎖,每個線程會依次去獲取鎖,運行一次。

3 虛假喚醒

我們以生產者消費者問題來舉例幾種情況,假設在wait在if中而不是在while中

1)情況1 稍微復雜點

  1. 有一個生產者p,兩個消費者c1、c2,一個隊列queue
  2. c1先執行,由於queue中為0,所以c1調用wait線程阻塞,線程放到了Wait Set中
  3. p生產了一個消費,放到queue中
  4. 在p調用notify之前,c2開始執行,需要競爭queue的鎖,所以c2在Entry Set等待競爭鎖
  5. p生產完成,調用notify,c1收到被喚醒後,從Wait Set競爭鎖,註意此時c2也在競爭鎖。
  6. c2從Entry Set先競爭到鎖,然後消費了queue中的消息,此時queue大小為0
  7. c2執行完後,釋放鎖,此時c1競爭到鎖,從queue中消費消息,由於queue目前大小為0,所以從queue為0的隊列中訪問是非法的。

2)情況2 稍微簡單點

  1. 有一個生產者p,兩個消費者c1、c2,一個隊列queue
  2. c1、c2先啟動,由於queue是空,所以分別調用wait,c1、c2都進入Wait Set
  3. 之後p生產了一個消息到queue中,然後調用notifyall,c1和c2都被喚醒。
  4. c1競爭到鎖,消費一個消息,queue大小為0,完成後釋放鎖
  5. c2競爭到鎖,消費一條消息,queue大小是-1,拋出異常。

3)情況3

  1. 假設有兩種情況會引起消費者阻塞
  2. c1是由於條件1,調用了wait;c2是由於條件2調用了wait;
  3. p生產了一條消息,然後滿足了條件1,調用notify,卻喚醒了c2
  4. 由於是使用的if,c2沒有再判斷是不是條件2被滿足了,所以就直接獲取到鎖,造成錯誤

4)情況4

  1. wait可能被interrupt等不被喚醒就繼續執行

4 舉例

我們以情況2來舉例

public class ProducerConsumer {
    public static void main(String[] args) throws Exception{
        new ProducerConsumer().test();
    }

    public void test() throws InterruptedException {
        Queue<Integer> queue = new LinkedList<>();
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue, 0);
        Consumer consumer1 = new Consumer(queue, 1);


        ExecutorService executor = Executors.newFixedThreadPool(6);
        executor.submit(consumer);
        executor.submit(consumer1);
        Thread.sleep(5000);
        executor.submit(producer);


    }

    class Producer implements Runnable {
        private Queue<Integer> queue;

        public Producer(Queue<Integer> queue) {
            this.queue = queue;
        }
        @Override
        public void run(){
            try {
                synchronized (queue) {
                    Integer time = new Random().nextInt(100);
                    queue.add(time);
                    System.out.println("producer notifyall");
                    queue.notifyAll();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    class Consumer implements Runnable {
        private Queue<Integer> queue;
        private int index;

        public Consumer(Queue<Integer> queue, int i) {
            this.queue = queue;
            this.index = i;
        }

        @Override
        public void run() {
            try {
                Thread.currentThread().setName("consumerThread_" + index);
                while (true) {
                    synchronized (queue) {
                        if (queue.size() <= 0) {
                            try {
                                System.out.println("consumer wait:" + Thread.currentThread().getName());
                                queue.wait();
                                System.out.println("consumer wait2:" + Thread.currentThread().getName());
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        System.out.println("gogogo:" + Thread.currentThread().getName());
                        Integer time = new Random().nextInt(100);
                        try {
                            Thread.sleep(time);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("consumer remove: " + queue.remove() + ": " + Thread.currentThread().getName());
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}

運行結果

consumer wait:consumerThread_0
consumer wait:consumerThread_1
producer notifyall
consumer wait2:consumerThread_1
gogogo:consumerThread_1
consumer remove: 99: consumerThread_1
consumer wait:consumerThread_1
consumer wait2:consumerThread_0
gogogo:consumerThread_0
java.util.NoSuchElementException
    at java.util.LinkedList.removeFirst(LinkedList.java:270)
    at java.util.LinkedList.remove(LinkedList.java:685)
    at com.hxy.ProducerConsumer$Consumer.run(ProducerConsumer.java:85)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

上述代碼中,兩個消費者先啟動,由於queue.size 小於等於0 ,所以兩個消費者都調用wait,放到了Wait Set中;5s後生產者往隊列裏發送了一條消息,然後調用notifyAll,兩個消費者都被喚醒,由於沒有再判斷是否滿足條件,所以分別獲取鎖去消費,造成第二個消費者拋出NoSuchElementException異常。

將上述代碼中消費者的if判斷修改為while便會正常,運行結果如下:

consumer wait:consumerThread_0
consumer wait:consumerThread_1
producer notifyall
consumer wait2:consumerThread_1
gogogo:consumerThread_1
consumer remove: 53: consumerThread_1
consumer wait:consumerThread_1
consumer wait2:consumerThread_0
consumer wait:consumerThread_0

consumerThread_0競爭到鎖後,會從while判斷queue.size大小,由於queue大小為0,所以繼續wait。

另外,可能會想到,在while使用wait,對於情況3這種情況,如果調用notifyall,會喚醒其他不相關的線程,而這些線程需要重新判斷,然後再調用wait,這顯然是一種資源浪費。針對這種情況,我們可以使用Condition,只喚醒相關的線程。

5 生產者消費者

下面附上正確的消費者生產者

public class ProducerConsumer {
    public static void main(String[] args) throws Exception{
        new ProducerConsumer().test();
    }

    public void test() throws InterruptedException {
        Queue<Integer> queue = new LinkedList<>();
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue, 0);
        Consumer consumer1 = new Consumer(queue, 1);

        ExecutorService executor = Executors.newFixedThreadPool(6);
        executor.submit(consumer);
        executor.submit(consumer1);
        executor.submit(producer);


    }

    class Producer implements Runnable {
        private Queue<Integer> queue;

        public Producer(Queue<Integer> queue) {
            this.queue = queue;
        }
        @Override
        public void run(){
            while (true) {
                try {
                    synchronized (queue) {
                        while (queue.size() >= 10) { // 防止虛假喚醒
                            queue.wait();
                        }
                        Integer time = new Random().nextInt(100);

                        Thread.sleep(time);

                        System.out.println("producer add:" + time);
                        queue.add(time);
                        queue.notifyAll();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Consumer implements Runnable {
        private Queue<Integer> queue;
        private int index;

        public Consumer(Queue<Integer> queue, int i) {
            this.queue = queue;
            this.index = i;
        }

        @Override
        public void run() {
            try {
                Thread.currentThread().setName("consumerThread_" + index);
                while (true) {
                    synchronized (queue) {
                        while (queue.size() <= 0) { // 防止虛假喚醒
                            queue.wait();
                        }
                        Integer time = new Random().nextInt(100);

                        Thread.sleep(time);

                        System.out.println("consumer remove: " + queue.remove() + ": " + Thread.currentThread().getName());
                        queue.notifyAll(); // 也有可能喚醒另一個consumer中的queue.wait
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}

生產者消費者和虛假喚醒