1. 程式人生 > >elastic-job+zookeeper實現分散式定時任務排程的使用(springboot版本)

elastic-job+zookeeper實現分散式定時任務排程的使用(springboot版本)

總體思路,要確認一個定時任務需要一個cron表示式+jobDetail;

現在要讓實現定時任務的協調,則就讓zookeeper,簡單說就是需要3要素,zk物件+cron+jobDetail;

總的專案結構

1、maven引入依賴

複製程式碼

<dependencies>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
   </dependency>

   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
   <dependency>
      <groupId>org.mybatis.spring.boot</groupId>
      <artifactId>mybatis-spring-boot-starter</artifactId>
   </dependency>
   <dependency>
      <groupId>com.dangdang</groupId>
      <artifactId>elastic-job-lite-core</artifactId>
      <version>2.1.5</version>
   </dependency>
   <dependency>
      <groupId>com.dangdang</groupId>
      <artifactId>elastic-job-lite-spring</artifactId>
      <version>2.1.5</version>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
   </dependency>
</dependencies>

複製程式碼

application.properties如下:

複製程式碼

server.port=8766
spring.application.name=scheduler-service



regCenter.serverList = localhost:2181
regCenter.namespace = elastic-job-lite-springboot

stockJob.cron = 0/5 * * * * ?

stockJob.shardingTotalCount = 2


stockJob.shardingItemParameters = 0=Chengdu0,1=Chengdu1

複製程式碼

其中

stockJob.cron為定時任務的cron表示式;
stockJob.shardingTotalCount為任務的分數量(即同時同時開幾個定時任務);
stockJob.shardingItemParameters為任務分片攜帶的引數;


 要素1-zookeeper
建立一個bean,用來配置zk

複製程式碼

package com.example.elasticjobdemo.config;

import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class JobRegistryCenterConfig {

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) {
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }

}

複製程式碼

建立一個任務,jobdetail

複製程式碼

package com.example.elasticjobdemo.config;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import org.springframework.beans.factory.annotation.Autowired;

public class StockSimpleJob implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println(String.format("------Thread ID: %s, 任務總片數: %s, " +
                        "當前分片項: %s.當前引數: %s,"+
                        "當前任務名稱: %s.當前任務引數: %s"
                ,
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter()

        ));

    }
}

複製程式碼

再建立一個類,把3要素連線起來

複製程式碼

package com.example.elasticjobdemo.config;

import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class StockJobConfig {

    @Autowired
    private JobRegistryCenterConfig jobRegistryCenterConfig;
    @Autowired
    private ZookeeperRegistryCenter regCenter;

    public StockJobConfig() {
    }

    @Bean
    public SimpleJob stockJob(){
        return new StockSimpleJob();
    }



    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${stockJob.cron}") final String cron, @Value("${stockJob.shardingTotalCount}") final int shardingTotalCount,
                                           @Value("${stockJob.shardingItemParameters}") final String shardingItemParameters) {
        return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters));
    }

    /**
     *@Description  任務配置類
     */
    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                         final String cron,
                                                         final int shardingTotalCount,
                                                         final String shardingItemParameters){


        return LiteJobConfiguration
                .newBuilder(
                        new SimpleJobConfiguration(
                                JobCoreConfiguration.newBuilder(
                                        jobClass.getName(),cron,shardingTotalCount)
                                        .shardingItemParameters(shardingItemParameters)
                                        .build()
                                ,jobClass.getCanonicalName()
                        )
                )
                .overwrite(true)
                .build();

    }
}

複製程式碼

boot任務啟動後的效果(需要先啟動zk,這裡我填了分片數為2)