1. 程式人生 > >【JEECG TBSchedule】詳解應對平臺高併發的分散式排程框架TBSchedule

【JEECG TBSchedule】詳解應對平臺高併發的分散式排程框架TBSchedule

原文地址:http://geek.csdn.net/news/detail/65738

【編者按】 TBSchedule是一款非常優秀的高效能分散式排程框架,本文是作者結合多年使用TBSchedule的經驗,在研讀三遍原始碼的基礎上完成。期間作者也與阿里空玄有過不少技術交流,並非常感謝空玄給予的大力支援。另外,作者寫這篇文章的目的一是出於對TBSchedule的一種熱愛,二是現在是一個資源共享、技術共享的時代,希望把它展現給大家(送人玫瑰,手留餘香),能給大家的工作帶來幫助。

以下為文章正文:

一、TBSchedule初識

    時下網際網路和電商領域,各個平臺都存在大資料、高併發的特點,對資料處理的要求越來越高,既要保證高效性,又要保證安全性、準確性。TBSchedule的使命就是將排程作業從業務系統中分離出來,降低或者是消除和業務系統的耦合度,進行高效非同步任務處理。其實在網際網路和電商領域TBSchedule的使用非常廣泛,目前被應用於阿里巴巴、淘寶、支付寶、京東、聚美、汽車之家、國美等很多網際網路企業的流程排程系統。

    在深入瞭解TBSchedule之前我們先從內部和外部形態對它有個初步認識,如圖1.1、圖1.2。

圖片描述

圖1.1 TBSchedule關鍵字

圖片描述

圖1.2 TBSchedule外部形態

    從TBSchedule的內部形態來說,與他有關的關鍵詞包括批量任務、動態擴充套件、多主機、多執行緒、併發、分片……,這些詞看起來非常的高大上,都是時下網際網路技術比較流行的詞彙。從TBSchedule的外部架構來看,一目瞭然,宿主在排程應用中與ZooKeeper進行通訊。一個框架結構是否是優秀的,從美感的角度就可以看出來,一個好的架構一定是隱藏了內部複雜的原理,外部視覺上美好的,讓使用者使用起來簡單易懂。

二、TBSchedule原理

    為什麼TBSchedule值得推廣呢?

  1. 傳統的排程框架spring task、quartz也是可以進行叢集排程作業的,一個節點掛了可以將任務漂移給其他節點執行從而避免單點故障,但是不支援分散式作業,一旦達到單機處理極限也會存在問題。
  2. elastic-job支援分散式,是一個很好的排程框架,但是開源時間較短,還沒有經歷大範圍市場考驗。
  3. Beanstalkd基於C語言開發,使用範圍較小,無法引入到php、java系統平臺。

    TBSchedule到底有多強大呢?我對TBSchedule的優勢特點進行了如下總結:

  1. 支援叢集、分散式
  2. 靈活的任務分片
  3. 動態的服務擴容和資源回收
  4. 任務監控支援
  5. 經歷了多年市場考驗,阿里強大技術團隊支援

    TBSchedule支援Cluster,可以宿主在多臺伺服器多個執行緒組並行進行任務排程,或者說可以將一個大的任務拆成多個小任務分配到不同的伺服器。

    TBSchedule的分散式機制是通過靈活的Sharding方式實現的,比如可以按所有資料的ID按10取模分片(分片規則如圖2.1)、按月份分片等等,根據不同的需求,不同的場景由客戶端配置分片規則。然後就是TBSchedule的宿主伺服器可以進行動態擴容和資源回收,這個特點主要是因為它後端依賴的ZooKeeper,這裡的ZooKeeper對於TBSchedule來說是一個NoSQL,用於儲存策略、任務、心跳資訊資料,它的資料結構類似檔案系統的目錄結構,它的節點有臨時節點、持久節點之分。排程引擎上線後,隨著業務量資料量的增多,當前Cluster可能不能滿足目前的處理需求,那麼就需要增加伺服器數量,一個新的伺服器上線後會在ZooKeeper中建立一個代表當前伺服器的一個唯一性路徑(臨時節點),並且新上線的伺服器會和ZooKeeper保持長連線,當通訊斷開後,節點會自動摘除。

    TBSchedule會定時掃描當前伺服器的數量,重新進行任務分配。TBSchedule不僅提供了服務端的高效能排程服務,還提供了一個scheduleConsole war隨著宿主應用的部署直接部署到伺服器,可以通過web的方式對排程的任務、策略進行監控管理,以及實時更新調整。

