1. 程式人生 > >Java執行緒實現Redis任務佇列(生產者消費者)

Java執行緒實現Redis任務佇列(生產者消費者)

注:接上篇IDEA整合Redis,本篇實現Redis的任務佇列,Redis連線池具體配置看上篇。

一:寫一個Jedis的工具類JedisUtil,將Jedis中的部分方法實現,程式碼如下:

package com.wq.Util;
import com.wq.RedisPool.RedisPool;
import redis.clients.jedis.Jedis;
import java.util.List;
public class JedisUtil {
    private static Jedis jedis = null;
/**
     * 儲存REDIS佇列 順序儲存
     * 
@param key 位元組型別 * @param value 位元組型別 */ public static void lpush(byte[] key,byte[] value){ try { jedis = RedisPool.getJedis(); jedis.lpush(key,value); } catch (Exception e) { e.printStackTrace(); }finally { RedisPool.returnResource(jedis); } } /**
* 儲存REDIS佇列 反序儲存 * @param key 位元組型別 * @param value 位元組型別 */ public static void rpush(byte[] key,byte[] value){ try { jedis = RedisPool.getJedis(); jedis.rpush(key,value); } catch (Exception e) { e.printStackTrace(); }finally { RedisPool.returnResource
(jedis); } } /** * 移除列表的最後一個元素,並將該元素新增到另一個列表並返回,就可以實現任務佇列 * @param srckey 原佇列的key * @param dstkey 目標佇列的key */ public static byte[] rpoplpush(byte[] srckey,byte[] dstkey){ byte[] value = null; try { jedis = RedisPool.getJedis(); value= jedis.rpoplpush(srckey,dstkey); } catch (Exception e) { e.printStackTrace(); }finally { RedisPool.returnResource(jedis); } return value; } /** * 從列表中彈出一個值,將彈出的元素插入到另外一個列表中並返回它; 如果列表沒有元素會阻塞列表直到等待超時或發現可彈出元素為止。 * @param srckey * @param dstkey * @param timout * @return */ public static byte[] brpoplpush(byte[] srckey,byte[] dstkey,int timout){ byte[] value = null; try { jedis = RedisPool.getJedis(); value = jedis.brpoplpush(srckey,dstkey,timout); } catch (Exception e) { e.printStackTrace(); } finally { RedisPool.returnResource(jedis); } return value; } /** * 設定實現任務佇列的鍵和過期時間 * @param key * @param timeout */ public static List<byte[]> brpop(byte[] key, int timeout){ List<byte[]> result = null; try { jedis = RedisPool.getJedis(); result=jedis.brpop(0,key); } catch (Exception e) { e.printStackTrace(); } finally { RedisPool.returnResource(jedis); } return result; } /** * 移除佇列中的最後一個元素並顯示最後一個元素 * @param key * @return */ public static byte[] rpop(byte[] key) { byte[] bytes = null; try { jedis = RedisPool.getJedis(); bytes = jedis.rpop(key); } catch (Exception e) { e.printStackTrace(); } finally { RedisPool.returnResource(jedis); } return bytes; } }
二:寫一個實體類MessageUtil,實現存入Redis中的是物件,不是單單的基本型別,存入Redis中的物件需要實現序列化介面,程式碼如下:
package com.wq.Util;
import java.io.Serializable;
public class MessageUtil implements Serializable{
    private static final long serialVersionUID = -8785806144878640550L;
    private int id;
    private String content;
    public int getId() {
        return id;
}
    public void setId(int id) {
        this.id = id;
}
    public String getContent() {
        return content;
}
    public void setContent(String content) {
        this.content = content;
}
}
三:使用兩個Redis列表,一個佇列作為生成者,一個佇列作為消費者,加上執行緒實現兩個列表,一個列表產生任務,通過任務佇列,另一個列表處理任務,程式碼如下:

1.使用jedis中的flushAll方法,將Redis資料庫中的所有key清空:


控制檯資訊說明,兩個佇列都不存在了;

2.再使用initList類,建立一個列表,並將物件序列化過後存入列表,並有執行緒持續產生物件並插入列表,程式碼如下:

package com.wq.Util;
import com.wq.RedisPool.RedisPool;
import redis.clients.jedis.Jedis;
public class initList {
    public static byte[] rediskey = "key".getBytes();
    public static byte[] dstkey = "dstkey".getBytes();
    public static long time=0;
    public static int i=0;
    public static void main(String args[]) {
        Jedis jedis = RedisPool.getJedis();
      while (true){
          try {
              MessageUtil msg1 = new MessageUtil();
msg1.setId(i);
msg1.setContent("wq"+i);
JedisUtil.lpush(rediskey,SerialoizebleUtil.serialize(msg1));
time=2000;
System.out.println("success"+i);
System.out.println(jedis.lrange(rediskey,0,100));
i++;
Thread.sleep(time);
} catch (InterruptedException e) {
              e.printStackTrace();
}
      }
    }
}
控制檯輸出如下,說明一直在向列表中插入新產生的物件,這裡插入了6個物件後,停止執行緒:

2.再開啟一個執行緒,使用Redis中的brpoplpush方法,實現任務佇列原理,程式碼如下:

package com.wq.Util;
import com.wq.RedisPool.RedisPool;
import junit.framework.TestCase;
import redis.clients.jedis.Jedis;
public class JedisUtilTest extends TestCase {
    public static byte[] rediskey = "key".getBytes();
    public static byte[] dstkey = "dstkey".getBytes();
    public static long time=0;
    public static void main(String args[]){
        Jedis jedis = RedisPool.getJedis();
        while (true) {
            try {
                byte[] bytes = JedisUtil.brpoplpush(rediskey, dstkey, 0);
MessageUtil msg = (MessageUtil) SerialoizebleUtil.unSerialize(bytes);
                if (msg != null) {
                    System.out.println(msg.getId() + " " + msg.getContent());
}
                time=3000;
System.out.println(jedis.lrange(rediskey,0,100));
System.out.println(jedis.lrange(dstkey,0,100));
Thread.sleep(time);
} catch (InterruptedException e) {
                e.printStackTrace();
}
        }
    }
}
控制檯輸出如下:


上面的列表存的是剛才產生的6個物件,下面圈出來的是新的列表,可以看出新的列表的物件在遞增,說明成功實現了任務佇列原理,下面是全部完成了的圖片


這裡將兩個執行緒同時啟動,更容易理解任務佇列以及生產者消費者概念。

To  be continue..........