1. 程式人生 > >Spring Boot Quartz 分散式叢集任務排程實現

Spring Boot Quartz 分散式叢集任務排程實現

Spring Boot Quartz

主要內容

  • Spring Scheduler 框架
  • Quartz 框架,功能強大,配置靈活
  • Quartz 叢集
  • mysql 持久化定時任務指令碼(tables_mysql.sql)

介紹

在工程中時常會遇到一些需求,例如定時重新整理一下配置、隔一段時間檢查下網路狀態併發送郵件等諸如此類的定時任務。
定時任務本質就是一個非同步的執行緒,執行緒可以查詢或修改並執行一系列的操作。由於本質是執行緒,在 Java 中可以自行編寫一個執行緒池對定時任務進行控制,但這樣效率太低了,且功能有限,屬於重複造輪子。

分散式任務排程應用場景

Quartz的叢集功能通過故障轉移和負載平衡功能為您的排程程式帶來高可用性和可擴充套件性。

排程程式中會有很多定時任務需要執行,一臺伺服器已經不能滿足使用,需要解決定時任務單機單點故障問題

用Quartz框架,在叢集環境下,通過資料庫鎖機制來實現定時任務的執行;獨立的 Quartz 節點並不與另一其的節點或是管理節點通訊。

Spring Scheduler 實現定時任務

1.定義 Task 類

/**
 * Spring Scheduled示例
 */
@Component
public class ScheduledTask {

    private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private Integer count0 = 1;
    private Integer count1 = 1;
    private Integer count2 = 1;

    @Scheduled(fixedRate = 5000)
    public void reportCurrentTime() throws InterruptedException {
        System.out.println(String.format("reportCurrentTime第%s次執行,當前時間為:%s", count0++, dateFormat.format(new Date())));
    }

    @Scheduled(fixedDelay = 5000)
    public void reportCurrentTimeAfterSleep() throws InterruptedException {
        System.out.println(String.format("reportCurrentTimeAfterSleep第%s次執行,當前時間為:%s", count1++, dateFormat.format(new Date())));
    }

    @Scheduled(cron = "0 0 1 * * *")
    public void reportCurrentTimeCron() throws InterruptedException {
        System.out.println(String.format("reportCurrentTimeCron第%s次執行,當前時間為:%s", count2++, dateFormat.format(new Date())));
    }
}

2.啟動定時任務

在Spring Boot的主類中加入@EnableScheduling註解,啟用定時任務的配置

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
@EnableScheduling
public class ScheduledTaskTests {
    @Test
    public void test() {
        log.info("啟動了ScheduledTask定時作業");
        while (true) {
        }
    }
}

quartz實現分散式定時任務

quartz 是一個開源的分散式排程庫,它基於java實現。
> 它有著強大的排程功能,支援豐富多樣的排程方式,比如簡單排程,基於cron表示式的排程等等。
> 支援排程任務的多種持久化方式。比如支援記憶體儲存,資料庫儲存,Terracotta server 儲存。
> 支援分散式和叢集能力。
> 採用JDBCJobStore方式儲存時,針對事務的處理方式支援全域性事務(和業務服務共享同一個事務)和區域性事務(quarzt 單獨管理自己的事務)
> 基於plugin機制以及listener機制支援靈活的擴充套件。

1.pom.xml配置

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

<!-- mysql -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>

<!-- orm -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

2.spring-quartz.properties叢集配置

#============================================================================
# 配置JobStore
#============================================================================
# JobDataMaps是否都為String型別,預設false
org.quartz.jobStore.useProperties=false

# 表的字首,預設QRTZ_
org.quartz.jobStore.tablePrefix = QRTZ_

# 是否加入叢集
org.quartz.jobStore.isClustered = true

# 排程例項失效的檢查時間間隔 ms
org.quartz.jobStore.clusterCheckinInterval = 5000

# 當設定為“true”時,此屬性告訴Quartz 在非託管JDBC連線上呼叫setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED)。
org.quartz.jobStore.txIsolationLevelReadCommitted = true

# 資料儲存方式為資料庫持久化
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX

# 資料庫代理類,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate可以滿足大部分資料庫
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate

#============================================================================
# Scheduler 排程器屬性配置
#============================================================================
# 排程標識名 叢集中每一個例項都必須使用相同的名稱
org.quartz.scheduler.instanceName = ClusterQuartz
# ID設定為自動獲取 每一個必須不同
org.quartz.scheduler.instanceId= AUTO

#============================================================================
# 配置ThreadPool
#============================================================================
# 執行緒池的實現類(一般使用SimpleThreadPool即可滿足幾乎所有使用者的需求)
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool

# 指定執行緒數,一般設定為1-100直接的整數,根據系統資源配置
org.quartz.threadPool.threadCount = 5

# 設定執行緒的優先順序(可以是Thread.MIN_PRIORITY(即1)和Thread.MAX_PRIORITY(這是10)之間的任何int 。預設值為Thread.NORM_PRIORITY(5)。)
org.quartz.threadPool.threadPriority = 5

3.定義兩個job

  • QuartzJob.java
//持久化
@PersistJobDataAfterExecution
//禁止併發執行(Quartz不要併發地執行同一個job定義(這裡指一個job類的多個例項))
@DisallowConcurrentExecution
@Slf4j
public class QuartzJob extends QuartzJobBean {
    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        String taskName = context.getJobDetail().getJobDataMap().getString("name");
        log.info("---> Quartz job {}, {} <----", new Date(), taskName);
    }
}
  • QuartzJob2.java
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
@Slf4j
public class QuartzJob2 extends QuartzJobBean {

    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        String taskName = context.getJobDetail().getJobDataMap().getString("name");
        log.info("---> Quartz job 2 {}, {} <----", new Date(), taskName);
    }
}

4.初始化觸發器等資訊,這裡通過Listener初始化

@Slf4j
public class StartApplicationListener implements ApplicationListener<ContextRefreshedEvent> {
    @Autowired
    SchedulerConfig schedulerConfig;
    public static AtomicInteger count = new AtomicInteger(0);
    private static String TRIGGER_GROUP_NAME = "test_trriger";
    private static String JOB_GROUP_NAME = "test_job";

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // 防止重複執行
        if (event.getApplicationContext().getParent() == null && count.incrementAndGet() <= 1) {
            initMyJob();
        }
    }

    public void initMyJob() {
        Scheduler scheduler = null;
        try {
            scheduler = schedulerConfig.scheduler();

            TriggerKey triggerKey = TriggerKey.triggerKey("trigger1", TRIGGER_GROUP_NAME);
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            if (null == trigger) {
                Class clazz = QuartzJob.class;
                JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity("job1", JOB_GROUP_NAME).build();
                CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");
                trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", TRIGGER_GROUP_NAME)
                        .withSchedule(scheduleBuilder).build();
                scheduler.scheduleJob(jobDetail, trigger);
                log.info("Quartz 建立了job:...:{}", jobDetail.getKey());
            } else {
                log.info("job已存在:{}", trigger.getKey());
            }

            TriggerKey triggerKey2 = TriggerKey.triggerKey("trigger2", TRIGGER_GROUP_NAME);
            CronTrigger trigger2 = (CronTrigger) scheduler.getTrigger(triggerKey2);
            if (null == trigger2) {
                Class clazz = QuartzJob2.class;
                JobDetail jobDetail2 = JobBuilder.newJob(clazz).withIdentity("job2", JOB_GROUP_NAME).build();
                CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule("0/15 * * * * ?");
                trigger2 = TriggerBuilder.newTrigger().withIdentity("trigger2", TRIGGER_GROUP_NAME)
                        .withSchedule(scheduleBuilder).build();
                scheduler.scheduleJob(jobDetail2, trigger2);
                log.info("Quartz 建立了job:...:{}", jobDetail2.getKey());
            } else {
                log.info("job已存在:{}", trigger2.getKey());
            }
            scheduler.start();
        } catch (Exception e) {
            log.info(e.getMessage());
        }
    }
}

5.啟動定時器

啟動兩個Application,分別是示例中的DemoQuartzApplication和DemoQuartzApplication2,會發現,兩個Job會分別在兩個應用執行。

當手動停止一個應用的時候,另一個應用會自動接管所有任務並繼續執行,如果任務太多,我們可以再開一臺服務即可。實現了排程任務的高可用性和可擴充套件性

執行效果如圖:

資料

  • 示例程式碼-github
  • spring-scheduling參考
  • quartz-github