圖片描述

圖2.1 TBSchedule分片規則

    是不是已經對TBSchedule稍微了有些好感呢?我們接著往下看。

    TBSchedule提供了兩個核心元件ScheduleServer、TBScheduleManagerFactory和兩類核心介面IScheduleTaskDeal、IScheduleTaskDealSingle、IScheduleTaskDealMuti,這兩部分是客戶端研發的關鍵部分,是使用TBSchedule必須要了解的。

    ScheduleServer即任務處理器,的主要作用是任務和策略的管理、任務採集和執行,由一組工作執行緒組成,這組工作執行緒是基於佇列實現的,進行任務抓取和任務處理(有兩種處理模式,下面會講)。每個任務處理器和ZooKeeper有一個心跳通訊連線,用於檢測Server的狀態和進行任務動態分配。舉個例子,比如3臺伺服器的worker叢集執行出票訊息生成任務,對於這個任務型別每臺伺服器可以配置一個ScheduleSever(即一個執行緒組),也可以配置兩個執行緒組,那麼就相當於6臺伺服器在並行執行此任務型別。當某臺伺服器宕機或者其他原因與ZooKeeper通訊斷開時,它的任務將被其他伺服器接管。ScheduleServer引數定義如圖2.2

圖片描述

圖2.2 ScheduleServer引數定義

    在這些引數中taskItems是一個非常重要的屬性,是客戶單可以自由發揮的地方,是任務分片的基礎,比如我們處理一個任務可以根據ID按10取模,那麼任務項就是0-9,3臺伺服器分別拿到4、 3、 3個任務項,伺服器的上下線都會對任務項進行重新分配。任務項是進行任務分配的最小單位。一個任務項只能由一個ScheduleServer來進行處理,但一個Server可以處理任意數量的任務項。這就是剛才我們說的分片特性。

    排程伺服器TBScheduleManagerFactory的主要工作ZooKeeper連線引數配置和ZooKeeper的初始化、排程管理。

    兩類核心介面是需要被我們定義的目標任務實現的,根據自己的需要進行任務採集(重寫selectTasks方法)和任務執行(重寫execute方法),這兩類介面也是客戶端研發根據需求自由發揮的地方。

    接下來我們深入瞭解下TBSchedule,看看它的內部是如何實現的。圖2.3流程圖是我花了很多心血通過一週時間畫出來的,基本是清晰的展現了TBSchedule內部的執行流程以及每個步驟ZooKeeper節點路徑和資料的變化。因為圖中的註釋已經描述的很詳細了,每個節點右側是ZooKeeper的資訊(資料結構見圖2.4),這裡就不再做過多的文字描述了,有任何建議或者不明白的地方可以找我交流。

圖片描述

圖2.3 TBSchedule內部流程

圖片描述

圖2.4 TBSchedule之ZooKeeper資料結構

    TBSchedule還有個強大之處是它提供了兩種處理器模式模式:

    1. SLEEP模式

    當某一個執行緒任務處理完畢,從任務池中取不到任務的時候,檢查其它執行緒是否處於活動狀態。如果是,則自己休眠;如果其它執行緒都已經因為沒有任務進入休眠,當前執行緒是最後一個活動執行緒的時候,就呼叫業務介面,獲取需要處理的任務,放入任務池中,同時喚醒其它休眠執行緒開始工作。

   2. NOTSLEEP模式

    當一個執行緒任務處理完畢,從任務池中取不到任務的時候,立即呼叫業務介面獲取需要處理的任務,放入任務池中。

    SLEEP模式內部邏輯相對較簡單,如果遇到大任務需要處理較長時間,可能會造成其他執行緒被動阻塞的情況。但其實生產環境一般都是小而快的任務,即使出現阻塞的情況ScheduleConsole也會及時的監控到。NOTSLEEP模式減少了執行緒休眠的時間,避免了因大任務造成阻塞的情況,但為了避免資料被重複處理,增加了CPU在資料比較上的開銷。TBSchedule預設是SLEEP模式。

    到目前為止我相信大家對TBSchedule有了一個深刻的瞭解,心中的疑霧逐漸散開了。理論是實踐的基礎,實踐才是最終的目的,下一節我們將結合理論知識進行TBSchedule實戰。

