LTS原理--輕量級分散式任務排程框架(Light Task Schedule)(一)
LTS(light-task-scheduler)主要用於解決分散式任務排程問題,支援實時任務,定時任務和Cron任務。有較好的伸縮性,擴充套件性,健壯穩定性而被多家公司使用,同時也希望開源愛好者一起貢獻。
專案地址
這兩個地址都會同步更新。感興趣,請加QQ群:109500214 (加群密碼: hello world)一起探討、完善。越多人支援,就越有動力去更新,喜歡記得右上角star哈。
##1.7.2-SNAPSHOT(master)變更主要點
- 優化JobContext中的BizLogger,由原來的去掉了threadlocal,解決taskTracker多執行緒的問題, 去掉LtsLoggerFactory.getLogger()用法
框架概況
LTS 有主要有以下四種節點:
- JobClient:主要負責提交任務, 並接收任務執行反饋結果。
- JobTracker:負責接收並分配任務,任務排程。
- TaskTracker:負責執行任務,執行完反饋給JobTracker。
- LTS-Admin:(管理後臺)主要負責節點管理,任務佇列管理,監控管理等。
其中JobClient,JobTracker,TaskTracker節點都是無狀態
的。 可以部署多個並動態的進行刪減,來實現負載均衡,實現更大的負載量, 並且框架採用FailStore策略使LTS具有很好的容錯能力。
LTS註冊中心提供多種實現(Zookeeper,redis等),註冊中心進行節點資訊暴露,master選舉。(Mongo or Mysql)儲存任務佇列和任務執行日誌, netty or mina做底層通訊, 並提供多種序列化方式fastjson, hessian2, java等。
LTS支援任務型別:
- 實時任務:提交了之後立即就要執行的任務。
- 定時任務:在指定時間點執行的任務,譬如 今天3點執行(單次)。
- Cron任務:CronExpression,和quartz類似(但是不是使用quartz實現的)譬如 0 0/1 * * * ?
支援動態修改任務引數,任務執行時間等設定,支援後臺動態新增任務,支援Cron任務暫停,支援手動停止正在執行的任務(有條件),支援任務的監控統計,支援各個節點的任務執行監控,JVM監控等等.
架構圖
概念說明
###節點組
- 英文名稱 NodeGroup,一個節點組等同於一個小的叢集,同一個節點組中的各個節點是對等的,等效的,對外提供相同的服務。
- 每個節點組中都有一個master節點,這個master節點是由LTS動態選出來的,當一個master節點掛掉之後,LTS會立馬選出另外一個master節點,框架提供API監聽介面給使用者。
###FailStore
- 顧名思義,這個主要是用於失敗了儲存的,主要用於節點容錯,當遠端資料互動失敗之後,儲存在本地,等待遠端通訊恢復的時候,再將資料提交。
- FailStore主要使用者JobClient的任務提交,TaskTracker的任務反饋,TaskTracker的業務日誌傳輸的場景下。
- FailStore目前提供幾種實現:leveldb,rocksdb,berkeleydb,mapdb,ltsdb,用於可以自由選擇使用哪種,使用者也可以採用SPI擴充套件使用自己的實現。
流程圖
下圖是一個標準的實時任務執行流程。
LTS-Admin新版介面預覽
目前後臺帶有由ztajy提供的一個簡易的認證功能. 使用者名稱密碼在auth.cfg中,使用者自行修改.
##特性 ###1、Spring支援 LTS可以完全不用Spring框架,但是考慮到很用使用者專案中都是用了Spring框架,所以LTS也提供了對Spring的支援,包括Xml和註解,引入lts-spring.jar
即可。 ###2、業務日誌記錄器 在TaskTracker端提供了業務日誌記錄器,供應用程式使用,通過這個業務日誌器,可以將業務日誌提交到JobTracker,這些業務日誌可以通過任務ID串聯起來,可以在LTS-Admin中實時檢視任務的執行進度。 ###3、SPI擴充套件支援 SPI擴充套件可以達到零侵入,只需要實現相應的介面,並實現即可被LTS使用,目前開放出來的擴充套件介面有
- 對任務佇列的擴充套件,使用者可以不選擇使用mysql或者mongo作為佇列儲存,也可以自己實現。
- 對業務日誌記錄器的擴充套件,目前主要支援console,mysql,mongo,使用者也可以通過擴充套件選擇往其他地方輸送日誌。
###4、故障轉移 當正在執行任務的TaskTracker宕機之後,JobTracker會立馬將分配在宕機的TaskTracker的所有任務再分配給其他正常的TaskTracker節點執行。 ###5、節點監控 可以對JobTracker,TaskTracker節點進行資源監控,任務監控等,可以實時的在LTS-Admin管理後臺檢視,進而進行合理的資源調配。 ###6、多樣化任務執行結果支援 LTS框架提供四種執行結果支援,EXECUTE_SUCCESS
,EXECUTE_FAILED
,EXECUTE_LATER
,EXECUTE_EXCEPTION
,並對每種結果採取相應的處理機制,譬如重試。
- EXECUTE_SUCCESS: 執行成功,這種情況,直接反饋客戶端(如果任務被設定了要反饋給客戶端)。
- EXECUTE_FAILED:執行失敗,這種情況,直接反饋給客戶端,不進行重試。
- EXECUTE_LATER:稍後執行(需要重試),這種情況,不反饋客戶端,重試策略採用1min,2min,3min的策略,預設最大重試次數為10次,使用者可以通過引數設定修改這個重試次數。
- EXECUTE_EXCEPTION:執行異常, 這種情況也會重試(重試策略,同上)
###7、FailStore容錯 採用FailStore機制來進行節點容錯,Fail And Store,不會因為遠端通訊的不穩定性而影響當前應用的執行。具體FailStore說明,請參考概念說明中的FailStore說明。
##專案編譯打包 專案主要採用maven進行構建,目前提供shell指令碼的打包。 環境依賴:Java(jdk1.6+)
Maven
使用者使用一般分為兩種: ###1、Maven構建 可以通過maven命令將lts的jar包上傳到本地倉庫中。在父pom.xml中新增相應的repository,並用deploy命令上傳即可。具體引用方式可以參考lts中的例子即可。 ###2、直接Jar引用 需要將lts的各個模組打包成單獨的jar包,並且將所有lts依賴包引入。具體引用哪些jar包可以參考lts中的例子即可。
##JobTracker和LTS-Admin部署 提供(cmd)windows
和(shell)linux
兩種版本指令碼來進行編譯和部署:
-
執行根目錄下的
sh build.sh
或build.cmd
指令碼,會在dist
目錄下生成lts-{version}-bin
資料夾 -
下面是其目錄結構,其中bin目錄主要是JobTracker和LTS-Admin的啟動指令碼。
jobtracker
中是 JobTracker的配置檔案和需要使用到的jar包,lts-admin
是LTS-Admin相關的war包和配置檔案。 lts-{version}-bin的檔案結構
-- lts-${version}-bin
|-- bin
| |-- jobtracker.cmd
| |-- jobtracker.sh
| |-- lts-admin.cmd
| |-- lts-admin.sh
| |-- lts-monitor.cmd
| |-- lts-monitor.sh
| |-- tasktracker.sh
|-- conf
| |-- log4j.properties
| |-- lts-admin.cfg
| |-- lts-monitor.cfg
| |-- readme.txt
| |-- tasktracker.cfg
| |-- zoo
| |-- jobtracker.cfg
| |-- log4j.properties
| |-- lts-monitor.cfg
|-- lib
| |-- *.jar
|-- war
|-- jetty
| |-- lib
| |-- *.jar
|-- lts-admin.war
- JobTracker啟動。如果你想啟動一個節點,直接修改下
conf/zoo
下的配置檔案,然後執行sh jobtracker.sh zoo start
即可,如果你想啟動兩個JobTracker節點,那麼你需要拷貝一份zoo,譬如命名為zoo2
,修改下zoo2
下的配置檔案,然後執行sh jobtracker.sh zoo2 start
即可。logs資料夾下生成jobtracker-zoo.out
日誌。 - LTS-Admin啟動.修改
conf/lts-monitor.cfg
和conf/lts-admin.cfg
下的配置,然後執行bin
下的sh lts-admin.sh
或lts-admin.cmd
指令碼即可。logs資料夾下會生成lts-admin.out
日誌,啟動成功在日誌中會打印出訪問地址,使用者可以通過這個訪問地址訪問了。
##JobClient(部署)使用 需要引入lts的jar包有lts-jobclient-{version}.jar
,lts-core-{version}.jar
及其它第三方依賴jar。 ###API方式啟動
JobClient jobClient = new RetryJobClient();
jobClient.setNodeGroup("test_jobClient");
jobClient.setClusterName("test_cluster");
jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
jobClient.start();
// 提交任務
Job job = new Job();
job.setTaskId("3213213123");
job.setParam("shopId", "11111");
job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
// job.setCronExpression("0 0/1 * * * ?"); // 支援 cronExpression表示式
// job.setTriggerTime(new Date()); // 支援指定時間執行
Response response = jobClient.submitJob(job);
###Spring XML方式啟動
<bean id="jobClient" class="com.github.ltsopensource.spring.JobClientFactoryBean">
<property name="clusterName" value="test_cluster"/>
<property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
<property name="nodeGroup" value="test_jobClient"/>
<property name="masterChangeListeners">
<list>
<bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/>
</list>
</property>
<property name="jobFinishedHandler">
<bean class="com.github.ltsopensource.example.support.JobFinishedHandlerImpl"/>
</property>
<property name="configs">
<props>
<!-- 引數 -->
<prop key="job.fail.store">leveldb</prop>
</props>
</property>
</bean>
###Spring 全註解方式
@Configuration
public class LTSSpringConfig {
@Bean(name = "jobClient")
public JobClient getJobClient() throws Exception {
JobClientFactoryBean factoryBean = new JobClientFactoryBean();
factoryBean.setClusterName("test_cluster");
factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181");
factoryBean.setNodeGroup("test_jobClient");
factoryBean.setMasterChangeListeners(new MasterChangeListener[]{
new MasterChangeListenerImpl()
});
Properties configs = new Properties();
configs.setProperty("job.fail.store", "leveldb");
factoryBean.setConfigs(configs);
factoryBean.afterPropertiesSet();
return factoryBean.getObject();
}
}
##TaskTracker(部署使用) 需要引入lts的jar包有lts-tasktracker-{version}.jar
,lts-core-{version}.jar
及其它第三方依賴jar。 ###定義自己的任務執行類
public class MyJobRunner implements JobRunner {
@Override
public Result run(JobContext jobContext) throws Throwable {
try {
// TODO 業務邏輯
// 會發送到 LTS (JobTracker上)
jobContext.getBizLogger().info("測試,業務日誌啊啊啊啊啊");
} catch (Exception e) {
return new Result(Action.EXECUTE_FAILED, e.getMessage());
}
return new Result(Action.EXECUTE_SUCCESS, "執行成功了,哈哈");
}
}
###API方式啟動
TaskTracker taskTracker = new TaskTracker();
taskTracker.setJobRunnerClass(MyJobRunner.class);
taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
taskTracker.setNodeGroup("test_trade_TaskTracker");
taskTracker.setClusterName("test_cluster");
taskTracker.setWorkThreads(20);
taskTracker.start();
###Spring XML方式啟動
<bean id="taskTracker" class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init-method="start">
<property name="jobRunnerClass" value="com.github.ltsopensource.example.support.MyJobRunner"/>
<property name="bizLoggerLevel" value="INFO"/>
<property name="clusterName" value="test_cluster"/>
<property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
<property name="nodeGroup" value="test_trade_TaskTracker"/>
<property name="workThreads" value="20"/>
<property name="masterChangeListeners">
<list>
<bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/>
</list>
</property>
<property name="configs">
<props>
<prop key="job.fail.store">leveldb</prop>
</props>
</property>
</bean>
###Spring註解方式啟動
@Configuration
public class LTSSpringConfig implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Bean(name = "taskTracker")
public TaskTracker getTaskTracker() throws Exception {
TaskTrackerAnnotationFactoryBean factoryBean = new TaskTrackerAnnotationFactoryBean();
factoryBean.setApplicationContext(applicationContext);
factoryBean.setClusterName("test_cluster");
factoryBean.setJobRunnerClass(MyJobRunner.class);
factoryBean.setNodeGroup("test_trade_TaskTracker");
factoryBean.setBizLoggerLevel("INFO");
factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181");
factoryBean.setMasterChangeListeners(new MasterChangeListener[]{
new MasterChangeListenerImpl()
});
factoryBean.setWorkThreads(20);
Properties configs = new Properties();
configs.setProperty("job.fail.store", "leveldb");
factoryBean.setConfigs(configs);
factoryBean.afterPropertiesSet();
// factoryBean.start();
return factoryBean.getObject();
}
}
##引數說明 引數說明
##使用建議 一般在一個JVM中只需要一個JobClient例項即可,不要為每種任務都新建一個JobClient例項,這樣會大大的浪費資源,因為一個JobClient可以提交多種任務。相同的一個JVM一般也儘量保持只有一個TaskTracker例項即可,多了就可能造成資源浪費。當遇到一個TaskTracker要執行多種任務的時候,請參考下面的 "一個TaskTracker執行多種任務"。 ##一個TaskTracker執行多種任務 有的時候,業務場景需要執行多種任務,有些人會問,是不是要每種任務型別都要一個TaskTracker去執行。我的答案是否定的,如果在一個JVM中,最好使用一個TaskTracker去執行多種任務,因為一個JVM中使用多個TaskTracker例項比較浪費資源(當然當你某種任務量比較多的時候,可以將這個任務單獨使用一個TaskTracker節點來執行)。那麼怎麼才能實現一個TaskTracker執行多種任務呢。下面是我給出來的參考例子。
/**
* 總入口,在 taskTracker.setJobRunnerClass(JobRunnerDispatcher.class)
* JobClient 提交 任務時指定 Job 型別 job.setParam("type", "aType")
*/
public class JobRunnerDispatcher implements JobRunner {
private static final ConcurrentHashMap<String/*type*/, JobRunner>
JOB_RUNNER_MAP = new ConcurrentHashMap<String, JobRunner>();
static {
JOB_RUNNER_MAP.put("aType", new JobRunnerA()); // 也可以從Spring中拿
JOB_RUNNER_MAP.put("bType", new JobRunnerB());
}
@Override
public Result run(JobContext jobContext) throws Throwable {
Job job = jobContext.getJob();
String type = job.getParam("type");
return JOB_RUNNER_MAP.get(type).run(job);
}
}
class JobRunnerA implements JobRunner {
@Override
public Result run(JobContext jobContext) throws Throwable {
// TODO A型別Job的邏輯
return null;
}
}
class JobRunnerB implements JobRunner {
@Override
public Result run(JobContext jobContext) throws Throwable {
// TODO B型別Job的邏輯
return null;
}
}
##TaskTracker的JobRunner測試 一般在編寫TaskTracker的時候,只需要測試JobRunner的實現邏輯是否正確,又不想啟動LTS進行遠端測試。為了方便測試,LTS提供了JobRunner的快捷測試方法。自己的測試類整合com.github.ltsopensource.tasktracker.runner.JobRunnerTester
即可,並實現initContext
和newJobRunner
方法即可。如lts-examples中的例子:
public class TestJobRunnerTester extends JobRunnerTester {
public static void main(String[] args) throws Throwable {
// Mock Job 資料
Job job = new Job();
job.setTaskId("2313213");
JobContext jobContext = new JobContext();
jobContext.setJob(job);
JobExtInfo jobExtInfo = new JobExtInfo();
jobExtInfo.setRetry(false);
jobContext.setJobExtInfo(jobExtInfo);
// 執行測試
TestJobRunnerTester tester = new TestJobRunnerTester();
Result result = tester.run(jobContext);
System.out.println(JSON.toJSONString(result));
}
@Override
protected void initContext() {
// TODO 初始化Spring容器
}
@Override
protected JobRunner newJobRunner() {
return new TestJobRunner();
}
}
##Spring Quartz Cron任務無縫接入 對於Quartz的Cron任務只需要在Spring配置中增加一下程式碼就可以接入LTS平臺
<bean class="com.github.ltsopensource.spring.quartz.QuartzLTSProxyBean">
<property name="clusterName" value="test_cluster"/>
<property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
<property name="nodeGroup" value="quartz_test_group"/>
</bean>
##Spring Boot 支援
@SpringBootApplication
@EnableJobTracker // 啟動JobTracker
@EnableJobClient // 啟動JobClient
@EnableTaskTracker // 啟動TaskTracker
@EnableMonitor // 啟動Monitor
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
剩下的就只是在application.properties中新增相應的配置就行了, 具體見lts-example中的com.github.ltsopensource.examples.springboot
包下的例子
##多網絡卡選擇問題 當機器有內網兩個網絡卡的時候,有時候,使用者想讓LTS的流量走外網網絡卡,那麼需要在host中,把主機名稱的對映地址改為外網網絡卡地址即可,內網同理。
##關於節點標識問題 如果在節點啟動的時候設定節點標識,LTS會預設設定一個UUID為節點標識,可讀性會比較差,但是能保證每個節點的唯一性,如果使用者能自己保證節點標識的唯一性,可以通過 setIdentity
來設定,譬如如果每個節點都是部署在一臺機器(一個虛擬機器)上,那麼可以將identity設定為主機名稱
##SPI擴充套件說明 支援JobLogger,JobQueue等等的SPI擴充套件
##LTS-Admin使用jetty啟動(預設),不定期掛掉解決方案 見issue#389