1. 程式人生 > >SpringBoot整合Elastic-Job,實現動態建立定時任務,任務持久化

SpringBoot整合Elastic-Job,實現動態建立定時任務,任務持久化

SpringBoot使用Elastic-Job-lite,實現動態建立定時任務,任務持久化

Elastic-Job是噹噹開源的一個分散式排程解決方案,由兩個相互獨立的子專案Elastic-Job-Lite和Elastic-Job-Cloud組成。

Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分散式任務的協調服務;Elastic-Job-Cloud採用自研Mesos Framework的解決方案,額外提供資源治理、應用分發以及程序隔離等功能。

這裡以Elastic-Job-lite為例,跟SpringBoot進行整合,噹噹的官方文件中並沒有對SpringBoot整合作說明,所有的配置都是基於文件中的xml的配置修改出來的。

起步

準備好一個SpringBoot的專案,pom.xml中引入Elastic-job,mysql,jpa等依賴

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
<groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId
>
elastic-job-lite-spring</artifactId> <version>2.1.5</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.zaxxer</groupId> <artifactId>HikariCP</artifactId> </dependency> </dependencies>

配置

使用yaml進行相關屬性的配置,主要配置的是資料庫連線池,jpa

elasticjob:
     serverlists: 172.31.31.48:2181
     namespace: boot-job

   spring:
     datasource:
       url: jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
       driver-class-name: com.mysql.jdbc.Driver
       username: root
       password: root
       type: com.zaxxer.hikari.HikariDataSource
   #  自動建立更新驗證資料庫結構
     jpa:
       hibernate:
         ddl-auto: update
       show-sql: true
       database: mysql

elastic-job相關的配置使用java配置實現,代替官方文件的xml配置

@Configuration
@Data
@ConfigurationProperties(prefix = "elasticjob")
public class ElasticJobConfig {
    private String serverlists;
    private String namespace;
    @Resource
    private HikariDataSource dataSource;

    @Bean
    public ZookeeperConfiguration zkConfig() {
        return new ZookeeperConfiguration(serverlists, namespace);
    }

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {
        return new ZookeeperRegistryCenter(config);
    }

    /**
     * 將作業執行的痕跡進行持久化到DB
     *
     * @return
     */
    @Bean
    public JobEventConfiguration jobEventConfiguration() {
        return new JobEventRdbConfiguration(dataSource);
    }

    @Bean
    public ElasticJobListener elasticJobListener() {
        return new ElasticJobListener(100, 100);
    }
}

所有相關的配置到這裡就已經OK了,接下來開始具體的編碼實現

定時任務實現

先實現一個自己的任務類,需要實現elastic-job提供的SimpleJob介面,實現它的execute(ShardingContext shardingContext)方法

@Slf4j
public class MyElasticJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        //打印出任務相關資訊,JobParameter用於傳遞任務的ID
        log.info("任務名:{}, 片數:{}, id={}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(),
                shardingContext.getJobParameter());
    }
}

接下來實現一個分散式的任務監聽器,如果任務有分片,分散式監聽器會在總的任務開始前執行一次,結束時執行一次。監聽器在之前的ElasticJobConfig已經註冊到了Spring容器之中。

public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener {
    @Resource
    private TaskRepository taskRepository;

    public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
        super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
    }

    @Override
    public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
    }

    @Override
    public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
        //任務執行完成後更新狀態為已執行
        JobTask jobTask = taskRepository.findOne(Long.valueOf(shardingContexts.getJobParameter()));
        jobTask.setStatus(1);
        taskRepository.save(jobTask);
    }
}

實現一個ElasticJobHandler,用於向Elastic-job中新增指定的作業配置,作業配置分為3級,分別是JobCoreConfiguration,JobTypeConfiguration和LiteJobConfiguration。LiteJobConfiguration使用JobTypeConfiguration,JobTypeConfiguration使用JobCoreConfiguration,層層巢狀。

@Component
public class ElasticJobHandler {
    @Resource
    private ZookeeperRegistryCenter registryCenter;
    @Resource
    private JobEventConfiguration jobEventConfiguration;
    @Resource
    private ElasticJobListener elasticJobListener;

