1. 程式人生 > >spring boot 整合quartz實現定時任務排程

spring boot 整合quartz實現定時任務排程

1.pom檔案引入

        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.2.1</version>
            <exclusions>
                <exclusion>
                    <artifactId
>
slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency>

2.application.xml配置

#定時任務配置
asyn-tasks:
  triggers:
    - job-name: CacheJob
      cron: 0 0/1 * * * ?
      enabled: true

3.配置檔案context-epasquartzmanage.properties

org.quartz.group.name=AIMS_JOBS
org.quartz.scheduler.instanceName = DefaultQuartzScheduler
org.quartz.scheduler.instanceId = AUTO
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 10
org.quartz.jobStore.misfireThreshold
= 60000 #org.quartz.jobStore.tablePrefix = QRTZ_ #org.quartz.jobStore.isClustered = true #org.quartz.jobStore.clusterCheckinInterval = 2000

4.配置讀取-job配置

package com.paic.aims.cache.config;

import com.paic.aims.cache.base.InvokeJobDetailFactory;
import com.paic.aims.cache.base.Job;
import com.paic.icore.agr.common.exception.ServiceRuntimeException;
import com.paic.icore.agr.common.um.PrincipalUtil;
import com.paic.icore.agr.common.utils.AgrLogUtils;
import org.apache.commons.lang.StringUtils;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.Trigger;
import org.quartz.listeners.JobListenerSupport;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import org.springframework.scheduling.quartz.JobDetailFactoryBean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.text.ParseException;
import java.util.*;

/**
 * job配置
 * 掃描所有的Task類,並按照配置檔案中配置的Cron表示式,啟動job
 **/
/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
@Configuration
public class JobConfig {
  private static Logger logger = AgrLogUtils.getLogger(JobConfig.class);
    @Autowired(required = false)
    private List<Job> jobList;
    @Autowired
    private TaskConfig taskConfig;
    @Bean
    public SchedulerFactoryBean schedulerFactoryBeans() {
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        if(CollectionUtils.isEmpty(jobList)){
            return schedulerFactoryBean;
        }
        List<TriggerConfig> triggerConfigs = taskConfig.triggers;
        List<Trigger> triggers = new ArrayList<>();
        logger.info("jobs is+ "+ jobList);
        for(Job job : jobList){
            String jobName = job.getClass().getSimpleName();
            TriggerConfig triggerConfig = triggerConfigs.stream().filter(e -> e.getJobName().equals(jobName)).findFirst()
                                                        .orElse(null);
            if(triggerConfig == null || StringUtils.isBlank(triggerConfig.getCron())){
                throw new ServiceRuntimeException("No cron expression found for job [" + jobName + "]");
            }
            if(!triggerConfig.isEnabled()){
                continue;
            }
            JobDetailFactoryBean jobFactoryBean = new JobDetailFactoryBean();
            jobFactoryBean.setJobClass(InvokeJobDetailFactory.class);
            jobFactoryBean.setDurability(true);
            jobFactoryBean.setRequestsRecovery(true);
            jobFactoryBean.setGroup("AIMS_JOBS");
            jobFactoryBean.setBeanName(jobName);

            Map<String, Object> map = new HashMap<>(100);
            map.put("taskClass", job.getClass());
            jobFactoryBean.setJobDataAsMap(map);

            jobFactoryBean.afterPropertiesSet();

            JobDetail jobDetail = jobFactoryBean.getObject();
            //是否不允許併發執行
            logger.info("jobDetailIsConcurrentExectionDisallowed="+jobDetail.isConcurrentExectionDisallowed());

            CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
            cronTriggerFactoryBean.setJobDetail(jobDetail);
            cronTriggerFactoryBean.setName("trigger_" + jobName);
            cronTriggerFactoryBean.setGroup("AIMS_JOBS");
            cronTriggerFactoryBean.setCronExpression(triggerConfig.getCron());
            try {
                cronTriggerFactoryBean.afterPropertiesSet();
            } catch (ParseException e) {
                throw new ServiceRuntimeException("Invalid cron expression [" + triggerConfig.getCron() + "] for " +
                        "task [" + jobName + "]");
            }
            triggers.add(cronTriggerFactoryBean.getObject());
        }
        logger.info("triggers is+ "+ triggers);
        schedulerFactoryBean.setConfigLocation( new ClassPathResource("context-epasquartzmanage.properties"));

        Properties p = new Properties();
        try {
            // 讀取配置檔案pro.properties
            p.load(JobConfig.class.getClassLoader().getResourceAsStream("context-epasquartzmanage.properties"));
            logger.info("org.quartz.group.name="+p.getProperty("org.quartz.group.name"));
            logger.info("org.quartz.jobStore.tablePrefix="+p.getProperty("org.quartz.jobStore.tablePrefix"));
        }catch ( Exception e){
            logger.info("err"+e.getMessage());
        }
        schedulerFactoryBean.setQuartzProperties(p);
        schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext");
        schedulerFactoryBean.setTriggers(triggers.toArray(new Trigger[triggers.size()]));
        //設定job監聽器,設定UserExecutionContext
        schedulerFactoryBean.setGlobalJobListeners(new JobListenerSupport() {
            @Override
            public String getName() {
                return "UserExecutionContextInit";
            }
            @Override
            public void jobToBeExecuted(JobExecutionContext context) {
                //設定預設執行身份
                PrincipalUtil.setUser(null);
            }
            @Override
            public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
                PrincipalUtil.clear();
            }
            @Override
            public void jobExecutionVetoed(JobExecutionContext context) {
                PrincipalUtil.clear();
            }
        });
        return schedulerFactoryBean;
    }

    @Component
    @ConfigurationProperties(prefix = "asyn-tasks")
    static class TaskConfig {
        private List<TriggerConfig> triggers = new ArrayList<>();
        public void setTriggers(List<TriggerConfig> triggers) {
            this.triggers = triggers;
        }
        public List<TriggerConfig> getTriggers() {
            return triggers;
        }
    }
}