三、TBSchedule實戰

    在專案中使用TBSchedule需要依賴ZooKeeper、TBSchedule。

    ZooKeeper依賴:

    <dependency>
        <groupId>org.apache.ZooKeeper</groupId>
        <artifactId>ZooKeeper</artifactId>
        <version>3.4.6</version>
    </dependency>

    TBSchedule依賴:

    <dependency>
        <groupId>com.taobao.pamirs.schedule</groupId>
        <artifactId>TBSchedule</artifactId>
        <version>3.3.3.2</version>
    </dependency>

    TBSchedule有三種引入方式:

  1. 通過ScheduleConsole引入

    TBSchedule隨著宿主排程應用部署到伺服器後,可以通過Web瀏覽器的方式訪問其提供監控平臺。

    第一步,初始化ZooKeeper

圖片描述

    第二步,建立排程策略

圖片描述

    第三步,建立排程任務

圖片描述

    第四步,監控排程任務

圖片描述

    2、通過原生Java引入

        // 初始化Spring
        ApplicationContext ctx = new FileSystemXmlApplicationContext(
                "spring-config.xml");

        // 初始化排程工廠
        TBScheduleManagerFactory scheduleManagerFactory = new TBScheduleManagerFactory();

        Properties p = new Properties();
        p.put("zkConnectString", "127.0.0.1:2181");
        p.put("rootPath", "/taobao-schedule/train_worker");
        p.put("zkSessionTimeout", "60000"); 
        p.put("userName", "train_dev");
        p.put("password", " train_dev ");
        p.put("isCheckParentPath", "true");

        scheduleManagerFactory.setApplicationContext(ctx);

        scheduleManagerFactory.init(p); 

                // 建立任務排程任務的基本資訊
String baseTaskTypeName = "DemoTask";
                ScheduleTaskType baseTaskType = new ScheduleTaskType();
                baseTaskType.setBaseTaskType(baseTaskTypeName);
                baseTaskType.setDealBeanName("demoTaskBean");
                baseTaskType.setHeartBeatRate(10000);
                baseTaskType.setJudgeDeadInterval(100000);
                baseTaskType.setTaskParameter("AREA=BJ,YEAR>30");
                baseTaskType.setTaskItems(ScheduleTaskType.splitTaskItem(
                "0:{TYPE=A,KIND=1},1:{TYPE=A,KIND=2},2:{TYPE=A,KIND=3},3:{TYPE=A,KIND=4}," +
                "4:{TYPE=A,KIND=5},5:{TYPE=A,KIND=6},6:{TYPE=A,KIND=7},7:{TYPE=A,KIND=8}," +
                "8:{TYPE=A,KIND=9},9:{TYPE=A,KIND=10}"));
        baseTaskType.setFetchDataNumber(500);
        baseTaskType.setThreadNumber(5);
        this.scheduleManagerFactory.getScheduleDataManager()
                .createBaseTaskType(baseTaskType);
        log.info("建立排程任務成功:" + baseTaskType.toString());

        // 建立任務的排程策略
        String taskName = baseTaskTypeName;
        String strategyName =taskName +"-Strategy";
        try {
            this.scheduleManagerFactory.getScheduleStrategyManager()
                    .deleteMachineStrategy(strategyName,true);
        } catch (Exception e) {
            e.printStackTrace();
        }
        ScheduleStrategy strategy = new ScheduleStrategy();
        strategy.setStrategyName(strategyName);
        strategy.setKind(ScheduleStrategy.Kind.Schedule);
        strategy.setTaskName(taskName);
        strategy.setTaskParameter("china");

        strategy.setNumOfSingleServer(1);
        strategy.setAssignNum(10);
        strategy.setIPList("127.0.0.1".split(","));
        this.scheduleManagerFactory.getScheduleStrategyManager()
                .createScheduleStrategy(strategy);
        log.info("建立排程策略成功:" + strategy.toString());

    3、通過Spring容器引入

<!-- 初始化ZooKeeper -->  
<bean id="scheduleManagerFactory"
            class="xx.xx.TBScheduleManagerFactory">
<property name="zkConfig">
<map>
    <entry key="zkConnectString" value="127.0.0.1:2181" />
    <entry key="rootPath" value="/taobao-schedule/train_worker" />
    <entry key="zkSessionTimeout" value="60000" />
    <entry key="userName" value="train_dev" />
    <entry key="password" value="train_dev" />
    <entry key="isCheckParentPath" value="true" />
