1. 程式人生 > >幾種實現延時任務的方式(二)

幾種實現延時任務的方式(二)

在上一節中,我們講了三種方式來實現延時任務,其實,將三種方式結合起來用,對於一些中小型公司已經足夠了,但是在中大型網際網路公司還是遠遠不夠的。

想必大家對Redis起碼有一個初步的概念:基於記憶體的非關係型資料庫。在平時的業務開發中,Redis經常會被用做快取,來提高網站的效能,減少資料庫的訪問,所以一想到Redis,腦海中第一個浮現出來的就是快取。

沒辦法,對於搬磚業務開發,所使用到的Redis基本也僅僅侷限於快取/代替Session了。但是Redis的應用場景很多很多。下面我就用Redis來實現延時任務。

我在這裡假設大家已經對Redis有了一個基本的瞭解,並且使用過Redis,所以我不再過多的講述Redis的基本知識了。

開門見山,我們會使用到Redis的ZSet資料結構。

ZSet可以理解為Set的升級版本,Set是無序的,ZSet是有序的,而且會實時排序,所以ZSet也被稱為 Sort Set,我是比較傾向於後面的稱呼的,因為從字面上的意思就可以知道這個Set擁有一個特點:排序。

ZSet有兩個元素:member ,score,ZSet會根據score進行排序,member就是成員的意思。

要想完成這個需求,我們第一個想到的應該就是怎麼把資料推送給Redis,第二個想到的是怎麼把Redis資料給讀取出來。

我所用的是Jedis

通過IDEA的自動提示功能,我們很容易找到zadd這方法,看起來推送資料給Redis應該是這個方法:
image.png


讓我們試一下吧。

    private static JedisPoolConfig jedisPoolConfig = null;

    private static JedisPool jedisPool;

    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");

    static {
        jedisPoolConfig = new JedisPoolConfig();
        jedisPool = new JedisPool(jedisPoolConfig, "", 0, 1500, "", 0);
    }

    private static void zadd(String key, String member, double score) {
        Jedis jedis = jedisPool.getResource();
        jedis.zadd(key, score, member);
        jedis.close();
    }

    private static Set<Tuple> zrangeWithScores(String key, int start, int end) {
        Jedis jedis = jedisPool.getResource();
        Set<Tuple> set = jedis.zrangeWithScores(key, start, end);
        jedis.close();
        return set;
    }

    private static void zrem(String key, String member) {
        Jedis jedis = jedisPool.getResource();
        jedis.zrem(key, member);
        jedis.close();
    }

在這裡我把ip和埠號,密碼,DBNum都給隱藏了 ,大家應該可以看懂。

我並沒有用Spring或者是Spring Boot來管理Redis,因為如果加上這些東西,還需要有Spring或者Spring Boot的知識,而且這裡我僅僅是想演示下用Redis實現延遲任務的一個思路而已,基於此原因,大家也別糾結 沒有分不同的類,沒有異常處理 等等。

我們在main方法呼叫下zadd方法:

  zadd("haha","order1",2018005130);
  zadd("haha","order2",2017005130);
  zadd("haha","order3",2016005130);
  zadd("haha","order4",2021115130);

執行完成後,開啟Redis管理工具,找到這個key,看一下結果:
image.png
score小的排前面,這也符合orderTime小的,越先過期。但是 score引數型別是double,所以我們不能把Date型的orderTime直接放進去了,但是我們可以把Date經過轉換,再放進去。

資料已經放進去了,怎麼拿出來呢?這個方法比較難找,只能藉助於搜尋引擎了,最後確定了方法:
image.png
不管是引數,還是返回型別,都有點奇怪,難怪找不到。
我們呼叫下這個方法試一下。 根據引數名稱,猜測start是開始,end是結束,我們要讀取第一個資料,應該是傳0,1,或者是1,2。但是 其實應該是傳0,0。。。真讓人沮喪。。返回的資料也很奇怪,看不懂啊:
image.png

這個先放一下,我們發現 這個資料雖然已經被讀取出來了,但是Redis並麼有把這個資料刪掉。

讓我們想想,這個ZSet並不像DelayQueue一樣,這個沒有延遲的功能。我們把資料推到Redis,下一秒就可以讀出來。所以我們需要先把資料讀取出來,然後判斷這個訂單有沒有超時,如果沒有超時的話,Sleep一段時間,再次判斷是否超時。超時了,則修改資料庫狀態,如果還是沒超時,繼續Sleep,再判斷。所以上面讀取方法沒有刪除資料,是符合我們要求的。

我們需要找到刪除的方法,最後確定了方法:
image.png
這個方法比較簡單,就是一個key,一個可變的member。
我們呼叫下這個方法:

 zrem("haha", "order3");

再看下Redis的管理工具:
image.png
order3 被刪除了。

接下來的問題就是讀取資料方法那個奇怪的返回值怎麼使用了,也沒有什麼好的辦法,就是在執行的時候,ALT+F8 調出視窗,各種嘗試唄。

下面,直接把最終程式碼貼出來吧:

   private static final int expireTime = 15000;
   //其他略,上面有
    public static void main(String[] args) {
        Thread productThread = new Thread(() -> {
            for (int i = 0; i < 15; i++) {
                try {
                    Thread.sleep(1200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                produce(i);
            }
        });
        productThread.start();

        Thread consumThread = new Thread(() -> {
            consum();
        });
        consumThread.start();
    }

    private static void produce(int orderId) {
        Date date = new Date();
        String dateStr = simpleDateFormat.format(date);
        System.out.printf("現在時間是%s,訂單%s加入佇列%n", dateStr, orderId);
        zadd("order", String.valueOf(orderId), date.getTime());
    }

    private static void consum() {
        while (true) {
            Set<Tuple> set = zrangeWithScores("order", 0, 0);
            for (Tuple item : set) {
                Date date = new Date();
                if (date.getTime() - item.getScore() > expireTime) {
                    String dateStr = simpleDateFormat.format(date);
                    System.out.printf("現在時間是%s,訂單%s已過期%n", dateStr, item.getElement());
                    zrem("order", item.getElement());
                } else {
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

最後讓我們執行一下:
image.png
沒有問題。

在這裡有兩個細節需要說明下:

  • 如果取出來的訂單沒有超時,會睡300毫秒,然後繼續判斷。這個數值直接影響著訂單過期的延遲情況,如果把數值設定的很大,比如20秒,顯而易見,訂單真正過期的時間遠遠不止15秒了。如果把資料設定的很小,那麼伺服器壓力也會增大。

  • 取出資料——》刪除資料 會有一定的時間差,如果開幾個執行緒同時消費,或者部署幾個應用同時消費,會有重複執行的情況,如果僅僅是修改下訂單的狀態,沒什麼問題,因為這是一個冪等性操作,不管執行幾次,都是同樣的結果,但是如何還有其他的非冪等性操作,比如 使用者信譽-10,就要用其他的手段來避免 重複執行了,這裡就不展開了。

這種實現方式其實也比較簡單,因為大家都或多或少的使用過Redis,只要知道這幾個方法,很容易就可以實現。
但是相比上一節的三個方法來說,這個方法就高階很多了,支援多執行緒消費,支援多應用(部署)消費,應用伺服器宕機也沒事。

好了,到這裡,延時任務的第四種方式——使用Redis 就講完了。