TBSchedule原始碼學習筆記-啟動過程 轉自https://blog.csdn.net/weiythi/article/details/78742651
TBSchedule基本概念及原理
概念介紹
TBSchedule是一個支援分散式的排程框架,能讓一種批量任務或者不斷變化的任務,被動態的分配到多個主機的JVM中,不同的執行緒組中並行執行。基於ZooKeeper的純Java實現,由Alibaba開源。
程式碼實現
起步配置
/* (non-Javadoc)
* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
*/
@Override
public void afterPropertiesSet() throws Exception {
InputStream in = TaskCenter.class.getClassLoader().getResourceAsStream("schedule-conf.properties");
Properties p = new Properties();
p.load(in);
TBScheduleManagerFactory scheduleManagerFactory = new TBScheduleManagerFactory();
scheduleManagerFactory.setApplicationContext(applicationContext);
try {
scheduleManagerFactory.init(p);
} catch (Exception e) {
e.printStackTrace();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
TBScheduleManagerFactory(com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory)是個什麼?程式碼裡對他的描述是“排程伺服器構造器”,只關心他的啟動過程
public void init(Properties p) throws Exception {
//這裡initialThread是個什麼,作用和意義?
if(this.initialThread != null){
this.initialThread.stopThread();
}
//lock是一個鎖(ReentrantLock)使用預設的非公平鎖
this.lock.lock();
try{
this.scheduleDataManager = null;
this.scheduleStrategyManager = null;
ConsoleManager.setScheduleManagerFactory(this);
if(this.zkManager != null){
this.zkManager.close();
}
this.zkManager = new ZKManager(p);
this.errorMessage = "Zookeeper connecting ......" + this.zkManager.getConnectStr();
initialThread = new InitialThread(this);
initialThread.setName("TBScheduleManagerFactory-initialThread");
initialThread.start();
}finally{
this.lock.unlock();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
到這裡需要知道,initalThread是個什麼執行緒,該執行緒的作用?點開看一下run方法的實現
@Override
public void run() {
//factory就是這個排程伺服器
facotry.lock.lock();
try {
int count =0;
while(facotry.zkManager.checkZookeeperState() == false){
count = count + 1;
if(count % 50 == 0){
facotry.errorMessage = "Zookeeper connecting ......" + facotry.zkManager.getConnectStr() + " spendTime:" + count * 20 +"(ms)";
log.error(facotry.errorMessage);
}
Thread.sleep(20);//貌似每20ms 檢測一次zk的連線狀態
if(this.isStop ==true){
return;
}
}
//如果連線成功則初始化資料
facotry.initialData();
} catch (Throwable e) {
log.error(e.getMessage(),e);
}finally{
facotry.lock.unlock();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
這個initalThread內部會競爭factory的鎖,所以如果zk一直檢查失敗,鎖得不到釋放,有可能一直阻塞執行緒,導致啟動專案啟動問題?while出口的isStop是什麼?isStop莫非是一個取消狀態?代表停止這個執行緒,這個執行緒可能更多的做一個zk狀態的啟動前自檢。 多慮了,後邊他的主執行緒會釋放鎖…..
scheduleDataManager是什麼?具體作用是`com.taobao.pamirs.schedule.taskmanager.IScheduleDataManager
他是一個排程中心客戶端,啟動過程幾乎沒怎麼參與。
scheduleStrategyManager是什麼?類名`com.taobao.pamirs.schedule.zk.ScheduleStrategyDataManager4ZK
官方沒有給出他的命名,暫且取名為排程策略管理器,提供了一些策略查詢和更新的功能。
ConsoleManager.setScheduleManagerFactory(this);,貌似與管理控制檯有關? 實際使用中,控制檯和排程專案是不相關的,那麼這個ConsoleManager是什麼?
然後剩下的就是初始化一個zk的管理器,最後釋放鎖
問題是整合tbschedule只需這樣就行,剩下的都是一些zk配置操作。如果和zk相關應該包含兩個點: 1.在哪裡新增監視點的?即tbschedule如何響應 rootPath 配置的節點路徑下的相關變化的? 2.監視點的動作是怎樣的
目前為止值得看下去的就兩行,分別是com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory
類下的init(Properties p)方法。
public void init(Properties p) throws Exception {
//.......//
ConsoleManager.setScheduleManagerFactory(this);
//.......//
- 1
- 2
- 3
- 4
- 5
還有com.taobao.pamirs.schedule.strategy.InitialThread
類下的
public void run() {
//......//
facotry.initialData();
//......//
}
- 1
- 2
- 3
- 4
- 5
其中TBScheduleManagerFactory類的init方法裡的程式碼行 更像一個setter方法,正常編碼時會避免在setter中做賦值以外操作的。暫且不管 然後facotry.initialData(); 方法名告訴我該方法會初始化資料。
/**
* 在Zk狀態正常後回撥資料初始化
* @throws Exception
*/
public void initialData() throws Exception{
//初始化zk管理器
this.zkManager.initial();
//初始化排程中心客戶端
this.scheduleDataManager = new ScheduleDataManager4ZK(this.zkManager);
//初始化排程策略管理器
this.scheduleStrategyManager = new ScheduleStrategyDataManager4ZK(this.zkManager);
if (this.start == true) {
// 註冊排程管理器
this.scheduleStrategyManager.registerManagerFactory(this);
if(timer == null){
timer = new Timer("TBScheduleManagerFactory-Timer");
}
if(timerTask == null){
timerTask = new ManagerFactoryTimerTask(this);
timer.schedule(timerTask, 2000,this.timerInterval);
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
其中timer和timerTask定義如下:其中ManagerFactoryTimerTask繼承自java.util.TimerTask
,所以會定時執行任務。這裡是2秒後開始執行,之後每2秒執行一次。
private java.util.Timer timer;
private com.taobao.pamirs.schedule.strategy.ManagerFactoryTimerTask timerTask;
- 1
- 2
排程任務ManagerFactoryTimerTask 內部做了些啥?看樣子是做了一些ZK連線狀態監測的操作,不斷重試連不上就重連,連上了就重新整理資訊。等等……重新整理?刷的是啥????
public void run() {
try {
Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
if(this.factory.zkManager.checkZookeeperState() == false){
if(count > 5){
log.error("Zookeeper連線失敗,關閉所有的任務後,重新連線Zookeeper伺服器......");
this.factory.reStart();
}else{
count = count + 1;
}
}else{
count = 0;
//重點是這裡、
this.factory.refresh();
}
} catch (Throwable ex) {
log.error(ex.getMessage(), ex);
} finally {
//這個後續有什麼作用???
factory.timerTaskHeartBeatTS = System.currentTimeMillis();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
那麼initialData() 的執行流程就是這樣了?
1.初始化排程中心客戶端 2.初始化排程策略管理器 3.註冊排程管理器 4.啟動一個定時任務,這個定時任務會以2秒為一個週期對當前的zk狀態做檢查,如果zk連線失敗(重試次數5)會停止當前服務,否則重新整理排程伺服器
那麼問題來了,這個定時器裡所做的”重新整理”操作又在做什麼操作,哪些東西被刷了??
public void refresh() throws Exception {
this.lock.lock();
try {
// 判斷狀態是否終止
ManagerFactoryInfo stsInfo = null;
boolean isException = false;
try {
//貌似是從策略管理器中拿到策略資訊
stsInfo = this.getScheduleStrategyManager().loadManagerFactoryInfo(this.getUuid());
} catch (Exception e) {
isException = true;
logger.error("獲取伺服器資訊有誤:uuid="+this.getUuid(), e);
}
if (isException == true) {
try {
stopServer(null); // 停止所有的排程任務
this.getScheduleStrategyManager().unRregisterManagerFactory(this);
} finally {
reRegisterManagerFactory();
}
} else if (stsInfo.isStart() == false) {
stopServer(null); // 停止所有的排程任務
this.getScheduleStrategyManager().unRregisterManagerFactory(
this);
} else {
reRegisterManagerFactory();
}
} finally {
this.lock.unlock();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
這個方法裡有幾個疑點: 1.getUuid() 拿到的是個啥?貌似是這個排程中心的唯一ID?沒有在目前的程式碼邏輯看到對他的初始化?初始化在哪做的,對應官方的哪個名詞? 2.stsInfo = this.getScheduleStrategyManager().loadManagerFactoryInfo(this.getUuid()); 取到的資訊有哪些作用,資料來源是哪裡,源資料的格式是什麼?什麼情況下會丟擲異常?? 3.ManagerFactoryInfo物件的isStart()方法,宣告的是什麼開始狀態?? 4.除去失敗的情況需要終止排程任務,那麼這個正確的情況reRegisterManagerFactory(); 做了哪些事情??
問題1: 關於getUuid(),uuid哪裡初始化來的? 一般有getter就有setter ,通過setUuid(String uuid) 方法,找到注入值的位置,發現在com.taobao.pamirs.schedule.zk.ScheduleStrategyDataManager4ZK類的public List registerManagerFactory(TBScheduleManagerFactory managerFactory) throws Exception方法中有呼叫。 巧了正好這個registerManagerFactory(factory) 反覆看見好幾遍了,他到底是個啥,乾點啥好呢,現在有了問題5
5.ScheduleStrategyDataManager4ZK 所謂排程策略管理器,起到了啥作用,registerManagerFactory方法在幹些什麼??
其他操作都省略,就看uuid他的生成策略,到底是個啥,能標識個什麼的唯一性??
/**
* 註冊ManagerFactory
* @param managerFactory
* @return 需要全部登出的排程,例如當IP不在列表中
* @throws Exception
*/
public List<String> registerManagerFactory(TBScheduleManagerFactory managerFactory) throws Exception{
//第一波進來肯定無值啊
if(managerFactory.getUuid() == null){
String uuid = managerFactory.getIp() +"$" + managerFactory.getHostName() +"$"+ UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
//...省略...//
managerFactory.setUuid(zkPath.substring(zkPath.lastIndexOf("/") + 1));
}
//...省略...//
return result;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
生成策略是當不存在uuid時才會生成uuid,這個uuid很容易發現是跟當前機器的ip還有hostname有關的,為了避免重複還追加了一個uuid,我想這裡可能是考慮到了一個機器下有多個專案整合tbschedule的情況吧 所以這裡uuid代表了
當前應用(注意是應用)在整個排程伺服器下的唯一ID,當前這個uuid在console(管理頁面)上也有所體現。
問題2:
知道了uuid是個什麼,那麼這個程式碼stsInfo = this.getScheduleStrategyManager().loadManagerFactoryInfo(this.getUuid());
字面意思可能就好理解了一些,貌似是拿應用的uuid去排程策略管理器中拿自己的排程策略?
public ManagerFactoryInfo loadManagerFactoryInfo(String uuid) throws Exception {
//這裡會拼裝一個zk的路徑,帶有uuid哦
String zkPath = this.PATH_ManagerFactory + "/" + uuid;
//判斷是否存在這個節點,沒有相關節點就丟擲異常?
if(this.getZooKeeper().exists(zkPath, false)==null){
throw new Exception("工作管理員不存在:" + uuid);
}
//拿到節點的儲存值
byte[] value = this.getZooKeeper().getData(zkPath,false,null);
ManagerFactoryInfo result = new ManagerFactoryInfo();
result.setUuid(uuid);
//如果值為null代表已啟動的??
if(value== null){
result.setStart(true);
}else{
//根據節點值來判斷是否start
result.setStart(Boolean.parseBoolean(new String(value)));
}
return result;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
那麼可見會已uuid拼裝一個zk路徑,去判斷對應節點是否存在,如果不存在就丟擲異常!如果存在就去判斷節點值(true/false),標識了當前節點的啟動狀態。那麼問題又來了。
6.應用在排程策略管理器下的節點是什麼時候變更狀態的?
問題3: 貌似和問題6有關?
問題4: reRegisterManagerFactory()
public void reRegisterManagerFactory() throws Exception{
//重新分配排程器
List<String> stopList = this.getScheduleStrategyManager().registerManagerFactory(this);
for (String strategyName : stopList) {
this.stopServer(strategyName);
}
//根據策略重新分配排程任務的機器
this.assignScheduleServer();
//
this.reRunScheduleServer();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
重新註冊了一次排程策略管理器,what這個方法有返回值??之前為什麼不對這個返回值做處理呢????!所以這個registerManagerFactory(this) 方法變得愈發的混亂了,有必要開啟詳細看看了。
/**
* 註冊ManagerFactory
* @param managerFactory
* @return 需要全部登出的排程,例如當IP不在列表中
* @throws Exception
*/
public List<String> registerManagerFactory(TBScheduleManagerFactory managerFactory) throws Exception{
if(managerFactory.getUuid() == null){
String uuid = managerFactory.getIp() +"$" + managerFactory.getHostName() +"$"+ UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
//拼裝了一個zk節點的路徑,等等這個路徑的拼裝格式貌似見過,所以loadManagerFactoryInfo方法中資料節點是在這一步時候建立的了?
String zkPath = this.PATH_ManagerFactory + "/" + uuid +"$";
zkPath = this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.EPHEMERAL_SEQUENTIAL);
managerFactory.setUuid(zkPath.substring(zkPath.lastIndexOf("/") + 1));
}else{
String zkPath = this.PATH_ManagerFactory + "/" + managerFactory.getUuid();
if(this.getZooKeeper().exists(zkPath, false)==null){
zkPath = this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.EPHEMERAL);
}
}
// 方法的返回值是這個list
List<String> result = new ArrayList<String>();
//貌似會載入所有策略,這個loadAllScheduleStrategy() 方法,會把 this.zkManager.getRootPath() + "/strategy" 這個節點下的所有子節點全都返回
//然後將每個節點的內容取出,再呼叫這個方法ScheduleStrategy result = (ScheduleStrategy)this.gson.fromJson(valueString, ScheduleStrategy.class); 可以看作對json反序列化操作。
//所以控制檯操作建立的策略,是儲存在this.zkManager.getRootPath() + "/strategy" 節點的子節點上的
for(ScheduleStrategy scheduleStrategy:loadAllScheduleStrategy()){
boolean isFind = false;
//暫停或者不在IP範圍
//猜測是要判斷策略在當前機器上是否可用,如果不可用則加入列表返回,如果可用則在對應的策略節點下建立一個臨時節點
if(ScheduleStrategy.STS_PAUSE.equalsIgnoreCase(scheduleStrategy.getSts()) == false && scheduleStrategy.getIPList() != null){
for(String ip:scheduleStrategy.getIPList()){
//這裡就是為什麼控制太介面說“127.0.0.1或者localhost會在所有機器上執行” 的原因了
if(ip.equals("127.0.0.1") || ip.equalsIgnoreCase("localhost") || ip.equals(managerFactory.getIp())|| ip.equalsIgnoreCase(managerFactory.getHostName())){
//新增可管理TaskType
//建立一個臨時節點,這個臨時節點與之前的排程伺服器中生成的uuid有關。
//這一步建立的臨時節點如下文。
String zkPath = this.PATH_Strategy+"/"+ scheduleStrategy.getStrategyName()+ "/"+ managerFactory.getUuid();
if(this.getZooKeeper().exists(zkPath, false)==null){
zkPath = this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.EPHEMERAL);
}
isFind = true;
break;
}
}
}
if(isFind == false){//清除原來註冊的Factory
String zkPath = this.PATH_Strategy+"/"+ scheduleStrategy.getStrategyName()+ "/"+ managerFactory.getUuid();
if(this.getZooKeeper().exists(zkPath, false)!=null){
ZKTools.deleteTree(this.getZooKeeper(), zkPath);
result.add(scheduleStrategy.getStrategyName());
}
}
}
return result;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
針對我的環境,此處rootPath配置為”/dsp_official_0928_wd/schedule”,其zk的策略節點為,可見其與通過控制檯錄入的策略是對應的
[zk: localhost:2181(CONNECTED) 7] ls /dsp_official_0928_wd/schedule/strategy [commonSyncAdvertiserTask, commonSyncCreativeTask]
按照loadAllScheduleStrategy() 方法的邏輯來說,會把這兩個節點上的內容都拿出來,反序列化成com.taobao.pamirs.schedule.strategy.ScheduleStrategy 物件 以/dsp_official_0928_wd/schedule/strategy/commonSyncAdvertiserTask 節點內容為例
{ “strategyName”: “commonSyncAdvertiserTask”, “IPList”: [ “127.0.0.1” ], “numOfSingleServer”: 0, “assignNum”: 2, “kind”: “Schedule”, “taskName”: “commonSyncAdvertiserTask”, “taskParameter”: “”, “sts”: “resume” }
某策略可用的應用節點:
[zk: localhost:2181(CONNECTED) 8] ls /dsp_official_0928_wd/schedule/strategy/commonSyncAdvertiserTask [127.0.0.1admadc01admadc010000000081]
通常情況下,策略應用節點的內容如下,但在registerManagerFactory此時還沒有初始化該節點的值,預設為null:
{ “strategyName”: “commonSyncAdvertiserTask”, “uuid”: “127.0.0.1admadc01admadc01FC8BBDAC658C4B00B0F47377E50A570D$0000000080”, “requestNum”: 0, “currentNum”: 0, “message”: “” }
先不管如上節點內容的意義,其實理解上也簡單。
問題5: 所以registerManagerFactory() 方法就是在當前機器支援的策略配置上註冊當前機器節點用於同步,並返回當前機器上所有不支援的策略
那麼回到 reRegisterManagerFactory() 方法上來,
public void reRegisterManagerFactory() throws Exception{
//重新分配排程器
//因為當前應用不支援這些策略,所以要在當前機器上停止這些策略的服務。
List<String> stopList = this.getScheduleStrategyManager().registerManagerFactory(this);
for (String strategyName : stopList) {
this.stopServer(strategyName);
}
//根據策略重新分配排程任務的機器
this.assignScheduleServer();
this.reRunScheduleServer();
}
```
這裡迫切的需要知道以下兩行的含義。
```java
this.assignScheduleServer();
this.reRunScheduleServer();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
assignScheduleServer() 方法的官方解釋是 根據策略重新分配排程任務的機器 那麼分配策略是什麼呢? 1.載入所有本應用能夠執行的策略(注意上文說道的策略應用節點的資料格式,返回預設格式)。 2.遍歷1返回策略列表,按策略taskType獲取到所有的可執行策略應用項列表 3.遍歷跟當前factory例項相關的strategy,選舉出每個strategy的leader factory例項,由leader重新計算每個factory例項能夠分到的reqNum,即根據strategy身上的assignNum“numOfSingleServer,將assignNum平分給每個factory例項。 4.根據2的返回結果(總的伺服器數量),和3的策略配置資訊,為2返回的策略服務項分配值(修改zk策略應用節點的requestNum)
reRunScheduleServer() 方法沒有給出官方的解釋,這裡暫且認為其是為了重新執行排程服務,怎麼做的????
public void reRunScheduleServer() throws Exception{
//拿到所有本機可執行的策略
for (ScheduleStrategyRunntime run : this.scheduleStrategyManager.loadAllScheduleStrategyRunntimeByUUID(this.uuid)) {
//在快取中取
List<IStrategyTask> list = this.managerMap.get(run.getStrategyName());
if(list == null){
list = new ArrayList<IStrategyTask>();
this.managerMap.put(run.getStrategyName(),list);
}
//如果已經有在執行的任務組,且當前任務組數大於已分配的最大任務組數量,那麼就停止最後新增的任務組
while(list.size() > run.getRequestNum() && list.size() >0){
//what??先從列表裡刪了????如果停止失敗呢????????
IStrategyTask task = list.remove(list.size() - 1);
try {
task.stop(run.getStrategyName());
} catch (Throwable e) {
logger.error("登出任務錯誤:strategyName=" + run.getStrategyName(), e);
}
}
//不足,增加排程器
ScheduleStrategy strategy = this.scheduleStrategyManager.loadStrategy(run.getStrategyName());
//注意這個迴圈,建立執行緒組直到執行的執行緒組滿了
while(list.size() < run.getRequestNum()){
//厲害了這裡做了什麼????
IStrategyTask result = this.createStrategyTask(strategy);
if(null==result){
logger.error("strategy 對應的配置有問題。strategy name="+strategy.getStrategyName());
}
list.add(result);
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
看起來該方法會遍歷所有當前應用支援的策略,並針對策略建立一個叫做IStrategyTask的東西,並維護在一個map中,這個this.createStrategyTask(strategy); 好吧,官方有說明“建立排程伺服器”
/**
* 建立排程伺服器
* @param baseTaskType
* @param ownSign
* @return
* @throws Exception
*/
public IStrategyTask createStrategyTask(ScheduleStrategy strategy)
throws Exception {
IStrategyTask result = null;
try{
if(ScheduleStrategy.Kind.Schedule == strategy.getKind()){
//就是這裡。。。。。。。。。
String baseTaskType = ScheduleUtil.splitBaseTaskTypeFromTaskType(strategy.getTaskName());
String ownSign =ScheduleUtil.splitOwnsignFromTaskType(strategy.getTaskName());
result = new TBScheduleManagerStatic(this,baseTaskType,ownSign,scheduleDataManager);
}else if(ScheduleStrategy.Kind.Java == strategy.getKind()){
result=(IStrategyTask)Class.forName(strategy.getTaskName()).newInstance();
result.initialTaskParameter(strategy.getStrategyName(),strategy.getTaskParameter());
}else if(ScheduleStrategy.Kind.Bean == strategy.getKind()){
result=(IStrategyTask)this.getBean(strategy.getTaskName());
result.initialTaskParameter(strategy.getStrategyName(),strategy.getTaskParameter());
}
}catch(Exception e ){
logger.error("strategy 獲取對應的java or bean 出錯,schedule並沒有載入該任務,請確認" +strategy.getStrategyName(),e);
}
return result;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
根據上文和日常配置,也知道來自於會走第一個條件分支。最後通過一句result = new TBScheduleManagerStatic(this,baseTaskType,ownSign,scheduleDataManager);
返回一個com.taobao.pamirs.schedule.strategy.IStrategyTask
介面的實現。
繼續看com.taobao.pamirs.schedule.taskmanager.TBScheduleManagerStatic
類。
public TBScheduleManagerStatic(TBScheduleManagerFactory aFactory,
String baseTaskType, String ownSign,IScheduleDataManager aScheduleCenter) throws Exception {
super(aFactory, baseTaskType, ownSign, aScheduleCenter);
}
- 1
- 2
- 3
- 4
- 5
呼叫了父類com.taobao.pamirs.schedule.taskmanager.TBScheduleManager
的建構函式,原始碼裡面少有的對這個類寫了一大堆文件註釋,先拿出來學習下。
/**
* 1、任務排程分配器的目標: 讓所有的任務不重複,不遺漏的被快速處理。
* 2、一個Manager只管理一種任務型別的一組工作執行緒。
* 3、在一個JVM裡面可能存在多個處理相同任務型別的Manager,也可能存在處理不同任務型別的Manager。
* 4、在不同的JVM裡面可以存在處理相同任務的Manager
* 5、排程的Manager可以動態的隨意增加和停止
*
* 主要的職責:
* 1、定時向集中的資料配置中心更新當前排程伺服器的心跳狀態
* 2、向資料配置中心獲取所有伺服器的狀態來重新計算任務的分配。這麼做的目標是避免集中任務排程中心的單點問題。
* 3、在每個批次資料處理完畢後,檢查是否有其它處理伺服器申請自己把持的任務佇列,如果有,則釋放給相關處理伺服器。
*
* 其它:
* 如果當前伺服器在處理當前任務的時候超時,需要清除當前佇列,並釋放已經把持的任務。並向控制主動中心報警。
*
* @author xuannan
*
*/
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
第2點:一個Manager只管理一種任務型別的一組工作執行緒。
通過分析啟動過程已經能夠確認。發現針對每一個策略和執行緒組都建立了一個com.taobao.pamirs.schedule.strategy.IStrategyTask
例項。
第3點:在一個JVM裡面可能存在多個處理相同任務型別的Manager,也可能存在處理不同任務型別的Manager。
如果啟動過程初始化多個com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory
物件時可能會有可能有”在一個JVM裡面可能存在多個處理相同任務型別的Manager”這種情況,還有會有requsetNum個相同任務型別的Manager在同一個JVM
第4點:在不同的JVM裡面可以存在處理相同任務的Manager 這種情況是肯定存在的,取決於策略的”IP地址(逗號分隔)”配置
第5點:排程的Manager可以動態的隨意增加和停止 這個要自己去驗證了。
疑問: 1. List list = this.managerMap.get(run.getStrategyName()); 到底起到什麼作用?是為了統計當前JVM已經執行的執行緒組數量麼??? 目前tbschedule啟動過程到這就結束了,2秒一次的zk健康檢查(心跳),並重新整理任務組(及時響應控制檯設定),貌似到目前為止,為什麼這裡並沒有使用監視點來響應控制檯的設定,而是自己主動查詢,出發點是什麼???