1. 程式人生 > >SpringBoot基礎教程3-1-3 Quartz定時任務單點持久化

SpringBoot基礎教程3-1-3 Quartz定時任務單點持久化

1 概述

實際專案中,複雜的定時任務都會結合持久化,動態改變定時任務狀態,本文將介紹基於Quartz的定時任務單點持久化方式,通過RESTful風格,演示定時任務的CRUD,最後使用Swagger測試。

2 資料庫表說明

//Quartz表
qrtz_calendars:以 Blob 型別儲存 Quartz 的 Calendar 資訊
qrtz_cron_triggers:儲存 Cron Trigger,包括 Cron 表示式和時區資訊
qrtz_fired_triggers:儲存與已觸發的 Trigger 相關的狀態資訊,以及相聯 Job 的執行資訊
qrtz_paused_trigger_grps:儲存已暫停的 Trigger 組的資訊
qrtz_scheduler_state:儲存少量的有關排程器 (Scheduler) 的狀態,和別的 排程器 (Scheduler)例項(假如是用於一個叢集中)
qrtz_locks:儲程式的非觀鎖的資訊(假如使用了悲觀鎖)
qrtz_job_details:儲存每一個已配置的 Job 的詳細資訊(jobDetail)
qrtz_job_listeners:儲存有關已配置的 Job 監聽器 的資訊
qrtz_simple_triggers:儲存簡單的 Trigger,包括重複次數,間隔,以及已觸的次數
qrtz_blog_triggers:以 Blob 型別儲存的Trigger(用於 Quartz 使用者用 JDBC 建立他們自己定製的 Trigger 型別,JobStore 並不知道如何儲存例項的時候)
qrtz_trigger_listeners:儲存已配置的觸發器監聽器 ( Trigger Listener ) 的資訊
qrtz_triggers:儲存已配置的 觸發器 (Trigger) 的資訊

//新建表
ScheduleJob:自定義定時任務詳細狀態表,方便管理定時任務
  • 建表指令碼:/resourecs/quartz.sql

3 新增依賴

<!--quartz相關依賴-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

<!--資料庫相關依賴-->
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<scope>runtime</scope>
</dependency>

<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>druid-spring-boot-starter</artifactId>
	<version>1.1.9</version>
</dependency>

<dependency>
	<groupId>org.mybatis.spring.boot</groupId>
	<artifactId>mybatis-spring-boot-starter</artifactId>
	<version>1.3.2</version>
</dependency>

4 新增配置

#資料庫連線池配置
spring:
  datasource:
    name: mysql_test
    type: com.alibaba.druid.pool.DruidDataSource
    #druid相關配置
    druid:
      #監控統計攔截的filters
      filters: stat
      driver-class-name: com.mysql.jdbc.Driver
      #基本屬性
      url: jdbc:mysql://127.0.0.1:3306/springboot?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true
      username: root
      password: 123456
      #配置初始化大小/最小/最大
      initial-size: 1
      min-idle: 1
      max-active: 20
      #獲取連線等待超時時間
      max-wait: 60000
      #間隔多久進行一次檢測,檢測需要關閉的空閒連線
      time-between-eviction-runs-millis: 60000
      #一個連線在池中最小生存的時間
      min-evictable-idle-time-millis: 300000
      validation-query: SELECT 'x'
      test-while-idle: true
      test-on-borrow: false
      test-on-return: false
      #開啟PSCache,並指定每個連線上PSCache的大小。oracle設為true,mysql設為false。分庫分表較多推薦設定為false
      pool-prepared-statements: false
      max-pool-prepared-statement-per-connection-size: 20
  #Quartz配置
  quartz:
    jdbc:
      initialize-schema: always
    job-store-type: jdbc

