1. 程式人生 > >LTS原理--輕量級分散式任務排程框架(Light Task Schedule)(一)

LTS原理--輕量級分散式任務排程框架(Light Task Schedule)(一)

LTS(light-task-scheduler)主要用於解決分散式任務排程問題,支援實時任務,定時任務和Cron任務。有較好的伸縮性,擴充套件性,健壯穩定性而被多家公司使用,同時也希望開源愛好者一起貢獻。

專案地址

這兩個地址都會同步更新。感興趣,請加QQ群:109500214 (加群密碼: hello world)一起探討、完善。越多人支援,就越有動力去更新,喜歡記得右上角star哈。

##1.7.2-SNAPSHOT(master)變更主要點

  1. 優化JobContext中的BizLogger,由原來的去掉了threadlocal,解決taskTracker多執行緒的問題, 去掉LtsLoggerFactory.getLogger()用法

框架概況

LTS 有主要有以下四種節點:

  • JobClient:主要負責提交任務, 並接收任務執行反饋結果。
  • JobTracker:負責接收並分配任務,任務排程。
  • TaskTracker:負責執行任務,執行完反饋給JobTracker。
  • LTS-Admin:(管理後臺)主要負責節點管理,任務佇列管理,監控管理等。

其中JobClient,JobTracker,TaskTracker節點都是無狀態的。 可以部署多個並動態的進行刪減,來實現負載均衡,實現更大的負載量, 並且框架採用FailStore策略使LTS具有很好的容錯能力。

LTS註冊中心提供多種實現(Zookeeper,redis等),註冊中心進行節點資訊暴露,master選舉。(Mongo or Mysql)儲存任務佇列和任務執行日誌, netty or mina做底層通訊, 並提供多種序列化方式fastjson, hessian2, java等。

LTS支援任務型別:

  • 實時任務:提交了之後立即就要執行的任務。
  • 定時任務:在指定時間點執行的任務,譬如 今天3點執行(單次)。
  • Cron任務:CronExpression,和quartz類似(但是不是使用quartz實現的)譬如 0 0/1 * * * ?

支援動態修改任務引數,任務執行時間等設定,支援後臺動態新增任務,支援Cron任務暫停,支援手動停止正在執行的任務(有條件),支援任務的監控統計,支援各個節點的任務執行監控,JVM監控等等.

架構圖

LTS architecture

概念說明

###節點組

  1. 英文名稱 NodeGroup,一個節點組等同於一個小的叢集,同一個節點組中的各個節點是對等的,等效的,對外提供相同的服務。
  2. 每個節點組中都有一個master節點,這個master節點是由LTS動態選出來的,當一個master節點掛掉之後,LTS會立馬選出另外一個master節點,框架提供API監聽介面給使用者。

###FailStore

  1. 顧名思義,這個主要是用於失敗了儲存的,主要用於節點容錯,當遠端資料互動失敗之後,儲存在本地,等待遠端通訊恢復的時候,再將資料提交。
  2. FailStore主要使用者JobClient的任務提交,TaskTracker的任務反饋,TaskTracker的業務日誌傳輸的場景下。
  3. FailStore目前提供幾種實現:leveldb,rocksdb,berkeleydb,mapdb,ltsdb,用於可以自由選擇使用哪種,使用者也可以採用SPI擴充套件使用自己的實現。

流程圖

下圖是一個標準的實時任務執行流程。

LTS progress

LTS-Admin新版介面預覽

LTS Admin目前後臺帶有由ztajy提供的一個簡易的認證功能. 使用者名稱密碼在auth.cfg中,使用者自行修改.

##特性 ###1、Spring支援 LTS可以完全不用Spring框架,但是考慮到很用使用者專案中都是用了Spring框架,所以LTS也提供了對Spring的支援,包括Xml和註解,引入lts-spring.jar即可。 ###2、業務日誌記錄器 在TaskTracker端提供了業務日誌記錄器,供應用程式使用,通過這個業務日誌器,可以將業務日誌提交到JobTracker,這些業務日誌可以通過任務ID串聯起來,可以在LTS-Admin中實時檢視任務的執行進度。 ###3、SPI擴充套件支援 SPI擴充套件可以達到零侵入,只需要實現相應的介面,並實現即可被LTS使用,目前開放出來的擴充套件介面有

  1. 對任務佇列的擴充套件,使用者可以不選擇使用mysql或者mongo作為佇列儲存,也可以自己實現。
  2. 對業務日誌記錄器的擴充套件,目前主要支援console,mysql,mongo,使用者也可以通過擴充套件選擇往其他地方輸送日誌。