    /**
     * @param jobName
     * @param jobClass
     * @param shardingTotalCount
     * @param cron
     * @param id                 資料ID
     * @return
     */
    private static LiteJobConfiguration.Builder simpleJobConfigBuilder(String jobName,
                                                                       Class<? extends SimpleJob> jobClass,
                                                                       int shardingTotalCount,
                                                                       String cron,
                                                                       String id) {
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
                JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount).jobParameter(id).build(), jobClass.getCanonicalName()));
    }

    /**
     * 新增一個定時任務
     *
     * @param jobName            任務名
     * @param cron               表示式
     * @param shardingTotalCount 分片數
     */
    public void addJob(String jobName, String cron, Integer shardingTotalCount, String id) {
        LiteJobConfiguration jobConfig = simpleJobConfigBuilder(jobName, MyElasticJob.class, shardingTotalCount, cron, id)
                .overwrite(true).build();

        new SpringJobScheduler(new MyElasticJob(), registryCenter, jobConfig, jobEventConfiguration, elasticJobListener).init();
    }
}

到這裡,elastic-job的註冊中心,資料來源相關配置,以及動態新增的邏輯已經做完了,接下來在service中呼叫上面寫好的方法,驗證功能是否正常。

編寫一個ElasticJobService類,掃描資料庫中狀態為0的任務,並且把這些任務新增到Elastic-job中,這裡的相關資料庫操作使用了spring-data-jpa,dao層相關程式碼就不貼了,可以在原始碼中檢視。

@Service
public class ElasticJobService {
    @Resource
    private ElasticJobHandler jobHandler;
    @Resource
    private TaskRepository taskRepository;

    /**
     * 掃描db,並新增任務
     */
    public void scanAddJob() {
        Specification query = (Specification<JobTask>) (root, criteriaQuery, criteriaBuilder) -> criteriaBuilder
                .and(criteriaBuilder.equal(root.get("status"), 0));
        List<JobTask> jobTasks = taskRepository.findAll(query);
        jobTasks.forEach(jobTask -> {
            Long current = System.currentTimeMillis();
            String jobName = "job" + jobTask.getSendTime();
            String cron;
            //說明消費未傳送,但是已經過了訊息的傳送時間,調整時間繼續執行任務
            if (jobTask.getSendTime() < current) {
                //設定為一分鐘之後執行,把Date轉換為cron表示式
                cron = CronUtils.getCron(new Date(current + 60000));
            } else {
                cron = CronUtils.getCron(new Date(jobTask.getSendTime()));
            }
            jobHandler.addJob(jobName, cron, 1, String.valueOf(jobTask.getId()));
        });
    }
}

在Junit中新增幾條測試資料

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class JobTaskTest {
    @Resource
    private TaskRepository taskRepository;

    @Test
    public void add() {
        //生成幾個任務,第一任務在三分鐘之後
        Long unixTime = System.currentTimeMillis() + 60000;
        JobTask task = new JobTask("test-msg-1", 0, unixTime);
        taskRepository.save(task);
        unixTime += 60000;
        task = new JobTask("test-msg-2", 0, unixTime);
        taskRepository.save(task);
        unixTime += 60000;
        task = new JobTask("test-msg-3", 0, unixTime);
        taskRepository.save(task);
        unixTime += 60000;
        task = new JobTask("test-msg-4", 0, unixTime);
        taskRepository.save(task);
    }
}

此時,資料庫中多了四條狀態為0的資料

最後,就可以開始驗證整個流程了,程式碼如下

@SpringBootApplication
public class ElasticJobApplication implements CommandLineRunner {
    @Resource
    private ElasticJobService elasticJobService;

    public static void main(String[] args) {
        SpringApplication.run(ElasticJobApplication.class, args);
    }

    @Override
    public void run(String... strings) throws Exception {
        elasticJobService.scanAddJob();
    }
}

可以看到,在啟動過程中,多個任務被加入到了Elastic-job中,並且一小段時間之後,任務一次執行,執行成功之後,因為我們配置了監聽器,會列印資料庫的更新SQL,當任務執行完成,再檢視資料庫,發現狀態也更改成功。資料庫中同時也會多出兩張表JOB_EXECUTION_LOG,JOB_STATUS_TRACE_LOG,這是我們之前配置的JobEventConfiguration,通過資料來源持久化了作業配置的相關資料,這兩張表的資料可以供Elastic-job提供的運維平臺使用,具體請檢視官方文件。

總結

至此,整個流程就已經走完了,整個demo中主要用到了Elastic-job和spring-data-jpa相關的技術,作為demo,肯定會有一些缺陷,沒考慮到的地方,可以根據自己的業務場景進行改進。

最後,附上github原始碼,歡迎star,一起交流。上面涉及到的資料庫,請自行建立,表會自動生成。原始碼地址