1. 程式人生 > >手寫一個生產者--消費者模型例子

手寫一個生產者--消費者模型例子

在併發程式設計中,比較經典的程式設計例子就是生產者和消費者模型。下面就是一個例子來詮釋一下什麼是生產者和消費者以及他們的特點和注意點。

1、先定義一個數據物件,

public class Data {
    private String id;

    private String name;

    public Data(String id,String name){
        this.id = id;
        this.name = name;
    }
    public String getId() {
        return id;
    }

    public
void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "Data [id=" + id + ", name=" + name + "]"; } }

2.定義一個生產者,實現Runnable介面。

public class Provider implements Runnable{
    //共享緩衝區
    private BlockingQueue<Data> queue;

    //多執行緒間是否啟動變數,有強制從主記憶體中重新整理的功能,及時返回執行緒狀態
    private volatile boolean isRunning = true;
    //id生成器
    private static AtomicInteger count = new AtomicInteger();

    //隨機物件
    private static Random r = new
Random(); public Provider(BlockingQueue queue){ this.queue = queue; } @Override public void run() { while(isRunning){ //隨機休眠0-1000毫秒 表示獲取資料 try { Thread.sleep(r.nextInt(1000)); //獲取的資料進行累計 int id = count.incrementAndGet(); //比如通過一個getData()方法獲取了 Data data = new Data(Integer.toString(id),"資料"+id); System.out.println("當前執行緒:"+ Thread.currentThread().getName() + ",獲取了資料,id為:"+ id+ ",進行裝載到公共緩衝區中。。。"); if(!this.queue.offer(data,2,TimeUnit.SECONDS)){ System.out.print("提交緩衝區資料失敗"); } } catch (InterruptedException e) { e.printStackTrace(); } System.out.print("aaa"); } } public void stop(){ this.isRunning = false; } }

這裡有幾個注意點,一個就是對共享緩衝區的選擇,作為生產者–消費者模型而言,共享緩衝區一定要具備阻塞的能力。所以這邊選擇的是阻塞佇列。還有一個就是在併發程式設計的時候,如果需要使用類似i++這種id自增長的功能,需要使用Atomic包下的併發類。因為這些類是採用CAS設計的,不會產生併發問題。

3.消費者

public class Consumer implements Runnable {

    private BlockingQueue<Data> queue;

    public Consumer(BlockingQueue queu){
        this.queue = queu;
    }

    //隨機物件
    private static Random r = new Random();

    @Override
    public void run() {
        while(true){
            try{
                //獲取資料
                Data data = this.queue.take();
                //進行資料處理,休眠 0-1000毫秒模擬耗時
                Thread.sleep(r.nextInt(1000));
                System.out.print("當前消費執行緒"+Thread.currentThread().getName() +",消費成功,消費id為"+data.getId());
            }catch(InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}

消費者主要就是從阻塞佇列中獲取資料,如果佇列中沒有元素,則會釋放CPU,然後等待。(注意這裡使用的是take而不是poll,不同點在於take在沒有元素的時候會釋放CPU,而poll則是直接返回null)。

main函式:

public class Main {
    public static void main(String[] args){
        //記憶體緩衝區
        BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
        //生產者
        Provider p1 = new Provider(queue);
        Provider p2 = new Provider(queue);
        Provider p3 = new Provider(queue);

        Consumer c1 = new Consumer(queue);
        Consumer c2 = new Consumer(queue);
        Consumer c3 = new Consumer(queue);

        //建立執行緒池,這是一個快取的執行緒池,可以建立無窮大的執行緒,沒有任務的時候不建立執行緒,空閒執行緒存活的時間為60s。
        ExecutorService cachepool = Executors.newCachedThreadPool();
        cachepool.execute(p1);
        cachepool.execute(p2);
        cachepool.execute(p3);
        cachepool.execute(c1);
        cachepool.execute(c2);
        cachepool.execute(c3);
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        p1.stop();
        p2.stop();
        p3.stop();
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}