Schedule 排程系統設計(單機版)
鑑於對Spring實現的@Scheduled的排程和SchedulerFactoryBean的研究發現,基於Spring的排程封裝雖滿足了大多需求,但為了簡化使用方式使得Job並不容易得到控制,導致開發對Job的控制和運維成本上升;下面是本人基於Quartz和Spring及Annotation開發的單機版排程配置,滿足單機排程的大部分需求和管理、運維操作並解放對配置檔案的繁瑣操作;
功能點描述
功能點 | Spring @Scheduled | 自定義@SchedulerJob |
---|---|---|
可控制 | 否 | 是 |
可運維 | 否 | 是 |
可頁面化 | 否 | 是 |
可統一跟蹤業務狀態 | 否 | 是 |
可統一跟蹤排程狀態 | 否 | 是 |
支援cron表示式 | 是 | 是 |
支援類似ScheduledExecutorService的定時排程 | 是 | 否 |
程式碼演示
- 基於註解進行作業配置
@Slf4j(topic = "dynamic-datasource") @Component public class DetectJob { /** * 作業配置 value=作業名,group=作業所屬組,init=true為容器建立完畢時立即觸發 */ @SchedulerJob(value = "detectDataSource",cron = "${cron.detect.data.source}",group = "dynamic-datasource", descrption="動態資料來源切換",init = true) public void detectDataSource(){ log.info(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"dynamic-datasource","detectDataSource")); } } 複製程式碼
##cron表示式 cron.detect.data.source=1 * * * * ? 複製程式碼
- 程式碼執行效果

頁面演示
通過頁面可對作業進行統一的監控和管理(觸發、暫停、恢復、動態新增、引數下發)及報警等操作;
簡要列出以下功能點:
- 作業展示
- 作業運維報警
- 作業引數下發
- 作業事件跟蹤

