1. 程式人生 > >使用Redis實現輕量級延時佇列

使用Redis實現輕量級延時佇列

A:需求說明:

  1. 如果系統中需要用到定時執行計劃的,又不想用到中介軟體,如果輪詢資料庫的話,會導致大量資源消耗,這樣我們就可以使用Redis來實現類似功(需要使用rabbitMQ的請看這裡:https://blog.csdn.net/u010096717/article/details/82148681
  2. 業務型別,如訂單一些評論,如果48h使用者未對商家評論,系統會自動產生一條預設評論,還有排隊到時提醒等

B:實現思路:

  1. 將整個Redis當做訊息池,以kv形式儲存訊息,key為id,value為具體的訊息body
  2. 使用ZSET做優先佇列,按照score維持優先順序(用當前時間+需要延時的時間作為score)
  3. 輪詢ZSET,拿出score比當前時間戳大的資料(已過期的)
  4. 根據id拿到訊息池的具體訊息進行消費
  5. 消費成功,刪除改佇列和訊息
  6. 消費失敗,讓該訊息重新回到佇列

C:程式碼實現

  1. Message訊息封裝類
    @Data
    public class Message {
    
        /**
         * 訊息id
         */
        private String id;
        /**
         * 訊息延遲/毫秒
         */
        private long delay;
    
        /**
         * 訊息存活時間
         */
        private int ttl;
        /**
         * 訊息體,對應業務內容
         */
        private String body;
        /**
         * 建立時間,如果只有優先順序沒有延遲,可以設定建立時間為0
         * 用來消除時間的影響
         */
        private long createTime;
    
    }

2.基於redis的訊息佇列

@Component
public class RedisMQ {

    /**
     * 訊息池字首,以此字首加上傳遞的訊息id作為key,以訊息{@link Message}
     * 的訊息體body作為值儲存
     */
    public static final String MSG_POOL = "Message:Pool:";
    /**
     * zset佇列 名稱 queue
     */
    public static final String QUEUE_NAME = "Message:Queue:";

    private static final int SEMIH = 30*60;



    @Autowired
    private RedisService redisService;

    /**
     * 存入訊息池
     * @param message
     * @return
     */
    public boolean addMsgPool(Message message) {

        if (null != message) {
            return redisService.setExp(MSG_POOL + message.getId(), message.getBody(), Long.valueOf(message.getTtl() + SEMIH));
        }
        return false;
    }

    /**
     * 從訊息池中刪除訊息
     * @param id
     * @return
     */
    public void deMsgPool(String id) {
        redisService.remove(MSG_POOL + id);
    }

    /**
     * 向佇列中新增訊息
     * @param key
     * @param score 優先順序
     * @param val
     * @return 返回訊息id
     */
    public void enMessage(String key, long score, String val) {
        redisService.zsset(key,val,score);
    }

    /**
     * 從佇列刪除訊息
     * @param id
     * @return
     */
    public boolean deMessage(String key, String id) {
        return redisService.zdel(key, id);
    }
    
}

4.編寫訊息傳送(生產者)

@Component
public class MessageProvider {

    static Logger logger = LoggerFactory.getLogger(MessageProvider.class);


    private static int delay = 30;//30秒,可自己動態傳入

    @Resource
    private RedisMQ redisMQ;

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    //改造成redis
    public void sendMessage(String messageContent) {
        try {
            if (messageContent != null){
                String seqId = UUID.randomUUID().toString();
                // 將有效資訊放入訊息佇列和訊息池中
                Message message = new Message();
                // 可以新增延遲配置
                message.setDelay(delay*1000);
                message.setCreateTime(System.currentTimeMillis());
                message.setBody(messageContent);
                message.setId(seqId);
                // 設定訊息池ttl,防止長期佔用
                message.setTtl(delay + 360);
                redisMQ.addMsgPool(message);
                //當前時間加上延時的時間,作為score
                Long delayTime = message.getCreateTime() + message.getDelay();
                String d = sdf.format(message.getCreateTime());
                System.out.println("當前時間:" + d+",消費的時間:" + sdf.format(delayTime));
                redisMQ.enMessage(RedisMQ.QUEUE_NAME,delayTime, message.getId());
            }else {
                logger.warn("訊息內容為空!!!!!");
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

5.訊息消費者

@Component
public class RedisMQConsumer {

    @Resource
    private RedisMQ redisMQ;

    @Autowired
    private RedisService redisService;

    @Autowired
    private MessageProvider provider;

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    /**
     * 訊息佇列監聽器<br>
     *
     */
    @Scheduled(cron = "*/1 * * * * *")
    public void monitor() {
        Set<String> set = redisService.rangeByScore(RedisMQ.QUEUE_NAME, 0, System.currentTimeMillis());
        if (null != set) {
            long current = System.currentTimeMillis();
            for (String id : set) {
                long  score = redisService.getScore(RedisMQ.QUEUE_NAME, id).longValue();
                if (current >= score) {
                    // 已超時的訊息拿出來消費
                    String str = "";
                    try {
                        str = redisService.get(RedisMQ.MSG_POOL + id);
                        System.out.println("消費了:" + str+ ",消費的時間:" + sdf.format(System.currentTimeMillis()));
                    } catch (Exception e) {
                        e.printStackTrace();
                        //如果出了異常,則重新放回佇列
                        System.out.println("消費異常,重新回到佇列");
                        provider.sendMessage(str);
                    } finally {
                        redisMQ.deMessage(RedisMQ.QUEUE_NAME, id);
                        redisMQ.deMsgPool(id);
                    }
                }
            }
        }
    }
}

6.配置資訊

<!--1依賴引入-->
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>


2yml配置
spring:
  redis:
    database: 1
    host: 127.0.0.1
    port: 6379

以上程式碼已經實現了延遲消費功能,現在來測試一下,呼叫MessageProvider的sendMessage方法,我設定了30秒

可以看到結果

因為我們是用定時器去輪詢的,會出現誤差