SpringBoot整合SpringBatch實用簡例
SpringBatch主要是一個輕量級的大資料量的並行處理(批處理)的框架。
作用和Hadoop很相似,不過Hadoop是基於重量級的分散式環境(處理巨量資料),而SpringBatch是基於輕量的應用框架(處理中小資料)。
這裡使用SpringBatch做了一個能跑的最簡單例子,進行描述SpringBatch的基本作用。
如果需要進行深入學習,請詳細參考閱讀 https://docs.spring.io/spring-batch/4.0.x/reference/html/index.html ;英文不好的同學,請和我一樣右鍵(翻譯成中文檢視)。
簡單的技術棧 : SpringBoot + SpringBatch + JPA , 完整demo的專案地址 : https://github.com/EalenXie/springboot-batch
1 . 新建專案springboot-batch,基本的pom.xml依賴 :
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>name.ealen</groupId> <artifactId>springboot-batch</artifactId> <version>1.0</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.1.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> </dependencies> </project>
2 . 你需要在資料庫中建立springbatch的相關元資料表,所以你需要在資料庫中執行如下來自官方 元資料模式 的指令碼。
-- do not edit this file -- BATCH JOB 例項表 包含與aJobInstance相關的所有信息 -- JOB ID由batch_job_seq分配 -- JOB 名稱,與spring配置一致 -- JOB KEY 對job引數的MD5編碼,正因為有這個欄位的存在,同一個job如果第一次執行成功,第二次再執行會丟擲JobInstanceAlreadyCompleteException異常。 CREATE TABLE BATCH_JOB_INSTANCE( JOB_INSTANCE_ID BIGINTNOT NULL PRIMARY KEY , VERSION BIGINT , JOB_NAME VARCHAR(100) NOT NULL, JOB_KEY VARCHAR(32) NOT NULL, constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY) ) ENGINE=InnoDB; -- 該BATCH_JOB_EXECUTION表包含與該JobExecution物件相關的所有資訊 CREATE TABLE BATCH_JOB_EXECUTION( JOB_EXECUTION_ID BIGINTNOT NULL PRIMARY KEY , VERSION BIGINT, JOB_INSTANCE_ID BIGINT NOT NULL, CREATE_TIME DATETIME NOT NULL, START_TIME DATETIME DEFAULT NULL , END_TIME DATETIME DEFAULT NULL , STATUS VARCHAR(10) , EXIT_CODE VARCHAR(2500) , EXIT_MESSAGE VARCHAR(2500) , LAST_UPDATED DATETIME, JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL, constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID) references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID) ) ENGINE=InnoDB; -- 該表包含與該JobParameters物件相關的所有資訊 CREATE TABLE BATCH_JOB_EXECUTION_PARAMS( JOB_EXECUTION_ID BIGINT NOT NULL , TYPE_CD VARCHAR(6) NOT NULL , KEY_NAME VARCHAR(100) NOT NULL , STRING_VAL VARCHAR(250) , DATE_VAL DATETIME DEFAULT NULL , LONG_VAL BIGINT , DOUBLE_VAL DOUBLE PRECISION , IDENTIFYING CHAR(1) NOT NULL , constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ENGINE=InnoDB; -- 該表包含與該StepExecution 物件相關的所有資訊 CREATE TABLE BATCH_STEP_EXECUTION( STEP_EXECUTION_ID BIGINTNOT NULL PRIMARY KEY , VERSION BIGINT NOT NULL, STEP_NAME VARCHAR(100) NOT NULL, JOB_EXECUTION_ID BIGINT NOT NULL, START_TIME DATETIME NOT NULL , END_TIME DATETIME DEFAULT NULL , STATUS VARCHAR(10) , COMMIT_COUNT BIGINT , READ_COUNT BIGINT , FILTER_COUNT BIGINT , WRITE_COUNT BIGINT , READ_SKIP_COUNT BIGINT , WRITE_SKIP_COUNT BIGINT , PROCESS_SKIP_COUNT BIGINT , ROLLBACK_COUNT BIGINT , EXIT_CODE VARCHAR(2500) , EXIT_MESSAGE VARCHAR(2500) , LAST_UPDATED DATETIME, constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ENGINE=InnoDB; -- 該BATCH_STEP_EXECUTION_CONTEXT表包含ExecutionContext與Step相關的所有資訊 CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT( STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, SHORT_CONTEXT VARCHAR(2500) NOT NULL, SERIALIZED_CONTEXT TEXT , constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID) references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID) ) ENGINE=InnoDB; -- 該表包含ExecutionContext與Job相關的所有資訊 CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT( JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, SHORT_CONTEXT VARCHAR(2500) NOT NULL, SERIALIZED_CONTEXT TEXT , constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ENGINE=InnoDB; CREATE TABLE BATCH_STEP_EXECUTION_SEQ ( ID BIGINT NOT NULL, UNIQUE_KEY CHAR(1) NOT NULL, constraint UNIQUE_KEY_UN unique (UNIQUE_KEY) ) ENGINE=InnoDB; INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ); CREATE TABLE BATCH_JOB_EXECUTION_SEQ ( ID BIGINT NOT NULL, UNIQUE_KEY CHAR(1) NOT NULL, constraint UNIQUE_KEY_UN unique (UNIQUE_KEY) ) ENGINE=InnoDB; INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ); CREATE TABLE BATCH_JOB_SEQ ( ID BIGINT NOT NULL, UNIQUE_KEY CHAR(1) NOT NULL, constraint UNIQUE_KEY_UN unique (UNIQUE_KEY) ) ENGINE=InnoDB; INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);
3 . 測試資料的實體類 : Access.java
package name.ealen.model; import javax.persistence.*; /** * Created by EalenXie on 2018/9/10 16:17. */ @Entity @Table public class Access { @Id @GeneratedValue(strategy = GenerationType.AUTO) private Integer id; private String username; private String shopName; private String categoryName; private String brandName; private String shopId; private String omit; private String updateTime; private boolean deleteStatus; private String createTime; private String description; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getShopName() { return shopName; } public void setShopName(String shopName) { this.shopName = shopName; } public String getCategoryName() { return categoryName; } public void setCategoryName(String categoryName) { this.categoryName = categoryName; } public String getBrandName() { return brandName; } public void setBrandName(String brandName) { this.brandName = brandName; } public String getShopId() { return shopId; } public void setShopId(String shopId) { this.shopId = shopId; } public String getOmit() { return omit; } public void setOmit(String omit) { this.omit = omit; } public String getUpdateTime() { return updateTime; } public void setUpdateTime(String updateTime) { this.updateTime = updateTime; } public boolean isDeleteStatus() { return deleteStatus; } public void setDeleteStatus(boolean deleteStatus) { this.deleteStatus = deleteStatus; } public String getCreateTime() { return createTime; } public void setCreateTime(String createTime) { this.createTime = createTime; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } @Override public String toString() { return "Access{" + "id=" + id + ", username='" + username + '\'' + ", shopName='" + shopName + '\'' + ", categoryName='" + categoryName + '\'' + ", brandName='" + brandName + '\'' + ", shopId='" + shopId + '\'' + ", omit='" + omit + '\'' + ", updateTime='" + updateTime + '\'' + ", deleteStatus=" + deleteStatus + ", createTime='" + createTime + '\'' + ", description='" + description + '\'' + '}'; } }
4 . 配置一個最簡單的Job 之前,準備一些基本配置,例如為Job新增一個監聽器 :
配置TaskExecutor,ExecutorConfiguration.java
package name.ealen.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * 配置TaskExecutor */ @Configuration public class ExecutorConfiguration { @Bean public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(50); threadPoolTaskExecutor.setMaxPoolSize(200); threadPoolTaskExecutor.setQueueCapacity(1000); threadPoolTaskExecutor.setThreadNamePrefix("Data-Job"); return threadPoolTaskExecutor; } }
為Job準備一個簡單的監聽器 ,實現JobExecutionListener即可 :
package name.ealen.listener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * Created by EalenXie on 2018/9/10 15:09. * 一個簡單的JOB listener */ @Component public class JobListener implements JobExecutionListener { private static final Logger log = LoggerFactory.getLogger(JobListener.class); @Resource private ThreadPoolTaskExecutor threadPoolTaskExecutor; private long startTime; @Override public void beforeJob(JobExecution jobExecution) { startTime = System.currentTimeMillis(); log.info("job before " + jobExecution.getJobParameters()); } @Override public void afterJob(JobExecution jobExecution) { log.info("JOB STATUS : {}", jobExecution.getStatus()); if (jobExecution.getStatus() == BatchStatus.COMPLETED) { log.info("JOB FINISHED"); threadPoolTaskExecutor.destroy(); } else if (jobExecution.getStatus() == BatchStatus.FAILED) { log.info("JOB FAILED"); } log.info("Job Cost Time : {}ms" , (System.currentTimeMillis() - startTime)); } }
5 . 配置一個最基本的Job : 一個Job 通常由一個或多個Step組成(基本就像是一個工作流);一個Step通常由三部分組成(讀入資料 ItemReader,處理資料 ItemProcessor,寫入資料 ItemWriter)
package name.ealen.batch; import name.ealen.listener.JobListener; import name.ealen.model.Access; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.database.JpaPagingItemReader; import org.springframework.batch.item.database.orm.JpaNativeQueryProvider; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; import javax.persistence.EntityManagerFactory; /** * Created by EalenXie on 2018/9/10 14:50. * :@EnableBatchProcessing提供用於構建批處理作業的基本配置 */ @Configuration @EnableBatchProcessing public class DataBatchConfiguration { private static final Logger log = LoggerFactory.getLogger(DataBatchConfiguration.class); @Resource private JobBuilderFactory jobBuilderFactory;//用於構建JOB @Resource private StepBuilderFactory stepBuilderFactory;//用於構建Step @Resource private EntityManagerFactory emf;//注入例項化Factory 訪問資料 @Resource private JobListener jobListener;//簡單的JOB listener /** * 一個簡單基礎的Job通常由一個或者多個Step組成 */ @Bean public Job dataHandleJob() { return jobBuilderFactory.get("dataHandleJob"). incrementer(new RunIdIncrementer()). start(handleDataStep()).//start是JOB執行的第一個step //next(xxxStep()). //next(xxxStep()). //... listener(jobListener).//設定了一個簡單JobListener build(); } /** * 一個簡單基礎的Step主要分為三個部分 * ItemReader : 用於讀取資料 * ItemProcessor : 用於處理資料 * ItemWriter : 用於寫資料 */ @Bean public Step handleDataStep() { return stepBuilderFactory.get("getData"). <Access, Access>chunk(100).// <輸入,輸出> 。chunk通俗的講類似於SQL的commit; 這裡表示處理(processor)100條後寫入(writer)一次。 faultTolerant().retryLimit(3).retry(Exception.class).skipLimit(100).skip(Exception.class). //捕捉到異常就重試,重試100次還是異常,JOB就停止並標誌失敗 reader(getDataReader()).//指定ItemReader processor(getDataProcessor()).//指定ItemProcessor writer(getDataWriter()).//指定ItemWriter build(); } @Bean public ItemReader<? extends Access> getDataReader() { //讀取資料,這裡可以用JPA,JDBC,JMS 等方式 讀入資料 JpaPagingItemReader<Access> reader = new JpaPagingItemReader<>(); //這裡選擇JPA方式讀資料 一個簡單的 native SQL String sqlQuery = "SELECT * FROM access"; try { JpaNativeQueryProvider<Access> queryProvider = new JpaNativeQueryProvider<>(); queryProvider.setSqlQuery(sqlQuery); queryProvider.setEntityClass(Access.class); queryProvider.afterPropertiesSet(); reader.setEntityManagerFactory(emf); reader.setPageSize(3); reader.setQueryProvider(queryProvider); reader.afterPropertiesSet(); //所有ItemReader和ItemWriter實現都會在ExecutionContext提交之前將其當前狀態儲存在其中,如果不希望這樣做,可以設定setSaveState(false) reader.setSaveState(true); } catch (Exception e) { e.printStackTrace(); } return reader; } @Bean public ItemProcessor<Access, Access> getDataProcessor() { return new ItemProcessor<Access, Access>() { @Override public Access process(Access access) throws Exception { log.info("processor data : " + access.toString());//模擬假裝處理資料,這裡處理就是列印一下 return access; } }; //lambda也可以寫為: //return access -> { //log.info("processor data : " + access.toString()); //return access; //}; } @Bean public ItemWriter<Access> getDataWriter() { return list -> { for (Access access : list) { log.info("write data : " + access); //模擬 假裝寫資料 ,這裡寫真正寫入資料的邏輯 } }; } }
6 . 配置好基本的Job之後,為Access表匯入一些基本的資料(git上面有demo資料,access.sql),寫一個SpringBoot的啟動類進行測試。
注意 : Job中的各個元件請使用@Bean註解宣告,這樣在元資料中才會有相應的正常操作記錄 :
package name.ealen; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * Created by EalenXie on 2018/9/10 14:41. */ @SpringBootApplication public class SpringBatchApplication { public static void main(String[] args) { SpringApplication.run(SpringBatchApplication.class, args); } }
7 . 執行可以看到基本資料處理效果,這裡是模擬處理,和模擬寫入 :
8 . 從元資料等表中檢視驗證JOB的執行情況 :
這裡提一下,之前寫過一篇SpringBoot+Quartz的整合, 大家應該想到些什麼了吧。SpringBatch像是一個天然的Job,Quartz是完全可以做為它運作的排程器。兩者結合,效果很不錯。
感謝各位提出意見和支援。