設計思路
- 應當滿足什麼業務場景
- 如何簡化操作、降低開發成本
- 如何對業務、系統功能進行監控、控制、運維
- 如何設計才能便於後期業務和功能的擴充套件
功能設計
- 設計思路
- 如何獲取方法上的註解及配置
- 如何實現通過Quartz定時執行註解方法
- 如何對每個方法上的註解進行統一的資源管理和監控、控制、運維
- 如何對排程進行效能的優化
- 功能點分析
- 基本排程
- 初始化立即排程
- 人工或系統控制排程(任務建立後不執行排程,控制權交給外部)
- 定時執行排程(及按照指定cron配置週期排程)
- 是否可併發執行
- 資源管理
- 統一管理系統內全部的配置資源(作業所屬組、描述、cron表示式、是否開啟報警、是否開啟監控等)
- 排程管理
- 排程狀態管理(系統狀態、業務狀態)
- 排程行為管理
- 作業業務引數下發(彌補業務過失)
- 排程跟蹤、業務跟蹤
- 排程報警、業務報警
- 基本排程
- 基本功能點實現
- 註解配置
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface SchedulerJob { /** * 作業名 * @return */ String value(); /** * 表示式 * @return */ String cron(); /** * 是否初始化時立即執行 * @return */ boolean init() default false; /** * 是否人為控制 * @return */ boolean control() default false; /** * 所屬組 * @return */ String group() default "default"; /** * 作業描述 * @return */ String descrption() default ""; /** * 作業執行器 * @return */ Class jobClass() default SimpleJob.class; } @Target({ElementType.METHOD,ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface SchedulerJobs { /** * 註解集 * @return */ SchedulerJob[] value(); } 複製程式碼
- 排程建立
@Slf4j @Configuration public class SchedulerBean implements InitializingBean, DisposableBean { private Scheduler scheduler; @Value("#{schdulerProperties['quartz.thread.count']}") private String threadCount; @Override public void destroy() throws Exception { scheduler.shutdown(); } @Override public void afterPropertiesSet() throws Exception { createScheduler(); } /** * 建立排程 * @throws SchedulerException */ public void createScheduler() throws SchedulerException { StdSchedulerFactory factory = new StdSchedulerFactory(); factory.initialize(getBaseQuartzProperties()); this.scheduler = factory.getScheduler(); } /** * 作業配置 * @return */ private Properties getBaseQuartzProperties() { Properties result = new Properties(); result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName()); result.put("org.quartz.threadPool.threadCount", threadCount); result.put("org.quartz.scheduler.threadName", "baiyunpeng-scheduler"); result.put("org.quartz.scheduler.instanceName", "baiyunpeng-scheduler"); result.put("org.quartz.jobStore.misfireThreshold", "1"); return result; } /** * 建立作業 * @param jobParam * @throws SchedulerException */ public void createJob(JobParam jobParam) throws SchedulerException { SchedulerJob schedulerJob = jobParam.getSchedulerJob(); JobDetail jobDetail = JobBuilder.newJob(schedulerJob.jobClass()) .withIdentity(jobParam.getJobKey()) .withDescription(jobParam.getJobKey().getName()) .build(); addJobDataMap(jobDetail,jobParam.getTarget(),jobParam.getTargetMethod()); this.scheduler.scheduleJob(jobDetail,createTrigger(jobParam.getJobKey(),jobParam.getCron())); } /** * 建立觸發器 * @param jobKey * @param cron * @return */ private Trigger createTrigger(JobKey jobKey, String cron) { return TriggerBuilder.newTrigger().withIdentity(jobKey.getName(),jobKey.getGroup()) .withSchedule(CronScheduleBuilder.cronSchedule(cron) .withMisfireHandlingInstructionDoNothing()).build(); } /** * 新增作業map * @param jobDetail * @param target * @param targetMethod */ private void addJobDataMap(JobDetail jobDetail, Object target, Method targetMethod) { JobDataMap jobDataMap = jobDetail.getJobDataMap(); jobDataMap.put("executeJob",target); jobDataMap.put("executeMethod",targetMethod); } public Scheduler getScheduler() { return scheduler; } public void start() throws SchedulerException { this.scheduler.start(); } } 複製程式碼
- 簡單的作業執行器建立
/** * 作業抽象類 * @author baiyunpeng */ public abstract class ExecuteJob implements Job { protected Object executeJob; protected Method executeMethod; public void setExecuteJob(Object executeJob) { this.executeJob = executeJob; } public void setExecuteMethod(Method executeMethod) { this.executeMethod = executeMethod; } } /** * 非併發執行 * @author baiyunpeng */ @Slf4j public class SimpleJob extends ExecuteJob { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { try { executeMethod.invoke(executeJob); } catch (IllegalAccessException | InvocationTargetException e) { log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,this.getClass().getName(), ExceptionUtils.getRootCauseMessage(e))); } } } /** * 可併發執行 * @author baiyunpeng */ @Slf4j @PersistJobDataAfterExecution @DisallowConcurrentExecution public class ConcurrentJob extends ExecuteJob{ @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { try { executeMethod.invoke(executeJob); } catch (IllegalAccessException | InvocationTargetException e) { log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,this.getClass().getName(), ExceptionUtils.getRootCauseMessage(e))); } } } 複製程式碼
- 作業建立
/** * 作業配置解析 * @param scheduled * @param method * @param bean */ protected void processScheduled(SchedulerJob scheduled, Method method, Object bean) { Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass()); String cron = scheduled.cron(); if(StringUtils.hasText(cron)){ if(Objects.nonNull(this.embeddedValueResolver)){ cron = this.embeddedValueResolver.resolveStringValue(cron); } jobParams.add(new JobParam(scheduled,bean,invocableMethod,new JobKey(scheduled.value(),scheduled.group()),cron)); } } /** * 作業初始化 */ private void finishRegister() { if(Objects.isNull(this.schedulerBean)){ SchedulerBean schedulerBean = beanFactory.getBean(SCHEDULER_BEAN, SchedulerBean.class); AssertUtil.assertNull(schedulerBean, SystemErrorCode.NS000000,"the scheduler bean init error"); this.schedulerBean = schedulerBean; try { jobParams.parallelStream().forEach(jobParam -> { try { this.schedulerBean.createJob(jobParam); SchedulerJob schedulerJob = jobParam.getSchedulerJob(); if(!schedulerJob.control()){ if (schedulerJob.init()){ this.schedulerBean.getScheduler().triggerJob(jobParam.getJobKey()); } }else { this.schedulerBean.getScheduler().pauseJob(jobParam.getJobKey()); } } catch (SchedulerException e) { log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"the scheduler job init error", ExceptionUtils.getRootCauseMessage(e))); System.exit(1); } }); schedulerBean.start(); }catch (Exception e){ log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"the scheduler job init error", ExceptionUtils.getRootCauseMessage(e))); System.exit(1); } } } 複製程式碼
總結
- 如何非同步執行方法,首先得獲取該方法的例項
- 如何定時執行,首先建立並獲取定時器
- 如何基於Quarzt監控作業執行,需獲Schedule和Jobkey等
後續更新
- 如何統一監控排程狀態和業務狀態
- 如何解決work執行緒池被任務阻塞的問題
- 如何做任務補發(note:除了misfire機制外還有哪些做法)
- 如何基於單機排程實現基本的分散式排程
- 分散式排程需要考慮的點有哪些