1. 程式人生 > >Elastic-Job原理--伺服器初始化、節點選舉與通知(二)

Elastic-Job原理--伺服器初始化、節點選舉與通知(二)

       在上一篇部落格Elastic-Job原理--簡介與示例(一)中我們簡單的介紹了一下Elastic-Job提供的功能,這篇部落格我們通過分析Elastic-Job的原始碼,瞭解學習一下Elastic-Job的初始化、節點選舉、配置變更通知等相關的流程。

     Elastic-Job依賴Zookeeper作為註冊中心,利用zk的功能完成節點選舉、分片和配置變更等相關的功能,接下來我們通過分析原始碼來了解一下Elastic-Job的節點選舉機制。

節點啟動註冊及選舉機制:

作業啟動

任務初始化

Elastic-Job在構造任務JobScheduler時會進行初始化後將任務名稱新增到zk的名稱空間中。

JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build(), jobEventConfig).init();

在init方法中會完成任務註冊和節點選舉操作。

public void init() {
        LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
        //註冊任務
        JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
        //新增任務資訊並進行節點選舉
        schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
        jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    }

任務註冊:

呼叫JobRegistry的registerJob方法進行任務註冊

   /**
     * 新增作業排程控制器.
     * 
     * @param jobName 作業名稱
     * @param jobScheduleController 作業排程控制器
     * @param regCenter 註冊中心
     */
    public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
        schedulerMap.put(jobName, jobScheduleController);
        regCenterMap.put(jobName, regCenter);
        regCenter.addCacheData("/" + jobName);
    }

在registerJob方法中會呼叫ZookeeperRegistryCenter的addCacheData方法將任務名稱作為節點名稱寫到zk中

    @Override
    public void addCacheData(final String cachePath) {
        TreeCache cache = new TreeCache(client, cachePath);
        try {
            //與zk建立連線並將cachePath寫到zk中
            cache.start();
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
        }
        caches.put(cachePath + "/", cache);
    }

節點選舉

在init初始化方法中呼叫如下方法進行節點選舉等操作。

schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());

在registerStartUpInfo中做了如下操作:

    /**
     * 註冊作業啟動資訊.
     * 
     * @param enabled 作業是否啟用
     */
    public void registerStartUpInfo(final boolean enabled) {
        //啟動所以的監聽器
        listenerManager.startAllListeners();
        //節點選舉
        leaderService.electLeader();
        //服務資訊持久化
        serverService.persistOnline(enabled);
        //例項資訊持久化
        instanceService.persistOnline();
        //重新分片
        shardingService.setReshardingFlag();
        //監控資訊監聽器
        monitorService.listen();
        if (!reconcileService.isRunning()) {
            reconcileService.startAsync();
        }
    }

(1)ListenerManager啟動所有監聽器

     /**
     * 開啟所有監聽器.
     */
    public void startAllListeners() {
    	//主節點選舉監聽管理器.
        electionListenerManager.start();
        //分片監聽管理器.
        shardingListenerManager.start();
        //失效轉移監聽管理器.
        failoverListenerManager.start();
        //冪等性監聽管理器.
        monitorExecutionListenerManager.start();
        //執行例項關閉監聽管理器.
        shutdownListenerManager.start();
        //作業觸發監聽管理器.
        triggerListenerManager.start();
        //重排程監聽管理器.
        rescheduleListenerManager.start();
        //保證分散式任務全部開始和結束狀態監聽管理器.保證分散式任務全部開始和結束狀態監聽管理器.
        guaranteeListenerManager.start();
        //註冊連線狀態監聽器.
        jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
    }

(2)LeaderService類中節點選舉

在LeaderService方法中呼叫electLeader方法進行節點選舉,在路徑中寫入leader/election/latch,如果選舉成功在在leader/election/instance路徑中填寫伺服器資訊。

    /**
     * 選舉主節點.
     */
    public void electLeader() {
        log.debug("Elect a new leader now.");
        //leader/election/latch
        jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
        log.debug("Leader election completed.");
    }

在JobNodeStoreage中呼叫executeInLeader方法,使用路徑leader/election/latch,如果獲取這個路徑則呼叫LeaderExecutionCallback回撥函式,執行execute方法。

     /**
     * 在主節點執行操作.
     * 
     * @param latchNode 分散式鎖使用的作業節點名稱
     * @param callback 執行操作的回撥
     */
    public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
        try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
            latch.start();
            latch.await();
            callback.execute();
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            handleException(ex);
        }
    }

在LeaderExecutionCallback的execute方法中會判斷是否選舉為主節點,如果選舉為主節點則將伺服器資訊新增到leader/election/instace路徑中

    @RequiredArgsConstructor
    class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
        
        @Override
        public void execute() {
            if (!hasLeader()) {
                //將伺服器資訊新增到leader/election/instance節點中
                jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
            }
        }
    }

    /**
     * 判斷是否已經有主節點.
     * 
     * @return 是否已經有主節點
     */
    public boolean hasLeader() {
        return jobNodeStorage.isJobNodeExisted(LeaderNode.INSTANCE);
    }

經過以上處理就完成了主節點選舉操作。

(3)將服務資訊新增到zk中

在ServerService方法中呼叫persistOnline將伺服器資訊新增到zk中,

    /**
     * 持久化作業伺服器上線資訊.
     * 
     * @param enabled 作業是否啟用
     */
    public void persistOnline(final boolean enabled) {
        if (!JobRegistry.getInstance().isShutdown(jobName)) {
            jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ? "" : ServerStatus.DISABLED.name());
        }
    }

(4)將例項資訊新增到zk的instances節點中

在InstanceService中呼叫persistOnline方法將例項的資訊初始化到zk的instances節點中

    /**
     * 持久化作業執行例項上線相關資訊.
     */
    public void persistOnline() {
        jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), "");
    }

(5)設定節點任務重新分片

在ShardingService中呼叫setReshardingFlag方法,在節點sharding寫建立necessary節點,通知主節點進行任務分片處理。

    /**
     * 設定需要重新分片的標記.
     */
    public void setReshardingFlag() {
        jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
    }

變更通知

Elastic-Job使用zk節點資訊變化通知的機制,建立了很多監聽器Listener,實現了介面TreeCacheListener建立抽象類AbstractJobListener,抽象類AbstractJobListener有很多實現類,分別進行不同的業務處理。

主要有以下實現類:

(1)CompletedNodeRemovedJobListener:保證分散式任務全部開始和結束狀態監聽管理器.

(2)CronSettingAndJobEventChangedJobListener:重排程監聽管理器.

(3)FailoverSettingsChangedJobListener:失效轉移監聽管理器.

(4)InstanceShutdownStatusJobListener:執行例項關閉監聽管理器.

(5)JobCrashedJobListener:失效轉移監聽管理器.

(6)JobTriggerStatusJobListener:作業觸發監聽管理器.

(7)LeaderElectionJobListener:主節點選舉監聽管理器.

(8)LeaderAbdicationJobListener:主節點選舉監聽管理器.

(9)ListenServersChangedJobListener:分片監聽管理器.

(10)MonitorExecutionSettingsChangedJobListener:冪等性監聽管理器.

(11)ShardingTotalCountChangedJobListener:分片監聽管理器.

(12)StartedNodeRemovedJobListener:保證分散式任務全部開始和結束狀態監聽管理器.