1. 程式人生 > >分散式定時任務排程框架實踐

分散式定時任務排程框架實踐

本文首發於 vivo網際網路技術 微信公眾號 
連結: https://mp.weixin.qq.com/s/l4vuYpNRjKxQRkRTDhyg2Q
作者:陳王榮

分散式任務排程框架幾乎是每個大型應用必備的工具,本文介紹了任務排程框架使用的需求背景和痛點,對業界普遍使用的開源分散式任務排程框架的使用進行了探究實踐,並分析了這幾種框架的優劣勢和對自身業務的思考。

一、業務背景

1.1 為什麼需要使用定時任務排程

(1)時間驅動處理場景:整點發送優惠券,每天更新收益,每天重新整理標籤資料和人群資料。

(2)批量處理資料:按月批量統計報表資料,批量更新簡訊狀態,實時性要求不高。

(3)非同步執行解耦:活動狀態重新整理,非同步執行離線查詢,與內部邏輯解耦。

1.2 使用需求和痛點

(1)任務執行監控告警能力。

(2)任務可靈活動態配置,無需重啟。

(3)業務透明,低耦合,配置精簡,開發方便。

(4)易測試。

(5)高可用,無單點故障。

(6)任務不可重複執行,防止邏輯異常。

(7)大任務的分發並行處理能力。

二、開源框架實踐與探索

 2.1 Java 原生 Timer 和ScheduledExecutorService

2.1.1 Timer使用

 

Timer缺陷:

  1. Timer底層是使用單執行緒來處理多個Timer任務,這意味著所有任務實際上都是序列執行,前一個任務的延遲會影響到之後的任務的執行。

  2. 由於單執行緒的緣故,一旦某個定時任務在執行時,產生未處理的異常,那麼不僅當前這個執行緒會停止,所有的定時任務都會停止。

  3. Timer任務執行是依賴於系統絕對時間,系統時間變化會導致執行計劃的變更。

由於上述缺陷,儘量不要使用Timer, idea中也會明確提示,使用ScheduledThreadPoolExecutor替代Timer 。

2.1.2 ScheduledExecutorService使用

ScheduledExecutorService對於Timer的缺陷進行了修補,首先ScheduledExecutorService內部實現是ScheduledThreadPool執行緒池,可以支援多個任務併發執行。

對於某一個執行緒執行的任務出現異常,也會處理,不會影響其他執行緒任務的執行,另外ScheduledExecutorService是基於時間間隔的延遲,執行不會由於系統時間的改變發生變化。

當然,ScheduledExecutorService也有自己的侷限性:只能根據任務的延遲來進行排程,無法滿足基於絕對時間和日曆排程的需求。

2.2 Spring Task

2.2.1 Spring Task 使用

spring task 是spring自主開發的輕量級定時任務框架,不需要依賴其他額外的包,配置較為簡單。

此處使用註解配置

2.2.2 Spring Task缺陷

Spring Task 本身不支援持久化,也沒有推出官方的分散式叢集模式,只能靠開發者在業務應用中自己手動擴充套件實現,無法滿足視覺化,易配置的需求。

2.3 永遠經典的 Quartz

2.3.1 基本介紹

Quartz框架是Java領域最著名的開源任務排程工具,也是目前事實上的定時任務標準,幾乎全部的開源定時任務框架都是基於Quartz核心排程構建而成。

2.3.2 原理解析

核心元件和架構

 關鍵概念

(1)Scheduler:任務排程器,是執行任務排程的控制器。本質上是一個計劃排程容器,註冊了全部Trigger和對應的JobDetail, 使用執行緒池作為任務執行的基礎元件,提高任務執行效率。

(2)Trigger:觸發器,用於定義任務排程的時間規則,告訴任務排程器什麼時候觸發任務,其中CronTrigger是基於cron表示式構建的功能強大的觸發器。

(3)Calendar:日曆特定時間點的集合。一個trigger可以包含多個Calendar,可用於排除或包含某些時間點。

(4)JobDetail:是一個可執行的工作,用來描述Job實現類及其它相關的靜態資訊,如Job的名稱、監聽器等相關資訊。 

