1. 程式人生 > >Elastic-Job——分散式定時任務框架

Elastic-Job——分散式定時任務框架

什麼是作業?

作業即定時任務.無需做複雜的控制,在指定的時間執行指定的任務.

為什麼需要作業?

        時間驅動/事件驅動: 內部系統一般可以通過事件來驅動,但涉及到外部系統,則只能使用時間驅動.如:抓取外部系統價格.每小時抓取,由於是外部系統,不能像內部系統一樣傳送事件觸發事件.

批量處理/逐條處理:批量處理堆積的資料更加高效,在不需要實時性的情況下比訊息中介軟體更有優勢,而且有的業務邏輯只能批量處理.

系統內部/系統解耦:作業一般封裝在系統內部,而訊息中介軟體可用於系統間解耦.

Elastic-Job是什麼?

        elastic-job主要的設計理念是無中心化的分散式定時排程框架,思路來源於Quartz的基於資料庫的高可用方案。但資料庫沒有分散式協調功能,所以在高可用方案的基礎上增加了彈性擴容和資料分片的思路,以便於更大限度的利用分散式伺服器的資源。Elastic-Job是ddframe中dd-job的作業模組中分離出來的分散式彈性作業框架。去掉了和dd-job中的監控和ddframe接入規範部分。


1.簡介

http://elasticjob.io/docs/elastic-job-lite/00-overview/intro/


Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供最輕量級的分散式任務的協調服務,外部依賴僅Zookeeper。
基本概念
1. 分片概念

任務的分散式執行,需要將一個任務拆分為多個獨立的任務項,然後由分散式的伺服器分別執行某一個或幾個分片項。

例如:有一個遍歷資料庫某張表的作業,現有2臺伺服器。為了快速的執行作業,那麼每臺伺服器應執行作業的50%。為滿足此需求,可將作業分成2片,每臺伺服器執行1片。作業遍歷資料的邏輯應為:伺服器A遍歷ID以奇數結尾的資料;伺服器B遍歷ID以偶數結尾的資料。如果分成10片,則作業遍歷資料的邏輯應為:每片分到的分片項應為ID%10,而伺服器A被分配到分片項0,1,2,3,4;伺服器B被分配到分片項5,6,7,8,9,直接的結果就是伺服器A遍歷ID以0-4結尾的資料;伺服器B遍歷ID以5-9結尾的資料。

2. 分片項與業務處理解耦

Elastic-Job並不直接提供資料處理的功能,框架只會將分片項分配至各個執行中的作業伺服器,開發者需要自行處理分片項與真實資料的對應關係。那一片對應那是資料是在程式碼裡面開發者自己定義的

3. 個性化引數的適用場景

個性化引數即shardingItemParameter,可以和分片項匹配對應關係,用於將分片項的數字轉換為更加可讀的業務程式碼。

例如:按照地區水平拆分資料庫,資料庫A是北京的資料;資料庫B是上海的資料;資料庫C是廣州的資料。如果僅按照分片項配置,開發者需要了解0表示北京;1表示上海;2表示廣州。合理使用個性化引數可以讓程式碼更可讀,如果配置為0=北京,1=上海,2=廣州,那麼程式碼中直接使用北京,上海,廣州的列舉值即可完成分片項和業務邏輯的對應關係。
核心理念
1. 分散式排程

Elastic-Job-Lite並無作業排程中心節點,而是基於部署作業框架的程式在到達相應時間點時各自觸發排程。

註冊中心僅用於作業註冊和監控資訊儲存。而主作業節點僅用於處理分片和清理等功能。

2. 作業高可用

Elastic-Job-Lite提供最安全的方式執行作業。將分片總數設定為1,並使用多於1臺的伺服器執行作業,作業將會以1主n從的方式執行。

一旦執行作業的伺服器崩潰,等待執行的伺服器將會在下次作業啟動時替補執行。開啟失效轉移功能效果更好,可以保證在本次作業執行時崩潰,備機立即啟動替補執行。
3. 最大限度利用資源

Elastic-Job-Lite也提供最靈活的方式,最大限度的提高執行作業的吞吐量。將分片項設定為大於伺服器的數量,最好是大於伺服器倍數的數量,作業將會合理的利用分散式資源,動態的分配分片項。