##Mybatis配置
mybatis:
  #Mapper.xml所在的位置
  mapper-locations: classpath:mapping/*.xml
  #entity掃描的包名
  type-aliases-package: com.mkeeper.entity

5 Spring接管Quartz

@Component
public class ScheduleJobFactory extends AdaptableJobFactory {
    // 讓不受spring管理的類具有spring自動注入的特性
    @Autowired
    private AutowireCapableBeanFactory autowireCapableBeanFactory;

    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        Object jobInstance = super.createJobInstance(bundle);
        autowireCapableBeanFactory.autowireBean(jobInstance);
        return jobInstance;
    }
}

6 Quartz配置SchedulerFactoryBean初始化

@Configuration
public class ScheduleConfig {
    @Autowired
    private ScheduleJobFactory scheduleJobFactory;

    @Bean
    @Qualifier("scheduleBean")
    public SchedulerFactoryBean schedulerFactoryBean(@Qualifier("dataSource") DataSource dataSource) {
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        // 名稱
        schedulerFactoryBean.setSchedulerName("TASK_EXECUTOR");
        // 延遲10秒啟動Scheduler
        schedulerFactoryBean.setStartupDelay(10);
        // 通過applicationContextSchedulerContextKey屬性配置spring上下文
        schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContextKey");
        // 設定是否任意一個已定義的Job會覆蓋現有的Job。預設為false,即已定義的Job不會覆蓋現有的Job。
        schedulerFactoryBean.setOverwriteExistingJobs(true);
        // 自動開始
        schedulerFactoryBean.setAutoStartup(true);
        // 資料來源
        schedulerFactoryBean.setDataSource(dataSource);
        // 將JobFactory改為自定義的,否則在 Job 中注入 Bean 會失敗
        schedulerFactoryBean.setJobFactory(scheduleJobFactory);
        return schedulerFactoryBean;
    }
}

7 自定義任務管理

實體

@Data
public class ScheduleJob implements Serializable {

    private static final Long serialVersionUID = 1435515995276255188L;

    private Long id;

    private String className;

    private String cronExpression;

    private String jobName;

    private String jobGroup;

    private String triggerName;

    private String triggerGroup;

    private Boolean pause;

    private Boolean enable;

    private String description;

    private Date createTime;

    private Date lastUpdateTime;

}

為了節約篇幅,mapping,dao省略,請參考原始碼

9 建立Quartz任務排程工具類(重點)

@Slf4j
public class ScheduleUtil {

    /**
     * 獲取 Trigger Key
     *
     * @param scheduleJob
     * @return
     */
    public static TriggerKey getTriggerKey(ScheduleJob scheduleJob) {
        return TriggerKey.triggerKey(scheduleJob.getTriggerName(), scheduleJob.getTriggerGroup());
    }

    /**
     * 獲取 Job Key
     *
     * @param scheduleJob
     * @return
     */
    public static JobKey getJobKey(ScheduleJob scheduleJob) {
        return JobKey.jobKey(scheduleJob.getJobName(), scheduleJob.getJobGroup());
    }

    /**
     * 獲取 Cron Trigger
     *
     * @param scheduler
     * @param scheduleJob
     * @return
     * @throws ServiceException
     */
    public static CronTrigger getCronTrigger(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
        try {
            return (CronTrigger) scheduler.getTrigger(getTriggerKey(scheduleJob));
        } catch (SchedulerException e) {
            throw new ServiceException("Get Cron trigger failed", e);
        }
    }

    /**
     * 建立任務
     *
     * @param scheduler
     * @param scheduleJob
     * @throws ServiceException
     */
    public static void createScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {

        validateCronExpression(scheduleJob);

        try {
            // 要執行的 Job 的類
            Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(scheduleJob.getClassName()).newInstance().getClass();

            JobDetail jobDetail = JobBuilder.newJob(jobClass)
                    .withIdentity(scheduleJob.getJobName(), scheduleJob.getJobGroup())
                    .withDescription(scheduleJob.getDescription())
                    .build();

            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression())
                    .withMisfireHandlingInstructionDoNothing();

            CronTrigger cronTrigger = TriggerBuilder.newTrigger()
                    .withIdentity(scheduleJob.getTriggerName(), scheduleJob.getTriggerGroup())
                    .withDescription(scheduleJob.getDescription())
                    .withSchedule(scheduleBuilder)
                    .startNow()
                    .build();

            scheduler.scheduleJob(jobDetail, cronTrigger);

            log.info("Create schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());

            if (scheduleJob.getPause()) {
                pauseJob(scheduler, scheduleJob);
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("Execute schedule job failed");
            throw new ServiceException("Execute schedule job failed", e);
        }
    }

    /**
     * 更新任務
     *
     * @param scheduler
     * @param scheduleJob
     * @throws ServiceException
     */
    public static void updateScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {

        validateCronExpression(scheduleJob);

        try {

            TriggerKey triggerKey = getTriggerKey(scheduleJob);

            CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression())
                    .withMisfireHandlingInstructionDoNothing();

            CronTrigger cronTrigger = getCronTrigger(scheduler, scheduleJob);

            cronTrigger = cronTrigger.getTriggerBuilder()
                    .withIdentity(triggerKey)
                    .withDescription(scheduleJob.getDescription())
                    .withSchedule(cronScheduleBuilder).build();

            scheduler.rescheduleJob(triggerKey, cronTrigger);

            log.info("Update schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());

            if (scheduleJob.getPause()) {
                pauseJob(scheduler, scheduleJob);
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
            log.error("Update schedule job failed");
            throw new ServiceException("Update schedule job failed", e);
        }
    }

    /**
     * 執行任務
     *
     * @param scheduler
     * @param scheduleJob
     * @throws ServiceException
     */
    public static void run(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
        try {
            scheduler.triggerJob(getJobKey(scheduleJob));
            log.info("Run schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
        } catch (SchedulerException e) {
            e.printStackTrace();
            log.error("Run schedule job failed");
            throw new ServiceException("Run schedule job failed", e);
        }
    }

    /**
     * 暫停任務
     *
     * @param scheduler
     * @param scheduleJob
     */
    public static void pauseJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
        try {
            scheduler.pauseJob(getJobKey(scheduleJob));
            log.info("Pause schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
        } catch (SchedulerException e) {
            e.printStackTrace();
            log.error("Pause schedule job failed");
            throw new ServiceException("Pause job failed", e);
        }
    }

    /**
     * 繼續執行任務
     *
     * @param scheduler
     * @param scheduleJob
     * @throws ServiceException
     */
    public static void resumeJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
        try {
            scheduler.resumeJob(getJobKey(scheduleJob));
            log.info("Resume schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
        } catch (SchedulerException e) {
            e.printStackTrace();
            log.error("Resume schedule job failed");
            throw new ServiceException("Resume job failed", e);
        }
    }

    /**
     * 刪除任務
     *
     * @param scheduler
     * @param scheduleJob
     * @throws ServiceException
     */
    public static void deleteJob(Scheduler scheduler, ScheduleJob scheduleJob) throws ServiceException {
        try {
            scheduler.deleteJob(getJobKey(scheduleJob));
            log.info("Delete schedule job {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
        } catch (SchedulerException e) {
            e.printStackTrace();
            log.error("Delete schedule job failed");
            throw new ServiceException("Delete job failed", e);
        }
    }

    /**
     * 校驗Cron表示式
     */
    public static void validateCronExpression(ScheduleJob scheduleJob) throws ServiceException {
        if (!CronExpression.isValidExpression(scheduleJob.getCronExpression())) {
            throw new ServiceException(String.format("Job %s expression %s is not correct!", scheduleJob.getClassName(), scheduleJob.getCronExpression()));
        }
    }
}

10 建立定時任務服務類

@Service
public class JobService {

    @Resource
    private JobMapper jobMapper;

    @Resource
    private Scheduler scheduler;

    public List<ScheduleJob> getAllEnableJob() {
        return jobMapper.getAllEnableJob();
    }

    public ScheduleJob select(Long jobId) throws ServiceException {
        ScheduleJob scheduleJob = jobMapper.select(jobId);
        if (scheduleJob == null) {
            throw new ServiceException("ScheduleJob:" + jobId + " not found");
        }
        return scheduleJob;
    }

    @Transactional(rollbackFor = DataAccessException.class)
    public ScheduleJob update(Long jobId, ScheduleJob scheduleJob) throws ServiceException {

        if (jobMapper.update(scheduleJob) <= 0) {
            throw new ServiceException("Update product:" + jobId + "failed");
        }

        ScheduleUtil.updateScheduleJob(scheduler, scheduleJob);

        return scheduleJob;
    }

    @Transactional(rollbackFor = DataAccessException.class)
    public boolean add(ScheduleJob scheduleJob) throws ServiceException {
        Integer num = jobMapper.insert(scheduleJob);
        if (num <= 0) {
            throw new ServiceException("Add product failed");
        }

        ScheduleUtil.createScheduleJob(scheduler, scheduleJob);

        return true;
    }

    @Transactional(rollbackFor = DataAccessException.class)
    public boolean delete(Long jobId) throws ServiceException {
        ScheduleJob scheduleJob = select(jobId);

        Integer num = jobMapper.delete(jobId);
        if (num <= 0) {
            throw new ServiceException("Delete product:" + jobId + "failed");
        }

        ScheduleUtil.deleteJob(scheduler, scheduleJob);

        return true;
    }

    public List<ScheduleJob> getAllJob() {
        return jobMapper.getAllJob();
    }

    public boolean resume(Long jobId) throws ServiceException {
        ScheduleJob scheduleJob = updateScheduleJobStatus(jobId, false);
        ScheduleUtil.resumeJob(scheduler, scheduleJob);
        return true;
    }

    public boolean pause(Long jobId) throws ServiceException {
        ScheduleJob scheduleJob = updateScheduleJobStatus(jobId, true);
        ScheduleUtil.pauseJob(scheduler, scheduleJob);
        return true;
    }

    public boolean run(Long jobId) throws ServiceException {
        ScheduleJob scheduleJob = updateScheduleJobStatus(jobId, false);
        ScheduleUtil.run(scheduler, scheduleJob);
        return true;
    }

    private ScheduleJob updateScheduleJobStatus(Long jobId, Boolean isPause) throws ServiceException {
        ScheduleJob scheduleJob = select(jobId);
        scheduleJob.setPause(isPause);
        update(scheduleJob.getId(), scheduleJob);
        return scheduleJob;
    }
}

11 建立應用啟動監聽類

/**
 *     啟動應用時執行定時任務
 *
 * @author mkeeper
 * @create 2018/10/19 10:05
 */
@Slf4j
@Component
public class ApplicationListener implements CommandLineRunner {

    @Resource
    private JobService jobService;

    @Resource
    private Scheduler scheduler;

    @Override
    public void run(String... args) {
        List<ScheduleJob> scheduleJobList = jobService.getAllEnableJob();
        for (ScheduleJob scheduleJob : scheduleJobList) {
            try {
                CronTrigger cronTrigger = ScheduleUtil.getCronTrigger(scheduler, scheduleJob);
                if (cronTrigger == null) {
                    ScheduleUtil.createScheduleJob(scheduler, scheduleJob);
                } else {
                    ScheduleUtil.updateScheduleJob(scheduler, scheduleJob);
                }
                log.info("Startup {}-{} success", scheduleJob.getJobGroup(), scheduleJob.getJobName());
            } catch (ServiceException e) {
                log.error("Job ERROR", e);
            }
        }
    }
}

12 新建任務

@Slf4j
@Component
public class TestJob implements Job {

    @Override
    public void execute(JobExecutionContext jobExecutionContext){
        // Do what you want here
        log.info("Test job is executing at: " + 		  	 System.currentTimeMillis()/1000);
    }
}

13 Controller

@RestController
@RequestMapping("/job")
public class JobController {

    @Autowired
    private JobService jobService;

    @GetMapping
    public R getAllJob() {
        return R.isOk().data(jobService.getAllJob());
    }

    @GetMapping("/{id}")
    public R getJob(@PathVariable("id") Long jobId) throws ServiceException {
        return R.isOk().data(jobService.select(jobId));
    }

    @PutMapping("/update/{id}")
    public R updateJob(@PathVariable("id") Long jobId, @RequestBody ScheduleJob newScheduleJob) throws ServiceException {
        return R.isOk().data(jobService.update(jobId, newScheduleJob));
    }

    @DeleteMapping("/delete/{id}")
    public R deleteJob(@PathVariable("id") Long jobId) throws ServiceException {
        return R.isOk().data(jobService.delete(jobId));
    }

    @PostMapping("/add")
    public R saveJob(@RequestBody ScheduleJob newScheduleJob) throws ServiceException {
        return R.isOk().data(jobService.add(newScheduleJob));
    }


    @GetMapping("/run/{id}")
    public R runJob(@PathVariable("id") Long jobId) throws ServiceException {
        return R.isOk().data(jobService.run(jobId));
    }


    @GetMapping("/pause/{id}")
    public R pauseJob(@PathVariable("id") Long jobId) throws ServiceException {
        return R.isOk().data(jobService.pause(jobId));
    }

    @GetMapping("/resume/{id}")
    public R resumeJob(@PathVariable("id") Long jobId) throws ServiceException {
        return R.isOk().data(jobService.resume(jobId));
    }
}

14 測試結果

考慮到要測試的介面很多,這裡推薦Swagger
Swagger是一個規範和完整的框架,用於生成、描述、呼叫和視覺化RESTful風格的Web服務
新增依賴

<!--swagger2-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.6.1</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.6.1</version>
        </dependency>

新增配置檔案

@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Value("${swagger.enable:false}")
    private boolean enable;

    @Bean
    public Docket createRestApi() {
        return new Docket(DocumentationType.SWAGGER_2)
                .enable(enable)
                .apiInfo(apiInfo())
                .select()
                .apis(RequestHandlerSelectors.basePackage("com.mkeeper.controller"))
                .paths(PathSelectors.any())
                .build();
    }
    private ApiInfo apiInfo() {
        return new ApiInfoBuilder()
                .title("Quartz定時任務單點持久化介面文件")
                .description("Quartz定時任務單點持久化")
                .version("1.0")
                .build();
    }

}

application.yml中開啟Swagger

swagger:
  enable: true

15 工程目錄

16 結束語

說點什麼呢,有任何建議,歡迎留言探討,本文原始碼

歡迎關注博主公眾號,第一時間推送最新文章

歡迎關注博主公眾號