(5)Job:任務執行介面,只有一個execute方法,用於執行真正的業務邏輯。

(6)JobStore:任務儲存方式,主要有RAMJobStore和JDBCJobStore,RAMJobStore是儲存在JVM的記憶體中,有丟失和數量受限的風險,JDBCJobStore是將任務資訊持久化到資料庫中,支援叢集。

2.3.3 實踐說明

(1)關於Quartz的基本使用

  • 可參考Quartz官方文件和網上部落格實踐教程。

(2)業務使用要滿足動態修改和重啟不丟失, 一般需要使用資料庫進行儲存。

  • Quartz本身支援JDBCJobStore,但是其配置的資料表比較多,官方推薦配置可參照官方文件,超過10張表,業務使用比較重。

  • 在使用的時候只需要存在基本trigger配置和對應任務以及相關執行日誌的表即可滿足絕大部分需求。

(3)元件化

  • 將quartz動態任務配置資訊持久化到資料庫,將資料操作包裝成基本jar包,供專案之間使用,引用專案只需要引入jar包依賴和配置對應的資料表,使用時就可以對Quartz配置透明。

(4)擴充套件

  • 叢集模式

    通過故障轉移和負載均衡實現了任務的高可用性,通過資料庫的鎖機制來確保任務執行的唯一性,但是叢集特性僅僅只是用來HA,節點數量的增加並不會提升單個任務的執行效率,不能實現水平擴充套件。

  • Quartz外掛

    可以對特定需要進行擴充套件,比如增加觸發器和任務執行日誌,任務依賴序列處理場景,可參考:quartz外掛——實現任務之間的序列排程

2.3.4 缺陷和不足

(1)需要把任務資訊持久化到業務資料表,和業務有耦合。

(2)排程邏輯和執行邏輯並存於同一個專案中,在機器效能固定的情況下,業務和排程之間不可避免地會相互影響。

(3)quartz叢集模式下,是通過資料庫獨佔鎖來唯一獲取任務,任務執行並沒有實現完善的負載均衡機制。

2.4 輕量級神器 XXL-JOB

2.4.1 基本介紹

XXL-JOB是一個輕量級分散式任務排程平臺,主打特點是平臺化,易部署,開發迅速、學習簡單、輕量級、易擴充套件,程式碼仍在持續更新中。

 “排程中心”是任務排程控制檯,平臺自身並不承擔業務邏輯,只是負責任務的統一管理和排程執行,並且提供任務管理平臺,  “執行器” 負責接收“排程中心”的排程並執行,可直接部署執行器,也可以將執行器整合到現有業務專案中。 通過將任務的排程控制和任務的執行解耦,業務使用只需要關注業務邏輯的開發。

主要提供了任務的動態配置管理、任務監控和統計報表以及排程日誌幾大功能模組,支援多種執行模式和路由策略,可基於對應執行器機器叢集數量進行簡單分片資料處理。

2.4.2 原理解析

2.1.0版本前核心排程模組都是基於quartz框架,2.1.0版本開始自研排程元件,移除quartz依賴 ,使用時間輪排程。

 

2.4.3 實踐說明

詳細配置和介紹參考官方文件。

 2.4.3.1 demo使用:

示例1:實現簡單任務配置,只需要繼承IJobHandler 抽象類,並宣告註解 

@JobHandler(value="offlineTaskJobHandler") ,實現業務邏輯即可。(注:此次引入了dubbo,後文介紹)。

@JobHandler(value="offlineTaskJobHandler")
@Component
public class OfflineTaskJobHandler extends IJobHandler {
  
   @Reference(check = false,version = "cms-dev",group="cms-service")
   private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
  
   @Override
   public ReturnT<String> execute(String param) throws Exception {
      XxlJobLogger.log(" offlineTaskJobHandler start.");
  
      try {
         offlineTaskExecutorFacade.executeOfflineTask();
      } catch (Exception e) {
         XxlJobLogger.log("offlineTaskJobHandler-->exception." , e);
         return FAIL;
      }
  
      XxlJobLogger.log("XXL-JOB, offlineTaskJobHandler end.");
      return SUCCESS;
   }
}

 示例2:分片廣播任務。

