1. 程式人生 > >面試官讓我手寫一個生產者消費者模式

面試官讓我手寫一個生產者消費者模式

不知道你是否遇到過面試官讓你手寫生產者消費者程式碼。別說,前段時間有小夥伴還真的遇到了這種情況。當時是一臉懵逼。

但是,俗話說,從哪裡跌倒就要從哪裡爬起來。既然這次被問到了,那就回去好好研究一下,爭取下一次不再被虐唄。

於是,今天我決定手敲一個生產者消費者模式壓壓驚。(因為我也不想以後被面試官血虐啊)

生產者消費者模式,其實很簡單。無非就是生產者不停的生產資料,消費者不停的消費資料。(這不廢話嗎,字面意思我也知道啊)

咳咳。其實,我們可以拿水池來舉例。

比如,現在要用多個注水管往水池裡邊注水,那這些注水管就認為是生產者。從水池裡邊抽水的抽水管就是消費者。水池本身就是一個緩衝區,用於生產者消費者之間的通訊。

好的,跟著我的思路。

既然生產者是生產資料的,那總得定義一個數據類吧(Data)

public class Data {
    private int id;
    private int num;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }

    public Data(int id, int num) {
        this.id = id;
        this.num = num;
    }

    public Data() {

    }
}

以上資料,假設注水管每次注水的id和注水容量num(單位是升)都是遞增的。並且,單次出水管的出水量和注水管的注水量是一一對應的。

生產者的類Producer和消費者類Consumer內部都需要維護一個阻塞佇列,來儲存緩衝區的資料。

public class Producer implements Runnable{
    //共享阻塞佇列
    private BlockingDeque<Data> queue;
    //是否還在執行
    private volatile boolean isRunning = true;
    //id生成器
    private static AtomicInteger count = new AtomicInteger();
    //生成隨機數
    private static Random random = new Random();

    public Producer(BlockingDeque<Data> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while(isRunning){
                //模擬注水耗時
                Thread.sleep(random.nextInt(1000));
                int num = count.incrementAndGet();
                Data data = new Data(num, num);
                System.out.println("當前>>注水管:"+Thread.currentThread().getName()+"注水容量(L):"+num);
                if(!queue.offer(data,2, TimeUnit.SECONDS)){
                    System.out.println("注水失敗...");
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void stop(){
        isRunning = false;
    }
}

消費者:

public class Consumer implements Runnable{

    private BlockingDeque<Data> queue ;

    private static Random random = new Random();

    public Consumer(BlockingDeque<Data> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true){
            try {
                Data data = queue.take();
                //模擬抽水耗時
                Thread.sleep(random.nextInt(1000));
                if(data != null){
                    System.out.println("當前<<抽水管:"+Thread.currentThread().getName()+",抽取水容量(L):"+data.getNum());
                }
            }catch (Exception e){
                e.printStackTrace();
            }

        }
    }
}

測試類,假設有三個注水管和三個出水管(即六個執行緒)同時執行。等一定時間後,所有注水管停止注水,則當水池空(阻塞佇列為空)的時候,出水管也將不再出水。

public class TestProC {
    public static void main(String[] args) throws InterruptedException {

        BlockingDeque<Data> queue = new LinkedBlockingDeque<>(10);

        Producer producer1 = new Producer(queue);
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);

        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);
        Consumer consumer3 = new Consumer(queue);

        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer1);
        service.execute(consumer2);
        service.execute(consumer3);

        Thread.sleep(3000);
        producer1.stop();
        producer2.stop();
        producer3.stop();

        Thread.sleep(1000);
        service.shutdown();
    }
}

執行結果如下:

到最後一次注水20L的時候,所有注水管都停止注水了,但此時水池還沒空。於是,所有出水管繼續消費水資源,直到最後20L也被消費完。

以上,就是一個典型的生產者消費者模式。

可以看到,這種模式有很多優點:

1)可以解耦消費者和生產者,因為它們是兩個不同的類,互相之間不會產生影響。

2)支援併發。生產者只管生產資料就行了,生產完直接把資料丟到緩衝區,而不需要等消費者消費完資料才可以生產下一個資料。否則會造成阻塞,從而影響效率。

3)允許生產者和消費者有不同的處理速度。如,當生產者生產資料比較快的時候,會把消費者還沒來得及處理的資料先放到緩衝區。等有空閒的消費者了,再去緩衝區拿去資料。

另外,以上的緩衝區,我們一般會使用阻塞佇列。就像上邊用的LinkedBlockingDeque。

這樣,當佇列滿的時候,會阻塞生產者繼續往佇列新增資料,直到有消費者來消費了佇列中的資料。當佇列空的時候,也會阻塞消費者從佇列獲取資料,直到有生產者把資料放入到佇列中。

阻塞佇列最好使用有界佇列(程式碼中指定的容量為10)。因為,如果生產者的速度遠遠大於消費者時,就會有可能造成佇列的元素一直增加,直到記憶體耗盡。當然,這也需要看實際的業務情況。如果能保證生產者的數量在可控範圍內,不會給記憶體造成壓力,用無界佇列,也未嘗不可