1. 程式人生 > >springboot和quartz整合實現動態定時任務(持久化單節點)

springboot和quartz整合實現動態定時任務(持久化單節點)

依賴 1.5 ostc read 自動 1.8 自動註入 etc string

  Quartz是一個完全由java編寫的開源作業調度框架,為在Java應用程序中進行作業調度提供了簡單卻強大的機制,它支持定時任務持久化到數據庫,從而避免了重啟服務器時任務丟失,支持分布式多節點,大大的提高了單節點定時任務的容錯性。springboot在2.0版本以前沒有對quartz做自動配置,因此需要我們自己去手動配置,網上找了許多資料,但是大都不能完全滿足自己的需要,因此自己整合了一下方便以後參考(copy),整合代碼基於springboot1.5.9和quartz2.3.0,過程如下:

  1、pom.xml

<?xml version="1.0" encoding="UTF-8"
?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>powerx.io</groupId> <artifactId
>springboot-quartz</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>springboot-quartz</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</
groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <druid.version>1.1.5</druid.version> <quartz.version>2.3.0</quartz.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>${druid.version}</version> </dependency> <!--quartz相關依賴--> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>${quartz.version}</version> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz-jobs</artifactId> <version>${quartz.version}</version> </dependency> <!--定時任務需要依賴context模塊--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

  2、整合配置類

  采用jobDetail使用Spring Ioc托管方式來完成整合,我們可以在定時任務實例中使用Spring註入註解完成業務邏輯處理,代碼如下

package com.example.demo.config;

import org.quartz.spi.JobFactory;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;

import javax.sql.DataSource;

@Configuration
@EnableScheduling
public class QuartzConfiguration {
    /**
     * 繼承org.springframework.scheduling.quartz.SpringBeanJobFactory
     * 實現任務實例化方式
     */
    public static class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements
            ApplicationContextAware {

        private transient AutowireCapableBeanFactory beanFactory;

        @Override
        public void setApplicationContext(final ApplicationContext context) {
            beanFactory = context.getAutowireCapableBeanFactory();
        }

        /**
         * 將job實例交給spring ioc托管
         * 我們在job實例實現類內可以直接使用spring註入的調用被spring ioc管理的實例
         *
         * @param bundle
         * @return
         * @throws Exception
         */
        @Override
        protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
            final Object job = super.createJobInstance(bundle);
            /**
             * 將job實例交付給spring ioc
             */
            beanFactory.autowireBean(job);
            return job;
        }
    }

    /**
     * 配置任務工廠實例
     *
     * @return
     */
    @Bean
    public JobFactory jobFactory() {
        /**
         * 采用自定義任務工廠 整合spring實例來完成構建任務*/
        AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
        return jobFactory;
    }

    /**
     * 配置任務調度器
     * 使用項目數據源作為quartz數據源
     *
     * @param jobFactory 自定義配置任務工廠
     * @param dataSource 數據源實例
     * @return
     * @throws Exception
     */
    @Bean(destroyMethod = "destroy", autowire = Autowire.NO)
    public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory, DataSource dataSource) throws Exception {
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        //將spring管理job自定義工廠交由調度器維護
        schedulerFactoryBean.setJobFactory(jobFactory);
        //設置覆蓋已存在的任務
        schedulerFactoryBean.setOverwriteExistingJobs(true);
        //項目啟動完成後,等待2秒後開始執行調度器初始化
        schedulerFactoryBean.setStartupDelay(2);
        //設置調度器自動運行
        schedulerFactoryBean.setAutoStartup(true);
        //設置數據源,使用與項目統一數據源
        schedulerFactoryBean.setDataSource(dataSource);
        //設置上下文spring bean name
        schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext");
        //設置配置文件位置
        schedulerFactoryBean.setConfigLocation(new ClassPathResource("/quartz.properties"));
        return schedulerFactoryBean;
    }
}

  AutowiringSpringBeanJobFactory:可以看到上面配置類中,AutowiringSpringBeanJobFactory我們繼承了SpringBeanJobFactory類,並且通過實現ApplicationContextAware接口獲取ApplicationContext設置方法,通過外部實例化時設置ApplicationContext實例對象,在createJobInstance方法內,我們采用AutowireCapableBeanFactory來托管SpringBeanJobFactory類中createJobInstance方法返回的定時任務實例,這樣我們就可以在定時任務類內使用Spring Ioc相關的註解進行註入業務邏輯實例了。

  JobFactory:自定義任務工廠

  SchedulerFactoryBean:使用項目內部數據源的方式來設置調度器的jobSotre,官方quartz有兩種持久化的配置方案。

  第一種:采用quartz.properties配置文件配置獨立的定時任務數據源,可以與使用項目的數據庫完全獨立。
  第二種:采用與創建項目統一個數據源,定時任務持久化相關的表與業務邏輯在同一個數據庫內。

  可以根據實際的項目需求采取不同的方案,我采用了第二種方案,在上面配置類中可以看到方法schedulerFactoryBean內自動註入了JobFactory實例,也就是我們自定義的AutowiringSpringBeanJobFactory任務工廠實例,另外一個參數就是DataSource,在我們引入spring-boot-starter-jdbc依賴後會根據application.yml文件內的數據源相關配置自動實例化DataSource實例,這裏直接註入是沒有問題的。我們通過調用SchedulerFactoryBean對象的setConfigLocation方法來設置quartz定時任務框架的基本配置,配置文件所在位置:resources/quartz.properties => classpath:/quartz.properties下。

  quartz.properties內容如下:

org.quartz.scheduler.instanceName = schedulerFactoryBean

org.quartz.scheduler.instanceId = AUTO

org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX

org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate

org.quartz.jobStore.tablePrefix = QRTZ_

org.quartz.jobStore.useProperties = false

org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool

org.quartz.threadPool.threadCount = 10

org.quartz.threadPool.threadPriority = 5

  在上面配置中org.quartz.jobStore.classorg.quartz.jobStore.driverDelegateClass是定時任務持久化的關鍵配置,配置了數據庫持久化定時任務以及采用MySQL數據庫進行連接,當然你也可以連接別的數據庫org.quartz.jobStore.tablePrefix屬性配置了定時任務數據表的前綴,在quartz官方提供的創建表SQL腳本默認就是qrtz_,我們需要解壓quartz2.3.0的jar,在quartz-2.2.3/docs/dbTables下找到tables_mysql_innodb.sql,然後在mysql客戶端執行來創建相應的表。

  3、動態定時任務demo

  QuartzService.java

package com.example.demo.service;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.annotation.PostConstruct;

import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.DateBuilder;
import org.quartz.DateBuilder.IntervalUnit;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Service;

@Service
public class QuartzService {

    @Autowired
    private Scheduler scheduler;

    @PostConstruct
    public void startScheduler() {
        try {
            scheduler.start();
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

    /**
     * 增加一個job
     * 
     * @param jobClass
     *            任務實現類
     * @param jobName
     *            任務名稱
     * @param jobGroupName
     *            任務組名
     * @param jobTime
     *            時間表達式 (這是每隔多少秒為一次任務)
     * @param jobTimes
     *            運行的次數 (<0:表示不限次數)
     */
    public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime,
            int jobTimes) {
        try {
            JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)// 任務名稱和組構成任務key
                    .build();
            // 使用simpleTrigger規則
            Trigger trigger = null;
            if (jobTimes < 0) {
                trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
                        .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime))
                        .startNow().build();
            } else {
                trigger = TriggerBuilder
                        .newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder
                                .repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes))
                        .startNow().build();
            }
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

    /**
     * 增加一個job
     * 
     * @param jobClass
     *            任務實現類
     * @param jobName
     *            任務名稱
     * @param jobGroupName
     *            任務組名
     * @param jobTime
     *            時間表達式 (如:0/5 * * * * ? )
     */
    public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, String jobTime) {
        try {
            // 創建jobDetail實例,綁定Job實現類
            // 指明job的名稱,所在組的名稱,以及綁定job類
            JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)// 任務名稱和組構成任務key
                    .build();
            // 定義調度觸發規則
            // 使用cornTrigger規則
            Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)// 觸發器key
                    .startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND))
                    .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).startNow().build();
            // 把作業和觸發器註冊到任務調度中
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 修改 一個job的 時間表達式
     * 
     * @param jobName
     * @param jobGroupName
     * @param jobTime
     */
    public void updateJob(String jobName, String jobGroupName, String jobTime) {
        try {
            TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
                    .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build();
            // 重啟觸發器
            scheduler.rescheduleJob(triggerKey, trigger);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

    /**
     * 刪除任務一個job
     * 
     * @param jobName
     *            任務名稱
     * @param jobGroupName
     *            任務組名
     */
    public void deleteJob(String jobName, String jobGroupName) {
        try {
            scheduler.deleteJob(new JobKey(jobName, jobGroupName));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 暫停一個job
     * 
     * @param jobName
     * @param jobGroupName
     */
    public void pauseJob(String jobName, String jobGroupName) {
        try {
            JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
            scheduler.pauseJob(jobKey);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

    /**
     * 恢復一個job
     * 
     * @param jobName
     * @param jobGroupName
     */
    public void resumeJob(String jobName, String jobGroupName) {
        try {
            JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
            scheduler.resumeJob(jobKey);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

    /**
     * 立即執行一個job
     * 
     * @param jobName
     * @param jobGroupName
     */
    public void runAJobNow(String jobName, String jobGroupName) {
        try {
            JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
            scheduler.triggerJob(jobKey);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

    /**
     * 獲取所有計劃中的任務列表
     * 
     * @return
     */
    public List<Map<String, Object>> queryAllJob() {
        List<Map<String, Object>> jobList = null;
        try {
            GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
            Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
            jobList = new ArrayList<Map<String, Object>>();
            for (JobKey jobKey : jobKeys) {
                List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
                for (Trigger trigger : triggers) {
                    Map<String, Object> map = new HashMap<>();
                    map.put("jobName", jobKey.getName());
                    map.put("jobGroupName", jobKey.getGroup());
                    map.put("description", "觸發器:" + trigger.getKey());
                    Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
                    map.put("jobStatus", triggerState.name());
                    if (trigger instanceof CronTrigger) {
                        CronTrigger cronTrigger = (CronTrigger) trigger;
                        String cronExpression = cronTrigger.getCronExpression();
                        map.put("jobTime", cronExpression);
                    }
                    jobList.add(map);
                }
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
        return jobList;
    }

    /**
     * 獲取所有正在運行的job
     * 
     * @return
     */
    public List<Map<String, Object>> queryRunJob() {
        List<Map<String, Object>> jobList = null;
        try {
            List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
            jobList = new ArrayList<Map<String, Object>>(executingJobs.size());
            for (JobExecutionContext executingJob : executingJobs) {
                Map<String, Object> map = new HashMap<String, Object>();
                JobDetail jobDetail = executingJob.getJobDetail();
                JobKey jobKey = jobDetail.getKey();
                Trigger trigger = executingJob.getTrigger();
                map.put("jobName", jobKey.getName());
                map.put("jobGroupName", jobKey.getGroup());
                map.put("description", "觸發器:" + trigger.getKey());
                Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
                map.put("jobStatus", triggerState.name());
                if (trigger instanceof CronTrigger) {
                    CronTrigger cronTrigger = (CronTrigger) trigger;
                    String cronExpression = cronTrigger.getCronExpression();
                    map.put("jobTime", cronExpression);
                }
                jobList.add(map);
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
        return jobList;
    }

}

  UserService.java

package com.example.demo.service;

import org.springframework.stereotype.Service;

@Service
public class UserService {

    public void play() {
        System.out.println("user id play");
    }
    
    public void study() {
        System.out.println("user id study");
    }
}

  TestJob1.java

package com.example.demo.job;

import java.util.Date;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;

public class TestJob1 extends QuartzJobBean {

    @Override
    protected void executeInternal(JobExecutionContext arg0) throws JobExecutionException {

        System.out.println(new Date() + "    job執行");
    }

}

  TestJob2.java

package com.example.demo.job;

import java.util.Date;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;

import com.example.demo.service.UserService;
@Component
public class TestJob2 extends QuartzJobBean {

    //註入業務service,完成定時任務邏輯
    @Autowired
    private UserService service;
    @Override
    protected void executeInternal(JobExecutionContext arg0) throws JobExecutionException {
        System.out.println(new Date() + "    job2執行");
        service.play();
    }

}

  TestController.java

package com.example.demo.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.example.demo.job.TestJob1;
import com.example.demo.job.TestJob2;
import com.example.demo.service.QuartzService;

@RestController
public class TestController {

    @Autowired
    private QuartzService quartzService;
    @RequestMapping("/addjob")
    public void startJob(String type) {
        if("TestJob1".equals(type)) {
            quartzService.addJob(TestJob1.class, "job1", "test", "0/5 * * * * ?");
            
        }else {
            quartzService.addJob(TestJob2.class, "job2", "test", "0/5 * * * * ?");
        }
    }
    
    @RequestMapping("/updatejob")
    public void updatejob() {
            quartzService.updateJob("job1", "test", "0/10 * * * * ?");
    }
    
    @RequestMapping("/deletejob")
    public void deletejob() {
            quartzService.deleteJob("job1", "test");
    }
    
    @RequestMapping("/pauseJob")
    public void pauseJob() {
            quartzService.pauseJob("job1", "test");
    }
    
    @RequestMapping("/resumeJob")
    public void resumeJob() {
            quartzService.resumeJob("job1", "test");
    }
    
    @RequestMapping("/queryAllJob")
    public Object queryAllJob() {
            return quartzService.queryAllJob();
    }
    

    @RequestMapping("/queryRunJob")
    public Object queryRunJob() {
            return quartzService.queryRunJob();
    }
}

  4、application.yml

spring:
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/quartz?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true
    username: root
    password: root
  jpa:
    hibernate:
      ddl-auto: update #ddl-auto:設為update表示每次都不會重新建表
    show-sql: true
  application:
    name: quartz-cluster-node-first
server:
  port: 8081
# 打印日誌
logging:
  level:
    root: INFO
    org.hibernate: INFO
    org.hibernate.type.descriptor.sql.BasicBinder: TRACE
    org.hibernate.type.descriptor.sql.BasicExtractor: TRACE
    com.springms: DEBUG

  至此,springboot整合quartz實現動態定時任務和任務持久化完畢,各功能我都測試過,符合預期,具體結果不再貼出,小夥伴們在使用時要註意springboot和quartz的版本問題,很多時候並不是代碼有問題,而是jar不匹配。springboot2.0以後,直接有了spring-boot-starter-quartz包,我們只需要把原來的jar替換掉,無需任何配置就完成了整合。

springboot和quartz整合實現動態定時任務(持久化單節點)