1. 程式人生 > >Elastic-Job原理--任務失敗轉移(五)

Elastic-Job原理--任務失敗轉移(五)

        在上一篇部落格Elastic-Job原理--任務排程處理(四)我們已經瞭解到Elastic-Job依賴quartz定時任務執行分片任務的過程,這篇部落格我們簡單瞭解一下Elastic-Job中當某個伺服器節點與註冊中心斷開連線(無法進行任務執行)時其需要執行的任務轉移到其他節點的過程。

首先提供如下類圖,與節點任務失敗轉移相關主要類如下:

FailoverService,作業失效轉移服務。
FailoverNode,作業失效轉移資料儲存路徑。
FailoverListenerManager,作業失效轉移監聽管理器。

一、重新分片

        當伺服器節點從註冊中心zk斷開連線時,Elastic-job需要做的一件事情是需要在下次任務執行前進行重新分片,當zk節點數目發生變更時,會引發ListenServersChangedJobListener監聽器呼叫,此監聽器會呼叫shardingService的重新分片標誌設定方法,這樣再下次任務執行前會重新進行任務分片操作。

/**
 * 當例項節點變更時會呼叫此監聽器
 *
 */
class ListenServersChangedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            //如果節點數目發生變更則設定重新分片標誌,下次任務執行前會進行重新分片
            if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
                shardingService.setReshardingFlag();
            }
        }
        
        private boolean isInstanceChange(final Type eventType, final String path) {
            return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
        }
        
        private boolean isServerChange(final String path) {
            return serverNode.isServerPath(path);
        }
    }

任務重新分片只是解決了下次任務執行時,所有的分片任務都是分佈到各個例項中,但是當前失效的任務是如何處理的。

二、任務失效轉移

     所謂失效轉移,就是在執行任務的過程中遇見異常的情況,這個分片任務可以在其他節點再次執行。這個和上面的HA不同,對於HA,上面如果任務終止,那麼不會在其他任務例項上再次重新執行。Job的失效轉移監聽來源於FailoverListenerManager中JobCrashedJobListener的dataChanged方法。FailoverListenerManager監聽的是zk的instance節點刪除事件。如果任務配置了failover等於true,其中某個instance與zk失去聯絡或被刪除,並且失效的節點又不是本身,就會觸發失效轉移邏輯。首先,在某個任務例項elastic-job會在leader節點下面建立failover節點以及items節點。items節點下會有失效任務例項的原本應該做的分片好。比如,失效的任務例項原來負責分片1和2。那麼items節點下就會有名字叫1的子節點,就代表分片1需要轉移到其他節點上去執行。如下圖:

當節點任務失效時會呼叫JobCrashedJobListener監聽器,此監聽器會根據例項id獲取所有的分片,然後呼叫FailoverService的setCrashedFailoverFlag方法,將每個分片id寫到/jobName/leader/failover/items下

/**
 * 任務失效時會呼叫這個監聽器
 */
class JobCrashedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
                String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
                if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
                    return;
                }
                //會將所有的分片初始化到註冊中心中
                List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
                if (!failoverItems.isEmpty()) {
                    for (int each : failoverItems) {
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                } else {
                    for (int each : shardingService.getShardingItems(jobInstanceId)) {
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                }
            }
        }
    }

在FailoverService方法中呼叫setCrashedFailoverFlag方法將需要任務轉移的分片id進行例項化。

    /**
     * 設定失效的分片項標記.
     * 
     * @param item 崩潰的作業項
     */
    public void setCrashedFailoverFlag(final int item) {
        if (!isFailoverAssigned(item)) {
            jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
        }
    }

然後接下來呼叫FailoverService的failoverIfNessary方法,首先判斷是否需要失敗轉移,如果可以需要則只需作業失敗轉移。

   /**
     * 如果需要失效轉移, 則執行作業失效轉移.
     */
    public void failoverIfNecessary() {
        if (needFailover()) {
            jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
        }
    }

在needFailover方法會對是否需要失效轉移進行判斷

private boolean needFailover() {
         // `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 有失效轉移的作業分片項
        return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
                // 當前作業不在執行中
                && !JobRegistry.getInstance().isJobRunning(jobName);
    }

條件一:${JOB_NAME}/leader/failover/items/${ITEM_ID} 有失效轉移的作業分片項。
條件二:當前作業不在執行中。此條件即是上文提交的作業節點空閒的定義。失效轉移: 執行中的作業伺服器崩潰不會導致重新分片,只會在下次作業啟動時分片。啟用失效轉移功能可以在本次作業執行過程中,監測其他作業伺服器【空閒】,抓取未完成的孤兒分片項執行

在FailoverLeaderExecutionCallback中回撥邏輯如下:

(1)也會首先判斷是否需要失效轉移,

(2)從註冊中心獲得一個 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作業分片項,

(3)在註冊中心節點`${JOB_NAME}/sharding/${ITEM_ID}/failover` 作業分片項 為 當前作業節點,

(4)然後移除任務轉移分片項,

(5)最後呼叫執行,提交任務

class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
   
   @Override
   public void execute() {
       // 判斷需要失效轉移
       if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
           return;
       }
       // 獲得一個 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作業分片項
       int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
       log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
       // 設定這個 `${JOB_NAME}/sharding/${ITEM_ID}/failover` 作業分片項 為 當前作業節點
       jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
       // 移除這個 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作業分片項
       jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
       // TODO 不應使用triggerJob, 而是使用executor統一排程 疑問:為什麼要用executor統一,後面研究下
       // 觸發作業執行
       JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
       if (null != jobScheduleController) {
           jobScheduleController.triggerJob();
       }
   }
}

呼叫 JobScheduleController#triggerJob() 方法,立即啟動作業。呼叫該方法,實際作業不會立即執行,而僅僅是進行觸發。如果有多個失效轉移的作業分片項,多次呼叫 JobScheduleController#triggerJob() 方法會不會導致作業是並行執行的?答案是不會,因為一個作業的 Quartz 執行緒數設定為 1。

同時可以結合部落格Elastic-Job原理--任務排程處理(四)任務排程處理流程中,LiteJobFacade有獲取分片操作的函式。

    @Override
    public ShardingContexts getShardingContexts() {
        //是否失敗轉移
        boolean isFailover = configService.load(true).isFailover();
        if (isFailover) {
            //獲取失敗分片
            List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
            if (!failoverShardingItems.isEmpty()) {
                //執行失敗分片任務
                return executionContextService.getJobShardingContext(failoverShardingItems);
            }
        }
        //重新分片
        shardingService.shardingIfNecessary();
        List<Integer> shardingItems = shardingService.getLocalShardingItems();
        //移除已經失敗轉移執行的任務
        if (isFailover) {
            shardingItems.removeAll(failoverService.getLocalTakeOffItems());
        }
        shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
        return executionContextService.getJobShardingContext(shardingItems);
    }

在getShardingContexts中有專門獲取所有的要失敗轉移需要執行的分片。