例如:3臺伺服器,分成10片,則分片項分配結果為伺服器A=0,1,2;伺服器B=3,4,5;伺服器C=6,7,8,9。如果伺服器C崩潰,則分片項分配結果為伺服器A=0,1,2,3,4;伺服器B=5,6,7,8,9。在不丟失分片項的情況下,最大限度的利用現有資源提高吞吐量。
整體架構圖

Elastic-Job-Lite Architecture

2.作業開發

Elastic-Job-Lite和Elastic-Job-Cloud提供統一作業介面,開發者僅需對業務作業進行一次開發,之後可根據不同的配置以及部署至不同的Lite或Cloud環境。

Elastic-Job提供Simple、Dataflow和Script 3種作業型別。方法引數shardingContext包含作業配置、片和執行時資訊。可通過getShardingTotalCount(), getShardingItem()等方法分別獲取分片總數,執行在本作業伺服器的分片序列號等。

a. Simple型別作業

意為簡單實現,未經任何封裝的型別。需實現SimpleJob介面。該介面僅提供單一方法用於覆蓋,此方法將定時執行。與Quartz原生介面相似,但提供了彈性擴縮容和分片等功能。

public classMyElasticJobimplementsSimpleJob{
    
    @Override
    publicvoidexecute(ShardingContext context){
        switch (context.getShardingItem()) {
            case 0: 
                // do something by sharding item 0
                break;
            case 1: 
                // do something by sharding item 1
                break;
            case 2: 
                // do something by sharding item 2
                break;
            // case n: ...
        }
    }
}

b. Dataflow型別作業

Dataflow型別用於處理資料流,需實現DataflowJob介面。該介面提供2個方法可供覆蓋,分別用於抓取(fetchData)和處理(processData)資料。

public classMyElasticJobimplementsDataflowJob<Foo> {
    
    @Override
    public List<Foo> fetchData(ShardingContext context){
        switch (context.getShardingItem()) {
            case 0: 
                List<Foo> data = // get data from database by sharding item 0
                return data;
            case 1: 
                List<Foo> data = // get data from database by sharding item 1
                return data;
            case 2: 
                List<Foo> data = // get data from database by sharding item 2
                return data;
            // case n: ...
        }
    }
    
    @Override
    publicvoidprocessData(ShardingContext shardingContext, List<Foo> data){
        // process data
        // ...
    }
}

流式處理

可通過DataflowJobConfiguration配置是否流式處理。

流式處理資料只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直執行下去;非流式處理資料則只會在每次作業執行過程中執行一次fetchData方法和processData方法,隨即完成本次作業。

如果採用流式作業處理方式,建議processData處理資料後更新其狀態,避免fetchData再次抓取到,從而使得作業永不停止。流式資料處理參照TbSchedule設計,適用於不間歇的資料處理。

c. Script型別作業

Script型別作業意為指令碼型別作業,支援shell,python,perl等所有型別指令碼。只需通過控制檯或程式碼配置scriptCommandLine即可,無需編碼。執行指令碼路徑可包含引數,引數傳遞完畢後,作業框架會自動追加最後一個引數為作業執行時資訊。

#!/bin/bash
echo sharding execution context is $*

作業執行時輸出

sharding execution context is {“jobName”:“scriptElasticDemoJob”,“shardingTotalCount”:10,“jobParameter”:“”,“shardingItem”:0,“shardingParameter”:“A”}

2. 作業配置

Elastic-Job配置分為3個層級,分別是Core, Type和Root。每個層級使用相似於裝飾者模式的方式裝配。

Core對應JobCoreConfiguration,用於提供作業核心配置資訊,如:作業名稱、分片總數、CRON表示式等。

Type對應JobTypeConfiguration,有3個子類分別對應SIMPLE, DATAFLOW和SCRIPT型別作業,提供3種作業需要的不同配置,如:DATAFLOW型別是否流式處理或SCRIPT型別的命令列等。

Root對應JobRootConfiguration,有2個子類分別對應Lite和Cloud部署型別,提供不同部署型別所需的配置,如:Lite型別的是否需要覆蓋本地配置或Cloud佔用CPU或Memory數量等。

a. 使用Java程式碼配置

