1. 程式人生 > >LTS 輕量級分散式任務排程框架(Light Task Scheduler)

LTS 輕量級分散式任務排程框架(Light Task Scheduler)

框架概況

LTS是一個輕量級分散式任務排程框架。有三種角色, JobClient, JobTracker, TaskTracker。各個節點都是無狀態的,可以部署多個,來實現負載均衡,實現更大的負載量, 並且框架具有很好的容錯能力。 採用多種註冊中心(Zookeeper,redis等)進行節點資訊暴露,master選舉。(Mongo or Mysql)儲存任務佇列和任務執行日誌, netty做底層通訊。

JobClient : 主要負責提交任務, 和 接收任務執行反饋結果。
JobTracker : 負責接收並分配任務,任務排程。
TaskTracker: 負責執行任務,執行完反饋給JobTracker。
支援任務型別:

實時任務
也支援定時任務 (如:3天之後執行)
CronExpression (如:0 0/1 * * * ?)
感興趣,請加QQ群:109500214 一起探討、完善。並且記得star一下哈,3Q

架構圖

這裡寫圖片描述

節點組:

  1. 一個節點組等同於一個叢集,同一個節點組中的各個節點是對等的,外界無論連線節點組中的任務一個節點都是可以的。
  2. 每個節點組中都有一個master節點(master宕機,會自動選舉出新的master節點),框架會提供介面API來監聽master節點的變化,使用者可以自己使用master節點做自己想做的事情。
  3. JobClient和TaskTracker都可以存在多個節點組。譬如 JobClient 可以存在多個節點組。 譬如:JobClient 節點組為 ‘lts_WEB’ 中的一個節點提交提交一個 只有節點組為’lts_TRADE’的 TaskTracker 才能執行的任務。
  4. (每個叢集中)JobTacker只有一個節點組。
  5. 多個JobClient節點組和多個TaskTracker節點組再加上一個JobTacker節點組, 組成一個大的叢集。

工作流程:

  1. JobClient 提交一個 任務 給 JobTracker, 這裡我提供了兩種客戶端API, 一種是如果JobTracker 不存在或者提交失敗,直接返回提交失敗。另一種客戶端是重試客戶端, 如果提交失敗,先儲存到本地FailStore(可以使用NFS來達到同個節點組共享leveldb檔案的目的,多執行緒訪問,已經做了檔案鎖處理),返回 給客戶端提交成功的資訊,待JobTracker可用的時候,再將任務提交。
  2. JobTracker收到JobClient提交來的任務,將任務存入任務佇列。JobTracker等待TaskTracker的Pull請求,然後將任務Push給TaskTracker去執行。
  3. TaskTracker收到JobTracker分發來的任務之後,然後從執行緒池中拿到一個執行緒去執行。執行完畢之後,再反饋任務執行結果給 JobTracker(成功or 失敗[失敗有失敗錯誤資訊]),如果發現JobTacker不可用,那麼儲存本地FailStore,等待TaskTracker可用的時候再反饋。反饋 結果的同時,詢問JobTacker有沒有新的任務要執行。
  4. JobTacker收到TaskTracker節點的任務結果資訊。根據任務資訊決定要不要反饋給客戶端。不需要反饋的直接刪除,需要反饋的,直接反饋,反饋失敗進入FeedbackQueue, 等待重新反饋。
  5. JobClient收到任務執行結果,進行自己想要的邏輯處理。

特性

負載均衡:

JobClient和TaskTracker可是根據自己設定的負載均衡策略來請求JobTracker節點組中的一個節點。當連線上後將一直保持連線這個節點,保持連線通道,直到這個節點不可用,減少每次都重新連線一個節點帶來的效能開銷。

健壯性:

當節點組中的一個節點當機之後,自動轉到其他節點工作。當整個節點組當機之後,將會採用儲存檔案的方式,待節點組可用的時候進行重發。
當執行任務的TaskTracker節點當機之後,JobTracker會將這個TaskTracker上的未完成的任務(死任務),重新分配給節點組中其他節點執行。

伸縮性

因為各個節點都是無狀態的,可以動態增加機器部署例項, 節點關注者會自動發現。
擴充套件性:
採用和dubbo一樣的SPI擴充套件方式,可以實現任務佇列擴充套件,日誌記錄器擴充套件等
日誌記錄
對於任務的分發,執行,還有使用者通過 (BizLogger) 【LtsLoggerFactory.getBizLogger()】 輸入的業務日誌,LTS都有記錄,使用者可以在LTS Admin 後臺介面檢視某個任務的所有日誌,可以實時檢視這個任務的執行情況。

開發計劃

WEB後臺管理:效能統計分析,預警等
實現LTS的分散式佇列儲存
LTS Admin
這裡寫圖片描述

呼叫示例
下面提供的是最簡單的配置方式。更多配置請檢視 lts-example 模組下的 API 呼叫方式例子.

JobTracker 端

    final JobTracker jobTracker = new JobTracker();
    // 節點資訊配置
    jobTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
    // 1. 任務佇列用mongo
    jobTracker.addConfig("job.queue", "mongo");
    // mongo 配置
    jobTracker.addConfig("mongo.addresses", "127.0.0.1:27017"); 
    jobTracker.addConfig("mongo.database", "lts");
    jobTracker.setOldDataHandler(new OldDataDeletePolicy());
    // 啟動節點
    jobTracker.start();

TaskTracker端

    TaskTracker taskTracker = new TaskTracker();
    taskTracker.setJobRunnerClass(TestJobRunner.class);
    taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
    taskTracker.setNodeGroup("test_trade_TaskTracker");
    taskTracker.setWorkThreads(20);
    taskTracker.start();
    // 任務執行類
    public class TestJobRunner implements JobRunner {
        @Override
        public void run(Job job) throws Throwable {
            System.out.println("我要執行"+ job);
            System.out.println(job.getParam("shopId"));
            // TODO 使用者自己的業務邏輯, 應該保證冪等
            try {
                Thread.sleep(5*1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

JobClient端

 JobClient jobClient = new RetryJobClient();
    // final JobClient jobClient = new JobClient();
    jobClient.setNodeGroup("test_jobClient");
    jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
    jobClient.start();

    // 提交任務
    Job job = new Job();
    job.setTaskId("3213213123");
    job.setParam("shopId", "11111");
    job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
    // job.setCronExpression("0 0/1 * * * ?");  // 支援 cronExpression表示式
    // job.setTriggerTime(new Date()); // 支援指定時間執行
    Response response = jobClient.submitJob(job);