Elastic-Job原理--伺服器初始化、節點選舉與通知(二)
在上一篇部落格Elastic-Job原理--簡介與示例(一)中我們簡單的介紹了一下Elastic-Job提供的功能,這篇部落格我們通過分析Elastic-Job的原始碼,瞭解學習一下Elastic-Job的初始化、節點選舉、配置變更通知等相關的流程。
Elastic-Job依賴Zookeeper作為註冊中心,利用zk的功能完成節點選舉、分片和配置變更等相關的功能,接下來我們通過分析原始碼來了解一下Elastic-Job的節點選舉機制。
節點啟動註冊及選舉機制:
任務初始化
Elastic-Job在構造任務JobScheduler時會進行初始化後將任務名稱新增到zk的名稱空間中。
JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build(), jobEventConfig).init();
在init方法中會完成任務註冊和節點選舉操作。
public void init() { LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig); JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount()); JobScheduleController jobScheduleController = new JobScheduleController( createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName()); //註冊任務 JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter); //新增任務資訊並進行節點選舉 schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled()); jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron()); }
任務註冊:
呼叫JobRegistry的registerJob方法進行任務註冊
/** * 新增作業排程控制器. * * @param jobName 作業名稱 * @param jobScheduleController 作業排程控制器 * @param regCenter 註冊中心 */ public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) { schedulerMap.put(jobName, jobScheduleController); regCenterMap.put(jobName, regCenter); regCenter.addCacheData("/" + jobName); }
在registerJob方法中會呼叫ZookeeperRegistryCenter的addCacheData方法將任務名稱作為節點名稱寫到zk中
@Override
public void addCacheData(final String cachePath) {
TreeCache cache = new TreeCache(client, cachePath);
try {
//與zk建立連線並將cachePath寫到zk中
cache.start();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
caches.put(cachePath + "/", cache);
}
節點選舉
在init初始化方法中呼叫如下方法進行節點選舉等操作。
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
在registerStartUpInfo中做了如下操作:
/**
* 註冊作業啟動資訊.
*
* @param enabled 作業是否啟用
*/
public void registerStartUpInfo(final boolean enabled) {
//啟動所以的監聽器
listenerManager.startAllListeners();
//節點選舉
leaderService.electLeader();
//服務資訊持久化
serverService.persistOnline(enabled);
//例項資訊持久化
instanceService.persistOnline();
//重新分片
shardingService.setReshardingFlag();
//監控資訊監聽器
monitorService.listen();
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
}
(1)ListenerManager啟動所有監聽器
/**
* 開啟所有監聽器.
*/
public void startAllListeners() {
//主節點選舉監聽管理器.
electionListenerManager.start();
//分片監聽管理器.
shardingListenerManager.start();
//失效轉移監聽管理器.
failoverListenerManager.start();
//冪等性監聽管理器.
monitorExecutionListenerManager.start();
//執行例項關閉監聽管理器.
shutdownListenerManager.start();
//作業觸發監聽管理器.
triggerListenerManager.start();
//重排程監聽管理器.
rescheduleListenerManager.start();
//保證分散式任務全部開始和結束狀態監聽管理器.保證分散式任務全部開始和結束狀態監聽管理器.
guaranteeListenerManager.start();
//註冊連線狀態監聽器.
jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
}
(2)LeaderService類中節點選舉
在LeaderService方法中呼叫electLeader方法進行節點選舉,在路徑中寫入leader/election/latch,如果選舉成功在在leader/election/instance路徑中填寫伺服器資訊。
/**
* 選舉主節點.
*/
public void electLeader() {
log.debug("Elect a new leader now.");
//leader/election/latch
jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
log.debug("Leader election completed.");
}
在JobNodeStoreage中呼叫executeInLeader方法,使用路徑leader/election/latch,如果獲取這個路徑則呼叫LeaderExecutionCallback回撥函式,執行execute方法。
/**
* 在主節點執行操作.
*
* @param latchNode 分散式鎖使用的作業節點名稱
* @param callback 執行操作的回撥
*/
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
handleException(ex);
}
}
在LeaderExecutionCallback的execute方法中會判斷是否選舉為主節點,如果選舉為主節點則將伺服器資訊新增到leader/election/instace路徑中
@RequiredArgsConstructor
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
if (!hasLeader()) {
//將伺服器資訊新增到leader/election/instance節點中
jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
}
}
}
/**
* 判斷是否已經有主節點.
*
* @return 是否已經有主節點
*/
public boolean hasLeader() {
return jobNodeStorage.isJobNodeExisted(LeaderNode.INSTANCE);
}
經過以上處理就完成了主節點選舉操作。
(3)將服務資訊新增到zk中
在ServerService方法中呼叫persistOnline將伺服器資訊新增到zk中,
/**
* 持久化作業伺服器上線資訊.
*
* @param enabled 作業是否啟用
*/
public void persistOnline(final boolean enabled) {
if (!JobRegistry.getInstance().isShutdown(jobName)) {
jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ? "" : ServerStatus.DISABLED.name());
}
}
(4)將例項資訊新增到zk的instances節點中
在InstanceService中呼叫persistOnline方法將例項的資訊初始化到zk的instances節點中
/**
* 持久化作業執行例項上線相關資訊.
*/
public void persistOnline() {
jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), "");
}
(5)設定節點任務重新分片
在ShardingService中呼叫setReshardingFlag方法,在節點sharding寫建立necessary節點,通知主節點進行任務分片處理。
/**
* 設定需要重新分片的標記.
*/
public void setReshardingFlag() {
jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
}
變更通知
Elastic-Job使用zk節點資訊變化通知的機制,建立了很多監聽器Listener,實現了介面TreeCacheListener建立抽象類AbstractJobListener,抽象類AbstractJobListener有很多實現類,分別進行不同的業務處理。
主要有以下實現類:
(1)CompletedNodeRemovedJobListener:保證分散式任務全部開始和結束狀態監聽管理器.
(2)CronSettingAndJobEventChangedJobListener:重排程監聽管理器.
(3)FailoverSettingsChangedJobListener:失效轉移監聽管理器.
(4)InstanceShutdownStatusJobListener:執行例項關閉監聽管理器.
(5)JobCrashedJobListener:失效轉移監聽管理器.
(6)JobTriggerStatusJobListener:作業觸發監聽管理器.
(7)LeaderElectionJobListener:主節點選舉監聽管理器.
(8)LeaderAbdicationJobListener:主節點選舉監聽管理器.
(9)ListenServersChangedJobListener:分片監聽管理器.
(10)MonitorExecutionSettingsChangedJobListener:冪等性監聽管理器.
(11)ShardingTotalCountChangedJobListener:分片監聽管理器.
(12)StartedNodeRemovedJobListener:保證分散式任務全部開始和結束狀態監聽管理器.