這篇文章的誕生要感謝一位讀者,是他讓這篇優秀的文章有了和大家見面的機會,重點是優秀文章,哈哈。

事情的經過是這樣的...

不用謝我,送人玫瑰,手有餘香。相信接下來的內容一定不會讓你失望,因為它將是目前市面上最好的關於“延遲任務”的文章,這也一直是我寫作追求的目標,讓我的每一篇文章都比市面上的好那麼一點點。

好了,話不多說,直接進入今天的主題,本文的主要內容如下圖所示:

什麼是延遲任務?

顧明思議,我們把需要延遲執行的任務叫做延遲任務。

延遲任務的使用場景有以下這些:

  1. 紅包 24 小時未被查收,需要延遲執退還業務;
  2. 每個月賬單日,需要給使用者傳送當月的對賬單;
  3. 訂單下單之後 30 分鐘後,使用者如果沒有付錢,系統需要自動取消訂單。

等事件都需要使用延遲任務。

延遲任務實現思路分析

延遲任務實現的關鍵是在某個時間節點執行某個任務。基於這個資訊我們可以想到實現延遲任務的手段有以下兩個:

  1. 自己手寫一個“死迴圈”一直判斷當前時間節點有沒有要執行的任務;
  2. 藉助 JDK 或者第三方提供的工具類來實現延遲任務。

而通過 JDK 實現延遲任務我們能想到的關鍵詞是:DelayQueue、ScheduledExecutorService,而第三方提供的延遲任務執行方法就有很多了,例如:Redis、Netty、MQ 等手段。

延遲任務實現

下面我們將結合程式碼來講解每種延遲任務的具體實現。

1.無限迴圈實現延遲任務

此方式我們需要開啟一個無限迴圈一直掃描任務,然後使用一個 Map 集合用來儲存任務和延遲執行的時間,實現程式碼如下:

import java.time.Instant;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/**
 * 延遲任務執行方法彙總
 */
public class DelayTaskExample {
    // 存放定時任務
    private static Map<String, Long> _TaskMap = new HashMap<>();

    public static void main(String[] args) {
        System.out.println("程式啟動時間:" + LocalDateTime.now());
        // 新增定時任務
        _TaskMap.put("task-1", Instant.now().plusSeconds(3).toEpochMilli()); // 延遲 3s

        // 呼叫無限迴圈實現延遲任務
        loopTask();
    }

    /**
     * 無限迴圈實現延遲任務
     */
    public static void loopTask() {
        Long itemLong = 0L;
        while (true) {
            Iterator it = _TaskMap.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                itemLong = (Long) entry.getValue();
                // 有任務需要執行
                if (Instant.now().toEpochMilli() >= itemLong) {
                    // 延遲任務,業務邏輯執行
                    System.out.println("執行任務:" + entry.getKey() +
                            " ,執行時間:" + LocalDateTime.now());
                    // 刪除任務
                    _TaskMap.remove(entry.getKey());
                }
            }
        }
    }
}

以上程式執行的結果為:

程式啟動時間:2020-04-12T18:51:28.188

執行任務:task-1 ,執行時間:2020-04-12T18:51:31.189

可以看出任務延遲了 3s 鍾執行了,符合我們的預期。

2.Java API 實現延遲任務

Java API 提供了兩種實現延遲任務的方法:DelayQueue 和 ScheduledExecutorService。

① ScheduledExecutorService 實現延遲任務

我們可以使用 ScheduledExecutorService 來以固定的頻率一直執行任務,實現程式碼如下:

public class DelayTaskExample {
    public static void main(String[] args) {
        System.out.println("程式啟動時間:" + LocalDateTime.now());
        scheduledExecutorServiceTask();
    }

    /**
     * ScheduledExecutorService 實現固定頻率一直迴圈執行任務
     */
    public static void scheduledExecutorServiceTask() {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        executor.scheduleWithFixedDelay(
                new Runnable() {
                    @Override
                    public void run() {
                        // 執行任務的業務程式碼
                        System.out.println("執行任務" +
                                " ,執行時間:" + LocalDateTime.now());
                    }
                },
                2, // 初次執行間隔
                2, // 2s 執行一次
                TimeUnit.SECONDS);
    }
}

以上程式執行的結果為:

程式啟動時間:2020-04-12T21:28:10.416

執行任務 ,執行時間:2020-04-12T21:28:12.421

執行任務 ,執行時間:2020-04-12T21:28:14.422

......

可以看出使用 ScheduledExecutorService#scheduleWithFixedDelay(...) 方法之後,會以某個頻率一直迴圈執行延遲任務。

② DelayQueue 實現延遲任務

