1. 程式人生 > >LTS分散式任務排程在專案中的實際應用

LTS分散式任務排程在專案中的實際應用

公司專案是分散式的,所以定時任務用lts框架,簡單的看看程式碼,分析了一下,找到呼叫流程。

注意:不瞭解lts的先看下這個文件,我也是看完才看懂程式碼
文件地址:https://www.cnblogs.com/dion-90/articles/8674591.html

lts的工作流程

  • JobClient 提交一個 任務 給 JobTracker, 這裡我提供了兩種客戶端API, 一種是如果JobTracker 不存在或者提交失敗,直接返回提交失敗。另一種客戶端是重試客戶端, 如果提交失敗,先儲存到本地leveldb(可以使用NFS來達到同個節點組共享leveldb檔案的目的,多執行緒訪問,做了檔案鎖處理),返回給客戶端提交成功的資訊,待JobTracker可用的時候,再將任務提交。
  • JobTracker 收到JobClient提交來的任務,先生成一個唯一的JobID。然後將任務儲存在Mongo叢集中。JobTracker 發現有(任務執行的)可用的TaskTracker節點(組) 之後,將優先順序最大,最先提交的任務分發給TaskTracker。這裡JobTracker會優先分配給比較空閒的TaskTracker節點,達到負載均衡。
  • TaskTracker 收到JobTracker分發來的任務之後,執行。執行完畢之後,再反饋任務執行結果給JobTracker(成功or 失敗[失敗有失敗錯誤資訊]),如果發現JobTacker不可用,那麼儲存本地leveldb,等待TaskTracker可用的時候再反饋。反饋結果的同時,詢問JobTacker有沒有新的任務要執行。
  • JobTacker收到TaskTracker節點的任務結果資訊,生成並插入(mongo)任務執行日誌。根據任務資訊決定要不要反饋給客戶端。不需要反饋的直接刪除, 需要反饋的(同樣JobClient不可用儲存檔案,等待可用重發)。
  • JobClient 收到任務執行結果,進行自己想要的邏輯處理。

程式碼分析

以我公司為例, spring cloud裡有兩個服務分別是JobClientJobTracker

其中task相當於JobClient, job相當於JobTracker

1.task中的程式碼, 專案啟動執行這個方法, 這相當是定製任務的基本資訊:

// task中的程式碼, 專案啟動執行這個方法
/**
 * 定時生成  ...資料
 */
public void jobMethod()
{
    // 定義個bean, 裡面可以定義屬性
    SynMainTainMsgCommand command = new SynMainTainMsgCommand();

    // 定義任務物件
    Job job = new Job();
    // taskId自定義
    job.setTaskId("doMainTainProdcut");
    // 自定義引數,實際就是HashMap
    job.setParam("description", "定時生成 車輛 保養資料");
    // 注意這裡傳的commond物件的類全名
    job.setParam("command", command.getClass().getName());
    job.setTaskTrackerNodeGroup("組名");
    job.setNeedFeedback(true);
    job.setReplaceOnExist(true);      // 當任務佇列中存在這個任務的時候,是否替換更新
    job.setCronExpression("cron表示式");
    // 提交任務
    Response response = jobClient.submitJob(job);
}

2.先把job任務寫了,一會再說怎麼呼叫的
在job服務裡實現相應的任務:

@Component
public class SynMainTainMsgHandler extends BaseCommandHandler<SynMainTainMsgCommand, HttpCommandResultWithData> {

    // ...略

    public HttpCommandResultWithData handler(SynMainTainMsgCommand command) {
        // 具體的任務實現...
    }

}

3.關鍵是下一步,job怎麼收到並且執行的


// 1.實現JobRunner的run方法
@JobRunner4TaskTracker
public class JobRunnerImpl implements JobRunner {

    @Override
    public Result run(JobContext jobContext) throws Throwable {
        String successjson;
        try {
        // 接到引數
        String command = jobContext.getJob().getParam("command");
        String commandParam = jobContext.getJob().getParam("commandParam");

        // 反射找到上面定義的bean
        Class clazz = Class.forName(command);

        // ...略部分程式碼
        Object o = clazz.newInstance();

        // 這裡想通過command找handler,並執行
        Command.Result handlerresult = dispatch((AbstractCommand)o);
        // 這裡只要找到 BaseCommandHandler的handler方法執行就行了,因為上面的SynMainTainMsgHandler繼承了BaseCommandHandler

            successjson = JsonUtil.toJson(handlerresult);


        return new Result(Action.EXECUTE_SUCCESS, successjson);
    }
}
  1. 這裡只要找到 BaseCommandHandler的handler方法執行就行了,因為上面的SynMainTainMsgHandler繼承了BaseCommandHandler,
    BaseCommandHandler繼承CommandHandler。

private <C extends Command<?>, CR extends Command.Result> CR dispatch(C command) {
        CommandHandler<C, CR> commandHandler = (CommandHandler)this.handlerByType.get(command.getClass());
        return commandHandler.handle(command);
    }

省略部分程式碼,實現思路就是通過傳進來的command.className獲取到具體的handler。