Spring Scheduled + Redis 實現分散式定時器(一)
阿新 • • 發佈:2019-01-07
1、需要了解的技術點:
1.1、Redis的命令:SETNX,EXPIRE;
1.2、Spring的Scheduled定時器註解,觸發器,任務,排程器;
1.3、Spring的applicationContext上下文物件,自定義註解,java反射機制;
2、思路: 2.1、建立一個自定義註解,引數:cron(時間格式); 2.2、建立一個@Component元件,用來實現自定義註解的功能, 2.2.1、實現ApplicationContextAware介面,用來獲取spring的ApplicationContext上下文物件; 2.2.2、實現BeanPostProcessor介面,用來獲取自定義註解所對應的方法; 2.2.3、實現SchedulingConfigurer 介面,用來建立定時器任務;
2.2.4、建立一個實現Runabel介面的類,用來反射自定義註解所對應的方法和搶佔redis的鎖;
2.2.5、建立一個實現Trigger介面的觸發器物件,用來獲取下一次執行任務的時間,以便給redis設定鎖的生存時間;
2.3、程式執行流程:
2.3.1、給需要加定時器的方法加上自定義註解
2.3.2、程式啟動,獲取spring上下文物件;
2.3.3、掃描自定義註解所對應的方法; 2.3.4、根據每個自定義註解的資訊建立對應觸發器和任務; 2.3.5、排程器觸發任務時,先去搶佔鎖,再根據情況判斷本例項是否要執行任務; 3、程式碼分解: 3.1、建立自定義註解;
程式碼分解完畢,以上為所有程式碼!!! 下一篇將引入任務持久化,內部心跳檢測定時器任務,以及動態管理定時器的功能;
2、思路: 2.1、建立一個自定義註解,引數:cron(時間格式); 2.2、建立一個@Component元件,用來實現自定義註解的功能, 2.2.1、實現ApplicationContextAware介面,用來獲取spring的ApplicationContext上下文物件; 2.2.2、實現BeanPostProcessor介面,用來獲取自定義註解所對應的方法; 2.2.3、實現SchedulingConfigurer
2.3.3、掃描自定義註解所對應的方法; 2.3.4、根據每個自定義註解的資訊建立對應觸發器和任務; 2.3.5、排程器觸發任務時,先去搶佔鎖,再根據情況判斷本例項是否要執行任務; 3、程式碼分解: 3.1、建立自定義註解;
3.2、建立KyTask類,用來記錄自定義註解的資訊和註解對應方法的資訊;@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface KyScheduled { /** * A cron-like expression, extending the usual UN*X definition to include * triggers on the second as well as minute, hour, day of month, month * and day of week. e.g. {@code "0 * * * * MON-FRI"} means once per minute on * weekdays (at the top of the minute - the 0th second). * @return an expression that can be parsed to a cron schedule */ String cron() default ""; }
3.3、實現ApplicationContextAware介面,獲取spring上下文物件; 原因:如果單純使用java的反射機制,當定時器任務使用@Autowired註解時,會獲取不到bean例項,所以要實現ApplicationContextAware介面;public class KyScheduledExecution { public class KyTask { private KyScheduled kyScheduled; private Method kyMethod; public KyScheduled getKyScheduled() { return kyScheduled; } public void setKyScheduled(KyScheduled kyScheduled) { this.kyScheduled = kyScheduled; } public Method getKyMethod() { return kyMethod; } public void setKyMethod(Method kyMethod) { this.kyMethod = kyMethod; } } }
@Component
public class KyScheduledExecution implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
this.applicationContext = context;
}
private Object getBean(Class classname) {
try {
return this.applicationContext.getBean(classname);
} catch (Exception e) {
log.error(e);
return "";
}
}
}
3.4、實現BeanPostProcessor介面,獲取自定義註解資訊;
@Component
public class KyScheduledExecution implements BeanPostProcessor {
private Log log = LogFactory.getLog(getClass());
//記錄任務集合
private List<KyTask> kyTaskList = new ArrayList<>();
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
/**
* 獲取所有自定義註解,並記錄註解和方法的資訊
* @param bean bean
* @param beanName beanName
* @return Object
* @throws BeansException BeansException
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
if (methods != null) {
for (Method method : methods) {
KyScheduled annotation = AnnotationUtils.findAnnotation(method, KyScheduled.class);
if (annotation != null && !"".equals(annotation.cron())) {
KyTask at = new KyTask();
at.setKyScheduled(annotation);
at.setKyMethod(method);
kyTaskList.add(at);
}
}
}
return bean;
}
}
3.5、在KyScheduledExecution在類中引入redis客戶端,並實現獲取redis鎖的方法,
需要在3.3的setApplicationContext方法中執行createRedisClient();public class KyScheduledExecution{
private Log log = LogFactory.getLog(getClass());
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.port}")
private int redisPort;
private Jedis jedis;
//記錄任務集合
private List<KyTask> kyTaskList = new ArrayList<>();
private ApplicationContext applicationContext;
/**
* 建立redis客戶端
*/
private void createRedisClient() {
if (jedis == null) {
jedis = new Jedis(redisHost, redisPort);
}
}
/**
* 獲取分散式鎖
*
* @param lockName 鎖名稱
* @param second 加鎖時間(秒)
* @return 如果獲取到鎖,則返回lockId值,否則為null
*/
private String setnxLock(String lockName, int second) {
synchronized (this) {
//生成隨機的Value值
String lockId = UUID.randomUUID().toString();
//搶佔鎖
Long lock = this.jedis.setnx(lockName, lockId);
if (lock == 1) {
//拿到Lock,設定超時時間
this.jedis.expire(lockName, second - 1);
return lockId;
}
}
return null;
}
}
3.5、建立自定義的觸發器物件,實現Trigger介面nextExecutionTime方法;public class KyTrigger implements Trigger, Serializable {
private String cron;
private boolean syncLock;
public KyTrigger(KyScheduled kyScheduled){
if(kyScheduled.cron() != null && !"".equals(kyScheduled.cron())) {
this.cron = kyScheduled.cron();
}
this.syncLock = kyScheduled.synclock();
}
public boolean getSyncLock(){
return this.syncLock;
}
public String getCron() {
return cron;
}
public void setCron(String cron) {
if(cron != null && !"".equals(cron)) {
this.cron = cron;
}
}
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
CronTrigger cronTrigger = new CronTrigger(this.cron);
return cronTrigger.nextExecutionTime(triggerContext);
}
}
3.6、在KyScheduledExecution類中建立一個實現Runnable介面的自定義的JOB內部類,需要接收Method和自定義的trigger物件;public class KyScheduledExecution{
private Log log = LogFactory.getLog(getClass());
/**
* 任務物件
*/
public class Job implements Runnable {
private Method method;
private String lockName;
private Object invokeMethod;
private Trigger trigger;
public String getLockName() {
return lockName;
}
Job(Method m, Trigger t) {
this.trigger = t;
this.invokeMethod = getBean(m.getDeclaringClass());//獲取bean例項
this.lockName = m.getDeclaringClass().getName() + "." + m.getName();//構造LockName
this.method = m;
}
@Override
public void run() {
//獲取下次執行時間(秒)
long nextTime = (this.trigger.nextExecutionTime(new SimpleTriggerContext()).getTime() - new Date().getTime()) / 1000;
//搶佔分散式鎖
String result = setnxLock(this.lockName, (int) nextTime);
if (result != null && !"".equals(result)) {
try {
//執行自定義註解的方法
this.method.invoke(this.invokeMethod);
} catch (IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
log.error(e);
}
}
}
}
}
3.7、在KyScheduledExecution類中實現SchedulingConfigurer介面;
@Component
public class KyScheduledExecution implements SchedulingConfigurer{
private Log log = LogFactory.getLog(getClass());
/**
* 配置定時器
*
* @param taskRegistrar ScheduledTaskRegistrar
*/
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
if (taskRegistrar != null) {
for (KyTask kt : kyTaskList) {
Method method = kt.getKyMethod();
//建立觸發器
KyTrigger trigger = new KyTrigger(kt.getKyScheduled());
//建立任務
Job job = new Job(method, trigger);
//將任務加入排程器中
taskRegistrar.addTriggerTask(job, trigger);
}
}
}
}
程式碼分解完畢,以上為所有程式碼!!! 下一篇將引入任務持久化,內部心跳檢測定時器任務,以及動態管理定時器的功能;