admin排程中心的對於job的排程過程, 排程策略以及rpc通訊

1.1 job排程過程

RemoteHttpJobBean類實現了QuartzJobBean, 當cron時間片到達時, 就會觸發一次quartz呼叫, 回撥executeInternal()方法, 而XxlJobTrigger.trigger(jobId)具體實現細節就是進行了一次網路請求.

// 當cron時間片到時, 就會呼叫RemoteHttpJobBean的executeInternal觸發一次網路請求給執行器
// @DisallowConcurrentExecution
public class RemoteHttpJobBean extends QuartzJobBean {

    private static Logger logger = LoggerFactory.getLogger(RemoteHttpJobBean.class);

    // 1 當quartz cron執行週期到達時, 就回調executeInternal()方法
    // 2 當點選"執行"時, 也會觸發executeInternal()回撥方法
    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {

        // load jobId
        JobKey jobKey = context.getTrigger().getJobKey();

        // 從xxl-job-qrtz-job-detail表中獲取, jobname的值, 即為jobId
        Integer jobId = Integer.valueOf(jobKey.getName());

        // 排程中心觸發, 發起一次rpc網路請求executor執行
        // trigger
        XxlJobTrigger.trigger(jobId);
    }

}

1.2 XxlJobTrigger.trigger()分析

該類的trigger()方法做的事情比較多, 所以程式碼量相對比較多一點, 咱們就來一行一行解析, 每段程式碼就做了什麼事情.

① XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); // 該jobid就是quartz框架觸發一次排程時生成的, 這個jobid指的就是具體的哪一個job被執行.所以這行程式碼根據該jobId從xxl-job-qrtz-trigger-info表中查詢本次job執行的一些引數, 這些引數就是在前臺頁面上新增任務時的一些引數, 如下圖:比如路由策略, 執行模式, 阻塞處理策略, cron表示式等, 儲存時, 就向該job儲存到了xxl-job-qrtz-trigger-info表, 並生成了一個jobId, 當cron時間片到達時, 就會傳遞該jobId, 觸發一次任務排程.

②  XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup());// 根據配置, 獲取該型別的執行器資訊, 根據jobInfo中的job_group欄位值, 找到該型別執行器所屬的組
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(),
                                                                                  ExecutorBlockStrategyEnum.SERIAL_EXECUTION);// 根據配置, 匹配執行模式, 預設單機序列


        ExecutorFailStrategyEnum failStrategy = ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(),
                                                                               ExecutorFailStrategyEnum.FAIL_ALARM); //根據配置,  匹配失敗後的處理模式, 預設失敗告警
        ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(),// 根據配置, 設定路由的策略, 獲取路由策略, 必填, 沒有預設值, 根據配置獲取路由策略
        ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();// 獲取該執行器的叢集機器列表
        

if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum
            && CollectionUtils.isNotEmpty(addressList)) // 判斷是否需要驚醒分片廣播, 即使, 是否本次任務是否每一個executor執行器都執行一遍本次任務

這裡分為了兩種情況, 如果進行分片廣播的話, 就省去了選擇路由策略這一步, 否則下面就得需要根據前端的配置項, 選擇路由策略.

首先分析, 需要進行分片廣播的形式:

接著, 再分析不是分片廣播的情況:其實無非就是多了一個路由策略選擇的過程, 如下:

1.3 執行排程時的路由策略選擇分析

目前xxl-job支援, 第一個、最後一個、輪詢、隨機、一致性HASH、最不經常使用、最近最久未使用、故障轉移、忙碌轉移等這幾種路由策略, 以下就是排程時的路由策略類圖, 從圖中可以看出是一種典型的 策略模式

以上就是XxlJobTrigger類的trigger方法, 做的一些事情以及實現方式, 下面分析當執行分片排程時, 執行的runExecutor()方法, 和不執行分片策略, 而根據具體的路由策略執行的router.routeRun()方法時, 是如何完成rpc遠端通訊的.

其實router.routeRun()方法最終也是執行runExecutor()方法, 如下圖:

附上程式碼:

public static void trigger(int jobId) {

        // 通過JobId從資料庫中查詢該任務的具體資訊
        // load data
        XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); // job info
        if (jobInfo == null) {
            logger.warn(">>>>>>>>>>>> xxl-job trigger fail, jobId invalid,jobId={}", jobId);
            return;
        }

        logger.info("==>XxlJobTrigger.trigger jobInfo:{}", JSON.toJSONString(jobInfo));

        // 獲取該型別的執行器資訊, 根據jobInfo中的job_group欄位值, 找到該型別執行器所屬的組
        XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); // group info

        logger.info("==>XxlJobTrigger.trigger group:{}", JSON.toJSONString(group));
        // 匹配執行模式, 預設單機序列
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(),
                                                                                  ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block
                                                                                                                               // strategy
        // 匹配失敗後的處理模式, 預設失敗告警
        ExecutorFailStrategyEnum failStrategy = ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(),
                                                                               ExecutorFailStrategyEnum.FAIL_ALARM); // fail
                                                                                                                     // strategy

        // 設定路由的策略
        // 獲取路由策略, 必填, 沒有預設值, 根據配置獲取路由策略
        ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(),
                                                                                              null); // route strategy
        // 獲取該執行器的叢集機器列表
        ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();

        // broadcast -- 判斷路由策略, 而且執行器的機器列表不能為空 // 判斷路由策略 是否為 分片廣播模式
        // 分片廣播 : for迴圈遍歷所有地址, 都發送一遍執行
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum
            && CollectionUtils.isNotEmpty(addressList)) {
            for (int i = 0; i < addressList.size(); i++) {
                String address = addressList.get(i);

                // 1、save log-id
                XxlJobLog jobLog = new XxlJobLog();
                jobLog.setJobGroup(jobInfo.getJobGroup());
                jobLog.setJobId(jobInfo.getId());
                XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog);
                logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

                // 2、prepare trigger-info
                // jobLog.setExecutorAddress(executorAddress);
                jobLog.setGlueType(jobInfo.getGlueType());
                jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
                jobLog.setExecutorParam(jobInfo.getExecutorParam());
                jobLog.setTriggerTime(new Date());

                ReturnT<String> triggerResult = new ReturnT<String>(null);
                StringBuffer triggerMsgSb = new StringBuffer();
                triggerMsgSb.append("註冊方式:").append((group.getAddressType() == 0) ? "自動註冊" : "手動錄入");
                triggerMsgSb.append("<br>阻塞處理策略:").append(blockStrategy.getTitle());
                triggerMsgSb.append("<br>失敗處理策略:").append(failStrategy.getTitle());
                triggerMsgSb.append("<br>地址列表:").append(group.getRegistryList());
                triggerMsgSb.append("<br>路由策略:").append(executorRouteStrategyEnum.getTitle()).append("(" + i + "/"
                                                                                                     + addressList.size()
                                                                                                     + ")"); // update01

                // 3、trigger-valid
                if (triggerResult.getCode() == ReturnT.SUCCESS_CODE && CollectionUtils.isEmpty(addressList)) {
                    triggerResult.setCode(ReturnT.FAIL_CODE);
                    triggerMsgSb.append("<br>----------------------<br>").append("排程失敗:").append("執行器地址為空");
                }

                if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
                    // 4.1、trigger-param
                    TriggerParam triggerParam = new TriggerParam();
                    triggerParam.setJobId(jobInfo.getId());
                    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
                    triggerParam.setExecutorParams(jobInfo.getExecutorParam());
                    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
                    triggerParam.setLogId(jobLog.getId());
                    triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
                    triggerParam.setGlueType(jobInfo.getGlueType());
                    triggerParam.setGlueSource(jobInfo.getGlueSource());
                    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
                    triggerParam.setBroadcastIndex(i);
                    triggerParam.setBroadcastTotal(addressList.size()); // update02

                    // 4.2、trigger-run (route run / trigger remote executor)
                    // 利用動態代理, admin排程中心發起一次rpc請求executor執行器端, 根據引數執行一次cron job任務
                    // 並且返回執行結果
                    triggerResult = runExecutor(triggerParam, address); // update03
                    triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>觸發排程<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());

                    // 重試一次, 且失敗策略是配置的"失敗重試"
                    // 4.3、trigger (fail retry)
                    if (triggerResult.getCode() != ReturnT.SUCCESS_CODE
                        && failStrategy == ExecutorFailStrategyEnum.FAIL_RETRY) {
                        triggerResult = runExecutor(triggerParam, address); // update04
                        triggerMsgSb.append("<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>失敗重試<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
                    }
                }

                // 5、save trigger-info
                jobLog.setExecutorAddress(triggerResult.getContent());
                jobLog.setTriggerCode(triggerResult.getCode());
                jobLog.setTriggerMsg(triggerMsgSb.toString());
                XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);

                // 6、monitor triger
                //
                JobFailMonitorHelper.monitor(jobLog.getId());
                logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());

            }
            return;
        }

        // 否則除分片模式外,其他的路由策略均走這裡, 比如First--第一個
        // 每觸發一次job, 就記錄一次log表
        // 1、save log-id
        XxlJobLog jobLog = new XxlJobLog();
        jobLog.setJobGroup(jobInfo.getJobGroup());
        jobLog.setJobId(jobInfo.getId());

        // 將jobLog資訊首先進行儲存
        XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog);
        logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

        // 2、prepare trigger-info
        // jobLog.setExecutorAddress(executorAddress);
        jobLog.setGlueType(jobInfo.getGlueType());
        jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
        jobLog.setExecutorParam(jobInfo.getExecutorParam());
        jobLog.setTriggerTime(new Date());

        ReturnT<String> triggerResult = new ReturnT<String>(null);
        StringBuffer triggerMsgSb = new StringBuffer();
        triggerMsgSb.append("註冊方式:").append((group.getAddressType() == 0) ? "自動註冊" : "手動錄入");
        triggerMsgSb.append("<br>阻塞處理策略:").append(blockStrategy.getTitle());
        triggerMsgSb.append("<br>失敗處理策略:").append(failStrategy.getTitle());
        triggerMsgSb.append("<br>地址列表:").append(group.getRegistryList());
        triggerMsgSb.append("<br>路由策略:").append(executorRouteStrategyEnum.getTitle());

        // 3、trigger-valid
        if (triggerResult.getCode() == ReturnT.SUCCESS_CODE && CollectionUtils.isEmpty(addressList)) {
            triggerResult.setCode(ReturnT.FAIL_CODE);
            triggerMsgSb.append("<br>----------------------<br>").append("排程失敗:").append("執行器地址為空");
        }

        if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
            // 4.1、trigger-param
            TriggerParam triggerParam = new TriggerParam();
            triggerParam.setJobId(jobInfo.getId());
            triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
            triggerParam.setExecutorParams(jobInfo.getExecutorParam());
            triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
            triggerParam.setLogId(jobLog.getId());
            triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
            triggerParam.setGlueType(jobInfo.getGlueType());
            triggerParam.setGlueSource(jobInfo.getGlueSource());
            triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
            triggerParam.setBroadcastIndex(0);
            triggerParam.setBroadcastTotal(1);

            // 此處使用了策略模式, 根據不同的策略 使用不同的實現類,此處不再詳細說明
            // 4.2、trigger-run (route run / trigger remote executor)
            // 根據不同的路由模式, 解析addressList

            // 根據前臺的配置從jobInfo獲取的策略, 並放入executorRouteStrategyEnum例項中, 呼叫相應的策略類
            // 將下面程式碼拆分後, 便於debug除錯:
            // triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);

            ExecutorRouter router = executorRouteStrategyEnum.getRouter();

            // 利用動態代理, admin排程中心發起一次rpc請求executor執行器端, 根據引數執行一次cron job任務
            // 並且返回執行結果
            triggerResult = router.routeRun(triggerParam, addressList);

            triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>觸發排程<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());

            // 失敗重試一次, 而且失敗處理策略為"失敗重試"
            // 4.3、trigger (fail retry)
            if (triggerResult.getCode() != ReturnT.SUCCESS_CODE
                && failStrategy == ExecutorFailStrategyEnum.FAIL_RETRY) {
                triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
                triggerMsgSb.append("<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>失敗重試<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
            }
        }

        // 5、save trigger-info
        jobLog.setExecutorAddress(triggerResult.getContent());
        jobLog.setTriggerCode(triggerResult.getCode());
        jobLog.setTriggerMsg(triggerMsgSb.toString());

        // 更新操作, 將操作引數, 執行返回結果值寫入log表.
        XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);

        // 6、monitor triger
        // 將logid寫入快取佇列, JobFailMonitorHelper開啟一個執行緒從快取中取資料, 根據該id從資料庫中查執行結果
        JobFailMonitorHelper.monitor(jobLog.getId());
        logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
    }

