分散式任務排程元件 LTS 使用者文件
LTS使用者文件
LTS(light-task-scheduler)主要用於解決分散式任務排程問題,支援實時任務,定時任務和Cron任務。有較好的伸縮性,擴充套件性,健壯穩定性而被多家公司使用,同時也希望開源愛好者一起貢獻。
專案地址
github地址: https://github.com/ltsopensource/light-task-scheduler
oschina地址: http://git.oschina.net/hugui/light-task-scheduler
例子: https://github.com/ltsopensource/lts-examples
框架概況
LTS 有主要有以下四種節點:
- JobClient:主要負責提交任務, 並接收任務執行反饋結果。
- JobTracker:負責接收並分配任務,任務排程。
- TaskTracker:負責執行任務,執行完反饋給JobTracker。
- LTS-Admin:(管理後臺)主要負責節點管理,任務佇列管理,監控管理等。
其中JobClient,JobTracker,TaskTracker節點都是 無狀態
的。 可以部署多個並動態的進行刪減,來實現負載均衡,實現更大的負載量, 並且框架採用FailStore策略使LTS具有很好的容錯能力。
LTS註冊中心提供多種實現(Zookeeper,redis等),註冊中心進行節點資訊暴露,master選舉。(Mongo or Mysql)儲存任務佇列和任務執行日誌, netty or mina做底層通訊, 並提供多種序列化方式fastjson, hessian2, java等。
LTS支援任務型別:
- 實時任務:提交了之後立即就要執行的任務。
- 定時任務:在指定時間點執行的任務,譬如 今天3點執行(單次)。
- Cron任務:CronExpression,和quartz類似(但是不是使用quartz實現的)譬如 0 0/1 * * * ?
支援動態修改任務引數,任務執行時間等設定,支援後臺動態新增任務,支援Cron任務暫停,支援手動停止正在執行的任務(有條件),支援任務的監控統計,支援各個節點的任務執行監控,JVM監控等等.
架構圖
概念說明
節點組
- 英文名稱 NodeGroup,一個節點組等同於一個小的叢集,同一個節點組中的各個節點是對等的,等效的,對外提供相同的服務。
- 每個節點組中都有一個master節點,這個master節點是由LTS動態選出來的,當一個master節點掛掉之後,LTS會立馬選出另外一個master節點,框架提供API監聽介面給使用者。
FailStore
- 顧名思義,這個主要是用於失敗了儲存的,主要用於節點容錯,當遠端資料互動失敗之後,儲存在本地,等待遠端通訊恢復的時候,再將資料提交。
- FailStore主要使用者JobClient的任務提交,TaskTracker的任務反饋,TaskTracker的業務日誌傳輸的場景下。
- FailStore目前提供幾種實現:leveldb,rocksdb,berkeleydb,mapdb,ltsdb,用於可以自由選擇使用哪種,使用者也可以採用SPI擴充套件使用自己的實現。
流程圖
下圖是一個標準的實時任務執行流程。

