1. 程式人生 > >【實戰Elastic-Job】--詳解當當網分散式作業框架

【實戰Elastic-Job】--詳解當當網分散式作業框架

         在做電商專案的時候,有很多地方需要作業來完成,通過對比在scheduler的地方用了當當網的分散式作業框架Elastic-Job而沒有選擇spring自帶的scheduler, elastic-job可以不依賴於Spring直接執行,但是也提供了自定義的名稱空間方便與Spring整合。下面小編帶領你詳細的瞭解一下Elastic-Job是什麼,怎麼用.一步步的學習ElastIc-Job的原理及應用.

什麼是作業?

作業即定時任務.無需做複雜的控制,在指定的時間執行指定的任務.

為什麼需要作業?

  時間驅動/事件驅動: 內部系統一般可以通過事件來驅動,但涉及到外部系統,則只能使用時間驅動.如:抓取外部系統價格.每小時抓取,由於是外部系統,不能像內部系統一樣傳送事件觸發事件.

批量處理/逐條處理:批量處理堆積的資料更加高效,在不需要實時性的情況下比訊息中介軟體更有優勢,而且有的業務邏輯只能批量處理.

系統內部/系統解耦:作業一般封裝在系統內部,而訊息中介軟體可用於系統間解耦.

Elastic-Job是什麼?

        elastic-job主要的設計理念是無中心化的分散式定時排程框架,思路來源於Quartz的基於資料庫的高可用方案。但資料庫沒有分散式協調功能,所以在高可用方案的基礎上增加了彈性擴容和資料分片的思路,以便於更大限度的利用分散式伺服器的資源。Elastic-Job是ddframe中dd-job的作業模組中分離出來的分散式彈性作業框架。去掉了和dd-job中的監控和ddframe接入規範部分。

怎麼用?

Elastic-Job常用的三種作業型別: SimpleJob, SequenceDataFlowJobThroughputDataFlowJob

SimpleJob型別作業:

SimpleJob是常用的實現方式,意為簡單實現.需要繼承AbstractSimpleElasticJob ,用於執行普通的任務. SimpleJob的任務測試類:
package test.scheduler.demojob;
import org.springframework.stereotype.Component;
import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext;
import com.dangdang.ddframe.job.plugin.job.type.simple.AbstractSimpleElasticJob;

@Component
public class SimpleJobDemo extends AbstractSimpleElasticJob {

	@Override
	public void process(final JobExecutionMultipleShardingContext context) {
		System.out.println("#######################");
	}
}

ThroughputDataFlow型別作業

ThroughputDataFlow型別作業意為高吞吐的資料流作業。需要繼承AbstractIndividualThroughputDataFlowElasticJob並可以指定返回值泛型,該類提供3個方法可覆蓋,分別用於抓取資料,處理資料和指定是否流式處理資料。可以獲取資料處理成功失敗次數等輔助監控資訊。如果流式處理資料,fetchData方法的返回值只有為null或長度為空時,作業才會停止執行,否則作業會一直執行下去;非流式處理資料則只會在每次作業執行過程中執行一次fetchData方法和processData方法,即完成本次作業。流式資料處理參照TbSchedule設計,適用於不間歇的資料處理。

ThroughputDataFlow的任務測試類:

package test.scheduler.demojob;

import java.util.ArrayList;
import java.util.List;
import org.springframework.stereotype.Component;
import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext;
import com.dangdang.ddframe.job.plugin.job.type.dataflow.AbstractIndividualThroughputDataFlowElasticJob;

@Component
public class ThroughputDataFlowJobDemo extends AbstractIndividualThroughputDataFlowElasticJob<String> {

	@Override
	public List<String> fetchData(final JobExecutionMultipleShardingContext context) {
		System.out.println("#####進入任務計劃#######");
		List<String> result = new ArrayList<String>();
		for (int i = 0; i < 10; i++) {
			result.add(String.valueOf(i));
		}
		return result;
	}

	@Override
	public boolean processData(JobExecutionMultipleShardingContext shardingContext, String data) {
		System.out.println(data);
		return true;
	}
}

