1. 程式人生 > >生產者/消費者模式實現

生產者/消費者模式實現

  wait/notify最經典的案例就是"生產者/消費者"模式。但是此模式有一些需要注意的地方。

 

1. 一個簡單的生產者消費者

  一個執行緒向集合中新增元素,兩個集合從集合中刪除元素,與之前等待/通知部落格的最後一個案例類似。

package cn.qlq.thread.seven;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 利用之前的等待/通知實現一個簡單的生產者消費者
 * 
 * 
@author Administrator * */ public class Demo1 { private static final Logger LOGGER = LoggerFactory.getLogger(Demo1.class); public static void main(String[] args) throws InterruptedException { final List<String> list = new ArrayList<String>(); // 刪除元素執行緒1
Thread sub1 = new Thread(new Runnable() { @Override public void run() { try { synchronized (list) { while (true) { while (list.size() == 0) { list.wait(); } LOGGER.info(
"list.remove ->{}, threadName->{}", list.get(0), Thread.currentThread().getName()); list.remove(0); list.notifyAll(); } } } catch (InterruptedException e) { e.printStackTrace(); } } }, "sub1"); sub1.start(); // 刪除元素執行緒2 Thread sub2 = new Thread(new Runnable() { @Override public void run() { try { synchronized (list) { while (true) { while (list.size() == 0) { list.wait(); } LOGGER.info("list.remove ->{}, threadName->{}", list.get(0), Thread.currentThread().getName()); list.remove(0); list.notifyAll(); } } } catch (InterruptedException e) { e.printStackTrace(); } } }, "sub2"); sub2.start(); // 增加元素執行緒 Thread.sleep(1 * 1000); Thread addThread = new Thread(new Runnable() { @Override public void run() { try { for (int i = 0; i < 5; i++) { synchronized (list) { list.add(i + ""); LOGGER.info("新增元素->{},threadName->{}", i, Thread.currentThread().getName()); list.notifyAll(); list.wait(); } } } catch (InterruptedException e) { LOGGER.error("InterruptedException error", e); } } }, "B"); addThread.start(); } }

 

結果:

18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 新增元素->0,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->0, threadName->sub2
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 新增元素->1,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->1, threadName->sub1
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 新增元素->2,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->2, threadName->sub2
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 新增元素->3,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->3, threadName->sub1
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 新增元素->4,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->4, threadName->sub2

 

2. 多生產與多消費:操作值-假死

  假死的現象其實就是進入waiting狀態。如果全部執行緒都進入waiting狀態,則程式就不再執行任何業務功能了,整個專案呈停止狀態。

  例如兩個生產者兩個消費者最後處於假死狀態的例子:

package cn.qlq.thread.seven;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sun.util.logging.resources.logging;

/**
 * 多生產與多消費:操作值-假死( 多生產與多消費保證只有一個元素生產與消費)
 * 
 * @author Administrator
 *
 */
public class Demo2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo2.class);

    public static void main(String[] args) throws InterruptedException {
        final List<String> list = new ArrayList<String>();

        // 刪除元素執行緒1
        Thread sub1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronized (list) {
                        while (true) {
                            while (list.size() == 0) {
                                LOGGER.info("進入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }

                            LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                    Thread.currentThread().getName());
                            list.remove(0);
                            list.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "sub1");
        sub1.start();

        // 刪除元素執行緒2
        Thread sub2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronized (list) {
                        while (true) {
                            while (list.size() == 0) {
                                LOGGER.info("進入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }

                            LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                    Thread.currentThread().getName());
                            list.remove(0);
                            list.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "sub2");
        sub2.start();

        // 增加元素執行緒
        Thread.sleep(1 * 1000);
        Thread addThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 50000; i++) {
                        synchronized (list) {
                            while (list.size() != 0) {
                                LOGGER.info("進入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }
                            list.add(i + "");
                            LOGGER.info("新增元素->{},threadName->{}", i, Thread.currentThread().getName());
                            list.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    LOGGER.error("InterruptedException error", e);
                }
            }
        }, "add1");
        addThread.start();

        // 增加元素執行緒
        Thread.sleep(1 * 1000);
        Thread addThread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 50000; i++) {
                        synchronized (list) {
                            while (list.size() != 0) {
                                LOGGER.info("進入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }
                            list.add(i + "");
                            LOGGER.info("新增元素->{},threadName->{}", Thread.currentThread().getName());
                            list.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    LOGGER.error("InterruptedException error", e);
                }
            }
        }, "add2");
        addThread2.start();

        Thread.sleep(10 * 1000);
        LOGGER.info("sub1 state->{}", sub1.getState());
        LOGGER.info("sub2 state->{}", sub2.getState());
        LOGGER.info("add1 state->{}", addThread.getState());
        LOGGER.info("add2 state->{}", addThread2.getState());
    }
}

 

結果:

18:49:23 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->sub1
18:49:23 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->sub2
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 新增元素->0,threadName->add1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->add1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 退出等待***,threadName->sub1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] list.remove ->0, threadName->sub1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->sub1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 退出等待***,threadName->sub2
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->sub2
18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 新增元素->add2,threadName->{}
18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 退出等待***,threadName->add1
18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->add1
18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 進入等待***,threadName->add2
18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] sub1 state->WAITING
18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] sub2 state->WAITING
18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] add1 state->WAITING
18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] add2 state->WAITING

  

解釋一下上面的執行緒假死的原因:

  由於喚醒執行緒呼叫的是notify()喚醒單個執行緒,所以有可能喚醒的是同類的執行緒,也就是生產者喚醒的是生產者,消費者喚醒的是消費者。導致最後四個執行緒都處於waiting狀態。

 

解決辦法:

  喚醒的時候採用notifyAll()喚醒所有的執行緒喚醒所有的執行緒,避免喚醒同類執行緒。