image.png
目前後臺帶有由ztajy提供的一個簡易的認證功能. 使用者名稱密碼在auth.cfg中,使用者自行修改.
特性
1、Spring支援
LTS可以完全不用Spring框架,但是考慮到很用使用者專案中都是用了Spring框架,所以LTS也提供了對Spring的支援,包括Xml和註解,引入lts-spring.jar即可。
2、業務日誌記錄器
在TaskTracker端提供了業務日誌記錄器,供應用程式使用,通過這個業務日誌器,可以將業務日誌提交到JobTracker,這些業務日誌可以通過任務ID串聯起來,可以在LTS-Admin中實時檢視任務的執行進度。
3、SPI擴充套件支援
SPI擴充套件可以達到零侵入,只需要實現相應的介面,並實現即可被LTS使用,目前開放出來的擴充套件介面有
對任務佇列的擴充套件,使用者可以不選擇使用mysql或者mongo作為佇列儲存,也可以自己實現。
對業務日誌記錄器的擴充套件,目前主要支援console,mysql,mongo,使用者也可以通過擴充套件選擇往其他地方輸送日誌。
4、故障轉移
當正在執行任務的TaskTracker宕機之後,JobTracker會立馬將分配在宕機的TaskTracker的所有任務再分配給其他正常的TaskTracker節點執行。
5、節點監控
可以對JobTracker,TaskTracker節點進行資源監控,任務監控等,可以實時的在LTS-Admin管理後臺檢視,進而進行合理的資源調配。
6、多樣化任務執行結果支援
LTS框架提供四種執行結果支援,EXECUTE_SUCCESS,EXECUTE_FAILED,EXECUTE_LATER,EXECUTE_EXCEPTION,並對每種結果採取相應的處理機制,譬如重試。
EXECUTE_SUCCESS: 執行成功,這種情況,直接反饋客戶端(如果任務被設定了要反饋給客戶端)。
EXECUTE_FAILED:執行失敗,這種情況,直接反饋給客戶端,不進行重試。
EXECUTE_LATER:稍後執行(需要重試),這種情況,不反饋客戶端,重試策略採用1min,2min,3min的策略,預設最大重試次數為10次,使用者可以通過引數設定修改這個重試次數。
EXECUTE_EXCEPTION:執行異常, 這種情況也會重試(重試策略,同上)
7、FailStore容錯
採用FailStore機制來進行節點容錯,Fail And Store,不會因為遠端通訊的不穩定性而影響當前應用的執行。具體FailStore說明,請參考概念說明中的FailStore說明。
專案編譯打包
專案主要採用maven進行構建,目前提供shell指令碼的打包。 環境依賴:Java(jdk1.6+) Maven
使用者使用一般分為兩種:
1、Maven構建
可以通過maven命令將lts的jar包上傳到本地倉庫中。在父pom.xml中新增相應的repository,並用deploy命令上傳即可。具體引用方式可以參考lts中的例子即可。
2、直接Jar引用
需要將lts的各個模組打包成單獨的jar包,並且將所有lts依賴包引入。具體引用哪些jar包可以參考lts中的例子即可。
JobTracker和LTS-Admin部署
提供(cmd)windows和(shell)linux兩種版本指令碼來進行編譯和部署:
執行根目錄下的sh build.sh或build.cmd指令碼,會在dist目錄下生成lts-{version}-bin資料夾
下面是其目錄結構,其中bin目錄主要是JobTracker和LTS-Admin的啟動指令碼。jobtracker 中是 JobTracker的配置檔案和需要使用到的jar包,lts-admin是LTS-Admin相關的war包和配置檔案。 lts-{version}-bin的檔案結構
-- lts-${version}-bin
|-- bin
| |-- jobtracker.cmd
| |-- jobtracker.sh
| |-- lts-admin.cmd
| |-- lts-admin.sh
| |-- lts-monitor.cmd
| |-- lts-monitor.sh
| |-- tasktracker.sh
|-- conf
| |-- log4j.properties
| |-- lts-admin.cfg
| |-- lts-monitor.cfg
| |-- readme.txt
| |-- tasktracker.cfg
| |-- zoo
| |-- jobtracker.cfg
| |-- log4j.properties
| |-- lts-monitor.cfg
|-- lib
| |-- *.jar
|-- war
|-- jetty
| |-- lib
| |-- *.jar
|-- lts-admin.war
JobTracker啟動。如果你想啟動一個節點,直接修改下conf/zoo下的配置檔案,然後執行 sh jobtracker.sh zoo start即可,如果你想啟動兩個JobTracker節點,那麼你需要拷貝一份zoo,譬如命名為zoo2,修改下zoo2下的配置檔案,然後執行sh jobtracker.sh zoo2 start即可。logs資料夾下生成jobtracker-zoo.out日誌。
LTS-Admin啟動.修改conf/lts-monitor.cfg和conf/lts-admin.cfg下的配置,然後執行bin下的sh lts-admin.sh或lts-admin.cmd指令碼即可。logs資料夾下會生成lts-admin.out日誌,啟動成功在日誌中會打印出訪問地址,使用者可以通過這個訪問地址訪問了。
JobClient(部署)使用
需要引入lts的jar包有lts-jobclient-{version}.jar,lts-core-{version}.jar 及其它第三方依賴jar。
API方式啟動
JobClient jobClient = new RetryJobClient(); jobClient.setNodeGroup("test_jobClient"); jobClient.setClusterName("test_cluster"); jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181"); jobClient.start(); // 提交任務 Job job = new Job(); job.setTaskId("3213213123"); job.setParam("shopId", "11111"); job.setTaskTrackerNodeGroup("test_trade_TaskTracker"); // job.setCronExpression("0 0/1 * * * ?");// 支援 cronExpression表示式 // job.setTriggerTime(new Date()); // 支援指定時間執行 Response response = jobClient.submitJob(job);
Spring XML方式啟動
<bean id="jobClient" class="com.github.ltsopensource.spring.JobClientFactoryBean"> <property name="clusterName" value="test_cluster"/> <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/> <property name="nodeGroup" value="test_jobClient"/> <property name="masterChangeListeners"> <list> <bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/> </list> </property> <property name="jobFinishedHandler"> <bean class="com.github.ltsopensource.example.support.JobFinishedHandlerImpl"/> </property> <property name="configs"> <props> <!-- 引數 --> <prop key="job.fail.store">leveldb</prop> </props> </property> </bean>
Spring 全註解方式
@Configuration public class LTSSpringConfig { @Bean(name = "jobClient") public JobClient getJobClient() throws Exception { JobClientFactoryBean factoryBean = new JobClientFactoryBean(); factoryBean.setClusterName("test_cluster"); factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181"); factoryBean.setNodeGroup("test_jobClient"); factoryBean.setMasterChangeListeners(new MasterChangeListener[]{ new MasterChangeListenerImpl() }); Properties configs = new Properties(); configs.setProperty("job.fail.store", "leveldb"); factoryBean.setConfigs(configs); factoryBean.afterPropertiesSet(); return factoryBean.getObject(); } }
TaskTracker(部署使用)
需要引入lts的jar包有lts-tasktracker-{version}.jar,lts-core-{version}.jar 及其它第三方依賴jar。 ###定義自己的任務執行類
public class MyJobRunner implements JobRunner { @Override public Result run(JobContext jobContext) throws Throwable { try { // TODO 業務邏輯 // 會發送到 LTS (JobTracker上) jobContext.getBizLogger().info("測試,業務日誌啊啊啊啊啊"); } catch (Exception e) { return new Result(Action.EXECUTE_FAILED, e.getMessage()); } return new Result(Action.EXECUTE_SUCCESS, "執行成功了,哈哈"); } }
API方式啟動
TaskTracker taskTracker = new TaskTracker(); taskTracker.setJobRunnerClass(MyJobRunner.class); taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181"); taskTracker.setNodeGroup("test_trade_TaskTracker"); taskTracker.setClusterName("test_cluster"); taskTracker.setWorkThreads(20); taskTracker.start();
Spring XML方式啟動
<bean id="taskTracker" class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init-method="start"> <property name="jobRunnerClass" value="com.github.ltsopensource.example.support.MyJobRunner"/> <property name="bizLoggerLevel" value="INFO"/> <property name="clusterName" value="test_cluster"/> <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/> <property name="nodeGroup" value="test_trade_TaskTracker"/> <property name="workThreads" value="20"/> <property name="masterChangeListeners"> <list> <bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/> </list> </property> <property name="configs"> <props> <prop key="job.fail.store">leveldb</prop> </props> </property> </bean>
Spring註解方式啟動
@Configuration public class LTSSpringConfig implements ApplicationContextAware { private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Bean(name = "taskTracker") public TaskTracker getTaskTracker() throws Exception { TaskTrackerAnnotationFactoryBean factoryBean = new TaskTrackerAnnotationFactoryBean(); factoryBean.setApplicationContext(applicationContext); factoryBean.setClusterName("test_cluster"); factoryBean.setJobRunnerClass(MyJobRunner.class); factoryBean.setNodeGroup("test_trade_TaskTracker"); factoryBean.setBizLoggerLevel("INFO"); factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181"); factoryBean.setMasterChangeListeners(new MasterChangeListener[]{ new MasterChangeListenerImpl() }); factoryBean.setWorkThreads(20); Properties configs = new Properties(); configs.setProperty("job.fail.store", "leveldb"); factoryBean.setConfigs(configs); factoryBean.afterPropertiesSet(); //factoryBean.start(); return factoryBean.getObject(); } }
引數說明
引數說明
使用建議
一般在一個JVM中只需要一個JobClient例項即可,不要為每種任務都新建一個JobClient例項,這樣會大大的浪費資源,因為一個JobClient可以提交多種任務。相同的一個JVM一般也儘量保持只有一個TaskTracker例項即可,多了就可能造成資源浪費。當遇到一個TaskTracker要執行多種任務的時候,請參考下面的 "一個TaskTracker執行多種任務"。
一個TaskTracker執行多種任務
有的時候,業務場景需要執行多種任務,有些人會問,是不是要每種任務型別都要一個TaskTracker去執行。我的答案是否定的,如果在一個JVM中,最好使用一個TaskTracker去執行多種任務,因為一個JVM中使用多個TaskTracker例項比較浪費資源(當然當你某種任務量比較多的時候,可以將這個任務單獨使用一個TaskTracker節點來執行)。那麼怎麼才能實現一個TaskTracker執行多種任務呢。下面是我給出來的參考例子。
/** * 總入口,在 taskTracker.setJobRunnerClass(JobRunnerDispatcher.class) * JobClient 提交 任務時指定 Job 型別job.setParam("type", "aType") */ public class JobRunnerDispatcher implements JobRunner { private static final ConcurrentHashMap<String/*type*/, JobRunner> JOB_RUNNER_MAP = new ConcurrentHashMap<String, JobRunner>(); static { JOB_RUNNER_MAP.put("aType", new JobRunnerA()); // 也可以從Spring中拿 JOB_RUNNER_MAP.put("bType", new JobRunnerB()); } @Override public Result run(JobContext jobContext) throws Throwable { Job job = jobContext.getJob(); String type = job.getParam("type"); return JOB_RUNNER_MAP.get(type).run(job); } } class JobRunnerA implements JobRunner { @Override public Result run(JobContext jobContext) throws Throwable { //TODO A型別Job的邏輯 return null; } } class JobRunnerB implements JobRunner { @Override public Result run(JobContext jobContext) throws Throwable { // TODO B型別Job的邏輯 return null; } }
TaskTracker的JobRunner測試
一般在編寫TaskTracker的時候,只需要測試JobRunner的實現邏輯是否正確,又不想啟動LTS進行遠端測試。為了方便測試,LTS提供了JobRunner的快捷測試方法。自己的測試類整合com.github.ltsopensource.tasktracker.runner.JobRunnerTester即可,並實現initContext和newJobRunner方法即可。如lts-examples中的例子:
public class TestJobRunnerTester extends JobRunnerTester { public static void main(String[] args) throws Throwable { //Mock Job 資料 Job job = new Job(); job.setTaskId("2313213"); JobContext jobContext = new JobContext(); jobContext.setJob(job); JobExtInfo jobExtInfo = new JobExtInfo(); jobExtInfo.setRetry(false); jobContext.setJobExtInfo(jobExtInfo); // 執行測試 TestJobRunnerTester tester = new TestJobRunnerTester(); Result result = tester.run(jobContext); System.out.println(JSON.toJSONString(result)); } @Override protected void initContext() { // TODO 初始化Spring容器 } @Override protected JobRunner newJobRunner() { return new TestJobRunner(); } }
Spring Quartz Cron任務無縫接入
對於Quartz的Cron任務只需要在Spring配置中增加一下程式碼就可以接入LTS平臺
<bean class="com.github.ltsopensource.spring.quartz.QuartzLTSProxyBean"> <property name="clusterName" value="test_cluster"/> <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/> <property name="nodeGroup" value="quartz_test_group"/> </bean>
Spring Boot 支援
@SpringBootApplication @EnableJobTracker// 啟動JobTracker @EnableJobClient// 啟動JobClient @EnableTaskTracker// 啟動TaskTracker @EnableMonitor// 啟動Monitor public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
剩下的就只是在application.properties中新增相應的配置就行了, 具體見lts-example中的com.github.ltsopensource.examples.springboot包下的例子
多網絡卡選擇問題
當機器有內網兩個網絡卡的時候,有時候,使用者想讓LTS的流量走外網網絡卡,那麼需要在host中,把主機名稱的對映地址改為外網網絡卡地址即可,內網同理。
關於節點標識問題
如果在節點啟動的時候設定節點標識,LTS會預設設定一個UUID為節點標識,可讀性會比較差,但是能保證每個節點的唯一性,如果使用者能自己保證節點標識的唯一性,可以通過 setIdentity 來設定,譬如如果每個節點都是部署在一臺機器(一個虛擬機器)上,那麼可以將identity設定為主機名稱
SPI擴充套件說明
支援JobLogger,JobQueue等等的SPI擴充套件
和其它解決方案比較
LTS-Admin使用jetty啟動(預設),不定期掛掉解決方案 見issue#389
歡迎關注高廣超的簡書部落格 與 收藏文章 !
歡迎關注 頭條號:網際網路技術棧 !
個人介紹:
高廣超:多年一線網際網路研發與架構設計經驗,擅長設計與落地高可用、高效能、可擴充套件的網際網路架構。目前從事大資料相關研發與架構工作。
本文首發在高廣超的簡書部落格 轉載請註明!