###4、故障轉移 當正在執行任務的TaskTracker宕機之後,JobTracker會立馬將分配在宕機的TaskTracker的所有任務再分配給其他正常的TaskTracker節點執行。 ###5、節點監控 可以對JobTracker,TaskTracker節點進行資源監控,任務監控等,可以實時的在LTS-Admin管理後臺檢視,進而進行合理的資源調配。 ###6、多樣化任務執行結果支援 LTS框架提供四種執行結果支援,EXECUTE_SUCCESSEXECUTE_FAILEDEXECUTE_LATEREXECUTE_EXCEPTION,並對每種結果採取相應的處理機制,譬如重試。

  • EXECUTE_SUCCESS: 執行成功,這種情況,直接反饋客戶端(如果任務被設定了要反饋給客戶端)。
  • EXECUTE_FAILED:執行失敗,這種情況,直接反饋給客戶端,不進行重試。
  • EXECUTE_LATER:稍後執行(需要重試),這種情況,不反饋客戶端,重試策略採用1min,2min,3min的策略,預設最大重試次數為10次,使用者可以通過引數設定修改這個重試次數。
  • EXECUTE_EXCEPTION:執行異常, 這種情況也會重試(重試策略,同上)

###7、FailStore容錯 採用FailStore機制來進行節點容錯,Fail And Store,不會因為遠端通訊的不穩定性而影響當前應用的執行。具體FailStore說明,請參考概念說明中的FailStore說明。

##專案編譯打包 專案主要採用maven進行構建,目前提供shell指令碼的打包。 環境依賴:Java(jdk1.6+) Maven

使用者使用一般分為兩種: ###1、Maven構建 可以通過maven命令將lts的jar包上傳到本地倉庫中。在父pom.xml中新增相應的repository,並用deploy命令上傳即可。具體引用方式可以參考lts中的例子即可。 ###2、直接Jar引用 需要將lts的各個模組打包成單獨的jar包,並且將所有lts依賴包引入。具體引用哪些jar包可以參考lts中的例子即可。

##JobTracker和LTS-Admin部署 提供(cmd)windows(shell)linux兩種版本指令碼來進行編譯和部署:

  1. 執行根目錄下的sh build.shbuild.cmd指令碼,會在dist目錄下生成lts-{version}-bin資料夾

  2. 下面是其目錄結構,其中bin目錄主要是JobTracker和LTS-Admin的啟動指令碼。jobtracker 中是 JobTracker的配置檔案和需要使用到的jar包,lts-admin是LTS-Admin相關的war包和配置檔案。 lts-{version}-bin的檔案結構

-- lts-${version}-bin
    |-- bin
    |   |-- jobtracker.cmd
    |   |-- jobtracker.sh
    |   |-- lts-admin.cmd
    |   |-- lts-admin.sh
    |   |-- lts-monitor.cmd
    |   |-- lts-monitor.sh
    |   |-- tasktracker.sh
    |-- conf
    |   |-- log4j.properties
    |   |-- lts-admin.cfg
    |   |-- lts-monitor.cfg
    |   |-- readme.txt
    |   |-- tasktracker.cfg
    |   |-- zoo
    |       |-- jobtracker.cfg
    |       |-- log4j.properties
    |       |-- lts-monitor.cfg
    |-- lib
    |   |-- *.jar
    |-- war
        |-- jetty
        |   |-- lib
        |       |-- *.jar
        |-- lts-admin.war
  1. JobTracker啟動。如果你想啟動一個節點,直接修改下conf/zoo下的配置檔案,然後執行 sh jobtracker.sh zoo start即可,如果你想啟動兩個JobTracker節點,那麼你需要拷貝一份zoo,譬如命名為zoo2,修改下zoo2下的配置檔案,然後執行sh jobtracker.sh zoo2 start即可。logs資料夾下生成jobtracker-zoo.out日誌。
  2. LTS-Admin啟動.修改conf/lts-monitor.cfgconf/lts-admin.cfg下的配置,然後執行bin下的sh lts-admin.shlts-admin.cmd指令碼即可。logs資料夾下會生成lts-admin.out日誌,啟動成功在日誌中會打印出訪問地址,使用者可以通過這個訪問地址訪問了。

##JobClient(部署)使用 需要引入lts的jar包有lts-jobclient-{version}.jarlts-core-{version}.jar 及其它第三方依賴jar。 ###API方式啟動

JobClient jobClient = new RetryJobClient();
jobClient.setNodeGroup("test_jobClient");
jobClient.setClusterName("test_cluster");
jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
jobClient.start();

// 提交任務
Job job = new Job();
job.setTaskId("3213213123");
job.setParam("shopId", "11111");
job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
// job.setCronExpression("0 0/1 * * * ?");  // 支援 cronExpression表示式
// job.setTriggerTime(new Date()); // 支援指定時間執行
Response response = jobClient.submitJob(job);

