spring boot 整合quartz實現定時任務排程
阿新 • • 發佈:2019-01-26
1.pom檔案引入
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
<exclusions>
<exclusion>
<artifactId >slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
2.application.xml配置
#定時任務配置
asyn-tasks:
triggers:
- job-name: CacheJob
cron: 0 0/1 * * * ?
enabled: true
3.配置檔案context-epasquartzmanage.properties
org.quartz.group.name=AIMS_JOBS
org.quartz.scheduler.instanceName = DefaultQuartzScheduler
org.quartz.scheduler.instanceId = AUTO
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 10
org.quartz.jobStore.misfireThreshold = 60000
#org.quartz.jobStore.tablePrefix = QRTZ_
#org.quartz.jobStore.isClustered = true
#org.quartz.jobStore.clusterCheckinInterval = 2000
4.配置讀取-job配置
package com.paic.aims.cache.config;
import com.paic.aims.cache.base.InvokeJobDetailFactory;
import com.paic.aims.cache.base.Job;
import com.paic.icore.agr.common.exception.ServiceRuntimeException;
import com.paic.icore.agr.common.um.PrincipalUtil;
import com.paic.icore.agr.common.utils.AgrLogUtils;
import org.apache.commons.lang.StringUtils;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.Trigger;
import org.quartz.listeners.JobListenerSupport;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import org.springframework.scheduling.quartz.JobDetailFactoryBean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.text.ParseException;
import java.util.*;
/**
* job配置
* 掃描所有的Task類,並按照配置檔案中配置的Cron表示式,啟動job
**/
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
@Configuration
public class JobConfig {
private static Logger logger = AgrLogUtils.getLogger(JobConfig.class);
@Autowired(required = false)
private List<Job> jobList;
@Autowired
private TaskConfig taskConfig;
@Bean
public SchedulerFactoryBean schedulerFactoryBeans() {
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
if(CollectionUtils.isEmpty(jobList)){
return schedulerFactoryBean;
}
List<TriggerConfig> triggerConfigs = taskConfig.triggers;
List<Trigger> triggers = new ArrayList<>();
logger.info("jobs is+ "+ jobList);
for(Job job : jobList){
String jobName = job.getClass().getSimpleName();
TriggerConfig triggerConfig = triggerConfigs.stream().filter(e -> e.getJobName().equals(jobName)).findFirst()
.orElse(null);
if(triggerConfig == null || StringUtils.isBlank(triggerConfig.getCron())){
throw new ServiceRuntimeException("No cron expression found for job [" + jobName + "]");
}
if(!triggerConfig.isEnabled()){
continue;
}
JobDetailFactoryBean jobFactoryBean = new JobDetailFactoryBean();
jobFactoryBean.setJobClass(InvokeJobDetailFactory.class);
jobFactoryBean.setDurability(true);
jobFactoryBean.setRequestsRecovery(true);
jobFactoryBean.setGroup("AIMS_JOBS");
jobFactoryBean.setBeanName(jobName);
Map<String, Object> map = new HashMap<>(100);
map.put("taskClass", job.getClass());
jobFactoryBean.setJobDataAsMap(map);
jobFactoryBean.afterPropertiesSet();
JobDetail jobDetail = jobFactoryBean.getObject();
//是否不允許併發執行
logger.info("jobDetailIsConcurrentExectionDisallowed="+jobDetail.isConcurrentExectionDisallowed());
CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
cronTriggerFactoryBean.setJobDetail(jobDetail);
cronTriggerFactoryBean.setName("trigger_" + jobName);
cronTriggerFactoryBean.setGroup("AIMS_JOBS");
cronTriggerFactoryBean.setCronExpression(triggerConfig.getCron());
try {
cronTriggerFactoryBean.afterPropertiesSet();
} catch (ParseException e) {
throw new ServiceRuntimeException("Invalid cron expression [" + triggerConfig.getCron() + "] for " +
"task [" + jobName + "]");
}
triggers.add(cronTriggerFactoryBean.getObject());
}
logger.info("triggers is+ "+ triggers);
schedulerFactoryBean.setConfigLocation( new ClassPathResource("context-epasquartzmanage.properties"));
Properties p = new Properties();
try {
// 讀取配置檔案pro.properties
p.load(JobConfig.class.getClassLoader().getResourceAsStream("context-epasquartzmanage.properties"));
logger.info("org.quartz.group.name="+p.getProperty("org.quartz.group.name"));
logger.info("org.quartz.jobStore.tablePrefix="+p.getProperty("org.quartz.jobStore.tablePrefix"));
}catch ( Exception e){
logger.info("err"+e.getMessage());
}
schedulerFactoryBean.setQuartzProperties(p);
schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext");
schedulerFactoryBean.setTriggers(triggers.toArray(new Trigger[triggers.size()]));
//設定job監聽器,設定UserExecutionContext
schedulerFactoryBean.setGlobalJobListeners(new JobListenerSupport() {
@Override
public String getName() {
return "UserExecutionContextInit";
}
@Override
public void jobToBeExecuted(JobExecutionContext context) {
//設定預設執行身份
PrincipalUtil.setUser(null);
}
@Override
public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
PrincipalUtil.clear();
}
@Override
public void jobExecutionVetoed(JobExecutionContext context) {
PrincipalUtil.clear();
}
});
return schedulerFactoryBean;
}
@Component
@ConfigurationProperties(prefix = "asyn-tasks")
static class TaskConfig {
private List<TriggerConfig> triggers = new ArrayList<>();
public void setTriggers(List<TriggerConfig> triggers) {
this.triggers = triggers;
}
public List<TriggerConfig> getTriggers() {
return triggers;
}
}
}
5.trigger配置
package com.paic.aims.cache.config;
/**
* trigger配置
*/
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
public class TriggerConfig {
private String jobName;
private String cron;
private boolean enabled;
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public String getCron() {
return cron;
}
public void setCron(String cron) {
this.cron = cron;
}
}
6.基礎類
/**
*定義一個job介面
**/
package com.paic.aims.cache.base;
public interface Job {
/**
* 這就是個介面
*/
void execute();
}
package com.paic.aims.cache.base;
import com.paic.icore.agr.common.utils.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
@Component
public class CacheJob implements Job {
/**
分散式事務鎖的超時時間 120s
*/
public final static long NX_TIME_OUT = 60L;
private final String BLOCK_REDIS_REFRESH = "BLOCKREDISREFRESHAIMS";
/**
* 定時重新整理所有快取
*/
@Autowired
private List<CacheDataPojoService> list;
/**
* 初始化資料
*/
public void init() {
for (CacheDataPojoService cacheDataPojoService : list) {
cacheDataPojoService.initData();
}
}
@Override
public void execute() {
if (RedisUtil.setnx(BLOCK_REDIS_REFRESH, BLOCK_REDIS_REFRESH, NX_TIME_OUT)) {
try {
for (CacheDataPojoService cacheDataPojoService : list) {
cacheDataPojoService.refreshData();
}
} finally {
RedisUtil.deleteString(BLOCK_REDIS_REFRESH);
}
}
}
}
package com.paic.aims.cache.base;
import com.paic.icore.agr.common.utils.AgrLogUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
@Component
public class CacheServiceImpl {
public final static Logger log = AgrLogUtils.getLogger(CacheServiceImpl.class);
/**
* 定時重新整理所有快取
*/
@Autowired
private List<CacheDataPojoService> list;
public void refresh(){
for (CacheDataPojoService cacheDataPojoService: list){
cacheDataPojoService.refreshData();
}
}
/**
* 初始化資料
*/
public void init(){
log.info("init cache listSize = {}",list.size());
for (CacheDataPojoService cacheDataPojoService: list){
log.info("CacheDataPojoService pojo class,{}",cacheDataPojoService.getClass().toString());
try {
cacheDataPojoService.initData();
}catch (Exception e){
log.error("init data happen error",e);
}
}
}
}
package com.paic.aims.cache.base;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.quartz.QuartzJobBean;
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
public class InvokeJobDetailFactory extends QuartzJobBean {
private Class<? extends Job> taskClass;
private ApplicationContext ctx;
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
try {
Job job = ctx.getBean(taskClass);
job.execute();
} catch (Exception e) {
throw new JobExecutionException(e);
}
}
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
ctx = applicationContext;
}
public void setTaskClass(Class<? extends Job> taskClass) {
this.taskClass = taskClass;
}
}
package com.paic.aims.cache.base;
import com.paic.icore.agr.common.utils.AgrLogUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
@Component
public class CacheServiceImpl {
public final static Logger log = AgrLogUtils.getLogger(CacheServiceImpl.class);
/**
* 定時重新整理所有快取
*/
@Autowired
private List<CacheDataPojoService> list;
public void refresh(){
for (CacheDataPojoService cacheDataPojoService: list){
cacheDataPojoService.refreshData();
}
}
/**
* 初始化資料
*/
public void init(){
log.info("init cache listSize = {}",list.size());
for (CacheDataPojoService cacheDataPojoService: list){
log.info("CacheDataPojoService pojo class,{}",cacheDataPojoService.getClass().toString());
try {
cacheDataPojoService.initData();
}catch (Exception e){
log.error("init data happen error",e);
}
}
}
}
7.啟動類
自定義啟動類
package com.paic.aims;
import com.paic.aims.cache.base.CacheDataPojoService;
import com.paic.aims.cache.base.CacheServiceImpl;
import com.paic.icore.agr.common.utils.AgrLogUtils;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
@Component
public class CacheServiceStartedRunner implements ApplicationRunner {
public final static Logger log = AgrLogUtils.getLogger(CacheServiceStartedRunner.class);
private CacheDataPojoService cacheDataPojoService;
@Autowired
private CacheServiceImpl acheServiceImpl;
public CacheServiceStartedRunner(CacheServiceImpl acheServiceImpl) {
this.acheServiceImpl =acheServiceImpl;
}
@Override
public void run(ApplicationArguments args) throws Exception {
//初始化基礎資料
log.info("初始化快取資料");
acheServiceImpl.init();
}
}
系統啟動類
package com.paic.aims;
import com.paic.icore.agr.common.config.AgrBootstrap;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.boot.autoconfigure.web.MultipartAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.netflix.feign.EnableFeignClients;
import org.springframework.context.annotation.Import;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
@SpringBootApplication
@Import(AgrBootstrap.class)
@EnableTransactionManagement
@EnableFeignClients(basePackages = {"com.paic.aims.*"})
@EnableAutoConfiguration(exclude = {MultipartAutoConfiguration.class})
@EnableAsync
@EnableScheduling
public class ApplicationJobs {
public static void main(String[] args) {
new SpringApplicationBuilder(ApplicationJobs.class).web(true).run(args);
}
}
8.自定義service類,實現自己的業務需求
package com.paic.aims.service;
import com.paic.aims.cache.base.CacheDataPojoService;
import com.paic.aims.dto.InformationModuleModel;
import com.paic.aims.mapper.InformationCacheMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* Demo class
*
* @author yjw
* @date 2018/04/01
*/
@Service
public class InformationAppServiceImpl implements CacheDataPojoService {
private static final Log logger = LogFactory.getLog(InformationAppServiceImpl.class);
@Autowired
private InformationCacheMapper informationMapper;
/**
* 分頁查詢三農資訊模組
* @return
*/
public List<InformationModuleModel> selectInformationModuleList(int belong) throws Exception {
List<InformationModuleModel> module = informationMapper.selectInformationModuleList();
return module;
}
@Override
public void initData() {
List<InformationModuleModel> module = informationMapper.selectInformationModuleList();
System.out.println("初始化資料");
}
@Override
public void refreshData() {
System.out.println("更新資料");
}
}
9.工具類
package com.paic.icore.agr.common.utils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* @Author
* @Date 2017/11/1 15:33
*/
@Component
public class RedisUtil {
private static final Logger logger = LoggerFactory.getLogger(RedisUtil.class);
private static StringRedisTemplate stringRedisTemplate;
private static RedisTemplate<String, Object> redisTemplate;
private static String applicationName;
private static String wrapKey(String key) {
return key;
}
public static void setObject(String key, Object o) {
redisTemplate.opsForValue().set(wrapKey(key), o);
}
public static void setObject(String key, Object o, int timeout) {
redisTemplate.opsForValue().set(wrapKey(key), o, timeout, TimeUnit.DAYS);
}
public static <T> T getObject(String key) {
return (T) redisTemplate.opsForValue().get(wrapKey(key));
}
public static void deleteObject(String key) {
redisTemplate.delete(wrapKey(key));
}
public static void deleteString(String key) {
stringRedisTemplate.delete(wrapKey(key));
}
public static String getString(String key) {
if (StringUtils.isBlank(key)) {
return null;
}
return stringRedisTemplate.opsForValue().get(wrapKey(key));
}
public static void setString(String key, String val) {
stringRedisTemplate.opsForValue().set(wrapKey(key), val);
}
public static void setString(String key, String val, int timeout) {
stringRedisTemplate.opsForValue().set(wrapKey(key), val, timeout, TimeUnit.SECONDS);
}
/**
* 分散式事務鎖
*
* @param key
* @param val
* @param time 超時時間
* @return true 為上鎖成功,false為上鎖失敗
*/
public static boolean setnx(String key, String val, long time) {
boolean flag;
RedisConnection connection = stringRedisTemplate.getConnectionFactory().getConnection();
try {
flag = connection.setNX(key.getBytes(), val.getBytes());
} finally {
connection.close();
}
// flag = stringRedisTemplate.getConnectionFactory().getConnection().setNX(key.getBytes(),val.getBytes());
//上鎖成功則進行設定超時時間
if (flag){
stringRedisTemplate.expire(key, time, TimeUnit.SECONDS);}
return flag;
}
public static long ttl(String key) {
return stringRedisTemplate.getConnectionFactory().getConnection().ttl(key.getBytes());
}
/**
* 相當於 map.put(key,valure)
*
* @param mapName
* @param key
* @param object
* @param <T>
*/
public static <T> void mapPut(String mapName, String key, T object) {
redisTemplate.opsForHash().put(mapName, key, object);
}
/**
* 相當於map.get(key)
*
* @param mapName
* @param key
* @param <T>
* @return
*/
public static <T> T mapGetByKey(String mapName, String key) {
return (T) redisTemplate.opsForHash().get(mapName, key);
}
/**
* 獲取整個Map
*
* @param mapName
* @return
*/
public static Map getMap(String mapName) {
return redisTemplate.opsForHash().entries(mapName);
}
/**
* 將要查詢的條件當做key進行ZSet儲存
* @param pattern
* @return
*/
public static Set<String> keys(String pattern) {
return stringRedisTemplate.keys(pattern);
}
@Autowired
public void setStringRedisTemplate(StringRedisTemplate stringRedisTemplate) {
RedisSerializer stringSerializer = new StringRedisSerializer();
stringRedisTemplate.setKeySerializer(stringSerializer);
stringRedisTemplate.setValueSerializer(stringSerializer);
stringRedisTemplate.setHashKeySerializer(stringSerializer);
stringRedisTemplate.setHashValueSerializer(stringSerializer);
RedisUtil.stringRedisTemplate = stringRedisTemplate;
}
@Autowired
public void setRedisTemplate(RedisTemplate redisTemplate) {
RedisUtil.redisTemplate = redisTemplate;
}
@Value("${spring.application.name:unknown}")
public void setApplicationName(String applicationName) {
RedisUtil.applicationName = applicationName;
}
public static void main(String[] args)
{
RedisUtil.setString("c","中文");
}
}