@JobHandler(value="shardingJobHandler")
@Service
public class ShardingJobHandler extends IJobHandler {
  
   @Override
   public ReturnT<String> execute(String param) throws Exception {
  
      // 分片引數
      ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
      XxlJobLogger.log("分片引數:當前分片序號 = {}, 總分片數 = {}", shardingVO.getIndex(), shardingVO.getTotal());
  
      // 業務邏輯
      for (int i = 0; i < shardingVO.getTotal(); i++) {
         if (i == shardingVO.getIndex()) {
            XxlJobLogger.log("第 {} 片, 命中分片開始處理", i);
         } else {
            XxlJobLogger.log("第 {} 片, 忽略", i);
         }
      }
  
      return SUCCESS;
   }
}

 2.4.3.2 整合dubbo 

(1)引入dubbo-spring-boot-starter和業務facade jar包依賴。

<dependency>
    <groupId>com.alibaba.spring.boot</groupId>
    <artifactId>dubbo-spring-boot-starter</artifactId>
    <version>2.0.0</version>
</dependency>
  
<dependency>
    <groupId>com.demo.service</groupId>
    <artifactId>xxx-facade</artifactId>
    <version>1.9-SNAPSHOT</version>
</dependency>

(2)配置檔案加入dubbo消費端配置(可根據環境定義多個配置檔案,通過profile切換)。

## Dubbo 服務消費者配置
spring.dubbo.application.name=xxl-job
  
spring.dubbo.registry.address=zookeeper://zookeeper.xyz:2183
spring.dubbo.port=20880
  
spring.dubbo.version=demo
spring.dubbo.group=demo-service

(3)程式碼中通過@Reference注入facade介面即可。

@Reference(check = false,version = "demo",group="demo-service")
private OfflineTaskExecutorFacade offlineTaskExecutorFacade;

(4)啟動程式加入@EnableDubboConfiguration註解。

@SpringBootApplication
@EnableDubboConfiguration
public class XxlJobExecutorApplication {
   public static void main(String[] args) {
        SpringApplication.run(XxlJobExecutorApplication.class, args);
   }
}

2.4.4 任務視覺化配置 

內建了平臺專案,方便了開發者對任務的管理和執行日誌的監控,並提供了一些便於測試的功能。

 

2.4.5 擴充套件 

(1)任務監控和報表的優化。

(2)任務報警方式的擴充套件,比如加入告警中心,提供內部訊息,簡訊告警。

(3)對實際業務內部執行出現異常情況下的不同監控告警和重試策略。

2.5 高可用 Elastic-Job

2.5.1 基本介紹

Elastic-Job是一個分散式排程解決方案,由兩個相互獨立的子專案Elastic-Job-Lite和Elastic-Job-Cloud組成。

Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分散式任務的協調服務。

Elastic-Job-Cloud使用Mesos + Docker的解決方案,額外提供資源治理、應用分發以及程序隔離等服務。

可惜的是已經兩年沒有迭代更新記錄。   

2.5.2 原理解析

 

 

2.5.3 實踐說明

2.5.3.1 demo使用

(1)安裝zookeeper,配置註冊中心config,配置檔案加入註冊中心zk的配置。​​​​​​​

@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class JobRegistryCenterConfig {
  
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList,
                                             @Value("${regCenter.namespace}") final String namespace) {
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }
}

​​​​​​​​​​

spring.application.name=demo_elasticjob
  
regCenter.serverList=localhost:2181
regCenter.namespace=demo_elasticjob
  
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-job?Unicode=true&characterEncoding=UTF-8
spring.datasource.username=user
spring.datasource.password=pwd

(2)配置資料來源config,並配置檔案中加入資料來源配置。​​​​​​​

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Configuration
@ConfigurationProperties(prefix = "spring.datasource")
public class DataSourceProperties {
    private String url;
    private String username;
    private String password;
  
    @Bean
    @Primary
    public DataSource getDataSource() {
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUrl(url);
        dataSource.setUsername(username);
        dataSource.setPassword(password);
        return dataSource;
    }
}

​​​​​​​

spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-job?Unicode=true&characterEncoding=UTF-8
spring.datasource.username=user
spring.datasource.password=pwd

