1. 程式人生 > >原始碼分析ElasticJob任務錯過機制(misfire)與冪等性

原始碼分析ElasticJob任務錯過機制(misfire)與冪等性

任務在排程執行中,由於某種原因未執行完畢,下一次排程任務觸發後,在同一個Job例項中,會出現兩個執行緒處理同一個分片上的資料,這樣就會造成兩個執行緒可能處理到相同的資料。為了避免同一條資料可能會被多次執行的問題,ElasticJob引入冪等機制,確保同一條資料不會再被多個Job同時處理,也避免同一條資料在同一個Job例項的多個執行緒處理。再重申一次ElastciJob的分散式是資料的分散式,一個任務在多個Job例項上執行,每個Job例項處理該Job的部分資料(資料分片)。
本文重點分析ElasticJob是如何做到如下兩點的。
1)ElasticJob如何確保在同一個Job例項中多個執行緒不會處理相同的資料。
2)ElasticJob如何確保資料不會被多個Job例項處理。
為了解決上述這種情況,ElasticJob引入任務錯過補償執行(misfire)與冪等機制(monitorExecution)

1、ElasticJob如何確保在同一個Job例項中多個執行緒不會處理相同的資料。
場景:例如任務排程週期為每5s執行一次,正常每次排程任務處理需要耗時2s,如果在某一段時間由於資料庫壓力變大,導致原本只需要2s就能處理完成的任務,現在需要16s才能執行,在這個資料處理的過程中,每5s又會觸發一次排程(任務處理),如果不加以控制的話,在同一個例項上根據分片條件去查詢資料庫,查詢到的資料有可能相同(部分相同),這樣同一條任務資料將被多次執行,如果這個任務時處理轉賬業務,如果在業務方法不實現冪等,則會引發非常嚴重的問題,那ElasticJob是否可以避免這個問題呢?
答案是肯定。elasticJob提供了一個配置引數:monitorExecution=true,開啟冪等性。
一個任務觸發後,將執行任務處理邏輯,其入口:AbstractElasticJobExecutor#misfireIfRunning

if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {  // @1
       if (shardingContexts.isAllowSendJobEvent()) {  // @2
             jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                    "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed."
, jobName, shardingContexts.getShardingItemParameters().keySet())); } return; }

程式碼@1:在一個排程任務觸發後如果上一次任務還未執行,則需要設定該分片狀態為mirefire,表示錯失了一次任務執行。
程式碼@2:如果該分片被設定為mirefire並開啟了事件跟蹤,將事件跟蹤儲存在資料庫中。
接下來詳細分析JobFacade.misfireIfRunning的實現邏輯:

/**
     * 如果當前分片項仍在執行則設定任務被錯過執行的標記.
     * 
     * @param items 需要設定錯過執行的任務分片項
     * @return 是否錯過本次執行
     */
    public boolean misfireIfHasRunningItems(final Collection<Integer> items) {
        if (!hasRunningItems(items)) {
            return false;
        }
        setMisfire(items);
        return true;
    }

如果存在未完成的分片,則呼叫setMisfire(items)方法,ElasticJob在開啟monitorExecution(true)【冪等機制】機制的情況下,在分片任務開始時會建立${namespace}/jobname/sharding/{item}/running節點,在任務結束後會刪除該目錄,所以在判斷是否有分片正在執行時,只需判斷是否存在上述節點即可。如果存在,呼叫setMisfire方法。
PS:如果ElasticJob為開啟冪等(monitorExecution)的情況下,才會建立\${namespace}/jobname/sharding
/{item}/running,misfire機制才能生效。
ExecutionService#setMisfire

/**
     * 設定任務被錯過執行的標記.
     *
     * @param items 需要設定錯過執行的任務分片項
     */
    public void setMisfire(final Collection<Integer> items) {
        for (int each : items) {
            jobNodeStorage.createJobNodeIfNeeded(ShardingNode.getMisfireNode(each));
        }
    }

設定misfire的方法為分配給該例項下的所有分片建立持久節點${namespace}/jobname/shading/{item}/misfire節點,注意,只要分配給該例項的任何一分片未執行完畢,則在該例項下的所有分片都增加misfire節點,然後忽略本次任務觸發執行,等待任務結束後再執行。
AbstractElasticJobExecutor#execute

execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
     while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
         jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
        execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}

在任務執行完成後檢查是否存在${namespace}/jobname/sharding/{item}/misfire節點,如果存在,則首先清除misfie相關的檔案,然後執行任務。
ElasticJob的misfire實現方案總結:
在下一個排程週期到達之後,只要發現這個分片的任何一個分片正在執行,則為該例項分片的所有分片都設定為misfire,等任務執行完畢後,再統一執行下一次任務排程。

2、ElasticJob如何確保資料不會被多個Job例項處理
ElasticJob基於資料分片,不同分片根據分片引數(人為配置),從資料庫中查詢各自資料(任務資料分片),如果當節點宕機,資料會重新分片,如果任務未執行完成,然後執行分片,資料是否會被不同的任務同時處理呢?
答案是不會,因為當節點宕機後,是否需要重新分片事件監聽器會監聽到Job例項代表的節點刪除,設定重新分片,在任務被排程執行具體處理邏輯之前,需要重新分片,重新分片的前提又是要所有的分片的任務全部執行完畢,這也依賴是否開啟冪等控制(monitorExecution),如果開啟,ElasticJob能感知正在執行處理邏輯的分片,重新分片需要等待當前所有任務全部執行完畢後才會觸發,故不會存在不同節點處理相同資料的問題。

問答:
1、如果一個任務JOB的排程頻率為每10s一次,在某個時間,該job執行耗時用了33s(平時只需執行5s),按照正常排程,應該後續會觸發3次排程,那該job後執行完,會連續執行3次排程嗎?
答案:在33s這次任務執行完成後,如果後面的任務執行在10s內執行完畢的話,只會觸發一次,不會補償3次,因為ElasticJob記錄任務錯失執行,只是建立了misfire節點,並不會記錄錯失的此時,因為也沒這個必要。