###Spring XML方式啟動

<bean id="jobClient" class="com.github.ltsopensource.spring.JobClientFactoryBean">
    <property name="clusterName" value="test_cluster"/>
    <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
    <property name="nodeGroup" value="test_jobClient"/>
    <property name="masterChangeListeners">
        <list>
            <bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/>
        </list>
    </property>
    <property name="jobFinishedHandler">
        <bean class="com.github.ltsopensource.example.support.JobFinishedHandlerImpl"/>
    </property>
    <property name="configs">
        <props>
            <!-- 引數 -->
            <prop key="job.fail.store">leveldb</prop>
        </props>
    </property>
</bean>

###Spring 全註解方式

@Configuration
public class LTSSpringConfig {

    @Bean(name = "jobClient")
    public JobClient getJobClient() throws Exception {
        JobClientFactoryBean factoryBean = new JobClientFactoryBean();
        factoryBean.setClusterName("test_cluster");
        factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181");
        factoryBean.setNodeGroup("test_jobClient");
        factoryBean.setMasterChangeListeners(new MasterChangeListener[]{
                new MasterChangeListenerImpl()
        });
        Properties configs = new Properties();
        configs.setProperty("job.fail.store", "leveldb");
        factoryBean.setConfigs(configs);
        factoryBean.afterPropertiesSet();
        return factoryBean.getObject();
    }
}

##TaskTracker(部署使用) 需要引入lts的jar包有lts-tasktracker-{version}.jarlts-core-{version}.jar 及其它第三方依賴jar。 ###定義自己的任務執行類

public class MyJobRunner implements JobRunner {
    @Override
    public Result run(JobContext jobContext) throws Throwable {
        try {
            // TODO 業務邏輯
            // 會發送到 LTS (JobTracker上)
            jobContext.getBizLogger().info("測試,業務日誌啊啊啊啊啊");

        } catch (Exception e) {
            return new Result(Action.EXECUTE_FAILED, e.getMessage());
        }
        return new Result(Action.EXECUTE_SUCCESS, "執行成功了,哈哈");
    }
}

###API方式啟動

TaskTracker taskTracker = new TaskTracker();
taskTracker.setJobRunnerClass(MyJobRunner.class);
taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
taskTracker.setNodeGroup("test_trade_TaskTracker");
taskTracker.setClusterName("test_cluster");
taskTracker.setWorkThreads(20);
taskTracker.start();

###Spring XML方式啟動

<bean id="taskTracker" class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init-method="start">
    <property name="jobRunnerClass" value="com.github.ltsopensource.example.support.MyJobRunner"/>
    <property name="bizLoggerLevel" value="INFO"/>
    <property name="clusterName" value="test_cluster"/>
    <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
    <property name="nodeGroup" value="test_trade_TaskTracker"/>
    <property name="workThreads" value="20"/>
    <property name="masterChangeListeners">
        <list>
            <bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/>
        </list>
    </property>
    <property name="configs">
        <props>
            <prop key="job.fail.store">leveldb</prop>
        </props>
    </property>
</bean>

###Spring註解方式啟動

@Configuration
public class LTSSpringConfig implements ApplicationContextAware {
    private ApplicationContext applicationContext;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
	@Bean(name = "taskTracker")
    public TaskTracker getTaskTracker() throws Exception {
        TaskTrackerAnnotationFactoryBean factoryBean = new TaskTrackerAnnotationFactoryBean();
        factoryBean.setApplicationContext(applicationContext);
        factoryBean.setClusterName("test_cluster");
        factoryBean.setJobRunnerClass(MyJobRunner.class);
        factoryBean.setNodeGroup("test_trade_TaskTracker");
        factoryBean.setBizLoggerLevel("INFO");
        factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181");
        factoryBean.setMasterChangeListeners(new MasterChangeListener[]{
                new MasterChangeListenerImpl()
        });
        factoryBean.setWorkThreads(20);
        Properties configs = new Properties();
        configs.setProperty("job.fail.store", "leveldb");
        factoryBean.setConfigs(configs);

        factoryBean.afterPropertiesSet();
//        factoryBean.start();
        return factoryBean.getObject();
    }
}

##引數說明 引數說明

