1. 程式人生 > >陪你解讀Spring Batch(二)帶你入手Spring Batch

陪你解讀Spring Batch(二)帶你入手Spring Batch

width sync launch 3.0 edi 抽象 override ride 批次

前言

  說得多不如show code。上一章簡單介紹了一下Spring Batch。本章將從頭到尾搭建一套基於Spring Batch(2.1.9)、Spring(3.0.5)、mybatis(3.4.5)、mysql、gradle的批處理簡單應用來處理文件中大量交易數據的寫入。

  那麽這裏簡單定義以下交易文件的格式,一個txnId交易Id,一個amt交易金額。一天比如有100w交易數據過來要落表。文件大概長這樣,只是簡單定義以下,實際開發肯定不會那麽少。

技術分享圖片

  因工作需求沒有使用最新版本的Spring Batch,所以本章是基於XML config的例子。最新版本支持用Java Config配置Spring Batch Job、Job Scope等。有興趣的同學可以自行研究一下。本人技術有限,本章講的如有錯誤希望請指正。

2.1 項目依賴

  首先我們要引入Spring Batch的依賴,這裏的版本是2.1.9

springbatch = ["org.springframework.batch:spring-batch-core:2.1.9.RELEASE",
                   "org.springframework.batch:spring-batch-infrastructure:2.1.9.RELEASE"]

  批量處理的過程中,我們都需要數據持久化。這裏我用的數據庫是mysql,ORM框架是mybatis。所以還要添加mysql-connect和mybatis的依賴

mybatis = "org.mybatis:mybatis:3.4.5"
mysqlconnect 
= "mysql:mysql-connector-java:5.1.25" dbcp = "commons-dbcp:commons-dbcp:1.4"

  事務和數據庫的配置就不用說了,必須的。

<!-- transaction config -->
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource"/>
    </bean>

    <bean id="dataSource" class
="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/m_test_db"/> <property name="username" value="root"/> <property name="password" value="root"/> </bean>

2.2 配置Job

  上一章節說過,其實文件批處理的場景,抽象的處理三大步驟分為,讀,處理,寫。那麽我們就依照這張圖來開始

技術分享圖片

  上圖如果看不懂的請看上一章節來理解。那麽我們先建立一個Spring Batch任務的xml文件,然後定義Job

<!--data-source,transaction-manager 數據源以及事務管理器-->
<batch:job-repository id="jobRepository"
data-source="dataSource" transaction-manager="transactionManager"
isolation-level-for-create="SERIALIZABLE"
table-prefix="DPL_" max-varchar-length="1000"/>

<batch:job id="investmentMatchFileJob"
               job-repository="jobRepository">
        <batch:step id="investmentMatchFileToDb">
            <batch:tasklet>
                <batch:chunk reader="txnListFileReader" writer="txnListResultWriter"
                             commit-interval="300"/>
            </batch:tasklet>
        </batch:step>
</batch:job>

  結合上面的圖看,是不是找到點感覺了?一個Job可以有多個Step組合,每一個Step由開發者自己編寫,可一把一個大Step分成多個小Step,完全看開發者意願。每一個Step對應一個ItemReader、ItemProcessor和ItemWriter。所有的批處理框架都可以抽象成最簡單的過程,讀取數據,處理數據,寫數據。所以Spring Batch提供了3個接口,ItemReader、ItemProcessor和ItemWriter。JobRepository則是記錄Job、Step和發起Job的執行信息等。

  xml配置Job必須依賴的有三項,名稱,JobRepository和Step列表。還有一個沒介紹就是commit-interval屬性,這就是控制讀了多少行進行一次寫。總不可能讀一行寫一行對吧?這裏配置多少,那麽Writer的入參list的size就是多少。

2.2.1 JobRepository

  JobRepository是記錄Job、Step和發起Job的執行信息,SpringBatch一共會讓你導入9張表,具體哪9張表請導入依賴然後查看schema-mysql.sql文件。

  這裏要說明的一點是table-prefix屬性,默認是以BATCH_開頭的,你可以改變前綴,當然你的sql腳本的表名前綴也要改動。註意,這裏只能改前綴,不可以改表的全名。表的列可以增加,比如說你的公司建表必須要有id,created_at,xxxx等字段的話,可以增加列,沒有問題。但是原有列的名稱不可以修改。腳本會在3張以SEQ結尾的表插入0,必須要先插入。

2.3 Step

2.3.1 Reader

  上面配置的reader是以下這個bean,value="file:#{jobParameters[‘txnListFile‘]}"。這裏用到SPEL表達式,傳入文件路徑參數。FlatFileItemReader只能處理一個文件,實際使用中不可能只處理一個文件,所以你也可以導入下面那個叫MultiResourceItemReader類,通過給MultiResourceItemReader設置Resource數組可以實現一個Job讀取一個目錄下多個文件。但是這裏註意,JobRepository不會記錄每個文件的處理情況。