DelayQueue 是一個支援延時獲取元素的無界阻塞佇列,佇列中的元素必須實現 Delayed 介面,並重寫 getDelay(TimeUnit) 和 compareTo(Delayed) 方法,DelayQueue 實現延遲佇列的完整程式碼如下:

public class DelayTest {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue delayQueue = new DelayQueue();
        // 新增延遲任務
        delayQueue.put(new DelayElement(1000));
        delayQueue.put(new DelayElement(3000));
        delayQueue.put(new DelayElement(5000));
        System.out.println("開始時間:" +  DateFormat.getDateTimeInstance().format(new Date()));
        while (!delayQueue.isEmpty()){
            // 執行延遲任務
            System.out.println(delayQueue.take());
        }
        System.out.println("結束時間:" +  DateFormat.getDateTimeInstance().format(new Date()));
    }

    static class DelayElement implements Delayed {
        // 延遲截止時間(單面:毫秒)
        long delayTime = System.currentTimeMillis();
        public DelayElement(long delayTime) {
            this.delayTime = (this.delayTime + delayTime);
        }
        @Override
        // 獲取剩餘時間
        public long getDelay(TimeUnit unit) {
            return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        @Override
        // 佇列裡元素的排序依據
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
                return -1;
            } else {
                return 0;
            }
        }
        @Override
        public String toString() {
            return DateFormat.getDateTimeInstance().format(new Date(delayTime));
        }
    }
}

以上程式執行的結果為:

開始時間:2020-4-12 20:40:38

2020-4-12 20:40:39

2020-4-12 20:40:41

2020-4-12 20:40:43

結束時間:2020-4-12 20:40:43

3.Redis 實現延遲任務

使用 Redis 實現延遲任務的方法大體可分為兩類:通過 zset 資料判斷的方式,和通過鍵空間通知的方式。

① 通過資料判斷的方式

我們藉助 zset 資料型別,把延遲任務儲存在此資料集合中,然後在開啟一個無線迴圈查詢當前時間的所有任務進行消費,實現程式碼如下(需要藉助 Jedis 框架):

import redis.clients.jedis.Jedis;
import utils.JedisUtils;
import java.time.Instant;
import java.util.Set;

public class DelayQueueExample {
    // zset key
    private static final String _KEY = "myDelayQueue";
    
    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = JedisUtils.getJedis();
        // 延遲 30s 執行(30s 後的時間)
        long delayTime = Instant.now().plusSeconds(30).getEpochSecond();
        jedis.zadd(_KEY, delayTime, "order_1");
        // 繼續新增測試資料
        jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");
        jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");
        jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");
        jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");
        // 開啟延遲佇列
        doDelayQueue(jedis);
    }

    /**
     * 延遲佇列消費
     * @param jedis Redis 客戶端
     */
    public static void doDelayQueue(Jedis jedis) throws InterruptedException {
        while (true) {
            // 當前時間
            Instant nowInstant = Instant.now();
            long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // 上一秒時間
            long nowSecond = nowInstant.getEpochSecond();
            // 查詢當前時間的所有任務
            Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);
            for (String item : data) {
                // 消費任務
                System.out.println("消費:" + item);
            }
            // 刪除已經執行的任務
            jedis.zremrangeByScore(_KEY, lastSecond, nowSecond);
            Thread.sleep(1000); // 每秒輪詢一次
        }
    }
}

② 通過鍵空間通知

預設情況下 Redis 伺服器端是不開啟鍵空間通知的,需要我們通過 config set notify-keyspace-events Ex 的命令手動開啟,開啟鍵空間通知後,我們就可以拿到每個鍵值過期的事件,我們利用這個機制實現了給每個人開啟一個定時任務的功能,實現程式碼如下:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import utils.JedisUtils;

public class TaskExample {
    public static final String _TOPIC = "[email protected]__:expired"; // 訂閱頻道名稱
    public static void main(String[] args) {
        Jedis jedis = JedisUtils.getJedis();
        // 執行定時任務
        doTask(jedis);
    }

    /**
     * 訂閱過期訊息,執行定時任務
     * @param jedis Redis 客戶端
     */
    public static void doTask(Jedis jedis) {
        // 訂閱過期訊息
        jedis.psubscribe(new JedisPubSub() {
            @Override
            public void onPMessage(String pattern, String channel, String message) {
                // 接收到訊息,執行定時任務
                System.out.println("收到訊息:" + message);
            }
        }, _TOPIC);
    }
}

4.Netty 實現延遲任務

