1. 程式人生 > >ElasticJob -- 分散式作業排程

ElasticJob -- 分散式作業排程

Elastic-Job是一個分散式排程解決方案,由兩個相互獨立的子專案Elastic-Job-Lite和Elastic-Job-Cloud組成。

Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供最輕量級的分散式任務的協調服務,外部依賴僅Zookeeper。

Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分散式任務的協調服務。

專案開源地址:https://github.com/dangdangdotcom/elastic-job

場景分析:

任務的分散式執行,需要將一個任務拆分為多個獨立的任務項,然後由分散式的伺服器分別執行某一個或幾個分片項。

場景1:有一個遍歷資料庫某張表的作業,現有2臺伺服器。為了快速的執行作業,那麼每臺伺服器應執行作業的50%。 為滿足此需求,可將作業分成2片,每臺伺服器執行1片。作業遍歷資料的邏輯應為:伺服器A遍歷ID以奇數結尾的資料;伺服器B遍歷ID以偶數結尾的資料。 如果分成10片,則作業遍歷資料的邏輯應為:每片分到的分片項應為ID%10,而伺服器A被分配到分片項0,1,2,3,4;伺服器B被分配到分片項5,6,7,8,9,直接的結果就是伺服器A遍歷ID以0-4結尾的資料;伺服器B遍歷ID以5-9結尾的資料。

場景2:餘額寶裡的昨日收益,系統需要job在每天某個時間點開始,給所有餘額寶使用者計算收益。如果使用者數量不多,我們可以輕易使用quartz來完成,我們讓計息job在某個時間點開始執行,迴圈遍歷所有使用者計算利息,這沒問題。可是,如果使用者體量特別大,我們可能會面臨著在第二天之前處理不完這麼多使用者。另外,我們部署job的時候也得注意,我們可能會把job直接放在我們的webapp裡,webapp通常是多節點部署的,這樣,我們的job也就是多節點,多個job同時執行,很容易造成重複執行,比如使用者重複計息,為了避免這種情況,我們可能會對job的執行加鎖,保證始終只有一個節點能執行,或者乾脆讓job從webapp裡剝離出來,獨自部署一個節點。

elastic-job就可以幫助我們解決上面的問題,elastic底層的任務排程還是使用的quartz,通過zookeeper來動態給job節點分片

整體架構圖

Elastic-Job-Lite

「每日分享」ElasticJob—分散式作業排程神器,你還在用Quartz嗎

 

Elastic-Job-Cloud

「每日分享」ElasticJob—分散式作業排程神器,你還在用Quartz嗎

 

 作業啟動流程

  • 彈性分散式實現

    1. 第一臺伺服器上線觸發主伺服器選舉。主伺服器一旦下線,則重新觸發選舉,選舉過程中阻塞,只有主伺服器選舉完成,才會執行其他任務。

    2. 某作業伺服器上線時會自動將伺服器資訊註冊到註冊中心,下線時會自動更新伺服器狀態。

    3. 主節點選舉,伺服器上下線,分片總數變更均更新重新分片標記。

    4. 定時任務觸發時,如需重新分片,則通過主伺服器分片,分片過程中阻塞,分片結束後才可執行任務。如分片過程中主伺服器下線,則先選舉主伺服器,再分片。

    5. 通過4可知,為了維持作業執行時的穩定性,執行過程中只會標記分片狀態,不會重新分片。分片僅可能發生在下次任務觸發前。

    6. 每次分片都會按伺服器IP排序,保證分片結果不會產生較大波動。

    7. 實現失效轉移功能,在某臺伺服器執行完畢後主動抓取未分配的分片,並且在某臺伺服器下線後主動尋找可用的伺服器執行任務。
       

 

 

作業執行流程

 

應用:

1. 引入框架的jar包

<!-- 引入elastic-job-lite核心模組 -->
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.0.5</version>
</dependency>
<!-- 使用springframework自定義名稱空間時引入 -->
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.0.5</version>
</dependency>

2. 構建job

public class MyTask implements SimpleJob{

    public void execute(ShardingContext context) {
    System.out.println("定時任務測試");
    }

}

3. spring 配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
    xmlns:job="http://www.dangdang.com/schema/ddframe/job"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.dangdang.com/schema/ddframe/reg
        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
        http://www.dangdang.com/schema/ddframe/job
        http://www.dangdang.com/schema/ddframe/job/job.xsd
        ">

<!-- 配置註冊中心 ,任務的資訊都會在zk中儲存 -->
<reg:zookeeper id="regCenter" server-lists="127.0.0.1:2181" namespace="test-job"
    base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />

<!-- 配置簡單作業   -->
<job:simple id="myTask"
    class="com.xxx.MyTask"
    registry-center-ref="regCenter" cron="0 10 * * * ?"
    sharding-total-count="1" overwrite="true"><!-- 分片為1,即不需要分片;支援覆蓋,即會用本次的配置覆蓋快取在zk中的配置 -->
    <job:event-log /><!-- job執行日誌記錄到log -->
    <job:event-rdb driver="${ds1.jdbc.driver_class_name}" <!-- job執行日誌記錄到DB, 詳細參考:http://dangdangdotcom.github.io/elastic-job/post/user_guide/common/event_trace/-->
        url="${ds1.jdbc.url}" username="${ds1.jdbc.username}" password="${ds1.jdbc.password}"
        log-level="INFO" />
