1. 程式人生 > >Spring-boot使用Quartz實現多執行緒排程任務

Spring-boot使用Quartz實現多執行緒排程任務

背景:專案的背景還原一下,專案的App需要去呼叫第三方的介面獲取資料。我們要建立很多個任務(Task),每個Task都要去獲取資料,但是每個Task的執行週期和頻率不一樣。所以使用多執行緒來解決。為每一個Task啟動一個執行緒。然後迴圈不停的去獲取資料。剛開始的時候打算自己寫Thread然後對Thread進行管理。後面發現有一個框架Quartz可以很Spring-boot進行整合。非常好用,但是使用的過程中還是遇到了很多的問題。自己網上查資料找結果才將問題最後都解決了。在這個工程中深感查詢資料的不容易,很多人寫的部落格表述不清楚。所以自己寫一個希望可以幫助到更多的人。

因為Quartz的用法網上還是很多的很容易找到。難的是如何和Spring-boot結合起來是比較麻煩的。所以我可能重點會放在這個上面。

具體實現步驟:

1、首先在專案的Gradle裡面新增Quartz 依賴(Maven的話就自己找一個依賴的源)

compile('org.quartz-scheduler:quartz:2.3.0')  

一、實現觀察者模式。實現一個Listener去觀察Service的動向,減少耦合關係。

1、先讓被觀察者繼承Java.util.Observable物件。表示可以擁有給觀察者傳送訊息的能力

@Service
public class OutlierServiceImpl extends Observable {
    
    @Override
    public void addTask(OutlierDetection outlierDetection){
      
        MessageObject messageObject =new MessageObject();
        messageObject.setOperate(OutlierOperate.ADD);
        messageObject.setOutlierDetection(outlierDetection);
        setChanged();
        notifyObservers(messageObject);
    }
    @Override
    public void removeTask(String assetId,String outlierName){

       
        setChanged();
        notifyObservers(messageObject);
    }

    @Override
    public void updateTask(OutlierDetection outlierDetection){
        MessageObject messageObject=new MessageObject();
        messageObject.setOperate(OutlierOperate.UPDATE);
        messageObject.setOutlierDetection(outlierDetection);
        setChanged();
        notifyObservers(messageObject);
    }

2、建立一個觀察者實現Observer的介面。表示可以擁有觀察的能力
@Component
public class OutlierServiceListener implements Observer{

    //private Observable ob;
    private String name;
    private Scheduler scheduler;


    @Autowired
    OutlierServiceImpl outlierServiceImpl;

    @Autowired
    @Qualifier("schedulerFactoryBean")
    private SchedulerFactoryBean schedulerFactoryBean;

    @Autowired
    JobFactory jobFactory;

    @PostConstruct
    public void registryOutlier(){
        outlierServiceImpl.addObserver(this);
        scheduler = schedulerFactoryBean.getScheduler();
    }


  

    @Override
    public void update(Observable o, java.lang.Object arg) {

        MessageObject messageObject =(MessageObject) arg;
        if(messageObject.getOperate().equals(OutlierOperate.ADD)){
            OutlierDetection outlier = messageObject.getOutlierDetection();
            addJob(outlier);
        }else if(messageObject.getOperate().equals(OutlierOperate.REMOVE)){
            String outlierName = messageObject.getOutlierName();
            String assetId = messageObject.getAssetId();
            removeJob(assetId,outlierName);
        }else if(messageObject.getOperate().equals(OutlierOperate.UPDATE)){
            OutlierDetection outlier = messageObject.getOutlierDetection();
            updateJob(outlier);
        }

    }

3.建立觀察者與被觀察者之間的聯絡

通過Spring的註解將outlierService注入到outlierListener裡面。然後設定Outlier的觀察者是自己。這樣觀察者就可以與被觀察者聯絡起來了。(註解PostConstruct的意思是在初始化完整個類後會執行這個函式)

4.被觀察者發出通知訊息。觀察者接收到通知訊息然後執行相應的操作。

綠色方框裡面是我自己定義的MessageObject。用於在觀察者和被觀察者之前傳遞訊息。被觀察者先配置好訊息,然後呼叫SetChanged()表示被觀察者已經有了變化。然後呼叫notifyObservers(messageObject)函式。通知所有的觀察者。並將訊息傳送過去。

二、在Listener裡面利用Quartz實現週期性執行任務

Quartz簡單介紹一下:Quartz是一個完全由java編寫的開源作業排程框架、可以按照計劃建立簡單或者複雜的幾十,幾百。甚至數十萬的作業數。

重點:如何和Spring整合(Spring-boot這種沒有xml配置的專案)如何整合。Quartz的使用方法網上介紹挺多的,也挺詳細的。

1、 建立一個自己的job繼承於Quartz的job,Overrideexecute函式。這個函式就是每一個job執行一次所要做的功能。把一個執行緒應該做的工作就寫在裡面就好了。

public class OutlierJob implements Job{