5.trigger配置

package com.paic.aims.cache.config;

/**
 * trigger配置
 */
/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
public class TriggerConfig {

    private String jobName;
    private String cron;
    private boolean enabled;

    public boolean isEnabled() {
        return enabled;
    }

    public void setEnabled(boolean enabled) {
        this.enabled = enabled;
    }

    public String getJobName() {
        return jobName;
    }

    public void setJobName(String jobName) {
        this.jobName = jobName;
    }

    public String getCron() {
        return cron;
    }

    public void setCron(String cron) {
        this.cron = cron;
    }
}

6.基礎類

/**
*定義一個job介面
**/
package com.paic.aims.cache.base;
public interface Job {
    /**
     * 這就是個介面
     */
    void execute();
}
package com.paic.aims.cache.base;

import com.paic.icore.agr.common.utils.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;
/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
@Component
public class CacheJob implements Job {
    /**
    分散式事務鎖的超時時間 120s
     */
    public final static long NX_TIME_OUT =  60L;
    private final String BLOCK_REDIS_REFRESH = "BLOCKREDISREFRESHAIMS";
    /**
     * 定時重新整理所有快取
     */
    @Autowired
    private List<CacheDataPojoService> list;
    /**
     * 初始化資料
     */
    public void init() {
        for (CacheDataPojoService cacheDataPojoService : list) {
            cacheDataPojoService.initData();
        }
    }
    @Override
    public void execute() {
        if (RedisUtil.setnx(BLOCK_REDIS_REFRESH, BLOCK_REDIS_REFRESH, NX_TIME_OUT)) {
            try {
                for (CacheDataPojoService cacheDataPojoService : list) {
                    cacheDataPojoService.refreshData();
                }
            } finally {
                RedisUtil.deleteString(BLOCK_REDIS_REFRESH);
            }
        }
    }
}
package com.paic.aims.cache.base;

import com.paic.icore.agr.common.utils.AgrLogUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;
/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
@Component
public class CacheServiceImpl {

    public final static Logger log = AgrLogUtils.getLogger(CacheServiceImpl.class);

    /**
     * 定時重新整理所有快取
     */
    @Autowired
    private List<CacheDataPojoService> list;
    public void refresh(){
        for (CacheDataPojoService cacheDataPojoService: list){
            cacheDataPojoService.refreshData();
        }
    }

    /**
    * 初始化資料
     */
    public void init(){
        log.info("init cache listSize = {}",list.size());
        for (CacheDataPojoService cacheDataPojoService: list){
            log.info("CacheDataPojoService pojo class,{}",cacheDataPojoService.getClass().toString());
            try {
                cacheDataPojoService.initData();
            }catch (Exception e){
                log.error("init data happen error",e);
            }

        }
    }

}

package com.paic.aims.cache.base;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.quartz.QuartzJobBean;
/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
public class InvokeJobDetailFactory extends QuartzJobBean {
    private Class<? extends Job> taskClass;
    private ApplicationContext ctx;

    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        try {
            Job job = ctx.getBean(taskClass);
            job.execute();
        } catch (Exception e) {
            throw new JobExecutionException(e);
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        ctx = applicationContext;
    }

    public void setTaskClass(Class<? extends Job> taskClass) {
        this.taskClass = taskClass;
    }
}
package com.paic.aims.cache.base;

import com.paic.icore.agr.common.utils.AgrLogUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;
/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
@Component
public class CacheServiceImpl {

    public final static Logger log = AgrLogUtils.getLogger(CacheServiceImpl.class);

    /**
     * 定時重新整理所有快取
     */
    @Autowired
    private List<CacheDataPojoService> list;
    public void refresh(){
        for (CacheDataPojoService cacheDataPojoService: list){
            cacheDataPojoService.refreshData();
        }
    }

    /**
    * 初始化資料
     */
    public void init(){
        log.info("init cache listSize = {}",list.size());
        for (CacheDataPojoService cacheDataPojoService: list){
            log.info("CacheDataPojoService pojo class,{}",cacheDataPojoService.getClass().toString());
            try {
                cacheDataPojoService.initData();
            }catch (Exception e){
                log.error("init data happen error",e);
            }

        }
    }

}

7.啟動類
自定義啟動類

package com.paic.aims;

import com.paic.aims.cache.base.CacheDataPojoService;
import com.paic.aims.cache.base.CacheServiceImpl;
import com.paic.icore.agr.common.utils.AgrLogUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
@Component
public class CacheServiceStartedRunner implements ApplicationRunner {
    public final static Logger log = AgrLogUtils.getLogger(CacheServiceStartedRunner.class);

    private CacheDataPojoService cacheDataPojoService;

    @Autowired
    private CacheServiceImpl acheServiceImpl;

    public CacheServiceStartedRunner(CacheServiceImpl acheServiceImpl) {
        this.acheServiceImpl =acheServiceImpl;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        //初始化基礎資料
        log.info("初始化快取資料");
        acheServiceImpl.init();
    }
}

系統啟動類

package com.paic.aims;

import com.paic.icore.agr.common.config.AgrBootstrap;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.boot.autoconfigure.web.MultipartAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.netflix.feign.EnableFeignClients;
import org.springframework.context.annotation.Import;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;

/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
@SpringBootApplication
@Import(AgrBootstrap.class)
@EnableTransactionManagement
@EnableFeignClients(basePackages = {"com.paic.aims.*"})
@EnableAutoConfiguration(exclude = {MultipartAutoConfiguration.class})
@EnableAsync
@EnableScheduling
public class ApplicationJobs {
    public static void main(String[] args) {
        new SpringApplicationBuilder(ApplicationJobs.class).web(true).run(args);
    }
}

8.自定義service類,實現自己的業務需求

package com.paic.aims.service;


import com.paic.aims.cache.base.CacheDataPojoService;
import com.paic.aims.dto.InformationModuleModel;
import com.paic.aims.mapper.InformationCacheMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * Demo class
 *
 * @author yjw
 * @date 2018/04/01
 */
@Service
public class InformationAppServiceImpl implements CacheDataPojoService {

    private static final Log logger = LogFactory.getLog(InformationAppServiceImpl.class);

    @Autowired
    private InformationCacheMapper informationMapper;

    /**
     * 分頁查詢三農資訊模組
     * @return
     */
    public List<InformationModuleModel> selectInformationModuleList(int belong) throws Exception {
        List<InformationModuleModel> module = informationMapper.selectInformationModuleList();
        return module;
    }

    @Override
    public void initData() {
        List<InformationModuleModel> module = informationMapper.selectInformationModuleList();
        System.out.println("初始化資料");
    }

    @Override
    public void refreshData() {
        System.out.println("更新資料");
    }


}

9.工具類

