1. 程式人生 > >基於redis佇列實現的生產者消費者

基於redis佇列實現的生產者消費者

一.簡介

基於redis佇列的生產者消費者實現主要是利用redis的blpop或者brpop命令,以下是官方文件對這兩個命令的描述:

BLPOP 是列表的阻塞式(blocking)彈出原語。

它是 LPOP 命令的阻塞版本,當給定列表內沒有任何元素可供彈出的時候,連線將被 BLPOP 命令阻塞,直到等待超時或發現可彈出元素為止。

當給定多個 key 引數時,按引數 key 的先後順序依次檢查各個列表,彈出第一個非空列表的頭元素。

BRPOP的描述差不多,這裡就不重複了。

那麼有了這兩個命令,實現生產者消費者模式就有思路了,我們從外界資料來源不停的傳入資料到redis指定的list裡面,此時不管有沒有消費者,我們的資料是會儲存在list裡的。

然後消費者的程式只需要呼叫blpop命令,如果指定的list裡面有資料,就能從裡面取得list最左邊的資料;如果指定的list裡面沒有資料,那麼就會阻塞在那,直到list裡面來了新資料或者已經達到阻塞時間為止。

二.普通生產者消費者程式碼:

生產者我們就用自己生成的資料模仿。

public class RedisProducer {
    public static void main(String[] args) throws InterruptedException {
        Jedis jedis=JavaRedisUtils.getJedis();
        jedis.select(4);
        int count=0;
        while(count<100){
            Thread.sleep(300);
            jedis.lpush("mylist",String.valueOf(count));
            count++;
        }
        jedis.close();
    }
}
然後消費者得整合Thread類,重寫run方法,我們可以在run方法裡面寫一些對取出來的資料需要進行的業務操作,我這裡就是簡單的打印出來判斷是否取出資料。
public class Consumer extends Thread{
    String name;

    public Consumer(String name) {
        this.name = name;
    }

    @Override
    public void run(){
            Jedis jedis = JavaRedisUtils.getJedis();
        while(true) {

            jedis.select(4);
            //阻塞式brpop,List中無資料時阻塞
            //引數0表示一直阻塞下去,直到List出現數據
            List<String> list = jedis.blpop(0, "mylist");
            for(String s : list) {
                System.out.println(name+"   "+s);
            }
            jedis.close();

        }
    }
}

下面是程式的consumer執行類:

public class RedisConsumer {
    public static void main(String[] args) {
        Consumer mq1=new Consumer("mq1");
        Consumer mq2=new Consumer("mq2");
        mq1.start();
        mq2.start();

    }

下面是程式執行部分結果:


我們可以從結果中看到,我們的消費者是真的取到了資料並且在原始沒有資料的時候,我們的消費者是阻塞了的,直到新資料來臨才繼續取資料。

為了更加方便的觀看到生產者和消費者的程式執行情況,我們將從"mylist"中的消費資料利用redis的brpoplpush命令將資料從mylist消費到各個消費者自己名字的列表中。

下面是brpoplpush的解釋:

當列表 source 為空時, BRPOPLPUSH 命令將阻塞連線,直到等待超時,或有另一個客戶端對 source 執行 LPUSH 或 RPUSH 命令為止。

超時引數 timeout 接受一個以秒為單位的數字作為值。超時引數設為 0 表示阻塞時間可以無限期延長(block indefinitely) 。

返回值:
假如在指定時間內沒有任何元素被彈出,則返回一個 nil 和等待時長。反之,返回一個含有兩個元素的列表,第一個元素是被彈出元素的值,第二個元素是等待時長。
public class Consumer extends Thread{
    String name;

    public Consumer(String name) {
        this.name = name;
    }

    @Override
    public void run(){
        Jedis jedis = JavaRedisUtils.getJedis();
        jedis.select(4);
        while(true) { 
//呼叫brpoplpush方法 從mylist取出來然後放到對應name的list去
            String a=jedis.brpoplpush("mylist",name,0);
        }
    }
}

執行程式之後,redis庫中出現了mq1以及mq2的list,並且他們分別消費了mylist中的所有資料:


以及它們分別消費的數目:


三.在消費過程中新增加消費者

上面我們已經做過實驗了,它能夠做到生產者和消費者能做到的事情:當list沒有資料的時候,消費者會阻塞,當list新來資料的時候,它會接著進行消費。那麼當新來一個新的消費者的時候,它會有什麼變化呢?

新加入消費者的程式碼如下:

public class addconsumer {
    public static void main(String[] args) {
        Consumer mq3=new Consumer("mq3");
        mq3.start();
    }
}

下面我們先執行生產者,緊接著執行消費者mq1和mq2,等它們消費一段時間,並且生產者資料還在傳輸的時候,我們開啟消費者mq3。讓我們來看看結果會是怎麼樣。

redis資料庫中產生了三個列表。


它們分別的資料量為:


說明,當新加入消費者的時候,它會和其它兩個消費者內部競爭,然後一起消費沒有消費過的資料。

以上是redis佇列實現的消費者和生產者demo,希望可以給大家提供到幫助。