SequenceDataFlowJob型別作業:
SequenceDataFlowJob和ThroughputDataFlowJob的區別是:獲取資料和處理資料使用相關的資料分配規則不同.由於ThroughputDataFlow作業可以使用多於分片項的任意執行緒數處理,所以效能調優的可能會優於SequenceDataFlow作業。
SequenceDataFlowJob的測試類:
package test.scheduler.demojob;
import java.util.List;
import org.springframework.stereotype.Component;
import com.dangdang.ddframe.job.api.JobExecutionSingleShardingContext;
import com.dangdang.ddframe.job.plugin.job.type.dataflow.AbstractBatchSequenceDataFlowElasticJob;
@Component
public class SequenceDataFlowJobDemo extends AbstractBatchSequenceDataFlowElasticJob<String> {

	@Override
	public List<String> fetchData(final JobExecutionSingleShardingContext context) {
		return null;
	}

	@Override
	public int processData(final JobExecutionSingleShardingContext context, final List<String> data) {
		return 1;
	}
}


如果使用spring管理的話,首先在applicationContext-scheduler的配置檔案中新增基本資訊:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job"
	xsi:schemaLocation="http://www.springframework.org/schema/beans 
                        http://www.springframework.org/schema/beans/spring-beans.xsd 
                        http://www.springframework.org/schema/context 
                        http://www.springframework.org/schema/context/spring-context.xsd 
                        http://www.dangdang.com/schema/ddframe/reg 
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd 
                        http://www.dangdang.com/schema/ddframe/job 
                        http://www.dangdang.com/schema/ddframe/job/job.xsd"
	default-lazy-init="true">
       <!--配置作業註冊中心-->
	<reg:zookeeper id="regCenter" server-lists="${JOB_URL}"
		namespace="${namespace}" base-sleep-time-milliseconds="1000"
		max-sleep-time-milliseconds="3000" max-retries="3" />
</beans>
裡面用到了zookeeper的東西,這篇部落格不做具體介紹,上面的部分都是基本的配置,新增需要執行的job節點
<!--配置作業A-->
<job:dataflow id="sequenceDataFlowJob"
		class="com.dangdang.example.elasticjob.spring.job.SequenceDataFlowJobDemo"
		registry-center-ref="regCenter" sharding-total-count="${sequenceDataFlowJob.shardingTotalCount}"
		cron="${sequenceDataFlowJob.cron}" sharding-item-parameters="${sequenceDataFlowJob.shardingItemParameters}"
		monitor-execution="${sequenceDataFlowJob.monitorExecution}" failover="${sequenceDataFlowJob.failover}"
		process-count-interval-seconds="${sequenceDataFlowJob.processCountIntervalSeconds}"
		max-time-diff-seconds="${sequenceDataFlowJob.maxTimeDiffSeconds}"
		description="${sequenceDataFlowJob.description}" disabled="${sequenceDataFlowJob.disabled}"
		overwrite="${sequenceDataFlowJob.overwrite}" />
節點屬性的含義: 1.id:作業名稱 2.class:作業實現類,需要實現ElasticJob介面,指令碼型作業不需要配置 3.registry-center-ref:註冊中心Bean的引用,需引用reg:zookeeper的宣告


        job節點的各個屬性可以放在scheduler.properties的配置檔案中,如果有多個作業,可以按照這個節奏繼續新增相應的任務.
sequenceDataFlowJob.cron=0/5 * * * * ?
sequenceDataFlowJob.shardingTotalCount=10
sequenceDataFlowJob.shardingItemParameters=0=A,1=B,2=C,3=D,4=E,5=F,6=G,7=H,8=I,9=J
sequenceDataFlowJob.maxTimeDiffSeconds=-1
sequenceDataFlowJob.monitorExecution=true
sequenceDataFlowJob.failover=true
sequenceDataFlowJob.processCountIntervalSeconds=10
sequenceDataFlowJob.description=\u6309\u987A\u5E8F\u4E0D\u505C\u6B62\u8FD0\u884C\u7684\u4F5C\u4E1A\u793A\u4F8B
sequenceDataFlowJob.disabled=false
sequenceDataFlowJob.overwrite=true

