1. 程式人生 > >TBSchedule原始碼學習筆記-啟動過程 轉自https://blog.csdn.net/weiythi/article/details/78742651

TBSchedule原始碼學習筆記-啟動過程 轉自https://blog.csdn.net/weiythi/article/details/78742651

TBSchedule基本概念及原理

概念介紹

TBSchedule是一個支援分散式的排程框架,能讓一種批量任務或者不斷變化的任務,被動態的分配到多個主機的JVM中,不同的執行緒組中並行執行。基於ZooKeeper的純Java實現,由Alibaba開源。

程式碼實現

起步配置

/* (non-Javadoc)
     * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
     */
    @Override
    public void afterPropertiesSet() throws
Exception { InputStream in = TaskCenter.class.getClassLoader().getResourceAsStream("schedule-conf.properties"); Properties p = new Properties(); p.load(in); TBScheduleManagerFactory scheduleManagerFactory = new TBScheduleManagerFactory(); scheduleManagerFactory.setApplicationContext(applicationContext); try
{ scheduleManagerFactory.init(p); } catch (Exception e) { e.printStackTrace(); } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

TBScheduleManagerFactory(com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory)是個什麼?程式碼裡對他的描述是“排程伺服器構造器”,只關心他的啟動過程

public void init(Properties p) throws
Exception { //這裡initialThread是個什麼,作用和意義? if(this.initialThread != null){ this.initialThread.stopThread(); } //lock是一個鎖(ReentrantLock)使用預設的非公平鎖 this.lock.lock(); try{ this.scheduleDataManager = null; this.scheduleStrategyManager = null; ConsoleManager.setScheduleManagerFactory(this); if(this.zkManager != null){ this.zkManager.close(); } this.zkManager = new ZKManager(p); this.errorMessage = "Zookeeper connecting ......" + this.zkManager.getConnectStr(); initialThread = new InitialThread(this); initialThread.setName("TBScheduleManagerFactory-initialThread"); initialThread.start(); }finally{ this.lock.unlock(); }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

到這裡需要知道,initalThread是個什麼執行緒,該執行緒的作用?點開看一下run方法的實現

@Override
    public void run() {
        //factory就是這個排程伺服器
        facotry.lock.lock();
        try {
            int count =0;
            while(facotry.zkManager.checkZookeeperState() == false){
                count = count + 1;
                if(count % 50 == 0){
                    facotry.errorMessage = "Zookeeper connecting ......" + facotry.zkManager.getConnectStr() + " spendTime:" + count * 20 +"(ms)";
                    log.error(facotry.errorMessage);
                }
                Thread.sleep(20);//貌似每20ms 檢測一次zk的連線狀態
                if(this.isStop ==true){
                    return;
                }
            }
            //如果連線成功則初始化資料
            facotry.initialData();
        } catch (Throwable e) {
             log.error(e.getMessage(),e);
        }finally{
            facotry.lock.unlock();
        }

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

這個initalThread內部會競爭factory的鎖,所以如果zk一直檢查失敗,鎖得不到釋放,有可能一直阻塞執行緒,導致啟動專案啟動問題?while出口的isStop是什麼?isStop莫非是一個取消狀態?代表停止這個執行緒,這個執行緒可能更多的做一個zk狀態的啟動前自檢。 多慮了,後邊他的主執行緒會釋放鎖…..

scheduleDataManager是什麼?具體作用是`com.taobao.pamirs.schedule.taskmanager.IScheduleDataManager 他是一個排程中心客戶端,啟動過程幾乎沒怎麼參與。

scheduleStrategyManager是什麼?類名`com.taobao.pamirs.schedule.zk.ScheduleStrategyDataManager4ZK 官方沒有給出他的命名,暫且取名為排程策略管理器,提供了一些策略查詢和更新的功能。

ConsoleManager.setScheduleManagerFactory(this);,貌似與管理控制檯有關? 實際使用中,控制檯和排程專案是不相關的,那麼這個ConsoleManager是什麼?

然後剩下的就是初始化一個zk的管理器,最後釋放鎖

問題是整合tbschedule只需這樣就行,剩下的都是一些zk配置操作。如果和zk相關應該包含兩個點: 1.在哪裡新增監視點的?即tbschedule如何響應 rootPath 配置的節點路徑下的相關變化的? 2.監視點的動作是怎樣的

目前為止值得看下去的就兩行,分別是com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory類下的init(Properties p)方法。


public void init(Properties p) throws Exception {
       //.......//
            ConsoleManager.setScheduleManagerFactory(this);
        //.......//   
  • 1
  • 2
  • 3
  • 4
  • 5

還有com.taobao.pamirs.schedule.strategy.InitialThread 類下的

public void run() {
        //......//
            facotry.initialData();
         //......//
    }
  • 1
  • 2
  • 3
  • 4
  • 5

其中TBScheduleManagerFactory類的init方法裡的程式碼行 更像一個setter方法,正常編碼時會避免在setter中做賦值以外操作的。暫且不管 然後facotry.initialData(); 方法名告訴我該方法會初始化資料。

/**
     * 在Zk狀態正常後回撥資料初始化
     * @throws Exception
     */
    public void initialData() throws Exception{
            //初始化zk管理器
            this.zkManager.initial();
            //初始化排程中心客戶端
            this.scheduleDataManager = new ScheduleDataManager4ZK(this.zkManager);
            //初始化排程策略管理器
            this.scheduleStrategyManager  = new ScheduleStrategyDataManager4ZK(this.zkManager);
            if (this.start == true) {
                // 註冊排程管理器
                this.scheduleStrategyManager.registerManagerFactory(this);
                if(timer == null){
                    timer = new Timer("TBScheduleManagerFactory-Timer");
                }
                if(timerTask == null){
                    timerTask = new ManagerFactoryTimerTask(this);
                    timer.schedule(timerTask, 2000,this.timerInterval);
                }
            }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

其中timer和timerTask定義如下:其中ManagerFactoryTimerTask繼承自java.util.TimerTask,所以會定時執行任務。這裡是2秒後開始執行,之後每2秒執行一次。

private java.util.Timer timer;
private com.taobao.pamirs.schedule.strategy.ManagerFactoryTimerTask timerTask;
  • 1
  • 2

排程任務ManagerFactoryTimerTask 內部做了些啥?看樣子是做了一些ZK連線狀態監測的操作,不斷重試連不上就重連,連上了就重新整理資訊。等等……重新整理?刷的是啥????

public void run() {
        try {

            Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
            if(this.factory.zkManager.checkZookeeperState() == false){
                if(count > 5){
                   log.error("Zookeeper連線失敗,關閉所有的任務後,重新連線Zookeeper伺服器......");
                   this.factory.reStart();

                }else{
                   count = count + 1;
                }
            }else{
                count = 0;
                //重點是這裡、
                this.factory.refresh();
            }
        }  catch (Throwable ex) {
            log.error(ex.getMessage(), ex);
        } finally {
            //這個後續有什麼作用???
            factory.timerTaskHeartBeatTS = System.currentTimeMillis();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

那麼initialData() 的執行流程就是這樣了?

1.初始化排程中心客戶端 2.初始化排程策略管理器 3.註冊排程管理器 4.啟動一個定時任務,這個定時任務會以2秒為一個週期對當前的zk狀態做檢查,如果zk連線失敗(重試次數5)會停止當前服務,否則重新整理排程伺服器

那麼問題來了,這個定時器裡所做的”重新整理”操作又在做什麼操作,哪些東西被刷了??

    public void refresh() throws Exception {
        this.lock.lock();
        try {
            // 判斷狀態是否終止
            ManagerFactoryInfo stsInfo = null;
            boolean isException = false;
            try {
                //貌似是從策略管理器中拿到策略資訊
                stsInfo = this.getScheduleStrategyManager().loadManagerFactoryInfo(this.getUuid());
            } catch (Exception e) {
                isException = true;
                logger.error("獲取伺服器資訊有誤:uuid="+this.getUuid(), e);
            }
            if (isException == true) {
                try {
                    stopServer(null); // 停止所有的排程任務
                    this.getScheduleStrategyManager().unRregisterManagerFactory(this);
                } finally {
                    reRegisterManagerFactory();
                }
            } else if (stsInfo.isStart() == false) {
                stopServer(null); // 停止所有的排程任務
                this.getScheduleStrategyManager().unRregisterManagerFactory(
                        this);
            } else {
                reRegisterManagerFactory();
            }
        } finally {
            this.lock.unlock();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

這個方法裡有幾個疑點: 1.getUuid() 拿到的是個啥?貌似是這個排程中心的唯一ID?沒有在目前的程式碼邏輯看到對他的初始化?初始化在哪做的,對應官方的哪個名詞? 2.stsInfo = this.getScheduleStrategyManager().loadManagerFactoryInfo(this.getUuid()); 取到的資訊有哪些作用,資料來源是哪裡,源資料的格式是什麼?什麼情況下會丟擲異常?? 3.ManagerFactoryInfo物件的isStart()方法,宣告的是什麼開始狀態?? 4.除去失敗的情況需要終止排程任務,那麼這個正確的情況reRegisterManagerFactory(); 做了哪些事情??

問題1: 關於getUuid(),uuid哪裡初始化來的? 一般有getter就有setter ,通過setUuid(String uuid) 方法,找到注入值的位置,發現在com.taobao.pamirs.schedule.zk.ScheduleStrategyDataManager4ZK類的public List registerManagerFactory(TBScheduleManagerFactory managerFactory) throws Exception方法中有呼叫。 巧了正好這個registerManagerFactory(factory) 反覆看見好幾遍了,他到底是個啥,乾點啥好呢,現在有了問題5

5.ScheduleStrategyDataManager4ZK 所謂排程策略管理器,起到了啥作用,registerManagerFactory方法在幹些什麼??

其他操作都省略,就看uuid他的生成策略,到底是個啥,能標識個什麼的唯一性??

/**
     * 註冊ManagerFactory
     * @param managerFactory
     * @return 需要全部登出的排程,例如當IP不在列表中
     * @throws Exception
     */
    public List<String> registerManagerFactory(TBScheduleManagerFactory managerFactory) throws Exception{
        //第一波進來肯定無值啊
        if(managerFactory.getUuid() == null){
            String uuid = managerFactory.getIp() +"$" + managerFactory.getHostName() +"$"+ UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
            //...省略...//
            managerFactory.setUuid(zkPath.substring(zkPath.lastIndexOf("/") + 1));
        }
        //...省略...//
        return result;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

生成策略是當不存在uuid時才會生成uuid,這個uuid很容易發現是跟當前機器的ip還有hostname有關的,為了避免重複還追加了一個uuid,我想這裡可能是考慮到了一個機器下有多個專案整合tbschedule的情況吧 所以這裡uuid代表了

當前應用(注意是應用)在整個排程伺服器下的唯一ID,當前這個uuid在console(管理頁面)上也有所體現。

問題2: 知道了uuid是個什麼,那麼這個程式碼stsInfo = this.getScheduleStrategyManager().loadManagerFactoryInfo(this.getUuid()); 字面意思可能就好理解了一些,貌似是拿應用的uuid去排程策略管理器中拿自己的排程策略?

public ManagerFactoryInfo loadManagerFactoryInfo(String uuid) throws Exception {
        //這裡會拼裝一個zk的路徑,帶有uuid哦
        String zkPath = this.PATH_ManagerFactory + "/" + uuid;
        //判斷是否存在這個節點,沒有相關節點就丟擲異常?
        if(this.getZooKeeper().exists(zkPath, false)==null){
            throw new Exception("工作管理員不存在:" + uuid);
        }
        //拿到節點的儲存值
        byte[] value = this.getZooKeeper().getData(zkPath,false,null);
        ManagerFactoryInfo result = new ManagerFactoryInfo();
        result.setUuid(uuid);
        //如果值為null代表已啟動的??
        if(value== null){
            result.setStart(true);
        }else{
            //根據節點值來判斷是否start
            result.setStart(Boolean.parseBoolean(new String(value)));
        }
        return result;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

那麼可見會已uuid拼裝一個zk路徑,去判斷對應節點是否存在,如果不存在就丟擲異常!如果存在就去判斷節點值(true/false),標識了當前節點的啟動狀態。那麼問題又來了。

6.應用在排程策略管理器下的節點是什麼時候變更狀態的?

問題3: 貌似和問題6有關?

問題4: reRegisterManagerFactory()

public void reRegisterManagerFactory() throws Exception{
        //重新分配排程器
        List<String> stopList = this.getScheduleStrategyManager().registerManagerFactory(this);
        for (String strategyName : stopList) {
            this.stopServer(strategyName);
        }
        //根據策略重新分配排程任務的機器
        this.assignScheduleServer();
        //
        this.reRunScheduleServer();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

重新註冊了一次排程策略管理器,what這個方法有返回值??之前為什麼不對這個返回值做處理呢????!所以這個registerManagerFactory(this) 方法變得愈發的混亂了,有必要開啟詳細看看了。

/**
     * 註冊ManagerFactory
     * @param managerFactory
     * @return 需要全部登出的排程,例如當IP不在列表中
     * @throws Exception
     */
    public List<String> registerManagerFactory(TBScheduleManagerFactory managerFactory) throws Exception{

        if(managerFactory.getUuid() == null){
            String uuid = managerFactory.getIp() +"$" + managerFactory.getHostName() +"$"+ UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
            //拼裝了一個zk節點的路徑,等等這個路徑的拼裝格式貌似見過,所以loadManagerFactoryInfo方法中資料節點是在這一步時候建立的了?
            String zkPath = this.PATH_ManagerFactory + "/" + uuid +"$";
            zkPath = this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.EPHEMERAL_SEQUENTIAL);
            managerFactory.setUuid(zkPath.substring(zkPath.lastIndexOf("/") + 1));
        }else{
            String zkPath = this.PATH_ManagerFactory + "/" + managerFactory.getUuid();
            if(this.getZooKeeper().exists(zkPath, false)==null){
                zkPath = this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.EPHEMERAL);           
            }
        }
        // 方法的返回值是這個list
        List<String> result = new ArrayList<String>();
        //貌似會載入所有策略,這個loadAllScheduleStrategy() 方法,會把 this.zkManager.getRootPath() +  "/strategy" 這個節點下的所有子節點全都返回
        //然後將每個節點的內容取出,再呼叫這個方法ScheduleStrategy result = (ScheduleStrategy)this.gson.fromJson(valueString, ScheduleStrategy.class); 可以看作對json反序列化操作。
        //所以控制檯操作建立的策略,是儲存在this.zkManager.getRootPath() +  "/strategy" 節點的子節點上的
        for(ScheduleStrategy scheduleStrategy:loadAllScheduleStrategy()){
            boolean isFind = false;
            //暫停或者不在IP範圍
            //猜測是要判斷策略在當前機器上是否可用,如果不可用則加入列表返回,如果可用則在對應的策略節點下建立一個臨時節點
            if(ScheduleStrategy.STS_PAUSE.equalsIgnoreCase(scheduleStrategy.getSts()) == false &&  scheduleStrategy.getIPList() != null){
                for(String ip:scheduleStrategy.getIPList()){
                    //這裡就是為什麼控制太介面說“127.0.0.1或者localhost會在所有機器上執行” 的原因了
                    if(ip.equals("127.0.0.1") || ip.equalsIgnoreCase("localhost") || ip.equals(managerFactory.getIp())|| ip.equalsIgnoreCase(managerFactory.getHostName())){
                        //新增可管理TaskType
                        //建立一個臨時節點,這個臨時節點與之前的排程伺服器中生成的uuid有關。
                        //這一步建立的臨時節點如下文。
                        String zkPath = this.PATH_Strategy+"/"+ scheduleStrategy.getStrategyName()+ "/"+ managerFactory.getUuid();
                        if(this.getZooKeeper().exists(zkPath, false)==null){
                            zkPath = this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.EPHEMERAL);           
                        }
                        isFind = true;
                        break;
                    }
                }
            }
            if(isFind == false){//清除原來註冊的Factory
                String zkPath = this.PATH_Strategy+"/"+ scheduleStrategy.getStrategyName()+ "/"+ managerFactory.getUuid();
                if(this.getZooKeeper().exists(zkPath, false)!=null){
                    ZKTools.deleteTree(this.getZooKeeper(), zkPath);
                    result.add(scheduleStrategy.getStrategyName());
                }
            }
        }
        return result;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

針對我的環境,此處rootPath配置為”/dsp_official_0928_wd/schedule”,其zk的策略節點為,可見其與通過控制檯錄入的策略是對應的

[zk: localhost:2181(CONNECTED) 7] ls /dsp_official_0928_wd/schedule/strategy [commonSyncAdvertiserTask, commonSyncCreativeTask]

按照loadAllScheduleStrategy() 方法的邏輯來說,會把這兩個節點上的內容都拿出來,反序列化成com.taobao.pamirs.schedule.strategy.ScheduleStrategy 物件 以/dsp_official_0928_wd/schedule/strategy/commonSyncAdvertiserTask 節點內容為例

{ “strategyName”: “commonSyncAdvertiserTask”, “IPList”: [ “127.0.0.1” ], “numOfSingleServer”: 0, “assignNum”: 2, “kind”: “Schedule”, “taskName”: “commonSyncAdvertiserTask”, “taskParameter”: “”, “sts”: “resume” }

某策略可用的應用節點:

[zk: localhost:2181(CONNECTED) 8] ls /dsp_official_0928_wd/schedule/strategy/commonSyncAdvertiserTask [127.0.0.1admadc01admadc010000000081]

通常情況下,策略應用節點的內容如下,但在registerManagerFactory此時還沒有初始化該節點的值,預設為null:

{ “strategyName”: “commonSyncAdvertiserTask”, “uuid”: “127.0.0.1admadc01admadc01FC8BBDAC658C4B00B0F47377E50A570D$0000000080”, “requestNum”: 0, “currentNum”: 0, “message”: “” }

先不管如上節點內容的意義,其實理解上也簡單。

問題5: 所以registerManagerFactory() 方法就是在當前機器支援的策略配置上註冊當前機器節點用於同步,並返回當前機器上所有不支援的策略

那麼回到 reRegisterManagerFactory() 方法上來,

  public void reRegisterManagerFactory() throws Exception{
        //重新分配排程器
        //因為當前應用不支援這些策略,所以要在當前機器上停止這些策略的服務。
        List<String> stopList = this.getScheduleStrategyManager().registerManagerFactory(this);
        for (String strategyName : stopList) {
            this.stopServer(strategyName);
        }
        //根據策略重新分配排程任務的機器
        this.assignScheduleServer();
        this.reRunScheduleServer();
    }
  ```
  這裡迫切的需要知道以下兩行的含義。
  ```java
this.assignScheduleServer();
        this.reRunScheduleServer();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

assignScheduleServer() 方法的官方解釋是 根據策略重新分配排程任務的機器 那麼分配策略是什麼呢? 1.載入所有本應用能夠執行的策略(注意上文說道的策略應用節點的資料格式,返回預設格式)。 2.遍歷1返回策略列表,按策略taskType獲取到所有的可執行策略應用項列表 3.遍歷跟當前factory例項相關的strategy,選舉出每個strategy的leader factory例項,由leader重新計算每個factory例項能夠分到的reqNum,即根據strategy身上的assignNum“numOfSingleServer,將assignNum平分給每個factory例項。 4.根據2的返回結果(總的伺服器數量),和3的策略配置資訊,為2返回的策略服務項分配值(修改zk策略應用節點的requestNum)

reRunScheduleServer() 方法沒有給出官方的解釋,這裡暫且認為其是為了重新執行排程服務,怎麼做的????

public void reRunScheduleServer() throws Exception{
    //拿到所有本機可執行的策略
        for (ScheduleStrategyRunntime run : this.scheduleStrategyManager.loadAllScheduleStrategyRunntimeByUUID(this.uuid)) {
            //在快取中取
            List<IStrategyTask> list = this.managerMap.get(run.getStrategyName());
            if(list == null){
                list = new ArrayList<IStrategyTask>();
                this.managerMap.put(run.getStrategyName(),list);
            }
            //如果已經有在執行的任務組,且當前任務組數大於已分配的最大任務組數量,那麼就停止最後新增的任務組
            while(list.size() > run.getRequestNum() && list.size() >0){
                //what??先從列表裡刪了????如果停止失敗呢????????
                IStrategyTask task  =  list.remove(list.size() - 1);
                    try {
                        task.stop(run.getStrategyName());
                    } catch (Throwable e) {
                        logger.error("登出任務錯誤:strategyName=" + run.getStrategyName(), e);
                    }
                }
           //不足,增加排程器
           ScheduleStrategy strategy = this.scheduleStrategyManager.loadStrategy(run.getStrategyName());
           //注意這個迴圈,建立執行緒組直到執行的執行緒組滿了
           while(list.size() < run.getRequestNum()){
               //厲害了這裡做了什麼????
               IStrategyTask result = this.createStrategyTask(strategy);
               if(null==result){
                   logger.error("strategy 對應的配置有問題。strategy name="+strategy.getStrategyName());
               }
               list.add(result);
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

看起來該方法會遍歷所有當前應用支援的策略,並針對策略建立一個叫做IStrategyTask的東西,並維護在一個map中,這個this.createStrategyTask(strategy); 好吧,官方有說明“建立排程伺服器”

/**
     * 建立排程伺服器
     * @param baseTaskType
     * @param ownSign
     * @return
     * @throws Exception
     */
    public IStrategyTask createStrategyTask(ScheduleStrategy strategy)
            throws Exception {
        IStrategyTask result = null;
        try{
            if(ScheduleStrategy.Kind.Schedule == strategy.getKind()){
                //就是這裡。。。。。。。。。
                String baseTaskType = ScheduleUtil.splitBaseTaskTypeFromTaskType(strategy.getTaskName());
                String ownSign =ScheduleUtil.splitOwnsignFromTaskType(strategy.getTaskName());
                result = new TBScheduleManagerStatic(this,baseTaskType,ownSign,scheduleDataManager);
            }else if(ScheduleStrategy.Kind.Java == strategy.getKind()){
                result=(IStrategyTask)Class.forName(strategy.getTaskName()).newInstance();
                result.initialTaskParameter(strategy.getStrategyName(),strategy.getTaskParameter());
            }else if(ScheduleStrategy.Kind.Bean == strategy.getKind()){
                result=(IStrategyTask)this.getBean(strategy.getTaskName());
                result.initialTaskParameter(strategy.getStrategyName(),strategy.getTaskParameter());
            }
        }catch(Exception e ){
            logger.error("strategy 獲取對應的java or bean 出錯,schedule並沒有載入該任務,請確認" +strategy.getStrategyName(),e);
        }
        return result;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

根據上文和日常配置,也知道來自於會走第一個條件分支。最後通過一句result = new TBScheduleManagerStatic(this,baseTaskType,ownSign,scheduleDataManager);返回一個com.taobao.pamirs.schedule.strategy.IStrategyTask 介面的實現。 繼續看com.taobao.pamirs.schedule.taskmanager.TBScheduleManagerStatic類。

public TBScheduleManagerStatic(TBScheduleManagerFactory aFactory,
            String baseTaskType, String ownSign,IScheduleDataManager aScheduleCenter) throws Exception {
        super(aFactory, baseTaskType, ownSign, aScheduleCenter);
    }
  • 1
  • 2
  • 3
  • 4
  • 5

呼叫了父類com.taobao.pamirs.schedule.taskmanager.TBScheduleManager的建構函式,原始碼裡面少有的對這個類寫了一大堆文件註釋,先拿出來學習下。

/**
 * 1、任務排程分配器的目標:    讓所有的任務不重複,不遺漏的被快速處理。
 * 2、一個Manager只管理一種任務型別的一組工作執行緒。
 * 3、在一個JVM裡面可能存在多個處理相同任務型別的Manager,也可能存在處理不同任務型別的Manager。
 * 4、在不同的JVM裡面可以存在處理相同任務的Manager 
 * 5、排程的Manager可以動態的隨意增加和停止
 * 
 * 主要的職責:
 * 1、定時向集中的資料配置中心更新當前排程伺服器的心跳狀態
 * 2、向資料配置中心獲取所有伺服器的狀態來重新計算任務的分配。這麼做的目標是避免集中任務排程中心的單點問題。
 * 3、在每個批次資料處理完畢後,檢查是否有其它處理伺服器申請自己把持的任務佇列,如果有,則釋放給相關處理伺服器。
 *  
 * 其它:
 *   如果當前伺服器在處理當前任務的時候超時,需要清除當前佇列,並釋放已經把持的任務。並向控制主動中心報警。
 * 
 * @author xuannan
 *
 */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

第2點:一個Manager只管理一種任務型別的一組工作執行緒。 通過分析啟動過程已經能夠確認。發現針對每一個策略和執行緒組都建立了一個com.taobao.pamirs.schedule.strategy.IStrategyTask例項。

第3點:在一個JVM裡面可能存在多個處理相同任務型別的Manager,也可能存在處理不同任務型別的Manager。 如果啟動過程初始化多個com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory物件時可能會有可能有”在一個JVM裡面可能存在多個處理相同任務型別的Manager”這種情況,還有會有requsetNum個相同任務型別的Manager在同一個JVM

第4點:在不同的JVM裡面可以存在處理相同任務的Manager 這種情況是肯定存在的,取決於策略的”IP地址(逗號分隔)”配置

第5點:排程的Manager可以動態的隨意增加和停止 這個要自己去驗證了。

疑問: 1. List list = this.managerMap.get(run.getStrategyName()); 到底起到什麼作用?是為了統計當前JVM已經執行的執行緒組數量麼??? 目前tbschedule啟動過程到這就結束了,2秒一次的zk健康檢查(心跳),並重新整理任務組(及時響應控制檯設定),貌似到目前為止,為什麼這裡並沒有使用監視點來響應控制檯的設定,而是自己主動查詢,出發點是什麼???