Lilishop 技術棧
官方公眾號 & 開源不易,如有幫助請點Star
介紹
Lilishop 是一款Java開發,基於SpringBoot研發的B2B2C多使用者商城,前端使用 Vue、uniapp開發 系統全端全部程式碼開源
本系統用於教大家如何運用系統中的每一個細節,如:支付、第三方登入、日誌收集、分散式事務、秒殺場景等各個場景學習方案
git地址 https://gitee.com/beijing_hongye_huicheng/lilishop-spring-learning
本文學習 分散式延時任務
延時任務介紹
即指定一個時間,執行提前約定好的任務,例如:定時取消訂單,定時上下架商品,定時開啟活動等。
延時任務與定時任務的區別
延時任務適用於個性化的業務場景,比如某訂單自動取消,某活動自動開啟,某商品自動上下架子。還有一個就是較為精確的,需要實時的事情。
而定時任務適用於全平臺的業務,比如計算商品評分統一結算,分銷中的可提現金額批量結算,平臺統計/店鋪統計資料生成等。總的來說就是定時掃描,每天,每小時,每分鐘,每個月,不管怎麼樣都要執行。比如定時上下架,用定時任務也可以,但是要實現精確的任務排程,建立一個每秒任務,是不太理智的。
兩個場景需要互補,具體應用什麼場景,可以再自己斟酌斟酌。
思路介紹
- 專案啟動時啟用一個執行緒,執行緒用於間隔一定時間去查詢redis的待執行任務。其任務id為物件json格式化之後的字串,值為要執行的時間。
- 查詢到執行的任務時,將其從redis的資訊中進行刪除。(刪除成功才執行延時任務,否則不執行,這樣可以避免分散式系統延時任務多次執行。)
- 刪除redis中的記錄之後,啟用子執行緒執行任務。將執行id,也就是json的字串翻轉回要執行的任務資訊,這樣可以得到用什麼執行器去執行任務,引數有哪些。
- 執行延時任務
實際使用
實際場景中,還會設計延時任務修改,刪除等,這些場景建議在執行任務建立時,redis標記要執行的任務,如果刪除或者修改任務時,修改redis中的標識即可,當然也可以在業務邏輯中做補充的條件判定,都可以。
另外具體執行任務建議使用mq去實現,相當於在執行任務時,執行緒只是釋出一個mq,交給消費者去消費具體的事情。
程式碼中的程序掃描5秒,也就代表一個延時任務最多延遲5秒去執行,實戰場景中可以調整至1秒,或者更低,但是不太建議。另外redis的效能槓槓的,不用太擔心redis的連線數導致效能問題。
使用步驟
啟用redis,可以本地啟動,也可以用ELK中docker-compose啟動。
啟動springboot應用。
請求springboot 應用 http://127.0.0.1:8080
檢視控制檯輸出內容
2021-06-09 12:41:33.168 INFO 40730 --- [nio-8888-exec-1] l.t.p.d.AbstractDelayQueueMachineFactory : 增加延時任務, 快取key test_delay, 等待時間 10
2021-06-09 12:41:33.168 INFO 40730 --- [nio-8888-exec-1] c.l.t.p.i.impl.RedisTimerTrigger : 定時執行在【2021-06-09 12:41:43】,消費【test params】
2021-06-09 12:41:44.399 INFO 40730 --- [ Thread-5] l.t.p.d.AbstractDelayQueueMachineFactory : 延時任務開始執行任務:[{"score":1.623213703E9,"value":"{"triggerTime":1623213703,"triggerExecutor":"testTimeTriggerExecutor","param":"test params"}"}]
2021-06-09 12:41:44.403 INFO 40730 --- [pool-2-thread-2] c.l.t.p.i.e.TestTimeTriggerExecutor : 執行器具執行任務test params
關鍵類介紹
快取操作類 用於延時任務的核型邏輯,間隔查詢需要執行的延時任務,考的就是redis的Sorted Set屬性來試下排序,執行任務。
/**
* 向Zset裡新增成員
*
* @param key key值
* @param score 分數,通常用於排序
* @param value 值
* @return 增加狀態
*/
@Override
public boolean zAdd(String key, long score, String value) {
Boolean result = redisTemplate.opsForZSet().add(key, value, score);
return result;
}
/**
* 獲取 某key 下 某一分值區間的佇列
*
* @param key 快取key
* @param from 開始時間
* @param to 結束時間
* @return 資料
*/
@Override
public Set<ZSetOperations.TypedTuple<Object>> zRangeByScore(String key, int from, long to) {
Set<ZSetOperations.TypedTuple<Object>> set = redisTemplate.opsForZSet().rangeByScoreWithScores(key, from, to);
return set;
}
/**
* 移除 Zset佇列值
*
* @param key key值
* @param value 刪除的集合
* @return 刪除數量
*/
@Override
public Long zRemove(String key, String... value) {
return redisTemplate.opsForZSet().remove(key, value);
}
延時佇列 抽象類,具體延時佇列需繼承
package cn.lili.trigger.plugin.delay;
import cn.hutool.json.JSONUtil;
import cn.lili.trigger.plugin.cache.Cache;
import cn.lili.trigger.plugin.util.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.DefaultTypedTuple;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.util.Calendar;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* 延時佇列工廠
*
* @author paulG
* @since 2020/11/7
**/
@Slf4j
public abstract class AbstractDelayQueueMachineFactory {
@Autowired
private Cache cache;
/**
* 插入任務id
*
* @param jobId 任務id(佇列內唯一)
* @param time 延時時間(單位 :秒)
* @return 是否插入成功
*/
public boolean addJob(String jobId, Integer time) {
Calendar instance = Calendar.getInstance();
instance.add(Calendar.SECOND, time);
long delaySeconds = instance.getTimeInMillis() / 1000;
boolean result = cache.zAdd(setDelayQueueName(), delaySeconds, jobId);
log.info("增加延時任務, 快取key {}, 等待時間 {}", setDelayQueueName(), time);
return result;
}
/**
* 延時佇列機器開始運作
*/
private void startDelayQueueMachine() {
log.info("延時佇列機器{}開始運作", setDelayQueueName());
// 監聽redis佇列
while (true) {
try {
// 獲取當前時間的時間戳
long now = System.currentTimeMillis() / 1000;
// 獲取當前時間前的任務列表
Set<DefaultTypedTuple> tuples = cache.zRangeByScore(setDelayQueueName(), 0, now);
// 如果任務不為空
if (!CollectionUtils.isEmpty(tuples)) {
log.info("延時任務開始執行任務:{}", JSONUtil.toJsonStr(tuples));
for (DefaultTypedTuple tuple : tuples) {
String jobId = (String) tuple.getValue();
// 移除快取,如果移除成功則表示當前執行緒處理了延時任務,則執行延時任務
Long num = cache.zRemove(setDelayQueueName(), jobId);
// 如果移除成功, 則執行
if (num > 0) {
ThreadPoolUtil.execute(() -> invoke(jobId));
}
}
}
} catch (Exception e) {
log.error("處理延時任務發生異常,異常原因為{}", e.getMessage(), e);
} finally {
// 間隔5秒鐘搞一次
try {
TimeUnit.SECONDS.sleep(5L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 最終執行的任務方法
*
* @param jobId 任務id
*/
public abstract void invoke(String jobId);
/**
* 要實現延時佇列的名字
*/
public abstract String setDelayQueueName();
@PostConstruct
public void init() {
new Thread(this::startDelayQueueMachine).start();
}
}
延時佇列示例實現
package cn.lili.trigger.plugin.delay;
import cn.hutool.json.JSONUtil;
import cn.lili.trigger.plugin.interfaces.TimeTrigger;
import cn.lili.trigger.plugin.interfaces.TimeTriggerExecutor;
import cn.lili.trigger.plugin.model.TimeTriggerMsg;
import cn.lili.trigger.plugin.util.SpringContextUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 測試延時佇列
*
* @author paulG
* @version v4.1
* @date 2020/11/17 7:19 下午
* @description
* @since 1
*/
@Component
public class TestDelayQueue extends AbstractDelayQueueMachineFactory {
@Autowired
private TimeTrigger timeTrigger;
@Override
public void invoke(String jobId) {
TimeTriggerMsg timeTriggerMsg = JSONUtil.toBean(jobId, TimeTriggerMsg.class);
TimeTriggerExecutor executor = (TimeTriggerExecutor) SpringContextUtil.getBean(timeTriggerMsg.getTriggerExecutor());
executor.execute(timeTriggerMsg.getParam());
}
@Override
public String setDelayQueueName() {
return "test_delay";
}
}
延時任務介面
package cn.lili.trigger.plugin.interfaces;
import cn.lili.trigger.plugin.model.TimeTriggerMsg;
/**
* 延時執行介面
*
* @author Chopper
*/
public interface TimeTrigger {
/**
* 新增延時任務
*
* @param timeTriggerMsg 延時任務資訊
*/
void add(TimeTriggerMsg timeTriggerMsg);
}
Redis延時任務實現類
package cn.lili.trigger.plugin.interfaces.impl;
import cn.hutool.json.JSONUtil;
import cn.lili.trigger.plugin.delay.TestDelayQueue;
import cn.lili.trigger.plugin.interfaces.TimeTrigger;
import cn.lili.trigger.plugin.model.TimeTriggerMsg;
import cn.lili.trigger.plugin.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* redis 延時任務
*
* @author Chopper
* @version v1.0
* 2021-06-09 11:00
*/
@Component
@Slf4j
public class RedisTimerTrigger implements TimeTrigger {
@Autowired
private TestDelayQueue testDelayQueue;
@Override
public void add(TimeTriggerMsg timeTriggerMsg) {
//計算延遲時間 執行時間-當前時間
Integer delaySecond = Math.toIntExact(timeTriggerMsg.getTriggerTime() - DateUtil.getDateline());
//設定延時任務
if (Boolean.TRUE.equals(testDelayQueue.addJob(JSONUtil.toJsonStr(timeTriggerMsg), delaySecond))) {
log.info("定時執行在【" + DateUtil.toString(timeTriggerMsg.getTriggerTime(), "yyyy-MM-dd HH:mm:ss") + "】,消費【" + timeTriggerMsg.getParam().toString() + "】");
} else {
log.error("延時任務新增失敗:{}", timeTriggerMsg);
}
}
}
延時任務執行器介面
package cn.lili.trigger.plugin.interfaces;
/**
* 延時任務執行器介面
*
* @author Chopper
*/
public interface TimeTriggerExecutor {
/**
* 執行任務
*
* @param object 任務引數
*/
void execute(Object object);
}
延時任務實現
package cn.lili.trigger.plugin.interfaces.execute;
import cn.lili.trigger.plugin.interfaces.TimeTriggerExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* test執行器
*
* @author Chopper
* @version v1.0
* 2021-06-09 10:49
*/
@Component
@Slf4j
public class TestTimeTriggerExecutor implements TimeTriggerExecutor {
@Override
public void execute(Object object) {
log.info("執行器具執行任務{}", object);
}
}
延時任務消5息模型
package cn.lili.trigger.plugin.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 延時任務訊息
*
* @author Chopper
* @version v1.0
* @since 2019-02-12 下午5:46
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TimeTriggerMsg implements Serializable {
private static final long serialVersionUID = 8897917127201859535L;
/**
* 執行器 執行時間
*/
private Long triggerTime;
/**
* 執行器beanId
*/
private String triggerExecutor;
/**
* 執行器引數
*/
private Object param;
}
控制器
package cn.lili.trigger.controller;
import cn.lili.trigger.plugin.interfaces.TimeTrigger;
import cn.lili.trigger.plugin.model.TimeTriggerMsg;
import cn.lili.trigger.plugin.util.DateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
private TimeTrigger timeTrigger;
@GetMapping
public void test(Integer seconds) {
Long executeTime = DateUtil.getDateline() + 5;
if (seconds != null) {
executeTime = DateUtil.getDateline() + seconds;
}
TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(executeTime, "testTimeTriggerExecutor", "test params");
timeTrigger.add(timeTriggerMsg);
}
}