1. 程式人生 > >Spring Batch 如何健壯可重啟可追溯 SKIP/RETRY/RESTART策略的應用

Spring Batch 如何健壯可重啟可追溯 SKIP/RETRY/RESTART策略的應用

前提:你已經有了一定的Spring基礎

你已經可以跑動一個簡單的Spring batch 的例項

參考:http://www.cnblogs.com/gulvzhe/archive/2011/10/25/2224249.html

http://www.cnblogs.com/cdutedu/p/3789396.html

先盜幾個圖



JobLauncher 指定一個 JobRepository

JobRepository包含了一些傳入JOB的引數,主要有六個表去儲存

每個JOB可以對應多個Step

		 ...<batch:step id="aStep" next="bStep">
			<batch:tasklet>
				<batch:chunk reader="aReader" writer="aWriter" processor="aProcessor" commit-interval="1000" />
			</batch:tasklet>
		</batch:step>...

tesklet裡面的工作塊為chunk,每個chunk執行完commit-interval,即提交一次,可以同時開多個chunk


每個STEP 這樣執行


(以下一段摘抄)

從DB或是檔案中取出資料的時候,read()操作每次只讀取一條記錄,之後將讀取的這條資料傳遞給processor(item)處理,框架將重複做這兩步操作,直到讀取記錄的件數達到batch配置資訊中”commin-interval”設定值的時候,就會呼叫一次write操作。然後再重複上圖的處理,直到處理完所有的資料。當這個Step的工作完成以後,或是跳到其他Step,或是結束處理。

那麼問題來了?讀取資料到處理的時候發生了異常如何處理?會不會導致整個批處理中斷呢?

有異常沒有捕獲,到最上層也沒有的話整個程序會掛掉,導致整個批處理中斷。

顯然,為了不影響後面的處理,要麼捕獲異常,打出日誌,跳過。要麼,重試(在IO超時或者表鎖定的情況下是很有效的-瞬態情況)。

即使有中斷,我們也需要重啟JOB

以上幾種正好對應Spring-Batch中的 SKIP\RETYR\RESTART

一、SKIP

<job id="importProductsJob">
<step id="importProductsStep">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="100"
skip-limit="10">
<skippable-exception-classes>
<include class="org.springframework.batch
➥ .item.file.FlatFileParseException" />
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
</job>

skippable-exception-classes 裡面配需要SKIP的異常型別

skip-limit 最多可以容錯次數,超過這個數,該STEP中斷


也可以新增listener去打日誌

<bean id="skipListener" class="com.manning
➥ .sbia.ch08.skip.DatabaseSkipListener">
<constructor-arg ref="dataSource" />
</bean>
<job id="importProductsJob"
xmlns="http://www.springframework.org/schema/batch">
<step id="importProductsStep">
<tasklet>
<chunk reader="reader" writer="writer"
commit-interval="100" skip-limit="10">
<skippable-exception-classes>
<include class="org.springframework.batch.item.file
➥ .FlatFileParseException" />
</skippable-exception-classes>
</chunk>
<listeners>
<listener ref="skipListener" />
</listeners>
</tasklet>
</step>
</job>

二、Retrying on error

主要針對於IO操作的、併發等,瞬態的錯誤

類似於SKIP的配置

<job id="importProducsJob">
<step id="importProductsStep">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="100"
retry-limit="3">
<retryable-exception-classes>
<include class="org.springframework.dao
➥.OptimisticLockingFailureException" />
</retryable-exception-classes>
</chunk>
</tasklet>
</step>
</job>

如果你不想重試次數達到後,由於這些錯誤導致STEP的中斷退出,可以混合RETRY和SKIP兩者。
<job id="job">
<step id="step">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="100"
retry-limit="3" skip-limit="10">
<retryable-exception-classes>
<include class="org.springframework.dao
➥ .DeadlockLoserDataAccessException" />
</retryable-exception-classes>
<skippable-exception-classes>
<include class="org.springframework.dao
➥ .DeadlockLoserDataAccessException" />
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
</job>


你也可以通過自定義的策略來控制重試

<job id="retryPolicyJob"
xmlns="http://www.springframework.org/schema/batch">
<step id="retryPolicyStep">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="100"
retry-policy="retryPolicy" />
</tasklet>
</step>
</job>
<bean id="retryPolicy" class="org.springframework
➥.batch.retry.policy.ExceptionClassifierRetryPolicy">
<property name="policyMap">
<map>
<entry key="org.springframework.dao.ConcurrencyFailureException">
<bean class="org.springframework.batch.retry
➥.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="3" />
</bean>
</entry>
<entry key="org.springframework.dao
➥ .DeadlockLoserDataAccessException">
<bean class="org.springframework.batch.retry
➥ .policy.SimpleRetryPolicy">
<property name="maxAttempts" value="5" />
</bean>
</entry>
</map>
</property>
</bean>

你也可以新增Listener

配置方法類似同SKIP,繼承RetryListenerSupport,配置在retry-listeners

你還可以通過RetryTemplate來重試。RetryTemplate配置屬性retryPolicy

三、重啟

重啟的重點是能夠儲存之前的job repository,自帶的reader,都可以,自己寫的Reader需要繼承介面

Spring batch 預設是會重啟的

allow-start-if-complete配置在tasklet上,決定了tasklet會不會在JOB重試的時候,重試該STEP(可能是下個STEP發生了異常)。

start-limit用來控制STEP級別的重試次數,重試次數結束後,JOB中斷退出。



處理在STEP中的狀態,已經處理了很多ITEM,失敗了? middle of a chunk-oriented step,item級別的重啟。首先是reader的重啟。 如果你需要你自己寫的ITEM的reader,也可以實現重啟的話,需要實現ItemStram的介面,將對於item 計數的值,儲存到executionContext,重啟的時候去讀取。