1.4 admin排程中心和executor執行器之間的rpc遠端通訊, 是如何進行資料互動的

①首先從快取中根據address為key獲取動態代理類例項,

ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);

jdk的方式實現動態代理:

所以當admin排程中心, 每觸發一次排程時, 最終呼叫executorBiz.run(triggerParam)方法, 其實底層就是利用了jdk的動態代理實現了一次rpc遠端呼叫, 並接收返回的執行結果.而executor執行器接受到admin排程中心傳遞過來的執行引數, 利用java的反射技術具體呼叫實現類的具體方法.下一篇將分析executor執行器端, 是如何接受admin排程中心的引數, 和利用該引數使用反射技術呼叫具體的實現類的.

附上程式碼:

 public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address) {
        ReturnT<String> runResult = null;
        try {
            // 返回一個ExcutorBiz動態代理類物件,重點在這個方法裡面
            // 建立執行器代理物件, 首先從executorBizRepository記憶體倉庫根據根據key=address地址, 獲取該executor執行器代理物件
            // 首次從executorBizRepository記憶體倉庫中沒有該key=address的代理物件, 就建立一個該key=address的executor執行器代理物件

            // 以ip地址作為key, 獲取executor執行器代理物件
            // 當根據ip作為key, 從本地快取中獲取不到executor執行器代理物件時, 就建立一個ip地址的executor執行器代理物件
            ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);

            // 這個run 方法不會最終執行,僅僅只是為了觸發 proxy object 的 invoke方法,同時將目標的型別傳送給服務端,
            // 因為在代理物件的invoke的方法裡面沒有執行目標物件的方法

            // 通過網路, admin排程中心發起一次rpc請求executor執行器端的jetty server, 執行器端ExecutorBizImpl類執行run()方法
            runResult = executorBiz.run(triggerParam);

        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            runResult = new ReturnT<String>(ReturnT.FAIL_CODE, "" + e);
        }

        StringBuffer runResultSB = new StringBuffer("觸發排程:");
        runResultSB.append("<br>address:").append(address);
        runResultSB.append("<br>code:").append(runResult.getCode());
        runResultSB.append("<br>msg:").append(runResult.getMsg());

        runResult.setMsg(runResultSB.toString());
        runResult.setContent(address);
        return runResult;
    }

總結:

一,本章首先分析了job的排程過程, 觸發一次排程都經歷了什麼流程, 都幹了什麼事:

基於quartz框架在cron時間片到達後, 觸發一次排程, 從資料庫中獲取前臺頁面配置的job執行引數, 從而選擇排程的路由策略.

二 ,其次, 分析了排程策略都有哪些:

第一個、最後一個、輪詢、隨機、一致性HASH、最不經常使用、最近最久未使用、故障轉移、忙碌轉移等;

三, 接著分析了admin排程中心與executor執行器之間進行資料通訊的方式是什麼, 資料傳輸的形式:

admin排程中心和executor執行器通訊的方式是http+hessian的形式, admin排程中心這邊採用動態代理技術每觸發一次排程請求, 就會發起一次網路請求, 而executor執行器利用反射技術, 執行具體實現類的具體方法, 並返回執行結果.

下一篇將分析executor執行器端, 是如何接受admin排程中心的引數, 以及如何利用該引數, 使用反射技術呼叫具體實現類的具體方法返回結果的.