Redis延時訊息佇列、非同步訊息佇列的實現
阿新 • • 發佈:2018-12-30
package list; import java.lang.reflect.Type; import java.util.Set; import java.util.UUID; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import redis.clients.jedis.Jedis; /** * 延時非同步訊息佇列的實現 */ public class RedisDelayingQueue<T> { static class TaskItem<T> { public String id; public T msg; } // fastjson 序列化物件中存在 generic 型別時,需要使用 TypeReference private Type TaskType = new TypeReference<TaskItem<T>>() { }.getType(); private Jedis jedis; private String queueKey; public RedisDelayingQueue(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } public void delay(T msg) { TaskItem<T> task = new TaskItem<T>(); task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid task.msg = msg; String s = JSON.toJSONString(task); // fastjson 序列化 jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延時佇列 ,5s 後再試 } public void loop() { while (!Thread.interrupted()) { // 只取一條 Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); if (values.isEmpty()) { try { Thread.sleep(500); // 歇會繼續 } catch (InterruptedException e) { break; } continue; } String s = values.iterator().next(); if (jedis.zrem(queueKey, s) > 0) { // 搶到了 TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化 this.handleMsg(task.msg); } } } public void handleMsg(T msg) { System.out.println(msg); } public static void main(String[] args) { Jedis jedis = new Jedis(); final RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo"); Thread producer = new Thread() { public void run() { for (int i = 0; i < 10; i++) { queue.delay("codehole" + i); } } }; Thread consumer = new Thread() { public void run() { queue.loop(); } }; producer.start(); consumer.start(); try { producer.join(); Thread.sleep(6000); consumer.interrupt(); consumer.join(); } catch (InterruptedException e) { } } }
有幾點要提醒一下
1.為什麼使用靜態內部類呢?我這裡定義成兩個靜態變數不行嗎?
(1)內部靜態類不需要有指向外部類的引用。但非靜態內部類需要持有對外部類的引用。
(2)非靜態內部類能夠訪問外部類的靜態和非靜態成員。靜態類不能訪問外部類的非靜態成員。他只能訪問外部類的靜態成員。
(3)一個非靜態內部類不能脫離外部類實體被建立,一個非靜態內部類可以訪問外部類的資料和方法,因為他就在外部類裡面。
主要是由於在傳入引數不確定是String還是int等其他型別,所以這裡呼叫定義的內部類,並且內部類的msg為泛型,可擴充套件性好。
非同步訊息佇列參考如下: