Spring-boot使用Quartz實現多執行緒排程任務
因為Quartz的用法網上還是很多的很容易找到。難的是如何和Spring-boot結合起來是比較麻煩的。所以我可能重點會放在這個上面。
具體實現步驟:
1、首先在專案的Gradle裡面新增Quartz 依賴(Maven的話就自己找一個依賴的源)
compile('org.quartz-scheduler:quartz:2.3.0')
一、實現觀察者模式。實現一個Listener去觀察Service的動向,減少耦合關係。
1、先讓被觀察者繼承Java.util.Observable物件。表示可以擁有給觀察者傳送訊息的能力
@Service public class OutlierServiceImpl extends Observable { @Override public void addTask(OutlierDetection outlierDetection){ MessageObject messageObject =new MessageObject(); messageObject.setOperate(OutlierOperate.ADD); messageObject.setOutlierDetection(outlierDetection); setChanged(); notifyObservers(messageObject); } @Override public void removeTask(String assetId,String outlierName){ setChanged(); notifyObservers(messageObject); } @Override public void updateTask(OutlierDetection outlierDetection){ MessageObject messageObject=new MessageObject(); messageObject.setOperate(OutlierOperate.UPDATE); messageObject.setOutlierDetection(outlierDetection); setChanged(); notifyObservers(messageObject); }
2、建立一個觀察者實現Observer的介面。表示可以擁有觀察的能力
@Component public class OutlierServiceListener implements Observer{ //private Observable ob; private String name; private Scheduler scheduler; @Autowired OutlierServiceImpl outlierServiceImpl; @Autowired @Qualifier("schedulerFactoryBean") private SchedulerFactoryBean schedulerFactoryBean; @Autowired JobFactory jobFactory; @PostConstruct public void registryOutlier(){ outlierServiceImpl.addObserver(this); scheduler = schedulerFactoryBean.getScheduler(); } @Override public void update(Observable o, java.lang.Object arg) { MessageObject messageObject =(MessageObject) arg; if(messageObject.getOperate().equals(OutlierOperate.ADD)){ OutlierDetection outlier = messageObject.getOutlierDetection(); addJob(outlier); }else if(messageObject.getOperate().equals(OutlierOperate.REMOVE)){ String outlierName = messageObject.getOutlierName(); String assetId = messageObject.getAssetId(); removeJob(assetId,outlierName); }else if(messageObject.getOperate().equals(OutlierOperate.UPDATE)){ OutlierDetection outlier = messageObject.getOutlierDetection(); updateJob(outlier); } }
3.建立觀察者與被觀察者之間的聯絡
通過Spring的註解將outlierService注入到outlierListener裡面。然後設定Outlier的觀察者是自己。這樣觀察者就可以與被觀察者聯絡起來了。(註解PostConstruct的意思是在初始化完整個類後會執行這個函式)4.被觀察者發出通知訊息。觀察者接收到通知訊息然後執行相應的操作。
綠色方框裡面是我自己定義的MessageObject。用於在觀察者和被觀察者之前傳遞訊息。被觀察者先配置好訊息,然後呼叫SetChanged()表示被觀察者已經有了變化。然後呼叫notifyObservers(messageObject)函式。通知所有的觀察者。並將訊息傳送過去。
二、在Listener裡面利用Quartz實現週期性執行任務Quartz簡單介紹一下:Quartz是一個完全由java編寫的開源作業排程框架、可以按照計劃建立簡單或者複雜的幾十,幾百。甚至數十萬的作業數。
重點:如何和Spring整合(Spring-boot這種沒有xml配置的專案)如何整合。Quartz的使用方法網上介紹挺多的,也挺詳細的。
1、 建立一個自己的job繼承於Quartz的job,Overrideexecute函式。這個函式就是每一個job執行一次所要做的功能。把一個執行緒應該做的工作就寫在裡面就好了。
public class OutlierJob implements Job{
@Autowired
OutlierService outlierService;
OutlierReport lastReport;
@Override
public void execute(JobExecutionContext context){
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
String name = getName(dataMap);
String assetId = getAssetId(dataMap);
System.out.println("Outlier task 【"+name+"】is running");
OutlierDetection outlierDetection=outlierService.getOutlierDetectionCfg(assetId,name);
if (lastReport==null){
lastReport = outlierService.findLastReportForTask(assetId,name);
}
lastReport = outlierService.runOutlierDetection(lastReport,outlierDetection);
}
public String getAssetId(JobDataMap dataMap){
return dataMap.getString("assetId");
}
public String getName(JobDataMap dataMap){
return dataMap.getString("name");
}
}
2、 建立Scheduler。給Scheduler新增job和job的trigger。然後將job加入到Scheduler裡面。Job就會開始按照trigger設定的週期定時的執行了。
private void addJob(OutlierDetection outlierDetection){
String taskName = outlierDetection.getTaskName();
String assetId = outlierDetection.getAssetId();
int interval = new Long(outlierDetection.getInterval()).intValue();
JobDetail job = JobBuilder.newJob(OutlierJob.class)
.withIdentity(taskName,assetId).build();
job.getJobDataMap().put("name",taskName);
job.getJobDataMap().put("assetId",assetId);
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(taskName,assetId)
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(interval).repeatForever())
.build();
try {
scheduler.scheduleJob(job,trigger);
scheduler.start();
} catch (SchedulerException e) {
e.printStackTrace();
}
}
3、 停止一個Job。當job執行起來以後如何停止是應該需要考慮到的。不然就會一直執行了。先暫停Trigger。然後將Trigger移出Scheduler。最後deleteJob
private void removeJob(String assetId,String jobName){
System.out.println("Job【"+jobName+"】exit");
try {
JobKey jobKey = JobKey.jobKey(jobName,assetId);
TriggerKey triggerKey = TriggerKey.triggerKey(jobName,assetId);
scheduler.pauseTrigger(triggerKey);
scheduler.unscheduleJob(triggerKey);
scheduler.deleteJob(jobKey);
outlierServiceImpl.deleteOutlierDetectionTask(assetId,jobName);
} catch (SchedulerException e) {
e.printStackTrace();
}
}
小結一下:在這裡。我們已經完成了建立一個Listener去觀察Service。當Service發出需要新增一個task的時候,Listener就能得到訊息。然後在Quartz的Scheduler裡面新增一個job。Job就會按照時間和週期定時的去執行我們之前寫好在execute裡面的程式碼。
問題:但是我們在job裡面如何去呼叫由Spring管理的bean。這個問題就很麻煩了。因為我們每次新增一個task都是手動去new一個job。那麼在new job的時候不是由Spring的容器在管理。所以在這種情況下,使用@autowired依賴注入Spring的bean類會出現注入不進來的情況。outlierService為空。這樣我們就沒有辦法呼叫Spring的bean。經過各種查詢,找到了解決這個問題的方法。4、建立一個JobFactory類繼承於AdaptableJobFactory。注入AutowireCapableBeansFactory.這樣就完成Spring對Job的注入功能。
@Component
public class JobFactory extends AdaptableJobFactory{
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
//呼叫父類的方法
Object jobInstance = super.createJobInstance(bundle);
//進行注入
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
5、 新增一個QuartzConfig類。對Scheduer進行重新配置設定Scheduler的JobFactory使用我們自己建立的JobFactory@Configuration
public class QuartzConfig {
@Autowired
private JobFactory jobFactory;
@Bean(name = "schedulerFactoryBean")
public SchedulerFactoryBean createSchedulerFactoryBean(){
SchedulerFactoryBean schedulerFactoryBean=new SchedulerFactoryBean();
schedulerFactoryBean.setOverwriteExistingJobs(true);
schedulerFactoryBean.setJobFactory(jobFactory);
return schedulerFactoryBean;
}
@Bean
public JobDetailImpl createJobDetailsImpl(){
return new JobDetailImpl();
}
}
6、 構造Scheduler使用我們自己建立的SchedulerFactoryBean
@PostConstruct
public void registryOutlier(){
outlierServiceImpl.addObserver(this);
scheduler = schedulerFactoryBean.getScheduler();
}
到現在為止。這樣在Job裡面就可以注入Spring的bean類了。並且將執行緒的處理邏輯分開。執行緒負責排程和邏輯的跳轉。