##使用建議 一般在一個JVM中只需要一個JobClient例項即可,不要為每種任務都新建一個JobClient例項,這樣會大大的浪費資源,因為一個JobClient可以提交多種任務。相同的一個JVM一般也儘量保持只有一個TaskTracker例項即可,多了就可能造成資源浪費。當遇到一個TaskTracker要執行多種任務的時候,請參考下面的 "一個TaskTracker執行多種任務"。 ##一個TaskTracker執行多種任務 有的時候,業務場景需要執行多種任務,有些人會問,是不是要每種任務型別都要一個TaskTracker去執行。我的答案是否定的,如果在一個JVM中,最好使用一個TaskTracker去執行多種任務,因為一個JVM中使用多個TaskTracker例項比較浪費資源(當然當你某種任務量比較多的時候,可以將這個任務單獨使用一個TaskTracker節點來執行)。那麼怎麼才能實現一個TaskTracker執行多種任務呢。下面是我給出來的參考例子。

/**
 * 總入口,在 taskTracker.setJobRunnerClass(JobRunnerDispatcher.class)
 * JobClient 提交 任務時指定 Job 型別  job.setParam("type", "aType")
 */
public class JobRunnerDispatcher implements JobRunner {

    private static final ConcurrentHashMap<String/*type*/, JobRunner>
            JOB_RUNNER_MAP = new ConcurrentHashMap<String, JobRunner>();

    static {
        JOB_RUNNER_MAP.put("aType", new JobRunnerA()); // 也可以從Spring中拿
        JOB_RUNNER_MAP.put("bType", new JobRunnerB());
    }

    @Override
    public Result run(JobContext jobContext) throws Throwable {
        Job job = jobContext.getJob();
        String type = job.getParam("type");
        return JOB_RUNNER_MAP.get(type).run(job);
    }
}

class JobRunnerA implements JobRunner {
    @Override
    public Result run(JobContext jobContext) throws Throwable {
        //  TODO A型別Job的邏輯
        return null;
    }
}

class JobRunnerB implements JobRunner {
    @Override
    public Result run(JobContext jobContext) throws Throwable {
        // TODO B型別Job的邏輯
        return null;
    }
}

##TaskTracker的JobRunner測試 一般在編寫TaskTracker的時候,只需要測試JobRunner的實現邏輯是否正確,又不想啟動LTS進行遠端測試。為了方便測試,LTS提供了JobRunner的快捷測試方法。自己的測試類整合com.github.ltsopensource.tasktracker.runner.JobRunnerTester即可,並實現initContextnewJobRunner方法即可。如lts-examples中的例子:

public class TestJobRunnerTester extends JobRunnerTester {

    public static void main(String[] args) throws Throwable {
        //  Mock Job 資料
        Job job = new Job();
        job.setTaskId("2313213");

        JobContext jobContext = new JobContext();
        jobContext.setJob(job);

        JobExtInfo jobExtInfo = new JobExtInfo();
        jobExtInfo.setRetry(false);

        jobContext.setJobExtInfo(jobExtInfo);

        // 執行測試
        TestJobRunnerTester tester = new TestJobRunnerTester();
        Result result = tester.run(jobContext);
        System.out.println(JSON.toJSONString(result));
    }

    @Override
    protected void initContext() {
        // TODO 初始化Spring容器
    }

    @Override
    protected JobRunner newJobRunner() {
        return new TestJobRunner();
    }
}

##Spring Quartz Cron任務無縫接入 對於Quartz的Cron任務只需要在Spring配置中增加一下程式碼就可以接入LTS平臺

<bean class="com.github.ltsopensource.spring.quartz.QuartzLTSProxyBean">
    <property name="clusterName" value="test_cluster"/>
    <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
    <property name="nodeGroup" value="quartz_test_group"/>
</bean>

##Spring Boot 支援

@SpringBootApplication
@EnableJobTracker       // 啟動JobTracker
@EnableJobClient        // 啟動JobClient
@EnableTaskTracker      // 啟動TaskTracker
@EnableMonitor          // 啟動Monitor
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

剩下的就只是在application.properties中新增相應的配置就行了, 具體見lts-example中的com.github.ltsopensource.examples.springboot包下的例子

##多網絡卡選擇問題 當機器有內網兩個網絡卡的時候,有時候,使用者想讓LTS的流量走外網網絡卡,那麼需要在host中,把主機名稱的對映地址改為外網網絡卡地址即可,內網同理。

##關於節點標識問題 如果在節點啟動的時候設定節點標識,LTS會預設設定一個UUID為節點標識,可讀性會比較差,但是能保證每個節點的唯一性,如果使用者能自己保證節點標識的唯一性,可以通過 setIdentity 來設定,譬如如果每個節點都是部署在一臺機器(一個虛擬機器)上,那麼可以將identity設定為主機名稱

##SPI擴充套件說明 支援JobLogger,JobQueue等等的SPI擴充套件

##LTS-Admin使用jetty啟動(預設),不定期掛掉解決方案 見issue#389