</job:simple>

</beans>

 

分片:

public interface SimpleJob extends ElasticJob {
     
    /**
     * 執行作業.
     *
     * @param shardingContext 分片上下文
     */
    void execute(ShardingContext shardingContext);
}

注意這裡面有一個shardingContext引數,看下原始碼:

/**
 * 分片上下文.
 *
 * @author zhangliang
 */
@Getter
@ToString
public final class ShardingContext {
     
    /**
     * 作業名稱.
     */
    private final String jobName;
     
    /**
     * 作業任務ID.
     */
    private final String taskId;
     
    /**
     * 分片總數.
     */
    private final int shardingTotalCount;
     
    /**
     * 作業自定義引數.
     * 可以配置多個相同的作業, 但是用不同的引數作為不同的排程例項.
     */
    private final String jobParameter;
     
    /**
     * 分配於本作業例項的分片項.
     */
    private final int shardingItem;
     
    /**
     * 分配於本作業例項的分片引數.
     */
    private final String shardingParameter;
     
    public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) {
        jobName = shardingContexts.getJobName();
        taskId = shardingContexts.getTaskId();
        shardingTotalCount = shardingContexts.getShardingTotalCount();
        jobParameter = shardingContexts.getJobParameter();
        this.shardingItem = shardingItem;
        shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);
    }
}

這裡面有2個很重要的屬性:shardingTotalCount 分片總數(比如:2)、shardingItem 當前分片索引(比如:1),前面提到的效能擴容,就可以根據2個引數進行簡單的處理,假設在電商系統中,每天晚上有個定時任務,要統計每家店的銷量。商家id一般在表設計上是一個自增數字,如果總共2個分片(注:通常也就是部署2個節點),可以把 id為奇數的放到分片0,id為偶數的放到分片1,這樣2個機器各跑一半,相對只有1臺機器而言,就快多了。

虛擬碼如下:

public class TestJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        int shardIndx = shardingContext.getShardingItem();
        if (shardIndx == 0) {
            //處理id為奇數的商家
        } else {
            //處理id為偶數的商家
        }
    }
}

這個還可以進一步簡化,如果使用mysql查詢商家列表,mysql中有一個mod函式,直接可以對商家id進行取模運算

select * from shop where mod(shop_id,2)=0

如果把上面的2、0換成引數,mybatis中類似這樣:

select * from shop where mod(shop_id,#{shardTotal})=#{shardIndex}

 

作業型別:

elastic-job提供了三種類型的作業:Simple型別作業、Dataflow型別作業、Script型別作業。這裡主要講解前兩者。Script型別作業意為指令碼型別作業,支援shell,python,perl等所有型別指令碼,使用不多,可以參見github文件。

SimpleJob需要實現SimpleJob介面,意為簡單實現,未經過任何封裝,與quartz原生介面相似,比如示例程式碼中所使用的job。

Dataflow型別用於處理資料流,需實現DataflowJob介面。該介面提供2個方法可供覆蓋,分別用於抓取(fetchData)和處理(processData)資料。
可通過DataflowJobConfiguration配置是否流式處理。
流式處理資料只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直執行下去; 非流式處理資料則只會在每次作業執行過程中執行一次fetchData方法和processData方法,隨即完成本次作業。
實際開發中,Dataflow型別的job還是很有好用的。

public class MyDataFlowJob implements DataflowJob<User> {
 
    /*
        status
        0:待處理
        1:已處理
     */
 
    @Override
    public List<User> fetchData(ShardingContext shardingContext) {
        List<User> users = null;
        /**
         * users = SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem Limit 0, 30
         */
        return users;
    }
 
    @Override
    public void processData(ShardingContext shardingContext, List<User> data) {
        for (User user: data) {
            user.setStatus(1);
            /**
             * update user
             */
        }
    }
}

<job:dataflow id="myDataFlowJob" class="com.fanfan.sample001.MyDataFlowJob" registry-center-ref="regCenter"
              sharding-total-count="2" cron="0 0 02 * * ?" streaming-process="true" overwrite="true" />

 

控制檯:

elastic-job還提供了一個不錯的UI控制檯,專案原始碼git clone到本地,mvn install就能得到一個elastic-job-lite-console-${version}.tar.gz的包,解壓,然後執行裡面的bin/start.sh 就能跑起來,介面類似如下:

ç¹å»æ¥çåå¾

  • 作業詳細資訊頁

 

通過這個控制檯,可以動態調整每個定時任務的觸發時間(即:cornExpress)。詳情可參考官網文件-運維平臺部分。



 


Refrence:
https://www.cnblogs.com/yjmyzz/p/elastic-job-tutorial.html
https://github.com/elasticjob/elastic-job-lite
https://www.cnblogs.com/wyb628/p/7682580.html