1. 程式人生 > >Java利用Redis實現消息隊列

Java利用Redis實現消息隊列

.get keys rpo throws max del 鍵值 先進先出 instance

應用場景

  • 為什麽要用redis?
    二進制存儲、java序列化傳輸、IO連接數高、連接頻繁

一、序列化

  這裏編寫了一個java序列化的工具,主要是將對象轉化為byte數組,和根據byte數組反序列化成java對象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 註意:每個需要序列化的對象都要實現Serializable接口;
其代碼如下:

技術分享
 1 package Utils;
 2 import java.io.*;
 3 /**
 4  * Created by Kinglf on 2016/10/17.
 5  */
 6 public class ObjectUtil {
 7     /**
 8      * 對象轉byte[]
 9      * @param obj
10      * @return
11      * @throws IOException
12      */
13     public static byte[] object2Bytes(Object obj) throws IOException{
14         ByteArrayOutputStream bo=new ByteArrayOutputStream();
15         ObjectOutputStream oo=new ObjectOutputStream(bo);
16         oo.writeObject(obj);
17         byte[] bytes=bo.toByteArray();
18         bo.close();
19         oo.close();
20         return bytes;
21     }
22     /**
23      * byte[]轉對象
24      * @param bytes
25      * @return
26      * @throws Exception
27      */
28     public static Object bytes2Object(byte[] bytes) throws Exception{
29         ByteArrayInputStream in=new ByteArrayInputStream(bytes);
30         ObjectInputStream sIn=new ObjectInputStream(in);
31         return sIn.readObject();
32     }
33 }
技術分享

二、消息類(實現Serializable接口)

技術分享
package Model;

import java.io.Serializable;

/**
 * Created by Kinglf on 2016/10/17.
 */
public class Message implements Serializable {

    private static final long serialVersionUID = -389326121047047723L;
    private int id;
    private String content;
    public Message(int id, String content) {
        this.id = id;
        this.content = content;
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getContent() {
        return content;
    }
    public void setContent(String content) {
        this.content = content;
    }
}
技術分享

三、Redis的操作

  利用redis做隊列,我們采用的是redis中list的push和pop操作;
結合隊列的特點:
只允許在一端插入新元素只能在隊列的尾部FIFO:先進先出原則 Redis中lpush頭入(rpop尾出)或rpush尾入(lpop頭出)可以滿足要求,而Redis中list藥push或 pop的對象僅需要轉換成byte[]即可
  java采用Jedis進行Redis的存儲和Redis的連接池設置
上代碼:

技術分享
package Utils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
 * Created by Kinglf on 2016/10/17.
 */
public class JedisUtil {
    private static String JEDIS_IP;
    private static int JEDIS_PORT;
    private static String JEDIS_PASSWORD;
    private static JedisPool jedisPool;
    static {
        //Configuration自行寫的配置文件解析類,繼承自Properties
        Configuration conf=Configuration.getInstance();
        JEDIS_IP=conf.getString("jedis.ip","127.0.0.1");
        JEDIS_PORT=conf.getInt("jedis.port",6379);
        JEDIS_PASSWORD=conf.getString("jedis.password",null);
        JedisPoolConfig config=new JedisPoolConfig();
        config.setMaxActive(5000);
        config.setMaxIdle(256);
        config.setMaxWait(5000L);
        config.setTestOnBorrow(true);
        config.setTestOnReturn(true);
        config.setTestWhileIdle(true);
        config.setMinEvictableIdleTimeMillis(60000L);
        config.setTimeBetweenEvictionRunsMillis(3000L);
        config.setNumTestsPerEvictionRun(-1);
        jedisPool=new JedisPool(config,JEDIS_IP,JEDIS_PORT,60000);
    }
    /**
     * 獲取數據
     * @param key
     * @return
     */
    public static String get(String key){
        String value=null;
        Jedis jedis=null;
        try{
            jedis=jedisPool.getResource();
            value=jedis.get(key);
        }catch (Exception e){
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        }finally {
            close(jedis);
        }
        return value;
    }