(3)配置事件config。​​​​​​​

@Configuration
public class JobEventConfig {
    @Autowired
    private DataSource dataSource;
  
    @Bean
    public JobEventConfiguration jobEventConfiguration() {
        return new JobEventRdbConfiguration(dataSource);
    }
}

(4)為了便於靈活配置不同的任務觸發事件,加入ElasticSimpleJob註解。​​​​​​​

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticSimpleJob {
  
    @AliasFor("cron")
    String value() default "";
  
    @AliasFor("value")
    String cron() default "";
  
    String jobName() default "";
  
    int shardingTotalCount() default 1;
  
    String shardingItemParameters() default "";
  
    String jobParameter() default "";
}

(5)對配置進行初始化。​​​​​​​

@Configuration
@ConditionalOnExpression("'${elaticjob.zookeeper.server-lists}'.length() > 0")
public class ElasticJobAutoConfiguration {
  
    @Value("${regCenter.serverList}")
    private String serverList;
  
    @Value("${regCenter.namespace}")
    private String namespace;
  
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private DataSource dataSource;
  
    @PostConstruct
    public void initElasticJob() {
        ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
        regCenter.init();
        Map<String, SimpleJob> map = applicationContext.getBeansOfType(SimpleJob.class);
  
        for (Map.Entry<String, SimpleJob> entry : map.entrySet()) {
            SimpleJob simpleJob = entry.getValue();
            ElasticSimpleJob elasticSimpleJobAnnotation = simpleJob.getClass().getAnnotation(ElasticSimpleJob.class);
  
            String cron = StringUtils.defaultIfBlank(elasticSimpleJobAnnotation.cron(), elasticSimpleJobAnnotation.value());
            SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(simpleJob.getClass().getName(), cron, elasticSimpleJobAnnotation.shardingTotalCount()).shardingItemParameters(elasticSimpleJobAnnotation.shardingItemParameters()).build(), simpleJob.getClass().getCanonicalName());
            LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();
  
            JobEventRdbConfiguration jobEventRdbConfiguration = new JobEventRdbConfiguration(dataSource);
            SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration, jobEventRdbConfiguration);
            jobScheduler.init();
        }
    }
}

(6)實現 SimpleJob介面,按上文中方法整合dubbo, 完成業務邏輯。​​​​​​​

@ElasticSimpleJob(
        cron = "*/10 * * * * ?",
        jobName = "OfflineTaskJob",
        shardingTotalCount = 2,
        jobParameter = "測試引數",
        shardingItemParameters = "0=A,1=B")
@Component
public class MySimpleJob implements SimpleJob {
    Logger logger = LoggerFactory.getLogger(OfflineTaskJob.class);
  
    @Reference(check = false, version = "cms-dev", group = "cms-service")
    private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
  
  
    @Override
    public void execute(ShardingContext shardingContext) {
  
        offlineTaskExecutorFacade.executeOfflineTask();
  
        logger.info(String.format("Thread ID: %s, 作業分片總數: %s, " +
                        "當前分片項: %s.當前引數: %s," +
                        "作業名稱: %s.作業自定義引數: %s"
                ,
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter()
        ));
    }
}

 

2.6 其餘開源框架

(1)Saturn:Saturn是唯品會開源的一個分散式任務排程平臺,在Elastic Job的基礎上進行了改造。

(2)SIA-TASK:是宜信開源的分散式任務排程平臺。

三、優劣勢對比和業務場景適配思考

​​​​​​​

業務思考:

  1. 豐富任務監控資料和告警策略。

  2. 接入統一登入和許可權控制。

  3. 進一步簡化業務接入步驟。

四、結語

對於併發場景不是特別高的系統來說,xxl-job配置部署簡單易用,不需要引入多餘的元件,同時提供了視覺化的控制檯,使用起來非常友好,是一個比較好的選擇。希望直接利用開源分散式框架能力的系統,建議根據自身的情況來進行合適的選型。

附:參考文獻

  • quartz外掛——實現任務之間的序列排程

更多內容敬請關注 vivo 網際網路技術 微信公眾號

注:轉載文章請先與微訊號:labs2020 聯