elastic-job2 專案接入(spring,maven)
阿新 • • 發佈:2019-01-11
最近系統在使用esjob進行定時任務管理,現將接入過程分享給大家:
引入依賴
新建spring-mvc-servlet.xml<dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.2</version> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.2</version> </dependency>
簡單任務
指定分片數後,當分片數大於機器數量的時候,每臺機器分配到的片數會是平均的,例如:第一片是從0開始的,比如總共分6片,有兩臺機器,則第一臺機器會分得0,1,2三片,而第二臺機器會分得3,4,5三片;當有機器宕機了或者有新機器加入的時候都會觸發重新分片。如果有多臺機器,而分片總數是1的時候即相當於1主多從的配置。sharding-item-parameters用於指定與分片對應的別名。sharding-item-parameters="0=A,1=B,2=C,3=D,4=E"job-sharding-strategy-class:可以通過它來指定作業分片策略,可選策略可參考官方文件http://elasticjob.io/docs/elastic-job-lite/02-guide/config-manual/。編寫任務
package com.el.test.job; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.simple.SimpleJob; import com.el.test.model.Name; import com.el.test.service.NameServiceBean; import org.springframework.beans.factory.annotation.Autowired; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 簡單作業: * 指定分片數後,當分片數大於機器數量的時候,每臺機器分配到的片數會是平均的, * 例如:第一片是從0開始的,比如總共分6片,有兩臺機器, * 則第一臺機器會分得0,1,2三片,而第二臺機器會分得3,4,5三片; * 當有機器宕機了或者有新機器加入的時候都會觸發重新分片。 * 如果有多臺機器,而分片總數是1的時候即相當於1主多從的配置。 * sharding-item-parameters用於指定與分片對應的別名。sharding-item-parameters="0=A,1=B,2=C,3=D,4=E" * job-sharding-strategy-class:可以通過它來指定作業分片策略, * 可選策略可參考官方文件http://elasticjob.io/docs/elastic-job-lite/02-guide/config-manual/。 */ public class TestJob implements SimpleJob { @Autowired private NameServiceBean nameServiceBean; /** * 具體執行邏輯,包含根據分片資訊獲取資料與業務邏輯處理 * @param shardingContext 分片資訊 */ @Override public void execute(ShardingContext shardingContext) { System.out.println("SpringSimpleJob 簡單任務-------------任務名:"+shardingContext.getJobName()+"\n" +",---ShardingParameter:"+shardingContext.getShardingParameter()+"\n" +",----TaskId:"+shardingContext.getTaskId()+"\n" +",----JobParameter:"+shardingContext.getJobParameter()+"\n" +",----tShardingItem:"+shardingContext.getShardingItem()+"\n" +",----ShardingTotalCount:"+shardingContext.getShardingTotalCount()+"\n" ); HashMap parm = new HashMap(); List<Name> shardingList=nameServiceBean.list(parm); for(Name oneObj : shardingList){ System.out.println( "SpringSimpleJob 簡單任務-------------id為:"+oneObj.getId()+"\n" + ",----tShardingItem:"+shardingContext.getShardingItem()+"\n" ); } } }
配置xml
<reg:zookeeper id="regCenter" server-lists="192.168.0.79:2181"
namespace="el-esjob-test-xvshu"
base-sleep-time-milliseconds="1000"
max-sleep-time-milliseconds="2000"
max-retries="3"/>
<!--esjob 預設啟動系統cpu核心數*2的執行緒操作資料,允許系統自定義executor-service-handle來操作具體核心數
預設實現com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler
public ExecutorService createExecutorService(String jobName) {
return (new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2)).createExecutorService();
}
-->
<!--job:simple
id: job的名稱,一旦定義完後不可更改,更改後會認為一個新的job
class: job的具體實現類
registry-center-ref: 使用的註冊中心,regCenter不用更改
sharding-total-count: 總的分片數(如果配置成1,則部署多個節點只有一個節點執行定時job,如果此節點出問題(非業務問題),則此次觸發會轉移到其他節點上)
sharding-item-parameters: 用於指定與分片對應的別名。sharding-item-parameters="0=A,1=B,2=C,3=D,4=E"
cron: job執行的時間表達式 ,Quartz格式
monitor-execution: 是否監控
failover: 是否失敗轉移
description: job的描述資訊
disabled: 是否禁用
overwrite: 是否覆蓋zk中的配置(以zk的為準還是以本地的為準)
-->
<job:simple id="TestJobS"
class="com.el.test.job.TestJob"
registry-center-ref="regCenter"
sharding-total-count="3"
cron="* 0/1 * * * ?"
failover="true"
description="測試的簡單job"
overwrite="true"
/>
流式任務
每次排程觸發的時候都會先調fetchData獲取資料,如果獲取到了資料再排程processData方法處理資料。DataflowJob在執行時有兩種方式,流式的和非流式的,通過屬性streamingProcess控制,如果是基於SpringXML的配置方式則是streaming-process屬性,boolean型別。當作業配置為流式的時候,每次觸發作業後會排程一次fetchData獲取資料,如果獲取到了資料會排程processData方法處理資料,處理完後又繼續調fetchData獲取資料,再調processData處理,如此迴圈,就像流水一樣。直到fetchData沒有獲取到資料或者發生了重新分片才會停止。
core code:com.dangdang.ddframe.job.executor.type.DataflowJobExecutor
DataflowJobExecutor
protected void process(ShardingContext shardingContext) {
DataflowJobConfiguration dataflowConfig = (DataflowJobConfiguration)this.getJobRootConfig().getTypeConfig();
if (dataflowConfig.isStreamingProcess()) {
this.streamingExecute(shardingContext);
} else {
this.oneOffExecute(shardingContext);
}
}
private void streamingExecute(ShardingContext shardingContext) {
for(List data = this.fetchData(shardingContext); null != data && !data.isEmpty(); data = this.fetchData(shardingContext)) {
this.processData(shardingContext, data);
if (!this.getJobFacade().isEligibleForJobRunning()) {
break;
}
}
}
private void oneOffExecute(ShardingContext shardingContext) {
List<Object> data = this.fetchData(shardingContext);
if (null != data && !data.isEmpty()) {
this.processData(shardingContext, data);
}
}
編寫任務
package com.el.test.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.el.test.model.Name;
import com.el.test.service.NameServiceBean;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.HashMap;
import java.util.List;
/**
* 流式作業:
* 每次排程觸發的時候都會先調fetchData獲取資料,
* 如果獲取到了資料再排程processData方法處理資料。
* DataflowJob在執行時有兩種方式,流式的和非流式的,
* 通過屬性streamingProcess控制,如果是基於Spring XML的配置方式則是streaming-process屬性,
* boolean型別。當作業配置為流式的時候,每次觸發作業後會排程一次fetchData獲取資料,
* 如果獲取到了資料會排程processData方法處理資料,處理完後又繼續調fetchData獲取資料,
* 再調processData處理,如此迴圈,就像流水一樣。直到fetchData沒有獲取到資料或者發生了
* 重新分片才會停止。
* core code:com.dangdang.ddframe.job.executor.type.DataflowJobExecutor
*
* protected void process(ShardingContext shardingContext) {
DataflowJobConfiguration dataflowConfig = (DataflowJobConfiguration)this.getJobRootConfig().getTypeConfig();
if (dataflowConfig.isStreamingProcess()) {
this.streamingExecute(shardingContext);
} else {
this.oneOffExecute(shardingContext);
}
}
private void streamingExecute(ShardingContext shardingContext) {
for(List data = this.fetchData(shardingContext); null != data && !data.isEmpty(); data = this.fetchData(shardingContext)) {
this.processData(shardingContext, data);
if (!this.getJobFacade().isEligibleForJobRunning()) {
break;
}
}
}
private void oneOffExecute(ShardingContext shardingContext) {
List<Object> data = this.fetchData(shardingContext);
if (null != data && !data.isEmpty()) {
this.processData(shardingContext, data);
}
}
*/
public class TestJobFlow implements DataflowJob<Name> {
@Autowired
private NameServiceBean nameServiceBean;
/**
* 獲取資料(可迴圈呼叫,直至獲取不到資料或重新分片)
* 例如:本片處理一萬條資料,每次獲取一千條,則或執行十次
* @param shardingContext 分片資訊
* @return 需要處理的資料
*/
@Override
public List<Name> fetchData(ShardingContext shardingContext) {
System.out.println("SpringDataflowJob Dataflow型別作業-------------任務名:"+shardingContext.getJobName()+"\n"
+",---ShardingParameter:"+shardingContext.getShardingParameter()+"\n"
+",----TaskId:"+shardingContext.getTaskId()+"\n"
+",----JobParameter:"+shardingContext.getJobParameter()+"\n"
+",----tShardingItem:"+shardingContext.getShardingItem()+"\n"
+",----ShardingTotalCount:"+shardingContext.getShardingTotalCount()+"\n"
);
HashMap parm = new HashMap();
List<Name> shardingList=nameServiceBean.list(parm);
return shardingList;
}
/**
* 拿到fetchData返回的list進行業務邏輯處理
* @param shardingContext 分片資訊
* @param list 需要處理的資料
*/
@Override
public void processData(ShardingContext shardingContext, List<Name> list) {
if(list!=null){
for(Name oneObj : list){
System.out.println(
"SpringDataflowJob Dataflow型別作業-------------id為:"+oneObj.getId()+"\n" +
",----tShardingItem:"+shardingContext.getShardingItem()+"\n"
);
}
}
}
}
配置xml
<reg:zookeeper id="regCenter" server-lists="192.168.0.79:2181"
namespace="el-esjob-test-xvshu"
base-sleep-time-milliseconds="1000"
max-sleep-time-milliseconds="2000"
max-retries="3"/>
<!--esjob 預設啟動系統cpu核心數*2的執行緒操作資料,允許系統自定義executor-service-handle來操作具體核心數
預設實現com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler
public ExecutorService createExecutorService(String jobName) {
return (new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2)).createExecutorService();
}
-->
<!--job:dataflow
id: job的名稱,一旦定義完後不可更改,更改後會認為一個新的job
class: job的具體實現類
registry-center-ref: 使用的註冊中心,regCenter不用更改
sharding-total-count: 總的分片數(如果配置成1,則部署多個節點只有一個節點執行定時job,如果此節點出問題(非業務問題),則此次觸發會轉移到其他節點上)
cron: job執行的時間表達式 ,Quartz格式
monitor-execution: 是否監控
failover: 是否失敗轉移
description: job的描述資訊
disabled: 是否禁用
overwrite: 是否覆蓋zk中的配置(以zk的為準還是以本地的為準)
streaming-process: 是否迴圈流式處理任務
-->
<job:dataflow id="TestJobF"
class="com.el.test.job.TestJobFlow"
registry-center-ref="regCenter"
sharding-total-count="3"
cron="* 0/1 * * * ?"
failover="true"
streaming-process="true"
description="測試的流式的Job"
/>
搭建運維平臺
從git上down下程式碼,https://github.com/elasticjob/elastic-job-lite
解壓縮elastic-job-lite-console-2.1.2.tar.gz並執行bin\start.sh。開啟瀏覽器訪問http://localhost:8899/即可訪問控制檯。8899為預設埠號,可通過啟動指令碼輸入-p自定義埠號。
運維平臺提供兩種賬戶,管理員及訪客,管理員擁有全部操作許可權,訪客僅擁有察看許可權。預設管理員使用者名稱和密碼是root/root,訪客使用者名稱和密碼是guest/guest,可通過conf\auth.properties修改管理員及訪客使用者名稱及密碼。
主頁如下:
新增zk
附錄:
spring完整配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<reg:zookeeper id="regCenter" server-lists="192.168.0.79:2181"
namespace="el-esjob-test-xvshu"
base-sleep-time-milliseconds="1000"
max-sleep-time-milliseconds="2000"
max-retries="3"/>
<!--esjob 預設啟動系統cpu核心數*2的執行緒操作資料,允許系統自定義executor-service-handle來操作具體核心數
預設實現com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler
public ExecutorService createExecutorService(String jobName) {
return (new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2)).createExecutorService();
}
-->
<!--job:simple
id: job的名稱,一旦定義完後不可更改,更改後會認為一個新的job
class: job的具體實現類
registry-center-ref: 使用的註冊中心,regCenter不用更改
sharding-total-count: 總的分片數(如果配置成1,則部署多個節點只有一個節點執行定時job,如果此節點出問題(非業務問題),則此次觸發會轉移到其他節點上)
sharding-item-parameters: 用於指定與分片對應的別名。sharding-item-parameters="0=A,1=B,2=C,3=D,4=E"
cron: job執行的時間表達式 ,Quartz格式
monitor-execution: 是否監控
failover: 是否失敗轉移
description: job的描述資訊
disabled: 是否禁用
overwrite: 是否覆蓋zk中的配置(以zk的為準還是以本地的為準)
-->
<job:simple id="TestJobS"
class="com.el.test.job.TestJob"
registry-center-ref="regCenter"
sharding-total-count="3"
cron="* 0/1 * * * ?"
failover="true"
description="測試的簡單job"
overwrite="true"
/>
<!--job:dataflow
id: job的名稱,一旦定義完後不可更改,更改後會認為一個新的job
class: job的具體實現類
registry-center-ref: 使用的註冊中心,regCenter不用更改
sharding-total-count: 總的分片數(如果配置成1,則部署多個節點只有一個節點執行定時job,如果此節點出問題(非業務問題),則此次觸發會轉移到其他節點上)
cron: job執行的時間表達式 ,Quartz格式
monitor-execution: 是否監控
failover: 是否失敗轉移
description: job的描述資訊
disabled: 是否禁用
overwrite: 是否覆蓋zk中的配置(以zk的為準還是以本地的為準)
streaming-process: 是否迴圈流式處理任務
-->
<job:dataflow id="TestJobF"
class="com.el.test.job.TestJobFlow"
registry-center-ref="regCenter"
sharding-total-count="3"
cron="* 0/1 * * * ?"
failover="true"
streaming-process="true"
description="測試的流式的Job"
/>
</beans>
到這一步,接入esjob的任務已經基本成功了,下一步就是部署服務,進行job管理。