節點屬性的含義: 1.cron:cron表示式,用於配置作業觸發時間 2.shardingTotalCount:作業分片總數 3.shardingItemParameters:分片序列號和引數用等號分隔,多個鍵值對用逗號分隔分片序列號從0開始,不可大於或等於作業分片總數如:0=a,1=b,2=c 4.maxTimeDiffSeconds:最大允許的本機與註冊中心的時間誤差秒數如果時間誤差超過配置秒數則作業啟動時將拋異常配置為-1表示不校驗時間誤差 5.failover:是否開啟失效轉移僅monitorExecution開啟,失效轉移才有效 6.processCountIntervalSeconds:統計作業處理資料數量的間隔時間,單位:秒 7.description:作業描述資訊 8.disabled:作業是否禁止啟動,可用於部署作業時,先禁用啟動,部署結束後統一啟動 9.overwrite:本地配置是否可覆蓋註冊中心配置,如果可覆蓋,每次啟動作業都以本地配置為準.

為什麼不用spring自帶的定時器任務功能而選擇Elastic-Job:

這就需要了解一下Elastic-Job的功能了.

1. 主要功能

a) 分散式:重寫Quartz基於資料庫的分散式功能,改用Zookeeper實現註冊中心。

b) 並行排程:採用任務分片方式實現。將一個任務拆分為n個獨立的任務項,由分散式的伺服器並行執行各自分配到的分片項。

c) 彈性擴容縮容:將任務拆分為n個任務項後,各個伺服器分別執行各自分配到的任務項。一旦有新的伺服器加入叢集,或現有伺服器下線,elastic-job將在保留本次任務執行不變的情況下,下次任務開始前觸發任務重分片。

d) 集中管理:採用基於Zookeeper的註冊中心,集中管理和協調分散式作業的狀態,分配和監聽。外部系統可直接根據Zookeeper的資料管理和監控elastic-job。

e) 定製化流程型任務:作業可分為簡單和資料流處理兩種模式,資料流又分為高吞吐處理模式和順序性處理模式,其中高吞吐處理模式可以開啟足夠多的執行緒快速的處理資料,而順序性處理模式將每個分片項分配到一個獨立執行緒,用於保證同一分片的順序性,這點類似於kafka的分割槽順序性。

2. 其他功能

a) 失效轉移:彈性擴容縮容在下次作業執行前重分片,但本次作業執行的過程中,下線的伺服器所分配的作業將不會重新被分配。失效轉移功能可以在本次作業執行中用空閒伺服器抓取孤兒作業分片執行。同樣失效轉移功能也會犧牲部分效能。

b) Spring名稱空間支援:elastic-job可以不依賴於spring直接執行,但是也提供了自定義的名稱空間方便與spring整合。

c) 運維平臺:提供web控制檯用於管理作業。

3. 非功能需求

a) 穩定性:在伺服器無波動的情況下,並不會重新分片;即使伺服器有波動,下次分片的結果也會根據伺服器IP和作業名稱雜湊值算出穩定的分片順序,儘量不做大的變動。

b) 高效能:同一伺服器的批量資料處理採用自動切割並多執行緒並行處理。

c) 靈活性:所有在功能和效能之間的權衡,都可通過配置開啟/關閉。如:elastic-job會將作業執行狀態的必要資訊更新到註冊中心。如果作業執行頻度很高,會造成大量Zookeeper寫操作,而分散式Zookeeper同步資料可能引起網路風暴。因此為了考慮效能問題,可以犧牲一些功能,而換取效能的提升。

d) 冪等性:elastic-job可犧牲部分效能用以保證同一分片項不會同時在兩個伺服器上執行。

e) 容錯性:作業伺服器和Zookeeper斷開連線則立即停止作業執行,用於防止分片已經重新分配,而腦裂的伺服器仍在繼續執行,導致重複執行。

訊息中介軟體和作業的區別:

訊息中介軟體也可以做到實時處理資料, 兩者確有相似之處。可互相替換的場景,如隊列表。將待處理的資料放入隊列表,然後使用頻率極短的定時任務拉取隊列表的資料並處理。這種情況使用訊息中介軟體的推送模式可更好的處理實時性資料。而且基於資料庫的訊息儲存吞吐量遠遠小於基於檔案的順序追加訊息儲存。