</map>
</property> 
</bean>
<!-- 配置排程策略 凌晨1點到3點執行 -->
<bean id="abstractDemoScheduleTask" class="com.xx.core.TBSchedule.InitScheduleTask" abstract="true">
<property name="scheduleTaskType.heartBeatRate" value="10000" />
<property name="scheduleTaskType.judgeDeadInterval" value="100000" />
<property name="scheduleTaskType.permitRunStartTime" value="0 0 1 * * ?"/> 
<property name="scheduleTaskType.permitRunEndTime" value="0 0 3 * * ?"/>  
<property name="scheduleTaskType.taskParameter" value="AREA=BJ,YEAR>30" />
<property name="scheduleTaskType.sleepTimeNoData" value="60000"/>
<property name="scheduleTaskType.sleepTimeInterval" value="60000"/>
<property name="scheduleTaskType.fetchDataNumber" value="500" />
<property name="scheduleTaskType.executeNumber" value="1" />
<property name="scheduleTaskType.threadNumber" value="5" />
<property name="scheduleTaskType.taskItems"> 
<list>
        <value>0:{TYPE=A,KIND=1}</value>
        <value>1:{TYPE=A,KIND=2}</value>
        <value>2:{TYPE=A,KIND=3}</value>
        <value>3:{TYPE=A,KIND=4}</value>
        <value>4:{TYPE=A,KIND=5}</value>
        <value>5:{TYPE=A,KIND=6}</value>
        <value>6:{TYPE=A,KIND=7}</value>
        <value>7:{TYPE=A,KIND=8}</value>
        <value>8:{TYPE=A,KIND=9}</value>
        <value>9:{TYPE=A,KIND=10}</value>
    </list>
</property>
<property name="scheduleStrategy.kind" value="Schedule" />
<property name="scheduleStrategy.numOfSingleServer" value="1" />
<property name="scheduleStrategy.assignNum" value="10" />   
    <property name="scheduleStrategy.iPList">
        <list>
            <value>127.0.0.1</value>
        </list>
    </property>
    </bean>        
<!-- 配置排程任務 -->
<bean id="demoTask" class="com.xx.worker.task.DemoTask" parent="abstractDemoScheduleTask">
<property name="scheduleTaskType.baseTaskType" value="demoTask" />
<property name="scheduleTaskType.dealBeanName" value="demoTaskBean" />
<property name="scheduleStrategy.strategyName" value="demoTaskBean-Strategy" />
<property name="scheduleStrategy.taskName" value="demoTaskBean" />
</bean> 

            排程任務具體實現 DemoTask.java
 /**
 * DemoTask任務類
 */
public class DemoTask  mplements
        IScheduleTaskDealSingle,TScheduleTaskDeal {

 /**
  * 資料採集
  * @param taskItemNum--分配的任務項 taskItemList--總任務項 
  *        eachFetchDataNum--採集任務數量
  */
    @Override
    public List<DemoTask> selectTasks(String taskParameter,
            String ownSign, int taskItemNum, List<TaskItemDefine> taskItemList,
            int eachFetchDataNum) throws Exception {
        List<DemoTask> taskList = new LinkedList<DemoTask>();
        //客戶端根據條件進行資料採集start

        //客戶端根據條件進行資料採集end
        return rt;
    }

/**
  * 資料處理
  */
    @Override
    public boolean execute(DemoTask task, String ownSign)
            throws Exception {
        //客戶端pop任務進行處理start

        //客戶端pop任務進行處理end
        return true;
    }
}

    其實我們看對於TBSchedule客戶端的使用非常簡單,初始化ZooKeeper、配置排程策略、配置排程任務,對排程任務進行具體實現,就這幾個步驟。現在可以慶祝下了,你又掌握了一個優秀開源框架的設計思想和使用方式。

四、TBSchedule挑戰

    任何事物都是沒有最好只有更好,TBSchedule也一樣,雖然它現在已經很完美了,我們不能放棄對更完美的追求。阿里團隊可以在下面幾個方面進行優化。

  1. 目前ScheduleConsole監控頁面過於簡單,需完善UI設計,提高使用者體驗。
  2. 支援Zookeeper叢集自動切換,避免ZooKeeper服務的叢集單點故障。
  3. 原生ZooKeeper操作替換為Curator,Curator對ZooKeeper進行了一次包裝,對原生ZooKeeper的操作做了大量優化,Client和Server之間的連線可能出現的問題處理等等,可以進一步提高TBSchedule的高可用。
  4. TBSchedule的幫助文件較少,網上的資料基本是千篇一律,希望有更多的愛好者加入進來。

    至此,我們已經完成了對TBSchedule的全部介紹,儘快使用起來吧!