package com.paic.icore.agr.common.utils;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * @Author
 * @Date 2017/11/1 15:33
 */
@Component
public class RedisUtil {

    private static final Logger logger = LoggerFactory.getLogger(RedisUtil.class);

    private static StringRedisTemplate stringRedisTemplate;
    private static RedisTemplate<String, Object> redisTemplate;
    private static String applicationName;

    private static String wrapKey(String key) {
        return key;
    }

    public static void setObject(String key, Object o) {
        redisTemplate.opsForValue().set(wrapKey(key), o);
    }

    public static void setObject(String key, Object o, int timeout) {
        redisTemplate.opsForValue().set(wrapKey(key), o, timeout, TimeUnit.DAYS);
    }

    public static <T> T getObject(String key) {
        return (T) redisTemplate.opsForValue().get(wrapKey(key));
    }

    public static void deleteObject(String key) {
        redisTemplate.delete(wrapKey(key));
    }

    public static void deleteString(String key) {
        stringRedisTemplate.delete(wrapKey(key));
    }

    public static String getString(String key) {
        if (StringUtils.isBlank(key)) {
            return null;
        }
        return stringRedisTemplate.opsForValue().get(wrapKey(key));
    }

    public static void setString(String key, String val) {
        stringRedisTemplate.opsForValue().set(wrapKey(key), val);
    }

    public static void setString(String key, String val, int timeout) {
        stringRedisTemplate.opsForValue().set(wrapKey(key), val, timeout, TimeUnit.SECONDS);
    }

    /**
     * 分散式事務鎖
     *
     * @param key
     * @param val
     * @param time 超時時間
     * @return true 為上鎖成功,false為上鎖失敗
     */
    public static boolean setnx(String key, String val, long time) {
        boolean flag;
        RedisConnection connection = stringRedisTemplate.getConnectionFactory().getConnection();
        try {
            flag = connection.setNX(key.getBytes(), val.getBytes());
        } finally {
            connection.close();
        }
//        flag = stringRedisTemplate.getConnectionFactory().getConnection().setNX(key.getBytes(),val.getBytes());
        //上鎖成功則進行設定超時時間
        if (flag){
            stringRedisTemplate.expire(key, time, TimeUnit.SECONDS);}
        return flag;
    }

    public static long ttl(String key) {
        return stringRedisTemplate.getConnectionFactory().getConnection().ttl(key.getBytes());
    }

    /**
     * 相當於 map.put(key,valure)
     *
     * @param mapName
     * @param key
     * @param object
     * @param <T>
     */
    public static <T> void mapPut(String mapName, String key, T object) {
        redisTemplate.opsForHash().put(mapName, key, object);
    }

    /**
     * 相當於map.get(key)
     *
     * @param mapName
     * @param key
     * @param <T>
     * @return
     */
    public static <T> T mapGetByKey(String mapName, String key) {
        return (T) redisTemplate.opsForHash().get(mapName, key);
    }

    /**
     * 獲取整個Map
     *
     * @param mapName
     * @return
     */
    public static Map getMap(String mapName) {
        return redisTemplate.opsForHash().entries(mapName);
    }


    /**
     * 將要查詢的條件當做key進行ZSet儲存
     * @param pattern
     * @return
     */
    public static Set<String> keys(String pattern) {
        return stringRedisTemplate.keys(pattern);
    }

    @Autowired
    public void setStringRedisTemplate(StringRedisTemplate stringRedisTemplate) {
        RedisSerializer stringSerializer = new StringRedisSerializer();
        stringRedisTemplate.setKeySerializer(stringSerializer);
        stringRedisTemplate.setValueSerializer(stringSerializer);
        stringRedisTemplate.setHashKeySerializer(stringSerializer);
        stringRedisTemplate.setHashValueSerializer(stringSerializer);
        RedisUtil.stringRedisTemplate = stringRedisTemplate;
    }

    @Autowired
    public void setRedisTemplate(RedisTemplate redisTemplate) {
        RedisUtil.redisTemplate = redisTemplate;
    }

    @Value("${spring.application.name:unknown}")
    public void setApplicationName(String applicationName) {
        RedisUtil.applicationName = applicationName;
    }

    public static void main(String[] args)
    {
        RedisUtil.setString("c","中文");
    }

}