1. 程式人生 > >Redis(二)延遲隊列

Redis(二)延遲隊列

sem and con fastjson this val oid uek json

1.目錄 

  • 延遲隊列
  • 進一步優化

2.延遲隊列

package com.redis;

import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import redis.clients.jedis.Jedis;

public class RedisDelayingQueue<T> {
    
static class TaskItem<T> { public String id; public T msg; } // fastjson 序列化對象中存在 generic 類型時,需要使用 TypeReference private Type TaskType = new TypeReference<TaskItem<T>>() { }.getType(); private Jedis jedis; private String queueKey;
public RedisDelayingQueue(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } public void delay(T msg) { TaskItem<T> task = new TaskItem<T>(); task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid task.msg = msg; String s
= JSON.toJSONString(task); // fastjson 序列化 jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延時隊列 ,5s 後再試 } public void loop() { while (!Thread.interrupted()) { // 只取一條 Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis()); if (values.isEmpty()) { try { Thread.sleep(500); // 歇會繼續 } catch (InterruptedException e) { break; } continue; } String s = values.iterator().next(); if (jedis.zrem(queueKey, s) > 0) { // 搶到了 TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化 this.handleMsg(task.msg); } } } public void handleMsg(T msg) { System.out.println(msg); } public static void main(String[] args) { Jedis jedis = new Jedis(); RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo"); Thread producer = new Thread(() -> { for (int i = 0; i < 10; i++) { queue.delay("codehole" + i); } }); Thread consumer = new Thread(() -> queue.loop()); producer.start(); consumer.start(); try { producer.join(); Thread.sleep(6000); consumer.interrupt(); consumer.join(); } catch (InterruptedException e) { } } }

3.進一步優化

上面的算法中同一個任務可能會被多個進程取到之後再使用zrem進行爭搶,那 些沒搶到的進程都是白取了一次任務,這是浪費。可以考慮使用lua scripting來 優化一下這個邏輯,將zrangebyscore和zrem一同挪到服務器端進行原子化操 作,這樣多個進程之間爭搶任務時就不會出現這種浪費了

Redis(二)延遲隊列