1. 程式人生 > >Elastic Job入門(1) - 簡介

Elastic Job入門(1) - 簡介

介紹 構建一般的業務系統來說,使用 Quartz或者 Spring Task即可基本滿足我們的 單體服用應用需要。然而隨著線上業務量的不斷髮展,這兩種定時任務已經日漸無法滿足我們的需求。一般,使用這兩種定時任務框架都會遇到如下的兩個痛點問題:
1、如果業務工程採用 叢集化的部署,可能會多次重複執行定時任務而導致系統的業務邏輯錯誤,併產生系統故障。
2、Quartz的叢集方案具備HA功能,可以實現定時任務的分發,但是通過增加機器節點數量的方式並不能提高每次定時任務的執行效率,無法實現任務的 彈性分片

開源產品Elastic-Job是噹噹開源的一款分散式彈性定時任務排程框架,在2.X版本以後主要分為Elastic-Job-Lite

Elastic-Job-Cloud兩個子專案。其中,Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分散式任務的協調服務。而Elastic-Job-Cloud使用Mesos + Docker的解決方案,額外提供資源治理、應用分發以及程序隔離等服務。本文主要學習Elastic-Job-Lite

GitHub地址為:https://github.com/elasticjob/elastic-job-lite

中文官網為:http://elasticjob.io/index_zh.html

從Elastic-Job的架構圖上基本就可以看出,其以Jar的形式為業務工程(諸如,Spring Boot工程)的快速整合提供了簡便的方式。同時,其提供的定時任務分片、彈性擴縮容、失效轉移、作業監控和支援多種作業模式等強大的功能,使業務開發人員 無需在這些方面花費較大多的精力,而可以更加專注於平臺的業務開發
。其主要的功能如下:
1、 定時任務:基於成熟的定時任務作業框架Quartz cron表示式執行定時任務;
2、 作業註冊中心:基於Zookeeper和其客戶端Curator實現的全域性作業註冊控制中心;作業註冊中心僅用於作業任務註冊和監控資訊的暫存;
3、 定時任務分片:可以將原本一個較大任務分片成為多小的子任務項分別在多個伺服器上同時執行,提高總任務的執行處理效率;
4、 彈性擴容縮容:執行中定時任務所在的伺服器崩潰,或新增加n臺作業伺服器,作業框架將在下次任務執行前重新進行任務排程分發,不影響當前任務的處理與執行;
5、 支援多種任務模式:分別支援Simple、Dataflow和Script型別的定時任務;
6、 失效轉移
:執行中的定時任務所在的伺服器崩潰不會導致重新分片,會在下次定時任務啟動時重新分發和排程;
7、 執行時定時任務狀態收集:監控任務執行時的狀態,統計最近一段時間任務處理成功和失敗的數量,記錄作業上次執行開始時間,結束時間和下次執行時間;
8、 支援配置定時任務停止、恢復和禁用:用於操作定時任務的啟停,並可以禁止某任務的執行;
9、 Spring支援:Elastic-Job-Lite專案完美支援spring的容器,自定義名稱空間,支援佔位符
10、 運維平臺:提供運維介面,方便開發和運維人員管理生產環境上已經發布的定時任務和註冊中心;   水平分片     橫向拓容     Springboot2預設資料庫連線池選擇了HikariCP
<dependency>
    <groupId>com.zaxxer</groupId>
    <artifactId>HikariCP</artifactId>
</dependency>
 spring:
     datasource:
       url: jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
       driver-class-name: com.mysql.jdbc.Driver
       username: root
       password: root
       type: com.zaxxer.hikari.HikariDataSource
   #  自動建立更新驗證資料庫結構
     jpa:
       hibernate:
         ddl-auto: update
       show-sql: true
       database: mysql

 

==========================================API配置啟動==========================================

引入Maven

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>${latest.release.version}</version>
</dependency>

 

方法引數ShardingContext包含作業配置,分片和執行時資訊。可通過getShardingTotalCount(),getShardingItems()等方法分別獲取分片總數,執行在本作業伺服器的分片序列號集合等。

Simple型別作業

與Quartz介面相似,用於執行普通的定時任務,只是增加了彈性拓容和分片等功能:

public class MyElasticJob implements SimpleJob {
    
    @Override
    public void execute(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0: 
                // do something by sharding item 0
                break;
            case 1: 
                // do something by sharding item 1
                break;
            case 2: 
                // do something by sharding item 2
                break;
            // case n: ...
        }
    }
}

 

Dataflow型別作業

Dataflow型別用於處理資料流,需實現DataflowJob介面。該介面提供2個方法可供覆蓋,分別用於抓取(fetchData)和處理(processData)資料。

public class MyElasticJob implements DataflowJob<Foo> {
    
    @Override
    public List<Foo> fetchData(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0: 
                List<Foo> data = // get data from database by sharding item 0
                return data;
            case 1: 
                List<Foo> data = // get data from database by sharding item 1
                return data;
            case 2: 
                List<Foo> data = // get data from database by sharding item 2
                return data;
            // case n: ...
        }
    }
    
    @Override
    public void processData(ShardingContext shardingContext, List<Foo> data) {
        // process data
        // ...
    }
}

流式處理

  可通過DataflowJobConfiguration配置是否流式處理。流式處理資料只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直執行下去; 非流式處理資料則只會在每次作業執行過程中執行一次fetchData方法和processData方法,隨即完成本次作業。如果採用流式作業處理方式,建議processData處理資料後更新其狀態,避免fetchData再次抓取到,從而使得作業永不停止。 流式資料處理參照TbSchedule設計,適用於不間歇的資料處理。

 

作業配置

Elastic-Job配置分為3個層級,分別是Core, Type和Root。每個層級使用相似於裝飾者模式的方式裝配。

Core對應JobCoreConfiguration,用於提供作業核心配置資訊,如:作業名稱、分片總數、CRON表示式等。

Type對應JobTypeConfiguration,有3個子類分別對應SIMPLE, DATAFLOW和SCRIPT型別作業,提供3種作業需要的不同配置,如:DATAFLOW型別是否流式處理或SCRIPT型別的命令列等。

Root對應JobRootConfiguration,有2個子類分別對應Lite和Cloud部署型別,提供不同部署型別所需的配置,如:Lite型別的是否需要覆蓋本地配置或Cloud佔用CPU或Memory數量等。

// 定義作業核心配置
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
// 定義SIMPLE型別配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SimpleDemoJob.class.getCanonicalName());
// 定義Lite作業根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();

 

作業啟動

public class JobDemo {
    
    public static void main(String[] args) {
        new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
    }
    
    private static CoordinatorRegistryCenter createRegistryCenter() {
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));
        regCenter.init();
        return regCenter;
    }
    
    private static LiteJobConfiguration createJobConfiguration() {
        // 建立作業配置
        // ...
    }
}

 

==========================================Spring配置啟動==========================================

引入Maven

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>${latest.release.version}</version>
</dependency>

 

作業配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
    xmlns:job="http://www.dangdang.com/schema/ddframe/job"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd
                        ">
    <!--配置作業註冊中心 -->
    <reg:zookeeper id="regCenter" server-lists="yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
    
    <!-- 配置作業-->
    <job:simple id="demoSimpleSpringJob" class="xxx.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
</beans>

 

作業啟動

將配置Spring名稱空間的xml通過Spring啟動,作業將自動載入。