Netty 是由 JBOSS 提供的一個 Java 開源框架,它是一個基於 NIO 的客戶、伺服器端的程式設計框架,使用 Netty 可以確保你快速和簡單的開發出一個網路應用,例如實現了某種協議的客戶、服務端應用。Netty 相當於簡化和流線化了網路應用的程式設計開發過程,例如:基於 TCP 和 UDP 的 socket 服務開發。

可以使用 Netty 提供的工具類 HashedWheelTimer 來實現延遲任務,實現程式碼如下。

首先在專案中新增 Netty 引用,配置如下:

<!-- https://mvnrepository.com/artifact/io.netty/netty-common -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-common</artifactId>
    <version>4.1.48.Final</version>
</dependency>

Netty 實現的完整程式碼如下:

public class DelayTaskExample {
    public static void main(String[] args) {
        System.out.println("程式啟動時間:" + LocalDateTime.now());
        NettyTask();
    }

    /**
     * 基於 Netty 的延遲任務
     */
    private static void NettyTask() {
        // 建立延遲任務例項
        HashedWheelTimer timer = new HashedWheelTimer(3, // 時間間隔
                TimeUnit.SECONDS,
                100); // 時間輪中的槽數
        // 建立一個任務
        TimerTask task = new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                System.out.println("執行任務" +
                        " ,執行時間:" + LocalDateTime.now());
            }
        };
        // 將任務新增到延遲佇列中
        timer.newTimeout(task, 0, TimeUnit.SECONDS);

    }
}

以上程式執行的結果為:

程式啟動時間:2020-04-13T10:16:23.033

執行任務 ,執行時間:2020-04-13T10:16:26.118

HashedWheelTimer 是使用定時輪實現的,定時輪其實就是一種環型的資料結構,可以把它想象成一個時鐘,分成了許多格子,每個格子代表一定的時間,在這個格子上用一個連結串列來儲存要執行的超時任務,同時有一個指標一格一格的走,走到那個格子時就執行格子對應的延遲任務,如下圖所示:

(圖片來源於網路)

以上的圖片可以理解為,時間輪大小為 8,某個時間轉一格(例如 1s),每格指向一個連結串列,儲存著待執行的任務。

5.MQ 實現延遲任務

如果專門開啟一個 MQ 中介軟體來執行延遲任務,就有點殺雞用宰牛刀般的奢侈了,不過已經有了 MQ 環境的話,用它來實現延遲任務的話,還是可取的。

幾乎所有的 MQ 中介軟體都可以實現延遲任務,在這裡更準確的叫法應該叫延佇列。本文就使用 RabbitMQ 為例,來看它是如何實現延遲任務的。

RabbitMQ 實現延遲佇列的方式有兩種:

  • 通過訊息過期後進入死信交換器,再由交換器轉發到延遲消費佇列,實現延遲功能;
  • 使用 rabbitmq-delayed-message-exchange 外掛實現延遲功能。

注意: 延遲外掛 rabbitmq-delayed-message-exchange 是在 RabbitMQ 3.5.7 及以上的版本才支援的,依賴 Erlang/OPT 18.0 及以上執行環境。

由於使用死信交換器比較麻煩,所以推薦使用第二種實現方式 rabbitmq-delayed-message-exchange 外掛的方式實現延遲佇列的功能。

首先,我們需要下載並安裝 rabbitmq-delayed-message-exchange 外掛,下載地址:http://www.rabbitmq.com/community-plugins.html

選擇相應的對應的版本進行下載,然後拷貝到 RabbitMQ 伺服器目錄,使用命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 開啟外掛,在使用命令 rabbitmq-plugins list 查詢安裝的所有外掛,安裝成功如下圖所示:

最後重啟 RabbitMQ 服務,使外掛生效。

首先,我們先要配置訊息佇列,實現程式碼如下:

import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedConfig {
    final static String QUEUE_NAME = "delayed.goods.order";
    final static String EXCHANGE_NAME = "delayedec";
    @Bean
    public Queue queue() {
        return new Queue(DelayedConfig.QUEUE_NAME);
    }

    // 配置預設的交換機
    @Bean
    CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        //引數二為型別:必須是x-delayed-message
        return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }
    // 繫結佇列到交換器
    @Bean
    Binding binding(Queue queue, CustomExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
    }
}

然後新增增加訊息的程式碼,具體實現如下:

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;

@Component
public class DelayedSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String msg) {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("傳送時間:" + sf.format(new Date()));

        rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setHeader("x-delay", 3000);
                return message;
            }
        });
    }
}

再新增消費訊息的程式碼:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;

@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {
    @RabbitHandler
    public void process(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("接收時間:" + sdf.format(new Date()));
        System.out.println("訊息內容:" + msg);
    }
}

