分散式任務排程平臺XXL-JOB--原始碼解析四:xxl-job-admin排程中心原始碼解析之job排程過程,排程策略以及rpc通訊
admin排程中心的對於job的排程過程, 排程策略以及rpc通訊
1.1 job排程過程
RemoteHttpJobBean類實現了QuartzJobBean, 當cron時間片到達時, 就會觸發一次quartz呼叫, 回撥executeInternal()方法, 而XxlJobTrigger.trigger(jobId)具體實現細節就是進行了一次網路請求.
// 當cron時間片到時, 就會呼叫RemoteHttpJobBean的executeInternal觸發一次網路請求給執行器 // @DisallowConcurrentExecution public class RemoteHttpJobBean extends QuartzJobBean { private static Logger logger = LoggerFactory.getLogger(RemoteHttpJobBean.class); // 1 當quartz cron執行週期到達時, 就回調executeInternal()方法 // 2 當點選"執行"時, 也會觸發executeInternal()回撥方法 @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { // load jobId JobKey jobKey = context.getTrigger().getJobKey(); // 從xxl-job-qrtz-job-detail表中獲取, jobname的值, 即為jobId Integer jobId = Integer.valueOf(jobKey.getName()); // 排程中心觸發, 發起一次rpc網路請求executor執行 // trigger XxlJobTrigger.trigger(jobId); } }
1.2 XxlJobTrigger.trigger()分析
該類的trigger()方法做的事情比較多, 所以程式碼量相對比較多一點, 咱們就來一行一行解析, 每段程式碼就做了什麼事情.
① XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); // 該jobid就是quartz框架觸發一次排程時生成的, 這個jobid指的就是具體的哪一個job被執行.所以這行程式碼根據該jobId從xxl-job-qrtz-trigger-info表中查詢本次job執行的一些引數, 這些引數就是在前臺頁面上新增任務時的一些引數, 如下圖:比如路由策略, 執行模式, 阻塞處理策略, cron表示式等, 儲存時, 就向該job儲存到了xxl-job-qrtz-trigger-info表, 並生成了一個jobId, 當cron時間片到達時, 就會傳遞該jobId, 觸發一次任務排程.
② XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup());// 根據配置, 獲取該型別的執行器資訊, 根據jobInfo中的job_group欄位值, 找到該型別執行器所屬的組
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(),
ExecutorBlockStrategyEnum.SERIAL_EXECUTION);// 根據配置, 匹配執行模式, 預設單機序列
ExecutorFailStrategyEnum failStrategy = ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(),
ExecutorFailStrategyEnum.FAIL_ALARM); //根據配置, 匹配失敗後的處理模式, 預設失敗告警
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(),// 根據配置, 設定路由的策略, 獲取路由策略, 必填, 沒有預設值, 根據配置獲取路由策略
ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();// 獲取該執行器的叢集機器列表
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum
&& CollectionUtils.isNotEmpty(addressList)) // 判斷是否需要驚醒分片廣播, 即使, 是否本次任務是否每一個executor執行器都執行一遍本次任務
這裡分為了兩種情況, 如果進行分片廣播的話, 就省去了選擇路由策略這一步, 否則下面就得需要根據前端的配置項, 選擇路由策略.
首先分析, 需要進行分片廣播的形式:
接著, 再分析不是分片廣播的情況:其實無非就是多了一個路由策略選擇的過程, 如下:
1.3 執行排程時的路由策略選擇分析
目前xxl-job支援, 第一個、最後一個、輪詢、隨機、一致性HASH、最不經常使用、最近最久未使用、故障轉移、忙碌轉移等這幾種路由策略, 以下就是排程時的路由策略類圖, 從圖中可以看出是一種典型的 策略模式
以上就是XxlJobTrigger類的trigger方法, 做的一些事情以及實現方式, 下面分析當執行分片排程時, 執行的runExecutor()方法, 和不執行分片策略, 而根據具體的路由策略執行的router.routeRun()方法時, 是如何完成rpc遠端通訊的.
其實router.routeRun()方法最終也是執行runExecutor()方法, 如下圖:
附上程式碼:
public static void trigger(int jobId) {
// 通過JobId從資料庫中查詢該任務的具體資訊
// load data
XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); // job info
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> xxl-job trigger fail, jobId invalid,jobId={}", jobId);
return;
}
logger.info("==>XxlJobTrigger.trigger jobInfo:{}", JSON.toJSONString(jobInfo));
// 獲取該型別的執行器資訊, 根據jobInfo中的job_group欄位值, 找到該型別執行器所屬的組
XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); // group info
logger.info("==>XxlJobTrigger.trigger group:{}", JSON.toJSONString(group));
// 匹配執行模式, 預設單機序列
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(),
ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block
// strategy
// 匹配失敗後的處理模式, 預設失敗告警
ExecutorFailStrategyEnum failStrategy = ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(),
ExecutorFailStrategyEnum.FAIL_ALARM); // fail
// strategy
// 設定路由的策略
// 獲取路由策略, 必填, 沒有預設值, 根據配置獲取路由策略
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(),
null); // route strategy
// 獲取該執行器的叢集機器列表
ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();
// broadcast -- 判斷路由策略, 而且執行器的機器列表不能為空 // 判斷路由策略 是否為 分片廣播模式
// 分片廣播 : for迴圈遍歷所有地址, 都發送一遍執行
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum
&& CollectionUtils.isNotEmpty(addressList)) {
for (int i = 0; i < addressList.size(); i++) {
String address = addressList.get(i);
// 1、save log-id
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、prepare trigger-info
// jobLog.setExecutorAddress(executorAddress);
jobLog.setGlueType(jobInfo.getGlueType());
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setTriggerTime(new Date());
ReturnT<String> triggerResult = new ReturnT<String>(null);
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append("註冊方式:").append((group.getAddressType() == 0) ? "自動註冊" : "手動錄入");
triggerMsgSb.append("<br>阻塞處理策略:").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>失敗處理策略:").append(failStrategy.getTitle());
triggerMsgSb.append("<br>地址列表:").append(group.getRegistryList());
triggerMsgSb.append("<br>路由策略:").append(executorRouteStrategyEnum.getTitle()).append("(" + i + "/"
+ addressList.size()
+ ")"); // update01
// 3、trigger-valid
if (triggerResult.getCode() == ReturnT.SUCCESS_CODE && CollectionUtils.isEmpty(addressList)) {
triggerResult.setCode(ReturnT.FAIL_CODE);
triggerMsgSb.append("<br>----------------------<br>").append("排程失敗:").append("執行器地址為空");
}
if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
// 4.1、trigger-param
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(i);
triggerParam.setBroadcastTotal(addressList.size()); // update02
// 4.2、trigger-run (route run / trigger remote executor)
// 利用動態代理, admin排程中心發起一次rpc請求executor執行器端, 根據引數執行一次cron job任務
// 並且返回執行結果
triggerResult = runExecutor(triggerParam, address); // update03
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>觸發排程<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
// 重試一次, 且失敗策略是配置的"失敗重試"
// 4.3、trigger (fail retry)
if (triggerResult.getCode() != ReturnT.SUCCESS_CODE
&& failStrategy == ExecutorFailStrategyEnum.FAIL_RETRY) {
triggerResult = runExecutor(triggerParam, address); // update04
triggerMsgSb.append("<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>失敗重試<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
}
}
// 5、save trigger-info
jobLog.setExecutorAddress(triggerResult.getContent());
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);
// 6、monitor triger
//
JobFailMonitorHelper.monitor(jobLog.getId());
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
return;
}
// 否則除分片模式外,其他的路由策略均走這裡, 比如First--第一個
// 每觸發一次job, 就記錄一次log表
// 1、save log-id
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
// 將jobLog資訊首先進行儲存
XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、prepare trigger-info
// jobLog.setExecutorAddress(executorAddress);
jobLog.setGlueType(jobInfo.getGlueType());
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setTriggerTime(new Date());
ReturnT<String> triggerResult = new ReturnT<String>(null);
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append("註冊方式:").append((group.getAddressType() == 0) ? "自動註冊" : "手動錄入");
triggerMsgSb.append("<br>阻塞處理策略:").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>失敗處理策略:").append(failStrategy.getTitle());
triggerMsgSb.append("<br>地址列表:").append(group.getRegistryList());
triggerMsgSb.append("<br>路由策略:").append(executorRouteStrategyEnum.getTitle());
// 3、trigger-valid
if (triggerResult.getCode() == ReturnT.SUCCESS_CODE && CollectionUtils.isEmpty(addressList)) {
triggerResult.setCode(ReturnT.FAIL_CODE);
triggerMsgSb.append("<br>----------------------<br>").append("排程失敗:").append("執行器地址為空");
}
if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
// 4.1、trigger-param
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(0);
triggerParam.setBroadcastTotal(1);
// 此處使用了策略模式, 根據不同的策略 使用不同的實現類,此處不再詳細說明
// 4.2、trigger-run (route run / trigger remote executor)
// 根據不同的路由模式, 解析addressList
// 根據前臺的配置從jobInfo獲取的策略, 並放入executorRouteStrategyEnum例項中, 呼叫相應的策略類
// 將下面程式碼拆分後, 便於debug除錯:
// triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
ExecutorRouter router = executorRouteStrategyEnum.getRouter();
// 利用動態代理, admin排程中心發起一次rpc請求executor執行器端, 根據引數執行一次cron job任務
// 並且返回執行結果
triggerResult = router.routeRun(triggerParam, addressList);
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>觸發排程<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
// 失敗重試一次, 而且失敗處理策略為"失敗重試"
// 4.3、trigger (fail retry)
if (triggerResult.getCode() != ReturnT.SUCCESS_CODE
&& failStrategy == ExecutorFailStrategyEnum.FAIL_RETRY) {
triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
triggerMsgSb.append("<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>失敗重試<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
}
}
// 5、save trigger-info
jobLog.setExecutorAddress(triggerResult.getContent());
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
// 更新操作, 將操作引數, 執行返回結果值寫入log表.
XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);
// 6、monitor triger
// 將logid寫入快取佇列, JobFailMonitorHelper開啟一個執行緒從快取中取資料, 根據該id從資料庫中查執行結果
JobFailMonitorHelper.monitor(jobLog.getId());
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
1.4 admin排程中心和executor執行器之間的rpc遠端通訊, 是如何進行資料互動的
①首先從快取中根據address為key獲取動態代理類例項,
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
jdk的方式實現動態代理:
所以當admin排程中心, 每觸發一次排程時, 最終呼叫executorBiz.run(triggerParam)方法, 其實底層就是利用了jdk的動態代理實現了一次rpc遠端呼叫, 並接收返回的執行結果.而executor執行器接受到admin排程中心傳遞過來的執行引數, 利用java的反射技術具體呼叫實現類的具體方法.下一篇將分析executor執行器端, 是如何接受admin排程中心的引數, 和利用該引數使用反射技術呼叫具體的實現類的.
附上程式碼:
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address) {
ReturnT<String> runResult = null;
try {
// 返回一個ExcutorBiz動態代理類物件,重點在這個方法裡面
// 建立執行器代理物件, 首先從executorBizRepository記憶體倉庫根據根據key=address地址, 獲取該executor執行器代理物件
// 首次從executorBizRepository記憶體倉庫中沒有該key=address的代理物件, 就建立一個該key=address的executor執行器代理物件
// 以ip地址作為key, 獲取executor執行器代理物件
// 當根據ip作為key, 從本地快取中獲取不到executor執行器代理物件時, 就建立一個ip地址的executor執行器代理物件
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
// 這個run 方法不會最終執行,僅僅只是為了觸發 proxy object 的 invoke方法,同時將目標的型別傳送給服務端,
// 因為在代理物件的invoke的方法裡面沒有執行目標物件的方法
// 通過網路, admin排程中心發起一次rpc請求executor執行器端的jetty server, 執行器端ExecutorBizImpl類執行run()方法
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(e.getMessage(), e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, "" + e);
}
StringBuffer runResultSB = new StringBuffer("觸發排程:");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
runResult.setContent(address);
return runResult;
}
總結:
一,本章首先分析了job的排程過程, 觸發一次排程都經歷了什麼流程, 都幹了什麼事:
基於quartz框架在cron時間片到達後, 觸發一次排程, 從資料庫中獲取前臺頁面配置的job執行引數, 從而選擇排程的路由策略.
二 ,其次, 分析了排程策略都有哪些:
第一個、最後一個、輪詢、隨機、一致性HASH、最不經常使用、最近最久未使用、故障轉移、忙碌轉移等;
三, 接著分析了admin排程中心與executor執行器之間進行資料通訊的方式是什麼, 資料傳輸的形式:
admin排程中心和executor執行器通訊的方式是http+hessian的形式, admin排程中心這邊採用動態代理技術每觸發一次排程請求, 就會發起一次網路請求, 而executor執行器利用反射技術, 執行具體實現類的具體方法, 並返回執行結果.
下一篇將分析executor執行器端, 是如何接受admin排程中心的引數, 以及如何利用該引數, 使用反射技術呼叫具體實現類的具體方法返回結果的.