1. 程式人生 > >elastic-job詳解(一):數據分片

elastic-job詳解(一):數據分片

count 任務 不同的 應該 center shc 偶數 int ext

數據分片的目的在於把一個任務分散到不同的機器上運行,既可以解決單機計算能力上限的問題,也能降低部分任務失敗對整體系統的影響。elastic-job並不直接提供數據處理的功能,框架只會將分片項分配至各個運行中的作業服務器(其實是Job實例,部署在一臺機器上的多個Job實例也能分片),開發者需要自行處理分片項與真實數據的對應關系。框架也預置了一些分片策略:平均分配算法策略,作業名哈希值奇偶數算法策略,輪轉分片策略。同時也提供了自定義分片策略的接口。

分片原理

elastic-job的分片是通過zookeeper來實現的。分片的分片由主節點分配,如下兩種情況都會觸發主節點上的分片算法執行:

  • 新的Job實例加入集群
  • 現有的Job實例下線(如果下線的是leader節點,那麽先選舉然後觸發分片算法的執行)

上述兩種情況,會讓zookeeper上leader節點的sharding節點上多出來一個necessary的臨時節點,主節點每次執行Job前,都會去看一下這個節點,如果有則執行分片算法。

技術分享

分片的執行結果會存儲在zookeeper上,如下圖,5個分片,每個分片應該由哪個Job實例來運行都已經分配好。分配的過程就是上面觸發分片算法之後的操作。分配完成之後,各個Job實例就會在下次執行的時候使用上這個分配結果。

技術分享

每個job實例任務觸發前都會獲取本任務在本實例上的分片情況(直接和上圖zookeeper上instance節點比對某一個分片是否該有這個Job實例執行),然後封裝成shardingContext,傳遞給調用任務的實際執行方法:

/**
     * 執行作業.
     *
     * @param shardingContext 分片上下文
     */
void execute(ShardingContext shardingContext);

分片算法

所有的分片策略都繼承JobShardingStrategy接口。根據當前註冊到ZK的實例列表和在客戶端配置的分片數量來進行數據分片。最終將每個Job實例應該獲得的分片數字返回出去。 方法簽名如下:

/**
     * 作業分片.
     * 
     * @param jobInstances 所有參與分片的單元列表
     * 
@param jobName 作業名稱 * @param shardingTotalCount 分片總數 * @return 分片結果 */ Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount);

分片函數的觸發,只會在leader選舉的時候觸發,也就是說只會在剛啟動和leader節點離開的時候觸發,並且是在leader節點上觸發,而其他節點不會觸發。

1. 基於平均分配算法的分片策略

基於平均分配算法的分片策略對應的類是:AverageAllocationJobShardingStrategy。它是默認的分片策略。它的分片效果如下:

  • 如果有3個Job實例, 分成9片, 則每個Job實例分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8].
  • 如果有3個Job實例, 分成8片, 則每個Job實例分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5].
  • 如果有3個Job實例, 分成10片, 則個Job實例分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8].

2. 作業名的哈希值奇偶數決定IP升降序算法的分片策略

這個策略的對應的類是:OdevitySortByNameJobShardingStrategy,它內部其實也是使用AverageAllocationJobShardingStrategy實現,只是在傳入的節點實例順序不一樣,也就是上面接口參數的List<JobInstance>。AverageAllocationJobShardingStrategy的缺點是一旦分片數小於Job實例數,作業將永遠分配至IP地址靠前的Job實例上,導致IP地址靠後的Job實例空閑。而OdevitySortByNameJobShardingStrategy則可以根據作業名稱重新分配Job實例負載。如:

  • 如果有3個Job實例,分成2片,作業名稱的哈希值為奇數,則每個Job實例分到的分片是:1=[0], 2=[1], 3=[]
  • 如果有3個Job實例,分成2片,作業名稱的哈希值為偶數,則每個Job實例分到的分片是:3=[0], 2=[1], 1=[]

實現比較簡單:

long jobNameHash = jobName.hashCode();
if (0 == jobNameHash % 2) {
    Collections.reverse(jobInstances);
}
return averageAllocationJobShardingStrategy.sharding(jobInstances, jobName, shardingTotalCount);

3. 根據作業名的哈希值對Job實例列表進行輪轉的分片策略

這個策略的對應的類是:RotateServerByNameJobShardingStrategy,和上面介紹的策略一樣,內部同樣是用AverageAllocationJobShardingStrategy實現,也是在傳入的List<JobInstance>列表順序上做文章。

4. 自定義分片策略

除了可以使用上述分片策略之外,elastic-job還允許自定義分片策略。我們可以自己實現JobShardingStrategy接口,並且配置到分片方法上去,整個過程比較簡單,下面僅僅列出通過配置spring來切換自定義的分片算法的例子:

<job:simple id="MyShardingJob1" class="nick.test.elasticjob.MyShardingJob1" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="5" sharding-item-parameters="0=A,1=B,2=C,3=D,4=E" job-sharding-strategy-class="nick.test.elasticjob.MyJobShardingStrategy"/>

elastic-job詳解(一):數據分片