    @Autowired
    OutlierService outlierService;

    OutlierReport lastReport;

    @Override
    public void execute(JobExecutionContext context){
        JobDataMap dataMap = context.getJobDetail().getJobDataMap();
        String name = getName(dataMap);
        String assetId = getAssetId(dataMap);
        System.out.println("Outlier task 【"+name+"】is running");
        OutlierDetection outlierDetection=outlierService.getOutlierDetectionCfg(assetId,name);
        if (lastReport==null){
             lastReport = outlierService.findLastReportForTask(assetId,name);
        }
        lastReport = outlierService.runOutlierDetection(lastReport,outlierDetection);
    }

    public String getAssetId(JobDataMap dataMap){
        return dataMap.getString("assetId");
    }

    public String getName(JobDataMap dataMap){
        return dataMap.getString("name");
    }
}

2、 建立Scheduler。給Scheduler新增job和job的trigger。然後將job加入到Scheduler裡面。Job就會開始按照trigger設定的週期定時的執行了。

private void addJob(OutlierDetection outlierDetection){
        String taskName = outlierDetection.getTaskName();
        String assetId = outlierDetection.getAssetId();
        int interval = new Long(outlierDetection.getInterval()).intValue();
        JobDetail job = JobBuilder.newJob(OutlierJob.class)
                .withIdentity(taskName,assetId).build();
        job.getJobDataMap().put("name",taskName);
        job.getJobDataMap().put("assetId",assetId);

        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity(taskName,assetId)
                .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(interval).repeatForever())
                .build();
        try {
            scheduler.scheduleJob(job,trigger);
            scheduler.start();
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }
3、 停止一個Job。當job執行起來以後如何停止是應該需要考慮到的。不然就會一直執行了。先暫停Trigger。然後將Trigger移出Scheduler。最後deleteJob
 private void removeJob(String assetId,String jobName){
        System.out.println("Job【"+jobName+"】exit");
        try {
            JobKey jobKey = JobKey.jobKey(jobName,assetId);
            TriggerKey triggerKey = TriggerKey.triggerKey(jobName,assetId);
            scheduler.pauseTrigger(triggerKey);
            scheduler.unscheduleJob(triggerKey);
            scheduler.deleteJob(jobKey);
            outlierServiceImpl.deleteOutlierDetectionTask(assetId,jobName);
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

小結一下:在這裡。我們已經完成了建立一個Listener去觀察Service。當Service發出需要新增一個task的時候,Listener就能得到訊息。然後在Quartz的Scheduler裡面新增一個job。Job就會按照時間和週期定時的去執行我們之前寫好在execute裡面的程式碼。

問題:但是我們在job裡面如何去呼叫由Spring管理的bean。這個問題就很麻煩了。因為我們每次新增一個task都是手動去new一個job。那麼在new job的時候不是由Spring的容器在管理。所以在這種情況下,使用@autowired依賴注入Spring的bean類會出現注入不進來的情況。outlierService為空。這樣我們就沒有辦法呼叫Spring的bean。經過各種查詢,找到了解決這個問題的方法。

4、建立一個JobFactory類繼承於AdaptableJobFactory。注入AutowireCapableBeansFactory.這樣就完成Spring對Job的注入功能。

@Component
public class JobFactory extends AdaptableJobFactory{

    @Autowired
    private AutowireCapableBeanFactory capableBeanFactory;

    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        //呼叫父類的方法
        Object jobInstance = super.createJobInstance(bundle);
        //進行注入
        capableBeanFactory.autowireBean(jobInstance);
        return jobInstance;
    }

}
5、 新增一個QuartzConfig類。對Scheduer進行重新配置設定Scheduler的JobFactory使用我們自己建立的JobFactory
@Configuration
public class QuartzConfig {

    @Autowired
    private JobFactory jobFactory;

    @Bean(name = "schedulerFactoryBean")
    public SchedulerFactoryBean createSchedulerFactoryBean(){
        SchedulerFactoryBean schedulerFactoryBean=new SchedulerFactoryBean();
        schedulerFactoryBean.setOverwriteExistingJobs(true);
        schedulerFactoryBean.setJobFactory(jobFactory);
        return schedulerFactoryBean;
    }

    @Bean
    public JobDetailImpl createJobDetailsImpl(){
        return new JobDetailImpl();
    }
}
6、 構造Scheduler使用我們自己建立的SchedulerFactoryBean
@PostConstruct
    public void registryOutlier(){
        outlierServiceImpl.addObserver(this);
        scheduler = schedulerFactoryBean.getScheduler();
    }

到現在為止。這樣在Job裡面就可以注入Spring的bean類了。並且將執行緒的處理邏輯分開。執行緒負責排程和邏輯的跳轉。