通用作業配置

    // 定義作業核心配置
    JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
    // 定義SIMPLE型別配置
    SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SimpleDemoJob.class.getCanonicalName());
    // 定義Lite作業根配置
    JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
    
    // 定義作業核心配置
    JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder("demoDataflowJob", "0/30 * * * * ?", 10).build();
    // 定義DATAFLOW型別配置
    DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, DataflowDemoJob.class.getCanonicalName(), true);
    // 定義Lite作業根配置
    JobRootConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).build();
    
    // 定義作業核心配置配置
    JobCoreConfiguration scriptCoreConfig = JobCoreConfiguration.newBuilder("demoScriptJob", "0/45 * * * * ?", 10).build();
    // 定義SCRIPT型別配置
    ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(scriptCoreConfig, "test.sh");
    // 定義Lite作業根配置
    JobRootConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptCoreConfig).build();

b. Spring名稱空間配置

與Spring容器配合使用作業,可將作業Bean配置為Spring Bean,並在作業中通過依賴注入使用Spring容器管理的資料來源等物件。可用placeholder佔位符從屬性檔案中取值。Lite可考慮使用Spring名稱空間方式簡化配置。

<?xml version="1.0" encoding="UTF-8"?>
<beansxmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"xmlns:job="http://www.dangdang.com/schema/ddframe/job"xsi:schemaLocation="http://www.springframework.org/schema/beans 
                        http://www.springframework.org/schema/beans/spring-beans.xsd 
                        http://www.dangdang.com/schema/ddframe/reg 
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd 
                        http://www.dangdang.com/schema/ddframe/job 
                        http://www.dangdang.com/schema/ddframe/job/job.xsd 
                        ">
    <!--配置作業註冊中心 -->
    <reg:zookeeperid="regCenter"server-lists="yourhost:2181"namespace="dd-job"base-sleep-time-milliseconds="1000"max-sleep-time-milliseconds="3000"max-retries="3" />
    
    <!-- 配置簡單作業-->
    <job:simpleid="simpleElasticJob"class="xxx.MySimpleElasticJob"registry-center-ref="regCenter"cron="0/10 * * * * ?"sharding-total-count="3"sharding-item-parameters="0=A,1=B,2=C" />
    
    <beanid="yourRefJobBeanId"class="xxx.MySimpleRefElasticJob">
        <propertyname="fooService"ref="xxx.FooService"/>
    </bean>
    
    <!-- 配置關聯Bean作業-->
    <job:simpleid="simpleRefElasticJob"job-ref="yourRefJobBeanId"registry-center-ref="regCenter"cron="0/10 * * * * ?"sharding-total-count="3"sharding-item-parameters="0=A,1=B,2=C" />
    
    <!-- 配置資料流作業-->
    <job:dataflowid="throughputDataflow"class="xxx.MyThroughputDataflowElasticJob"registry-center-ref="regCenter"cron="0/10 * * * * ?"sharding-total-count="3"sharding-item-parameters="0=A,1=B,2=C" />
    
    <!-- 配置指令碼作業-->
    <job:scriptid="scriptElasticJob"registry-center-ref="regCenter"cron="0/10 * * * * ?"sharding-total-count="3"sharding-item-parameters="0=A,1=B,2=C"script-command-line="/your/file/path/demo.sh" />
    
    <!-- 配置帶監聽的簡單作業-->
    <job:simpleid="listenerElasticJob"class="xxx.MySimpleListenerElasticJob"registry-center-ref="regCenter"cron="0/10 * * * * ?"sharding-total-count="3"sharding-item-parameters="0=A,1=B,2=C">
        <job:listenerclass="xx.MySimpleJobListener"/>
        <job:distributed-listenerclass="xx.MyOnceSimpleJobListener"started-timeout-milliseconds="1000"completed-timeout-milliseconds="2000" />
    </job:simple>
    
    <!-- 配置帶作業資料庫事件追蹤的簡單作業-->
    <job:simpleid="eventTraceElasticJob"class="xxx.MySimpleListenerElasticJob"registry-center-ref="regCenter"cron="0/10 * * * * ?"sharding-total-count="3"sharding-item-parameters="0=A,1=B,2=C"event-trace-rdb-data-source="yourDataSource">
    </job:simple>
</beans>

配置項詳細說明請參見配置手冊

3. 作業啟動

a. Java啟動方式

public classJobDemo{
    
    publicstaticvoidmain(String[] args){
        new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
    }
    
    privatestatic CoordinatorRegistryCenter createRegistryCenter(){
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));
        regCenter.init();
        return regCenter;
    }
    
    privatestatic LiteJobConfiguration createJobConfiguration(){
        // 建立作業配置
        ...
    }
}

b. Spring啟動方式

將配置Spring名稱空間的xml通過Spring啟動,作業將自動載入。