1. 程式人生 > >elastic-job詳解(一):資料分片

elastic-job詳解(一):資料分片

資料分片的目的在於把一個任務分散到不同的機器上執行,既可以解決單機計算能力上限的問題,也能降低部分任務失敗對整體系統的影響。elastic-job並不直接提供資料處理的功能,框架只會將分片項分配至各個執行中的作業伺服器(其實是Job例項,部署在一臺機器上的多個Job例項也能分片),開發者需要自行處理分片項與真實資料的對應關係。框架也預置了一些分片策略:平均分配演算法策略,作業名雜湊值奇偶數演算法策略,輪轉分片策略。同時也提供了自定義分片策略的介面。

分片原理

elastic-job的分片是通過zookeeper來實現的。分片的分片由主節點分配,如下三種情況都會觸發主節點上的分片演算法執行:

  • 新的Job例項加入叢集
  • 現有的Job例項下線(如果下線的是leader節點,那麼先選舉然後觸發分片演算法的執行)
  • 主節點選舉

上述三種情況,會讓zookeeper上leader節點的sharding節點上多出來一個necessary的臨時節點,主節點每次執行Job前,都會去看一下這個節點,如果有則執行分片演算法。

image

分片的執行結果會儲存在zookeeper上,如下圖,5個分片,每個分片應該由哪個Job例項來執行都已經分配好。分配的過程就是上面觸發分片演算法之後的操作。分配完成之後,各個Job例項就會在下次執行的時候使用上這個分配結果。

image

每個job例項任務觸發前都會獲取本任務在本例項上的分片情況(直接和上圖zookeeper上instance節點比對某一個分片是否該有這個Job例項執行),然後封裝成shardingContext,傳遞給呼叫任務的實際執行方法:

/**
     * 執行作業.
     *
     * @param shardingContext 分片上下文
     */
void execute(ShardingContext shardingContext);

分片演算法

所有的分片策略都繼承JobShardingStrategy介面。根據當前註冊到ZK的例項列表和在客戶端配置的分片數量來進行資料分片。最終將每個Job例項應該獲得的分片數字返回出去。 方法簽名如下:

複製程式碼
/**
     * 作業分片.
     * 
     * @param jobInstances 所有參與分片的單元列表
     * @param
jobName 作業名稱 * @param shardingTotalCount 分片總數 * @return 分片結果 */ Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount);
複製程式碼

分片函式的觸發,只會在leader選舉的時候觸發,也就是說只會在剛啟動和leader節點離開的時候觸發,並且是在leader節點上觸發,而其他節點不會觸發。

1. 基於平均分配演算法的分片策略

基於平均分配演算法的分片策略對應的類是:AverageAllocationJobShardingStrategy。它是預設的分片策略。它的分片效果如下:

  • 如果有3個Job例項, 分成9片, 則每個Job例項分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8].
  • 如果有3個Job例項, 分成8片, 則每個Job例項分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5].
  • 如果有3個Job例項, 分成10片, 則個Job例項分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8].

2. 作業名的雜湊值奇偶數決定IP升降序演算法的分片策略

這個策略的對應的類是:OdevitySortByNameJobShardingStrategy,它內部其實也是使用AverageAllocationJobShardingStrategy實現,只是在傳入的節點例項順序不一樣,也就是上面介面引數的List<JobInstance>。AverageAllocationJobShardingStrategy的缺點是一旦分片數小於Job例項數,作業將永遠分配至IP地址靠前的Job例項上,導致IP地址靠後的Job例項空閒。而OdevitySortByNameJobShardingStrategy則可以根據作業名稱重新分配Job例項負載。如:

  • 如果有3個Job例項,分成2片,作業名稱的雜湊值為奇數,則每個Job例項分到的分片是:1=[0], 2=[1], 3=[]
  • 如果有3個Job例項,分成2片,作業名稱的雜湊值為偶數,則每個Job例項分到的分片是:3=[0], 2=[1], 1=[]

實現比較簡單:

long jobNameHash = jobName.hashCode();
if (0 == jobNameHash % 2) {
    Collections.reverse(jobInstances);
}
return averageAllocationJobShardingStrategy.sharding(jobInstances, jobName, shardingTotalCount);

3. 根據作業名的雜湊值對Job例項列表進行輪轉的分片策略

這個策略的對應的類是:RotateServerByNameJobShardingStrategy,和上面介紹的策略一樣,內部同樣是用AverageAllocationJobShardingStrategy實現,也是在傳入的List<JobInstance>列表順序上做文章。

4. 自定義分片策略

除了可以使用上述分片策略之外,elastic-job還允許自定義分片策略。我們可以自己實現JobShardingStrategy介面,並且配置到分片方法上去,整個過程比較簡單,下面僅僅列出通過配置spring來切換自定義的分片演算法的例子:

<job:simple id="MyShardingJob1" class="nick.test.elasticjob.MyShardingJob1" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="5" sharding-item-parameters="0=A,1=B,2=C,3=D,4=E" job-sharding-strategy-class="nick.test.elasticjob.MyJobShardingStrategy"/>