Spring Batch(4)——Item概念及使用程式碼
Reader
Reader
是指從各種各樣的外部輸入中獲取資料,框架為獲取各種型別的檔案已經預定義了常規的Reader
實現類。Reader
通過ItemReader
介面實現:
public interface ItemReader<T> {
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
read
方法的作用就是讀取一條資料,資料以泛型T的實體結構返回,當read返回null時表示所有資料讀取完畢。返回的資料可以是任何結構,比如檔案中的一行字串,資料庫的一行資料,或者xml檔案中的一系列元素,只要是一個Java物件即可。
Writer
Writer
通過ItemWriter
介面實現:
public interface ItemWriter<T> {
void write(List<? extends T> items) throws Exception;
}
Writer
是Reader
的反向操作,是將資料寫入到特定的資料來源中。在Step控制一文已經介紹Writer
是根據chunk
屬性設定的值按列表進行操作的,所以傳入的是一個List
結構。chunk
用於表示批處理的事物分片,因此需要注意的是,在writer
方法中進行完整資料寫入事物操作。例如向資料庫寫入List
中的資料,在寫入完成之後再提交事物。
讀寫的組合模式
無論是讀還是寫,有時會需要從多個不同的來源獲取檔案,或者寫入到不同的資料來源,或者是需要在讀和寫之間處理一些業務。可以使用組合模式來實現這個目的:
public class CompositeItemWriter<T> implements ItemWriter<T> { ItemWriter<T> itemWriter; public CompositeItemWriter(ItemWriter<T> itemWriter) { this.itemWriter = itemWriter; } public void write(List<? extends T> items) throws Exception { //Add business logic here itemWriter.write(items); } public void setDelegate(ItemWriter<T> itemWriter){ this.itemWriter = itemWriter; } }
Processor
除了使用組合模式,直接使用Processor
是一種更優雅的方法。Processor
是Step
中的可選項,但是批處理大部分時候都需要對資料進行處理,因此框架提供了ItemProcessor
介面來滿足Processor
過程:
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
Processor
的結構非常簡單也是否易於理解。傳入一個型別I,然後由Processor
處理成為O。
Processor鏈
在一個Step中可以使用多個Processor
來按照順序處理業務,此時同樣可以使用CompositeItem
模式來實現:
@Bean
public CompositeItemProcessor compositeProcessor() {
//建立 CompositeItemProcessor
CompositeItemProcessor<Foo,Foobar> compositeProcessor = new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
//新增第一個 Processor
itemProcessors.add(new FooTransformer());
//新增第二個 Processor
itemProcessors.add(new BarTransformer());
//新增連結串列
compositeProcessor.setDelegates(itemProcessors);
return processor;
}
過濾記錄
在Reader
讀取資料的過程中,並不是所有的資料都可以使用,此時Processor
還可以用於過濾非必要的資料,同時不會影響Step
的處理過程。只要ItemProcesspr
的實現類在procss
方法中返回null
即表示改行資料被過濾掉了。
ItemStream
在Step控制一文中已經提到了ItemStream
。在資料批處理概念中提到過,Spring Batch的每一步都是無狀態的,進而Reader
和Writer
也是無狀態的,這種方式能夠很好的隔離每行資料的處理,也能將容錯的範圍收窄到可以空子的範圍。但是這並不意味著整個批處理的過程中並不需要控制狀態。例如從資料庫持續讀入或寫入資料,每次Reader
和Writer
都單獨去申請資料來源的連結、維護資料來源的狀態(開啟、關閉等)。因此框架提供了ItemStream
介面來完善這些操作:
public interface ItemStream {
void open(ExecutionContext executionContext) throws ItemStreamException;
void update(ExecutionContext executionContext) throws ItemStreamException;
void close() throws ItemStreamException;
}
持久化資料
在使用Spring Batch之前需要初始化他的元資料儲存(Meta-Data Schema),也就是要將需要用到的表匯入到對應的資料庫中。當然,Spring Batch支援不使用任何持久化資料庫,僅僅將資料放到記憶體中,不設定DataSource
即可。
初始化序列
Spring Batch相關的工作需要使用序列SEQUENCE
:
CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_SEQ;
有些資料庫不支援SEQUENCE
,可以通過表代理,比如在MySql(InnoDB資料庫)中:
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (ID BIGINT NOT NULL);
INSERT INTO BATCH_STEP_EXECUTION_SEQ values(0);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (ID BIGINT NOT NULL);
INSERT INTO BATCH_JOB_EXECUTION_SEQ values(0);
CREATE TABLE BATCH_JOB_SEQ (ID BIGINT NOT NULL);
INSERT INTO BATCH_JOB_SEQ values(0);
關於Version欄位
某些表中都有Version
欄位。因為Spring的更新策略是樂觀鎖,因此在進行資料更新之後都會對錶的Version
欄位進行+1處理。在記憶體與資料庫互動的過程中,會使用採用getVersion、increaseVersion(+1)、updateDataAndVersion的過程,如果在update
的時候發現Version不是預計的數值(+1),則會丟擲OptimisticLockingFailureException
的異常。當同一個Job
在進群中不同服務上執行時,需要注意這個問題。
BATCH_JOB_INSTANCE
BATCH_JOB_INSTANCE
用於記錄JobInstance,在資料批處理概念中介紹了他的工作方式,其結構為:
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT PRIMARY KEY ,
VERSION BIGINT,
JOB_NAME VARCHAR(100) NOT NULL ,
JOB_KEY VARCHAR(2500)
);
欄位 | 說明 |
---|---|
JOB_INSTANCE_ID | 主鍵,主鍵與單個JobInstance 相關。當獲取到某個JobInstance 例項後,通過getId 方法可以獲取到此資料 |
VERSION | |
JOB_NAME | Job的名稱,用於標記執行的Job,在建立Job時候指定 |
JOB_KEY | JobParameters的序列化數值。在資料批處理概念中介紹了一個JobInstance 相當於Job+JobParameters。他用於標記同一個Job 不同的例項 |
BATCH_JOB_EXECUTION_PARAMS
BATCH_JOB_EXECUTION_PARAMS
對應的是JobParameters
物件。其核心功能是儲存Key-Value結構的各種狀態數值。欄位中IDENTIFYING=true
用於標記那些執行過程中必須的資料(可以理解是框架需要用到的資料),為了儲存key-value結構該表一個列資料格式:
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
TYPE_CD VARCHAR(6) NOT NULL ,
KEY_NAME VARCHAR(100) NOT NULL ,
STRING_VAL VARCHAR(250) ,
DATE_VAL DATETIME DEFAULT NULL ,
LONG_VAL BIGINT ,
DOUBLE_VAL DOUBLE PRECISION ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
);
欄位 | 說明 |
---|---|
JOB_EXECUTION_ID | 與BATCH_JOB_EXECUTION表關聯的外來鍵,詳見資料批處理概念中Job、JobInstance、JobExecute的關係 |
TYPE_CD | 用於標記資料的物件型別,例如 string、date、long、double,非空 |
KEY_NAME | key的值 |
STRING_VAL | string型別的數值 |
DATE_VAL | date型別的數值 |
LONG_VAL | long型別的數值 |
DOUBLE_VAL | double型別的數值 |
IDENTIFYING | 標記這對key-valuse是否來自於JobInstace自身 |
BATCH_JOB_EXECUTION
關聯JobExecution
,每當執行一個Job
都會產生一個新的JobExecution
,對應的在表中都會新增一行資料。
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT PRIMARY KEY ,
VERSION BIGINT,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME TIMESTAMP NOT NULL,
START_TIME TIMESTAMP DEFAULT NULL,
END_TIME TIMESTAMP DEFAULT NULL,
STATUS VARCHAR(10),
EXIT_CODE VARCHAR(20),
EXIT_MESSAGE VARCHAR(2500),
LAST_UPDATED TIMESTAMP,
JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
constraint JOB_INSTANCE_EXECUTION_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;
欄位 | 說明 |
---|---|
JOB_EXECUTION_ID | JobExecution的主鍵,JobExecution::getId方法可以獲取到該值 |
VERSION | |
JOB_INSTANCE_ID | 關聯到JobInstace的外來鍵,詳見資料批處理概念中Job、JobInstance、JobExecute的關係 |
CREATE_TIME | 建立時間戳 |
START_TIME | 開始時間戳 |
END_TIME | 結束時間戳,無論成功或失敗都會更新這一項資料。如果某行資料該值為空表示執行期間出現錯誤,並且框架無法更新該值 |
STATUS | JobExecute的執行狀態:COMPLETED、STARTED或者其他狀態。此數值對應Java中BatchStatus列舉值 |
EXIT_CODE | JobExecute執行完畢之後的退出返回值 |
EXIT_MESSAGE | JobExecute退出的詳細內容,如果是異常退出可能會包括異常堆疊的內容 |
LAST_UPDATED | 最後一次更新的時間戳 |
BATCH_STEP_EXECUTION
該表對應的是StepExecution
,其結構和BATCH_JOB_EXECUTION
基本相似,只是對應的物件是Step
,增加了與之相對的一些欄位數值:
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
START_TIME TIMESTAMP NOT NULL ,
END_TIME TIMESTAMP DEFAULT NULL,
STATUS VARCHAR(10),
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(20) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED TIMESTAMP,
constraint JOB_EXECUTION_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
未填入內容部分見BATCH_JOB_EXECUTION
說明。
欄位 | 說明 |
---|---|
STEP_EXECUTION_ID | StepExecute對應的主鍵 |
VERSION | |
STEP_NAME | Step名稱 |
JOB_EXECUTION_ID | 關聯到BATCH_JOB_EXECUTION表的外來鍵,標記該StepExecute所屬的JobExecute |
START_TIME | |
END_TIME | |
STATUS | |
COMMIT_COUNT | 執行過程中,事物提交的次數,該值與資料的規模以及chunk的設定有關 |
READ_COUNT | 讀取資料的次數 |
FILTER_COUNT | Processor中過濾記錄的次數 |
WRITE_COUNT | 吸入資料的次數 |
READ_SKIP_COUNT | 讀資料的跳過次數 |
WRITE_SKIP_COUNT | 寫資料的跳過次數 |
PROCESS_SKIP_COUNT | Processor跳過的次數 |
ROLLBACK_COUNT | 回滾的次數 |
EXIT_CODE | |
EXIT_MESSAGE | |
LAST_UPDATED |
BATCH_JOB_EXECUTION_CONTEXT
該表會記錄所有與Job
相關的ExecutionContext
資訊。每個ExecutionContext
都對應一個JobExecution
,在執行的過程中它包含了所有Job
範疇的狀態資料,這些資料在執行失敗後對於後續處理有中重大意義。
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT CLOB,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;
欄位 | 說明 |
---|---|
JOB_EXECUTION_ID | 關聯到JobExecution的外來鍵,建立JobExecution和ExecutionContext的關係。 |
SHORT_CONTEXT | 標記SERIALIZED_CONTEXT的版本號 |
SERIALIZED_CONTEXT | 序列化的ExecutionContext |
BATCH_STEP_EXECUTION_CONTEXT
Step
中ExecutionContext
相關的資料表,結構與BATCH_JOB_EXECUTION_CONTEXT
完全一樣。
表索引建議
上面的所有建表語句都沒有提供索引,但是並不代表索引沒有價值。當感覺到SQL語句的執行有效率問題時候,可以增加索引。
索引帶來的價值取決於SQL查詢的頻率以及關聯關係,下面是Spring Batch框架在執行過程中會用到的一些查詢條件語句,用於參考優化索引:
表 | Where條件 | 執行頻率 |
---|---|---|
BATCH_JOB_INSTANCE | JOB_NAME = ? and JOB_KEY = ? | 每次Job啟動執時 |
BATCH_JOB_EXECUTION | JOB_INSTANCE_ID = ? | 每次Job重啟時 |
BATCH_EXECUTION_CONTEXT | EXECUTION_ID = ? and KEY_NAME = ? | 視chunk的大小而定 |
BATCH_STEP_EXECUTION | VERSION = ? | 視chunk的大小而定 |
BATCH_STEP_EXECUTION | STEP_NAME = ? and JOB_EXECUTION_ID = ? | 每一個Step執行之前 |
使用案例
下面是Spring Batch一些簡單的應用,原始碼在下列地址的simple工程:
- Gitee:https://gitee.com/chkui-com/spring-batch-sample
- Github:https://github.com/chkui/spring-batch-sample
Spring Batch提供了2種執行方式:命令列方式或Java內嵌方式。命令列方式是直到需要執行批處理任務的時候才啟動程式,內嵌方式是結合Web工程或其他外部化框架來使用。2者最大的差別就是是否直接向IoCs注入一個Job
例項。
通用基本配置
兩種方式的基本配置都是一樣的,通過Reader
、Processor
、Writer
來組裝一個Step
。程式碼中Item
並不涉及檔案或資料庫的操作,只是簡單的模擬資料讀取、處理、寫入的過程。實體Record
和Msg
用於模擬資料轉換,基本配置如下:
public class BatchDefaultConfig {
@Bean
//配置Step
public Step simpleStep(StepBuilderFactory builder, ItemReader<Record> reader, ItemProcessor<Record, Msg> processor,
ItemWriter<Msg> writer) {
return builder.get("SimpleStep").<Record, Msg>chunk(10).reader(reader).processor(processor).writer(writer)
.build();
}
@Bean
//配置 Reader
public ItemReader<Record> reader() {
return new ItemReader<Record>() {
private int count = 0;
public Record read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
return ++this.count < 100 ? new Record().setId(this.count).setMsg("Read Number:" + this.count) : null;
}
};
}
@Bean
//配置 Processor
public ItemProcessor<Record, Msg> processor() {
return new ItemProcessor<Record, Msg>() {
public Msg process(Record item) throws Exception {
return new Msg("MSG GET INFO = " + item.getMsg());
}
};
}
@Bean
//配置 Writer
public ItemWriter<Msg> writer() {
return new ItemWriter<Msg>() {
private int batchCount = 0;
public void write(List<? extends Msg> items) throws Exception {
System.out.println("Batch Count : " + ++batchCount + ". Data:");
for (Msg msg : items) {
System.out.println(msg.getInfo());
}
}
};
}
}
命令列方式執行
有了基本配置之後,命令列執行的方式僅僅是向容器新增一個Job
:
@Configuration
//匯入依賴配置
@Import({ BatchDefaultConfig.class })
public class BatchCommondConfig {
@Bean
public Job simpleJob(Step step, JobBuilderFactory builder) {
return builder.get("SimpleJob").start(step).build(); //向容器返回一個Job的Bean
}
}
然後啟動Spring Framework則會自動啟用Command Runner執行方式執行——先呼叫SpringApplication::callRunner
方法,然後使用JobLauncherCommandLineRunner::execute
執行:
public class CommondSample {
public static void main(String[] args) throws DuplicateJobException {
//模擬測試引數, 這些引數值在執行Java時從外部傳入的,比如-Dkey=value
String[] argsExt = new String[2];
argsExt[0] = "BuilderParam1=Value1";
argsExt[1] = "BuilderParam2=Value2";
//執行Spring Framework
SpringApplication.run(CommondSample.class, argsExt);
}
}
啟用之後觀察資料庫已經發生了變更。使用命令列需要通過 Java執行引數(-Dkey=value)傳遞JobParameters
的資料,上面的程式碼模擬實現了相關的過程。
Java內嵌執行
Java內嵌的方式主要是用於搭配外部工程化使用,比如使用Web框架或則統一排程平臺管之類的結構化框架來統一管理批處理任務。與命令列執行最大的區別就是不向容器注入Job
:
@Configuration
//匯入進出配置
@Import({BatchDefaultConfig.class})
public class BatchOperatoConfig {
@Bean
//返回JobFactory
public JobFactory simpleJob(Step step, JobBuilderFactory builder) {
SimpleJobFactory sampleJobFactory = new SimpleJobFactory();
sampleJobFactory.setJob(builder.get("SimpleJob").start(step).build());
return sampleJobFactory;
}
}
配置程式碼向容器添加了一個JobFactory
的實現類,JobFactory
的兩個介面一個是獲取Job
一個是獲取Job
的名稱,SimpleJobFactory
實現了JobFactory
:
public class SimpleJobFactory implements JobFactory {
private Job job;
public void setJob(Job job) {
this.job = job;
}
@Override
public Job createJob() {
return job;
}
@Override
public String getJobName() {
return job.getName();
}
}
最後通過SimpleJobFactory
來啟動一個Job
:
@SpringBootApplication
@EnableBatchProcessing
@EnableScheduling
public class OperatorSample {
public static void main(String[] args) throws DuplicateJobException {
new SuspendThread().run(); //掛起系統一直執行
ConfigurableApplicationContext ctx = SpringApplication.run(OperatorSample.class);
Cron cron = ctx.getBean(Cron.class);
cron.register(); //註冊JobFactory
cron.runJobLaunch();
}
}
@Service
class Cron {
@Autowired
JobLauncher jobLauncher;
@Autowired
private JobOperator jobOperator;
@Autowired
private JobRegistry jobRegistry;
@Autowired
private JobFactory jobFactory;
//註冊JobFactory
void register() throws DuplicateJobException {
jobRegistry.register(jobFactory);
}
//使用JobLaunch執行
void runJobLaunch() {
Map<String, JobParameter> map = new HashMap<>();
map.put("Builder", new JobParameter("1"));
map.put("Timer", new JobParameter("2"));
jobLauncher.run(jobFactory.createJob(), new JobParameters(map));
}
@Scheduled(cron = "30 * * * * ? ")
void task1() {
System.out.println("1");
runOperator();
}
//定時任務使用 JobOperator執行
private void runOperator() {
jobOperator.start("SimpleJob", "Builder=1,Timer=2");
}
}
這裡使用了2種執行方式:JobLauncher
和JobOperator
。JobLauncher
簡單明瞭的啟動一個批處理任務。而JobOperator
擴充套件了一些用於Job
管理的介面方法,觀察JobOperator
的原始碼可以發現它提供了獲取ExecuteContext
、檢查JobInstance
等功能,如果需要定製開發一個基於Web或者JMX管理批處理任務的系統,JobOperator
更合適。JobOperator
的第二個引數用於傳遞JobParameters
,等號兩端分別是key
和value
,逗號用於分割多行資料。
在Job配置與執行提及過一個JobInstance
相當於Job
+JobParameters
,因此雖然上面的程式碼使用了兩種不同的執行方式,但是Job
和JobParameters
是一樣的。在執行被定時任務包裹的runOperator
方法時,會一直丟擲JobInstanceAlreadyExistsException
異常,因為同一個例項不能執行2次。如果執行失敗可以使用對應的restart
方法。
後續會介紹各種Reader
和Writ