最後,我們使用程式碼測試一下:

import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.text.SimpleDateFormat;
import java.util.Date;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {

    @Autowired
    private DelayedSender sender;

    @Test
    public void Test() throws InterruptedException {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
        sender.send("Hi Admin.");
        Thread.sleep(5 * 1000); //等待接收程式執行之後,再退出測試
    }
}

以上程式的執行結果如下:

傳送時間:2020-04-13 20:47:51

接收時間:2020-04-13 20:47:54

訊息內容:Hi Admin.

從結果可以看出,以上程式執行符合延遲任務的實現預期。

6.使用 Spring 定時任務

如果你使用的是 Spring 或 SpringBoot 的專案的話,可以使用藉助 Scheduled 來實現,本文將使用 SpringBoot 專案來演示 Scheduled 的實現,實現我們需要宣告開啟 Scheduled,實現程式碼如下:

@SpringBootApplication
@EnableScheduling
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

然後新增延遲任務,實現程式碼如下:

@Component
public class ScheduleJobs {
    @Scheduled(fixedDelay = 2 * 1000)
    public void fixedDelayJob() throws InterruptedException {
        System.out.println("任務執行,時間:" + LocalDateTime.now());
    }
}

此時當我們啟動專案之後就可以看到任務以延遲了 2s 的形式一直迴圈執行,結果如下:

任務執行,時間:2020-04-13T14:07:53.349

任務執行,時間:2020-04-13T14:07:55.350

任務執行,時間:2020-04-13T14:07:57.351

...

我們也可以使用 Corn 表示式來定義任務執行的頻率,例如使用 @Scheduled(cron = "0/4 * * * * ?") 。

7.Quartz 實現延遲任務

Quartz 是一款功能強大的任務排程器,可以實現較為複雜的排程功能,它還支援分散式的任務排程。

我們使用 Quartz 來實現一個延遲任務,首先定義一個執行任務程式碼如下:

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;

import java.time.LocalDateTime;

public class SampleJob extends QuartzJobBean {
    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext)
            throws JobExecutionException {
        System.out.println("任務執行,時間:" + LocalDateTime.now());
    }
}

在定義一個 JobDetail 和 Trigger 實現程式碼如下:

import org.quartz.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SampleScheduler {
    @Bean
    public JobDetail sampleJobDetail() {
        return JobBuilder.newJob(SampleJob.class).withIdentity("sampleJob")
                .storeDurably().build();
    }

    @Bean
    public Trigger sampleJobTrigger() {
        // 3s 後執行
        SimpleScheduleBuilder scheduleBuilder =
                SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).withRepeatCount(1);
        return TriggerBuilder.newTrigger().forJob(sampleJobDetail()).withIdentity("sampleTrigger")
                .withSchedule(scheduleBuilder).build();
    }
}

最後在 SpringBoot 專案啟動之後開啟延遲任務,實現程式碼如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

/**
 * SpringBoot 專案啟動後執行
 */
public class MyStartupRunner implements CommandLineRunner {

    @Autowired
    private SchedulerFactoryBean schedulerFactoryBean;

    @Autowired
    private SampleScheduler sampleScheduler;

    @Override
    public void run(String... args) throws Exception {
        // 啟動定時任務
        schedulerFactoryBean.getScheduler().scheduleJob(
                sampleScheduler.sampleJobTrigger());
    }
}

以上程式的執行結果如下:

2020-04-13 19:02:12.331  INFO 17768 --- [  restartedMain] com.example.demo.DemoApplication         : Started DemoApplication in 1.815 seconds (JVM running for 3.088)

任務執行,時間:2020-04-13T19:02:15.019

從結果可以看出在專案啟動 3s 之後執行了延遲任務。

總結

本文講了延遲任務的使用場景,以及延遲任務的 10 種實現方式:

  1. 手動無線迴圈;
  2. ScheduledExecutorService;
  3. DelayQueue;
  4. Redis zset 資料判斷的方式;
  5. Redis 鍵空間通知的方式;
  6. Netty 提供的 HashedWheelTimer 工具類;
  7. RabbitMQ 死信佇列;
  8. RabbitMQ 延遲訊息外掛 rabbitmq-delayed-message-exchange;
  9. Spring Scheduled;
  10. Quartz。

最後的話

俗話說:臺上一分鐘,臺下十年功。本文的所有內容皆為作者多年工作積累的結晶,以及熬夜嘔心瀝血的整理,如果覺得本文有幫助到你,請幫我分享出去,讓更多的人看到,謝謝你。

本文由部落格一文多發平臺 OpenWrite 釋出!