1. 程式人生 > >SpringBatch中的retry和skip機制實現分析

SpringBatch中的retry和skip機制實現分析

pub 限制次數 else boolean exceptio 2個 let move vat

本文主要分析SpringBatch中的retry和skip機制的實現。
先簡單說明下SpringBatch在SpringBoot中的使用。
如果要在springboot中使用batch的話,直接加入以下依賴即可:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

然後使用註解開啟Batch模塊:

...
@EnableBatchProcessing
public class Application { ... }

之後就可以註入JobBuilderFactory和StepBuilderFactory:

@Autowired
private JobBuilderFactory jobs;

@Autowired
private StepBuilderFactory steps;

有了這2個factory之後,就可以build job。
SpringBatch中的相關基礎概念比如ItemReader、ItemWriter、Chunk等本文就不介紹了。

我們以FlatFileItemReader作為reader,一個自定義Writer用於打印reader中讀取出來的數據。

這個定義的writer遇到good job這條數據的時候會報錯,具體邏輯如下:

@Override
public void write(List<? extends String> items) throws Exception {
    System.out.println("handle start =====" + items);
    for(String a : items) {
        if(a.equals("good job")) {
            throw new Exception("custom exception");
        }
    }
    System.out.println("handle end.. -----" + items);
}

其中reader中讀取的文件中的數據如下:

hello world
hello coder
good job
cool
66666

我們使用StepBuilderFactory構造Step,chunkSize設置為2。然後在job1中使用並執行:

stepBuilderFactory.get("teststep").chunk(2).reader(reader).writer(writer).build();

執行job1後console打印如下:

handle start =====[hello world, hello coder]
handle end.. -----[hello world, hello coder]
handle start =====[good job, cool]

job1遇到了good job這條數據,writer拋出了異常,由於沒有使用skip或者retry機制,導致整個流程停止。job1的處理流程底層在SimpleChunkProcessor這個類中完成,包括processor、writer的使用。

接下裏我們構造一個job2,job2使用skip機制(其中skipLimit必須要和skip(Class<? extends Throwable> type)一起使用),skip機制可以防止writer發生異常後不停止整個job,但是需要同時滿足skip的限制次數和skip對應的Exception是發生異常的父類或自身關系條件才不會停止整個job,這裏我們使用Exception作為異常和Integer.MAX_VALUE作為skip的限制次數為例:
stepBuilderFactory.get.get("test-step").chunk(2).reader(reader).writer(writer).faultTolerant().skipLimit(Integer.MAX_VALUE).skip(Exception.class).build();
執行job2 後console打印如下:

stepBuilderFactory.get.get("teststep").chunk(2).reader(reader).writer(writer).faultTolerant().skipLimit(Integer.MAX_VALUE).skip(Exception.class).build();

執行job2 後console打印如下:

handle start =====[hello world, hello coder]
handle end.. -----[hello world, hello coder]
handle start =====[good job, cool]
handle start =====[good job]
handle start =====[cool]
handle end.. -----[cool]
handle start =====[66666]
handle end.. -----[66666]

我們看到good job這條數據發生的異常被skip掉了,job完整的執行。

但是發現了另外一個問題,那就是處理 [good job, cool] 這批數據的時候,發生了異常,但是接下來執行了 [good job] 和 [cool] 這兩批chunk為1的批次。這是在ItemWriter中執行的,它也會在ItemWriteListener中執行多次。

換句話說,如果使用了skip功能,那麽對於需要被skip的批次數據中會進行scan操作找出具體是哪1條數據的原因,這裏的scan操作指的是一條一條數據的遍歷。

這個過程為什麽叫scan呢? 在源碼中,FaultTolerantChunkProcessor類(處理帶有skip或者retry機制的處理器,跟SimpleChunkProcessor類似,只不過SimpleChunkProcessor處理簡單的Job)裏有個私有方法scan:
private void scan(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs,
ChunkMonitor chunkMonitor, boolean recovery) throws Exception {

...

Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
Chunk<O>.ChunkIterator outputIterator = outputs.iterator();

List<O> items = Collections.singletonList(outputIterator.next()); // 拿出需要寫的數據中的每一條數據
inputIterator.next();
try {
    writeItems(items); // 寫每條數據
    doAfterWrite(items);
    contribution.incrementWriteCount(1);
    inputIterator.remove();
    outputIterator.remove();
}
catch (Exception e) { // 寫的時候如果發生了異常
    doOnWriteError(e, items);
    if (!shouldSkip(itemWriteSkipPolicy, e, -1) && !rollbackClassifier.classify(e)) {
        inputIterator.remove();
        outputIterator.remove();
    }
    else {
        // 具體的skip策略
        checkSkipPolicy(inputIterator, outputIterator, e, contribution, recovery);
    }
    if (rollbackClassifier.classify(e)) {
        throw e;
    }
}
chunkMonitor.incrementOffset();
if (outputs.isEmpty()) { // 批次裏的所有數據處理完畢之後 scanning 設置為false
    data.scanning(false);
    inputs.setBusy(false);
    chunkMonitor.resetOffset();
}
       }

這個scan方法觸發的條件是UserData這個內部類裏的scanning被設置為true,這裏被設置為true是在處理批次數據出現異常後並且不能retry的情況下才會被設置的。

try {
    batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs,
            rollbackClassifier));
}
catch (Exception e) {
    RetryContext context = contextHolder.get();
     if (!batchRetryTemplate.canRetry(context)) {
         // 設置scanning為true
        data.scanning(true);
    }
    throw e;
}

這就是為什麽skip機制在skip數據的時候會去scan批次中的每條數據,然後並找出需要被skip的數據的原理。
job3帶有retry功能,retry的功能在於出現某個異常並且這個異常可以被retry所接受的話會進行retry,retry的次數可以進行配置,我們配置了3次retry:

stepBuilderFactory.get.get("teststep").chunk(2).reader(reader).writer(writer).faultTolerant().skipLimit(Integer.MAX_VALUE).skip(Exception.class).retryLimit(3).retry(Exception.class).build();

執行 job3後console打印如下:

handle start =====[hello world, hello coder]
handle end.. -----[hello world, hello coder]
handle start =====[good job, cool]
handle start =====[good job, cool]
handle start =====[good job, cool]
handle start =====[good job]
handle start =====[cool]
handle end.. -----[cool]
handle start =====[66666]
handle end.. -----[66666]

good job, cool] 這批數據retry了3次,而且都失敗了。失敗之後進行了skip操作。

SpringBatch中的retry和skip都有對應的policy實現,默認的retry policy是SimpleRetryPolicy,可以設置retry次數和接收的exception。比如可以使用NeverRetryPolicy:

.retryPolicy(new NeverRetryPolicy())

使用NeverRetryPolicy之後,便不再retry了,只會skip。SpringBatch內部的retry是使用Spring的retry模塊完成的。執行的時候可以設置RetryCallback和RecoveryCallback。

SpringBatch中默認的skip policy是LimitCheckingItemSkipPolicy。

SpringBatch中的retry和skip機制實現分析