幾種實現延時任務的方式(二)
在上一節中,我們講了三種方式來實現延時任務,其實,將三種方式結合起來用,對於一些中小型公司已經足夠了,但是在中大型網際網路公司還是遠遠不夠的。
想必大家對Redis起碼有一個初步的概念:基於記憶體的非關係型資料庫。在平時的業務開發中,Redis經常會被用做快取,來提高網站的效能,減少資料庫的訪問,所以一想到Redis,腦海中第一個浮現出來的就是快取。
沒辦法,對於搬磚業務開發,所使用到的Redis基本也僅僅侷限於快取/代替Session了。但是Redis的應用場景很多很多。下面我就用Redis來實現延時任務。
我在這裡假設大家已經對Redis有了一個基本的瞭解,並且使用過Redis,所以我不再過多的講述Redis的基本知識了。
開門見山,我們會使用到Redis的ZSet資料結構。
ZSet可以理解為Set的升級版本,Set是無序的,ZSet是有序的,而且會實時排序,所以ZSet也被稱為 Sort Set,我是比較傾向於後面的稱呼的,因為從字面上的意思就可以知道這個Set擁有一個特點:排序。
ZSet有兩個元素:member ,score,ZSet會根據score進行排序,member就是成員的意思。
要想完成這個需求,我們第一個想到的應該就是怎麼把資料推送給Redis,第二個想到的是怎麼把Redis資料給讀取出來。
我所用的是Jedis
通過IDEA的自動提示功能,我們很容易找到zadd這方法,看起來推送資料給Redis應該是這個方法:

讓我們試一下吧。
private static JedisPoolConfig jedisPoolConfig = null; private static JedisPool jedisPool; private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); static { jedisPoolConfig = new JedisPoolConfig(); jedisPool = new JedisPool(jedisPoolConfig, "", 0, 1500, "", 0); } private static void zadd(String key, String member, double score) { Jedis jedis = jedisPool.getResource(); jedis.zadd(key, score, member); jedis.close(); } private static Set<Tuple> zrangeWithScores(String key, int start, int end) { Jedis jedis = jedisPool.getResource(); Set<Tuple> set = jedis.zrangeWithScores(key, start, end); jedis.close(); return set; } private static void zrem(String key, String member) { Jedis jedis = jedisPool.getResource(); jedis.zrem(key, member); jedis.close(); }
在這裡我把ip和埠號,密碼,DBNum都給隱藏了 ,大家應該可以看懂。
我並沒有用Spring或者是Spring Boot來管理Redis,因為如果加上這些東西,還需要有Spring或者Spring Boot的知識,而且這裡我僅僅是想演示下用Redis實現延遲任務的一個思路而已,基於此原因,大家也別糾結 沒有分不同的類,沒有異常處理 等等。
我們在main方法呼叫下zadd方法:
zadd("haha","order1",2018005130); zadd("haha","order2",2017005130); zadd("haha","order3",2016005130); zadd("haha","order4",2021115130);
執行完成後,開啟Redis管理工具,找到這個key,看一下結果:

score小的排前面,這也符合orderTime小的,越先過期。但是 score引數型別是double,所以我們不能把Date型的orderTime直接放進去了,但是我們可以把Date經過轉換,再放進去。
資料已經放進去了,怎麼拿出來呢?這個方法比較難找,只能藉助於搜尋引擎了,最後確定了方法:

不管是引數,還是返回型別,都有點奇怪,難怪找不到。
我們呼叫下這個方法試一下。 根據引數名稱,猜測start是開始,end是結束,我們要讀取第一個資料,應該是傳0,1,或者是1,2。但是 其實應該是傳0,0。。。真讓人沮喪。。返回的資料也很奇怪,看不懂啊:

這個先放一下,我們發現 這個資料雖然已經被讀取出來了,但是Redis並麼有把這個資料刪掉。
讓我們想想,這個ZSet並不像DelayQueue一樣,這個沒有延遲的功能。我們把資料推到Redis,下一秒就可以讀出來。所以我們需要先把資料讀取出來,然後判斷這個訂單有沒有超時,如果沒有超時的話,Sleep一段時間,再次判斷是否超時。超時了,則修改資料庫狀態,如果還是沒超時,繼續Sleep,再判斷。所以上面讀取方法沒有刪除資料,是符合我們要求的。
我們需要找到刪除的方法,最後確定了方法:

這個方法比較簡單,就是一個key,一個可變的member。
我們呼叫下這個方法:
zrem("haha", "order3");
再看下Redis的管理工具:

order3 被刪除了。
接下來的問題就是讀取資料方法那個奇怪的返回值怎麼使用了,也沒有什麼好的辦法,就是在執行的時候,ALT+F8 調出視窗,各種嘗試唄。
下面,直接把最終程式碼貼出來吧:
private static final int expireTime = 15000; //其他略,上面有 public static void main(String[] args) { Thread productThread = new Thread(() -> { for (int i = 0; i < 15; i++) { try { Thread.sleep(1200); } catch (InterruptedException e) { e.printStackTrace(); } produce(i); } }); productThread.start(); Thread consumThread = new Thread(() -> { consum(); }); consumThread.start(); } private static void produce(int orderId) { Date date = new Date(); String dateStr = simpleDateFormat.format(date); System.out.printf("現在時間是%s,訂單%s加入佇列%n", dateStr, orderId); zadd("order", String.valueOf(orderId), date.getTime()); } private static void consum() { while (true) { Set<Tuple> set = zrangeWithScores("order", 0, 0); for (Tuple item : set) { Date date = new Date(); if (date.getTime() - item.getScore() > expireTime) { String dateStr = simpleDateFormat.format(date); System.out.printf("現在時間是%s,訂單%s已過期%n", dateStr, item.getElement()); zrem("order", item.getElement()); } else { try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
最後讓我們執行一下:

沒有問題。
在這裡有兩個細節需要說明下:
-
如果取出來的訂單沒有超時,會睡300毫秒,然後繼續判斷。這個數值直接影響著訂單過期的延遲情況,如果把數值設定的很大,比如20秒,顯而易見,訂單真正過期的時間遠遠不止15秒了。如果把資料設定的很小,那麼伺服器壓力也會增大。
-
取出資料——》刪除資料 會有一定的時間差,如果開幾個執行緒同時消費,或者部署幾個應用同時消費,會有重複執行的情況,如果僅僅是修改下訂單的狀態,沒什麼問題,因為這是一個冪等性操作,不管執行幾次,都是同樣的結果,但是如何還有其他的非冪等性操作,比如 使用者信譽-10,就要用其他的手段來避免 重複執行了,這裡就不展開了。
這種實現方式其實也比較簡單,因為大家都或多或少的使用過Redis,只要知道這幾個方法,很容易就可以實現。
但是相比上一節的三個方法來說,這個方法就高階很多了,支援多執行緒消費,支援多應用(部署)消費,應用伺服器宕機也沒事。
好了,到這裡,延時任務的第四種方式——使用Redis 就講完了。