<bean id="txnListFileReader"
      class="org.springframework.batch.item.file.FlatFileItemReader"
      scope="step">
    <!--輸入文件-->
    <property name="resource" value="file:#{jobParameters[‘txnListFile‘]}"/>
    <!--將每行映射為一個對象-->
    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <!--從劃分的字段中構建一個對象-->
            <property name="fieldSetMapper" ref="InvestMatchItemMapper"/>
            <!--根據某種分隔符來分-->
            <property name="lineTokenizer" ref="TxnListItemMapperFileLineTokenizer"/>
        </bean>
    </property>
    <!--跳過開頭的的一些行-->
    <property name="linesToSkip" value="1"/>
    <property name="encoding" value="UTF-8"/>
</bean>
<bean id="InvestMatchItemMapper" class="me.grimmjx.sync.TxnListItemMapper"/>
<bean id="TxnListItemMapperFileLineTokenizer"
      class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
    <property name="delimiter" value="|"/>
    <property name="names">
        <list>
            <value>txnId</value>
            <value>amt</value>
        </list>
    </property>
</bean>



<!--以下的內容是對一個目錄下多個文件進行批處理的樣例-->
<bean id="txnListFileReader"
      class="org.springframework.batch.item.file.MultiResourceItemReader"
      scope="step">
    <property name="resources" value="file:#{jobParameters[‘txnListFile‘]}/*.txt"/>
    <property name="delegate">
        <bean class="org.springframework.batch.item.file.FlatFileItemReader"
              scope="step">
            <property name="lineMapper">
                <bean
                        class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                    <property name="fieldSetMapper" ref="InvestMatchItemMapper"/>
                    <property name="lineTokenizer"
                              ref="TxnListItemMapperFileLineTokenizer"/>
                </bean>
            </property>
            <property name="linesToSkip" value="1"/>
            <property name="encoding" value="UTF-8"/>
        </bean>
    </property>
</bean>

  以下圖來理解比較方便

技術分享圖片

  從xml配置來看,delimiter控制如何分割,names就是文件每一列的名字。在這麽多配置裏,我們只需要寫一個Java類。這裏就是從一行數據,轉換成一個對象。

/**
 * @author GrimMjx
 * 交易記錄匹配器類。
 */
public class TxnListItemMapper implements FieldSetMapper<TxnList>{

    @Override
    public TxnList mapFieldSet(FieldSet fieldSet) throws BindException {
        TxnList txnList = new TxnList();
        txnList.setTxnId(fieldSet.readString("txnId"));
        txnList.setAmt(fieldSet.readBigDecimal("amt"));

        return txnList;
    }
}

2.3.2 Writer

  writer的bean為

<bean id="txnListResultWriter" class="me.grimmjx.sync.TxnListResultWriter" scope="step"/>

  writer執行的是寫入操作,我們要實現ItemWriter<T>接口,以下為這個類的Java代碼。這裏的操作很簡單,將構建好的對象集合直接寫入庫。註意了,外面沒有冪等的話,最好這裏先判斷庫裏有沒有,不要無腦寫入。

/**
 * @author GrimMjx
 * 交易數據寫入類。
 */
public class TxnListResultWriter implements ItemWriter<TxnList> {
    @Autowired
    private TxnListMapper txnListMapper;

    @Override
    public void write(List<? extends TxnList> items) throws Exception {
        List<TxnList> txnLists = Lists.newArrayList();
        for (TxnList item : items) {
            txnLists.add(item);
        }
        txnListMapper.insertBatch(txnLists);
    }
}

2.4 啟動Job

  這裏先定義一個bean,與之前的Job相關聯。

<bean id="DefaultFileProcessor" class="me.grimmjx.processor.DefaultFileProcessor">
        <property name="job" ref="investmentMatchFileJob"/>
        <property name="jobLauncher" ref="jobLauncher"/>
</bean>

  以下為這個processor的Java代碼

/**
 * 默認文件處理器類。
 *
 * @author GrimMjx
 */
public class DefaultFileProcessor {
    /**
     * 批次job
     */
    protected Job job;

    /**
     * 任務啟動器
     */
    protected JobLauncher jobLauncher;


    public void process() {
        String baseDir = "/Users/miaojiaxing/test/2019.01.31.txt";

        JobParametersBuilder builder = new JobParametersBuilder();
        builder.addString("txnListFile", baseDir);
        // 攜帶參數
//        builder.addString("packageCode", "12345");
        builder.addString("dateTime", System.currentTimeMillis() + "");
        JobParameters jobParas = builder.toJobParameters();

        try {
            jobLauncher.run(job, jobParas);
        } catch (Exception e) {
            throw new RuntimeException("Run springBatchJob meet error", e);
        }
    }

    public void setJob(Job job) {
        this.job = job;
    }

    public void setJobLauncher(JobLauncher jobLauncher) {
        this.jobLauncher = jobLauncher;
    }

}

  最後我們試試

/**
 * @author GrimMjx
 * <p>
 * 測試類。
 */
public class MainTest {

    public static void main(String[] args) {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
        DefaultFileProcessor bean = ctx.getBean("DefaultFileProcessor", DefaultFileProcessor.class);
        bean.process();
        DefaultFileProcessor rereadProcessor = ctx.getBean("rereadProcessor", DefaultFileProcessor.class);
        rereadProcessor.process();
    }
}

技術分享圖片

  沒有問題。

  下一章節將結合校驗清洗、異常彈性處理、並行配置附上代碼。

陪你解讀Spring Batch(二)帶你入手Spring Batch