1. 程式人生 > >Elastic-job實戰(分散式作業排程框架)

Elastic-job實戰(分散式作業排程框架)

就拿一個場景來說吧,如果我們的專案是部署到多臺機器上,那麼某一時刻,我們的定時任務肯定每臺機器上都會執行一遍,那這肯定不是我們想要的結果,我們只希望有一臺機器能執行。

一.前言

Elastic job是噹噹網架構師基於Zookepper、Quartz開發並開源的一個Java分散式定時任務,解決了Quartz不支援分散式的弊端。Elastic job主要的功能有支援彈性擴容,通過Zookepper集中管理和監控job,支援失效轉移等,這些都是Quartz等其他定時任務無法比擬的。

官網說明:

目前Elastic job的最新版本已經由原來的elastic-job-core分離除了兩個專案,分別為Elastic-Job-Lite

Elastic-Job-Cloud。Elastic-Job是一個分散式排程解決方案,由兩個相互獨立的子專案Elastic-Job-Lite和Elastic-Job-Cloud組成。
Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分散式任務的協調服務。
Elastic-Job-Cloud使用Mesos +Docker(TBD)的解決方案,額外提供資源治理、應用分發以及程序隔離等服務,Elastic-Job-Lite和Elastic-Job-Cloud提供同一套API開發作業,開發者僅需一次開發,即可根據需要以Lite或Cloud的方式部署。

二.如何使用Elastic-Job

Elastic-job提供3種類型作業:

1.Simple型別作業
2.Dataflow型別作業
3.Script型別作業

這裡我們用第一種測試。

2.1 新增依賴

  <dependency>
      <groupId>com.dangdang</groupId>
      <artifactId>elastic-job-spring</artifactId>
      <version>1.1.1</version>
 </dependency>

2.2 新增配置檔案application.properties

################################## Elastic-Job ##################################
regCenter.namespace=my-job
regCenter.zk.address=localhost1:2181,localhost2:2181
regCenter.baseSleepTimeMilliseconds=1000
regCenter.maxSleepTimeMilliseconds=3000
regCenter.maxRetries=3
regCenter.sessionTimeoutMilliseconds=10000
#如果為true,可以在控制檯檢測到作業的執行狀態
job.monitorExecution=true
#失效轉移,如果為true,當作業在執行過程中異常中斷,作業會被分發到叢集中存活的結點
job.failover=true
#如果為true,在開始部署的時候作業不會自啟動,即使到了觸發時間,需要在控制檯手動觸發。
job.disabled=false
#如果為true,則會覆蓋zk的配置
job.overwrite=true
job.monitorPort=9888
#只有當分片的數量設定為1的時候,整個叢集中只會有一個程序去執行該Job,在伺服器數量沒有波動的情況下,任務總會在固定某個程序上執行。在作業執行前程序如果掛了,那作業會被分配到叢集某一個存活的程序中
job.shardingTotalCount=1
job.shardingItemParameters=

2.3 配置zk和Job application-Context-jobs.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
       xmlns:job="http://www.dangdang.com/schema/ddframe/job"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd">
    <reg:zookeeper id="regCenter"
                   server-lists="${regCenter.zk.address}"
                   namespace="${regCenter.namespace}-${env}"
                   base-sleep-time-milliseconds="${regCenter.baseSleepTimeMilliseconds}"
                   max-sleep-time-milliseconds="${regCenter.maxSleepTimeMilliseconds}"
                   max-retries="${regCenter.maxRetries}"
                   nested-port="-1"
                   session-timeout-milliseconds="${regCenter.sessionTimeoutMilliseconds}"/>

    <job:simple id="myJob"
                class="com.xx.job.MyJob"
                registry-center-ref="regCenter"
                sharding-total-count="${job.shardingTotalCount}"
                cron="0/5 * * * * ?"
                sharding-item-parameters="${job.shardingItemParameters}"
                monitor-execution="${job.monitorExecution}"
                monitor-port="${job.monitorPort}"
                failover="${job.failover}"
                description="我的job"
                disabled="${job.disabled}"
                overwrite="${job.overwrite}"/>
</beans>

2.4 封裝執行器(並行和序列)

序列任務執行器

/**
 * 序列任務執行器
 */
public abstract class AbstractSerialExecutor extends AbstractSimpleElasticJob {
    public abstract void executeJob(String jobName);

    @Override
    public void process(JobExecutionMultipleShardingContext shardingContext) {
        long t1 = System.currentTimeMillis();
        executeJob(shardingContext.getJobName());
        LogUtil.successJob(shardingContext.getJobName(), (System.currentTimeMillis() - t1));
    }
}

並行任務執行器

/**
 * 並行任務執行器
 */
public abstract class AbstractParallelExecutor extends AbstractSimpleElasticJob implements Runnable {
    @Override
    public void process(JobExecutionMultipleShardingContext shardingContext) {
        something....
    }
}

2.5 編寫Job

@Component
public class MyJob extends AbstractSerialExecutor {
    private final static Logger logger = LoggerFactory.getLogger(MyJob.class);
    @Override
    public void executeJob(String jobName) {
        System.out.println("*****************Myjob.....");
    }
}

3.Elastic-Job監控後臺搭建

具體怎麼弄,可以去搜一個教程…

在這裡插入圖片描述

zookeeper裡面儲存了我們Job的資訊:

在這裡插入圖片描述

4.測試分散式定時任務

我們把我們專案打成jar包,本地啟動一個,另一臺伺服器也啟動。我們發現只有一臺伺服器在執行定時任務:( 0/5 * * * * ?)每5秒跑一次…

在這裡插入圖片描述

在這裡插入圖片描述

這裡我們做一個實驗,我們假設一臺伺服器宕機了(現在在跑定時任務的主機),會怎麼樣?

在這裡插入圖片描述

在這裡插入圖片描述

使用Elastic-job輕鬆幫我們實現分散式定時任務,原理應該都一樣。我推測:zookeeper的Leader選舉,哪臺機器被選為Leader,那他就跑任務。

擴充套件

Elastic-Job監控平臺也給我們提供了便利,我們可以輕鬆監控作業,配置作業…
在這裡插入圖片描述

在這裡插入圖片描述

在這裡插入圖片描述