    private static void close(Jedis jedis) {
        try{
            jedisPool.returnResource(jedis);
        }catch (Exception e){
            if(jedis.isConnected()){
                jedis.quit();
                jedis.disconnect();
            }
        }
    }
    public static byte[] get(byte[] key){
        byte[] value = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            value = jedis.get(key);
        } catch (Exception e) {
            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返還到連接池
            close(jedis);
        }

        return value;
    }

    public static void set(byte[] key, byte[] value) {

        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.set(key, value);
        } catch (Exception e) {
            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返還到連接池
            close(jedis);
        }
    }

    public static void set(byte[] key, byte[] value, int time) {

        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.set(key, value);
            jedis.expire(key, time);
        } catch (Exception e) {
            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返還到連接池
            close(jedis);
        }
    }

    public static void hset(byte[] key, byte[] field, byte[] value) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.hset(key, field, value);
        } catch (Exception e) {
            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返還到連接池
            close(jedis);
        }
    }

    public static void hset(String key, String field, String value) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.hset(key, field, value);
        } catch (Exception e) {
            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返還到連接池
            close(jedis);
        }
    }

    /**
     * 獲取數據
     *
     * @param key
     * @return
     */
    public static String hget(String key, String field) {

        String value = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            value = jedis.hget(key, field);
        } catch (Exception e) {
            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返還到連接池
            close(jedis);
        }

        return value;
    }
    /**
     * 獲取數據
     *
     * @param key
     * @return
     */
    public static byte[] hget(byte[] key, byte[] field) {

        byte[] value = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            value = jedis.hget(key, field);
        } catch (Exception e) {
            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返還到連接池
            close(jedis);
        }

        return value;
    }
    public static void hdel(byte[] key, byte[] field) {

        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.hdel(key, field);
        } catch (Exception e) {
            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返還到連接池
            close(jedis);
        }
    }
    /**
     * 存儲REDIS隊列 順序存儲
     * @param  key reids鍵名
     * @param  value 鍵值
     */
    public static void lpush(byte[] key, byte[] value) {

        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.lpush(key, value);
        } catch (Exception e) {
            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返還到連接池
            close(jedis);
        }
    }

    /**
     * 存儲REDIS隊列 反向存儲
     * @param  key reids鍵名
     * @param  value 鍵值
     */
    public static void rpush(byte[] key, byte[] value) {

        Jedis jedis = null;
        try {

            jedis = jedisPool.getResource();
            jedis.rpush(key, value);

        } catch (Exception e) {

            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {

            //返還到連接池
            close(jedis);

        }
    }

    /**
     * 將列表 source 中的最後一個元素(尾元素)彈出,並返回給客戶端
     * @param  key reids鍵名
     * @param  destination 鍵值
     */
    public static void rpoplpush(byte[] key, byte[] destination) {

        Jedis jedis = null;
        try {

            jedis = jedisPool.getResource();
            jedis.rpoplpush(key, destination);

        } catch (Exception e) {

            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {

            //返還到連接池
            close(jedis);

        }
    }

    /**
     * 獲取隊列數據
     * @param  key 鍵名
     * @return
     */
    public static List lpopList(byte[] key) {

        List list = null;
        Jedis jedis = null;
        try {

            jedis = jedisPool.getResource();
            list = jedis.lrange(key, 0, -1);

        } catch (Exception e) {

            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {

            //返還到連接池
            close(jedis);

        }
        return list;
    }
    /**
     * 獲取隊列數據
     * @param  key 鍵名
     * @return
     */
    public static byte[] rpop(byte[] key) {

        byte[] bytes = null;
        Jedis jedis = null;
        try {

            jedis = jedisPool.getResource();
            bytes = jedis.rpop(key);

        } catch (Exception e) {

            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {

            //返還到連接池
            close(jedis);

        }
        return bytes;
    }
    public static void hmset(Object key, Map hash) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.hmset(key.toString(), hash);
        } catch (Exception e) {
            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {
            //返還到連接池
            close(jedis);

        }
    }
    public static void hmset(Object key, Map hash, int time) {
        Jedis jedis = null;
        try {

            jedis = jedisPool.getResource();
            jedis.hmset(key.toString(), hash);
            jedis.expire(key.toString(), time);
        } catch (Exception e) {
            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {
            //返還到連接池
            close(jedis);

        }
    }
    public static List hmget(Object key, String... fields) {
        List result = null;
        Jedis jedis = null;
        try {

            jedis = jedisPool.getResource();
            result = jedis.hmget(key.toString(), fields);

        } catch (Exception e) {
            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {
            //返還到連接池
            close(jedis);

        }
        return result;
    }

    public static Set hkeys(String key) {
        Set result = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            result = jedis.hkeys(key);

        } catch (Exception e) {
            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {
            //返還到連接池
            close(jedis);

        }
        return result;
    }
    public static List lrange(byte[] key, int from, int to) {
        List result = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            result = jedis.lrange(key, from, to);

        } catch (Exception e) {
            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {
            //返還到連接池
            close(jedis);

        }
        return result;
    }
    public static Map hgetAll(byte[] key) {
        Map result = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            result = jedis.hgetAll(key);
        } catch (Exception e) {
            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {
            //返還到連接池
            close(jedis);
        }
        return result;
    }

    public static void del(byte[] key) {

        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.del(key);
        } catch (Exception e) {
            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返還到連接池
            close(jedis);
        }
    }

    public static long llen(byte[] key) {

        long len = 0;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.llen(key);
        } catch (Exception e) {
            //釋放redis對象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返還到連接池
            close(jedis);
        }
        return len;
    }
}
技術分享

四、Configuration主要用於讀取Redis的配置信息

技術分享
package Utils;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
 * Created by Kinglf on 2016/10/17.
 */
public class Configuration extends Properties {

    private static final long serialVersionUID = -2296275030489943706L;
    private static Configuration instance = null;

    public static synchronized Configuration getInstance() {
        if (instance == null) {
            instance = new Configuration();
        }
        return instance;
    }


    public String getProperty(String key, String defaultValue) {
        String val = getProperty(key);
        return (val == null || val.isEmpty()) ? defaultValue : val;
    }

    public String getString(String name, String defaultValue) {
        return this.getProperty(name, defaultValue);
    }

    public int getInt(String name, int defaultValue) {
        String val = this.getProperty(name);
        return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
    }

    public long getLong(String name, long defaultValue) {
        String val = this.getProperty(name);
        return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
    }

    public float getFloat(String name, float defaultValue) {
        String val = this.getProperty(name);
        return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
    }

    public double getDouble(String name, double defaultValue) {
        String val = this.getProperty(name);
        return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
    }

    public byte getByte(String name, byte defaultValue) {
        String val = this.getProperty(name);
        return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
    }

    public Configuration() {
        InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml");
        try {
            this.loadFromXML(in);
            in.close();
        } catch (IOException ioe) {

        }
    }
}
技術分享

五、測試

技術分享
import Model.Message;
import Utils.JedisUtil;
import Utils.ObjectUtil;
import redis.clients.jedis.Jedis;

import java.io.IOException;

/**
 * Created by Kinglf on 2016/10/17.
 */
public class TestRedisQueue {
    public static byte[] redisKey = "key".getBytes();
    static {
        try {
            init();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void init() throws IOException {
        for (int i = 0; i < 1000000; i++) {
            Message message = new Message(i, "這是第" + i + "個內容");
            JedisUtil.lpush(redisKey, ObjectUtil.object2Bytes(message));
        }

    }

    public static void main(String[] args) {
        try {
            pop();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void pop() throws Exception {
        byte[] bytes = JedisUtil.rpop(redisKey);
        Message msg = (Message) ObjectUtil.bytes2Object(bytes);
        if (msg != null) {
            System.out.println(msg.getId() + "----" + msg.getContent());
        }
    }
}
技術分享
每執行一次pop()方法,結果如下:
<br>1----這是第1個內容
<br>2----這是第2個內容
<br>3----這是第3個內容
<br>4----這是第4個內容

總結

至此,整個Redis消息隊列的生產者和消費者代碼已經完成

  1. Message 需要傳送的實體類(需實現Serializable接口)
  2. Configuration Redis的配置讀取類,繼承自Properties
  3. ObjectUtil 將對象和byte數組雙向轉換的工具類
  4. Jedis 通過消息隊列的先進先出(FIFO)的特點結合Redis的list中的push和pop操作進行封裝的工具類

Java利用Redis實現消息隊列