1. 程式人生 > >Spring Batch(6)——資料庫批資料讀寫

Spring Batch(6)——資料庫批資料讀寫

前序文章陸續介紹了批處理的基本概念Job使用Step控制Item的結構以及扁平檔案的讀寫。本文將接著前面的內容說明資料庫如何進行批處理讀寫。

資料讀取

資料庫是絕大部分系統要用到的資料儲存工具,因此針對資料庫執行批量資料處理任務也是很常見的需求。資料的批量處理與常規業務開發不同,如果一次性讀取百萬條,對於任何系統而言肯定都是不可取的。為了解決這個問題Spring Batch提供了2套資料讀取方案:

  • 基於遊標讀取資料
  • 基於分頁讀取資料

遊標讀取資料

對於有經驗大資料工程師而言資料庫遊標的操作應該是非常熟悉的,因為這是從資料庫讀取資料流標準方法,而且在Java中也封裝了ResultSet

這種面向遊標操作的資料結構。

ResultSet一直都會指向結果集中的某一行資料,使用next方法可以讓遊標跳轉到下一行資料。Spring Batch同樣使用這個特性來控制資料的讀取:

  1. 在初始化時開啟遊標。
  2. 每一次呼叫ItemReader::read方法就從ResultSet獲取一行資料並執行next
  3. 返回可用於資料處理的對映結構(map、dict)。

在一切都執行完畢之後,框架會使用回撥過程呼叫ResultSet::close來關閉遊標。由於所有的業務過程都繫結在一個事物之上,所以知道到Step執行完畢或異常退出呼叫執行close。下圖展示了資料讀取的過程:

SQL語句的查詢結果稱為資料集

(對於大部分資料庫而言,其SQL執行結果會產生臨時的表空間索引來存放資料集)。遊標開始會停滯在ID=2的位置,一次ItemReader執行完畢後會產生對應的實體FOO2,然後遊標下移直到最後的ID=6。最後關閉遊標。

JdbcCursorItemReader

JdbcCursorItemReader是使用遊標讀取資料集的ItemReader實現類之一。它使用JdbcTemplate中的DataSource控制ResultSet,其過程是將ResultSet的每行資料轉換為所需要的實體類。

JdbcCursorItemReader的執行過程有三步:

  1. 通過DataSource建立JdbcTemplate
  2. 設定資料集的SQL語句。
  3. 建立ResultSet到實體類的對映。 大致如下:
//隨風溜達的向日葵 chkui.com
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());

除了上面的程式碼,JdbcCursorItemReader還有其他屬性:

屬性名稱說明
ignoreWarnings標記當執行SQL語句出現警告時,是輸出日誌還是丟擲異常,預設為true——輸出日誌
fetchSize預通知JDBC驅動全量資料的個數
maxRows設定ResultSet從資料庫中一次讀取記錄的上限
queryTimeout設定執行SQL語句的等待超時時間,單位秒。當超過這個時間會丟擲DataAccessException
verifyCursorPosition對遊標位置進行校驗。由於在RowMapper::mapRow方法中ResultSet是直接暴露給使用者的,因此有可能在業務程式碼層面呼叫了ResultSet::next方法。將這個屬性設定為true,在框架中會有一個位置計數器與ResultSet保持一致,當執行完Reader後位置不一致會丟擲異常。
saveState標記讀取的狀態是否被存放到ExecutionContext中。預設為true
driverSupportsAbsolute告訴框架是指直接使用ResultSet::absolute方法來指定遊標位置,使用這個屬性需要資料庫驅動支援。建議在支援absolute特性的資料庫上開啟這個特性,能夠明顯的提升效能。預設為false
setUseSharedExtendedConnection標記讀取資料的遊標是否與Step其他過程繫結成同一個事物。預設為false,表示讀取資料的遊標是單獨建立連線的,具有自身獨立的事物。如果設定為true需要用ExtendedConnectionDataSourceProxy包裝DataSource用於管理事物過程。此時遊標的建立標記為'READ_ONLY'、'HOLD_CURSORS_OVER_COMMIT'。需要注意的是該屬性需要資料庫支援3.0以上的JDBC驅動。

可執行原始碼

原始碼在下列地址的items子專案:

執行JdbcCursorItemReader的程式碼在org.chenkui.spring.batch.sample.items.JdbcReader。啟動位置是org.chenkui.spring.batch.sample.database.cursor.JdbcCurosrApplication

在執行程式碼之前請先在資料庫中執行以下DDL語句,並新增部分測試資料。

CREATE TABLE `tmp_test_weather` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主鍵',
  `siteid` varchar(64) NOT NULL COMMENT '業務主鍵',
  `month` varchar(64) NOT NULL COMMENT '日期',
  `type` varchar(64) NOT NULL COMMENT '氣象型別',
  `value` int(11) NOT NULL COMMENT '值',
  `ext` varchar(255) DEFAULT NULL COMMENT '擴充套件資料',
  PRIMARY KEY (`id`)
) ;

執行程式碼:

//隨風溜達的向日葵 chkui.com
public class JdbcReader {

    @Bean
    public RowMapper<WeatherEntity> weatherEntityRowMapper() {

        return new RowMapper<WeatherEntity>() {
            public static final String SITEID_COLUMN = "siteId"; // 設定對映欄位
            public static final String MONTH_COLUMN = "month";
            public static final String TYPE_COLUMN = "type";
            public static final String VALUE_COLUMN = "value";
            public static final String EXT_COLUMN = "ext";

            @Override
            // 資料轉換
            public WeatherEntity mapRow(ResultSet resultSet, int rowNum) throws SQLException {
                WeatherEntity weatherEntity = new WeatherEntity();
                weatherEntity.setSiteId(resultSet.getString(SITEID_COLUMN));
                weatherEntity.setMonth(resultSet.getString(MONTH_COLUMN));
                weatherEntity.setType(WeatherEntity.Type.valueOf(resultSet.getString(TYPE_COLUMN)));
                weatherEntity.setValue(resultSet.getInt(VALUE_COLUMN));
                weatherEntity.setExt(resultSet.getString(EXT_COLUMN));
                return weatherEntity;
            }
        };
    }

    @Bean
    public ItemReader<WeatherEntity> jdbcCursorItemReader(
        @Qualifier("weatherEntityRowMapper") RowMapper<WeatherEntity> rowMapper, DataSource datasource) {
        JdbcCursorItemReader<WeatherEntity> itemReader = new JdbcCursorItemReader<>();
        itemReader.setDataSource(datasource); //設定DataSource
        //設定讀取的SQL
        itemReader.setSql("SELECT siteId, month, type, value, ext from TMP_TEST_WEATHER"); 
        itemReader.setRowMapper(rowMapper); //設定轉換
        return itemReader;
    }
}

HibernateCursorItemReader

在Java體系中資料庫操作常見的規範有JPAORM,Spring Batch提供了HibernateCursorItemReader來實現HibernateTemplate,它可以通過Hibernate框架進行遊標的控制。

需要注意的是:使用Hibernate框架來處理批量資料到目前為止一直都有爭議,核心原因是Hibernate最初是為線上聯機事物型系統開發的。不過這並不意味著不能使用它來處理批資料,解決此問題就是讓Hibernate使用StatelessSession用來保持遊標,而不是standard session一次讀寫,這將導致Hibernate的快取機制和資料髒讀檢查失效,進而影響批處理的過程。關於Hibernate的狀態控制機制請閱讀官方文件。

HibernateCursorItemReader使用過程與JdbcCursorItemReader沒多大差異都是逐條讀取資料然後控制狀態連結關閉。只不過他提供了Hibernate所使用的HSQL方案。

@Bean
public ItemReader<WeatherEntity> hibernateCursorItemReader(SessionFactory sessionFactory) {
    HibernateCursorItemReader<WeatherEntity> itemReader = new HibernateCursorItemReader<>();
    itemReader.setName("hibernateCursorItemReader");
    itemReader.setQueryString("from WeatherEntity tmp_test_weather");
    itemReader.setSessionFactory(sessionFactory);
    return itemReader;
}

public ItemReader<WeatherEntity> hibernateCursorItemReader(SessionFactory sessionFactory) {
    return new HibernateCursorItemReaderBuilder<CustomerCredit>()
            .name("creditReader")
            .sessionFactory(sessionFactory)
            .queryString("from CustomerCredit")
            .build();
}

如果沒有特別的需要,不推薦使用Hibernate

StoredProcedureItemReader

儲存過程是在同一個資料庫中處理大量資料的常用方法。StoredProcedureItemReader的執行過程和JdbcCursorItemReader一致,但是底層邏輯是先執行儲存過程,然後返回儲存過程執行結果遊標。不同的資料庫儲存過程遊標返回會有一些差異:

  1. 作為一個ResultSet返回。(SQL Server, Sybase, DB2, Derby以及MySQL)
  2. 引數返回一個 ref-cursor例項。比如Oracle、PostgreSQL資料庫,這類資料庫儲存過程是不會直接return任何內容的,需要從傳參獲取。
  3. 返回儲存過程呼叫後的返回值。

針對以上3個型別,配置上有一些差異:

//隨風溜達的向日葵 chkui.com
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
    StoredProcedureItemReader reader = new StoredProcedureItemReader();

    reader.setDataSource(dataSource);
    reader.setProcedureName("sp_processor_weather");
    reader.setRowMapper(new weatherEntityRowMapper());
	
    reader.setRefCursorPosition(1);//第二種型別需要指定ref-cursor的引數位置

    reader.setFunction(true);//第三種類型需要明確的告知reader通過返回獲取

    return reader;
}

使用儲存過程處理資料的好處是可以實現針對庫內的資料進行合併、分割、排序等處理。如果資料在同一個資料庫,效能也明顯好於通過Java處理。

分頁讀取資料

相對於遊標,還有一個辦法是進行分頁查詢。分頁查詢意味著再進行批處理的過程中同一個SQL會多次執行。在聯機型事物系統中分頁查詢常用於列表功能,每一次查詢需要指定開始位置和結束位置。

JdbcPagingItemReader

分頁查詢的預設實現類是JdbcPagingItemReader,它的核心功能是用分頁器PagingQueryProvider進行分頁控制。由於不同的資料庫分頁方法差別很大,所以針對不同的資料庫有不同的實現類。框架提供了SqlPagingQueryProviderFactoryBean用於檢查當前資料庫並自動注入對應的PagingQueryProvider

JdbcPagingItemReader會從資料庫中一次性讀取一整頁的資料,但是呼叫Reader的時候還是會一行一行的返回資料。框架會自行根據執行情況確定什麼時候需要執行下一個分頁的查詢。

分頁讀取資料執行原始碼

執行JdbcPagingItemReader的程式碼在org.chenkui.spring.batch.sample.items.pageReader。啟動位置是org.chenkui.spring.batch.sample.database.paging.JdbcPagingApplication

//隨風溜達的向日葵 chkui.com
public class pageReader {
    final private boolean wrapperBuilder = false;
    @Bean
    //設定 queryProvider
    public SqlPagingQueryProviderFactoryBean queryProvider(DataSource dataSource) {
        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

        provider.setDataSource(dataSource);
        provider.setSelectClause("select id, siteid, month, type, value, ext");
        provider.setFromClause("from tmp_test_weather");
        provider.setWhereClause("where id>:start");
        provider.setSortKey("id");

        return provider;
    }

    @Bean
    public ItemReader<WeatherEntity> jdbcPagingItemReader(DataSource dataSource,
            PagingQueryProvider queryProvider,
            RowMapper<WeatherEntity> rowMapper) {

        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("start", "1");
        JdbcPagingItemReader<WeatherEntity> itemReader;
        if (wrapperBuilder) {
            itemReader = new JdbcPagingItemReaderBuilder<WeatherEntity>()
                    .name("creditReader")
                    .dataSource(dataSource)
                    .queryProvider(queryProvider)
                    .parameterValues(parameterValues)
                    .rowMapper(rowMapper)
                    .pageSize(1000)
                    .build();
        } else {
            itemReader = new JdbcPagingItemReader<>();
            itemReader.setName("weatherEntityJdbcPagingItemReader");
            itemReader.setDataSource(dataSource);
            itemReader.setQueryProvider(queryProvider);
            itemReader.setParameterValues(parameterValues);
            itemReader.setRowMapper(rowMapper);
            itemReader.setPageSize(1000);
        }
        return itemReader;
    }
}

資料寫入

Spring Batch為不同型別的檔案的寫入提供了多個實現類,但並沒有為資料庫的寫入提供任何實現類,而是交由開發者自己去實現介面。理由是:

  1. 資料庫的寫入與檔案寫入有巨大的差別。對於一個Step而言,在寫入一份檔案時需要保持對檔案的開啟狀態從而能夠高效的向隊尾新增資料。如果每次都重新開啟檔案,從開始位置移動到隊尾會耗費大量的時間(很多檔案流無法在open時就知道長度)。當整個Step結束時才能關閉檔案的開啟狀態,框架提供的檔案讀寫類都實現了這個控制過程。

  2. 另外無論使用何種方式將資料寫入檔案都是"逐行進行"的(流資料寫入、字串逐行寫入)。因此當資料寫入與整個Step繫結為事物時還需要實現一個控制過程是:在寫入資料的過程中出現異常時要擦除本次事物已經寫入的資料,這樣才能和整個Step的狀態保持一致。框架中的類同樣實現了這個過程。

  3. 但是向資料庫寫入資料並不需要類似於檔案的尾部寫入控制,因為資料庫的各種連結池本身就保證了連結->寫入->釋放的高效執行,也不存在向隊尾新增資料的問題。而且幾乎所有的資料庫驅動都提供了事物能力,在任何時候出現異常都會自動回退,不存在擦除資料的問題。

因此,對於資料庫的寫入操作只要按照常規的批量資料寫入的方式即可,開發者使用任何工具都可以完成這個過程。

寫入資料一個簡單的實現

實現資料寫入方法很多,這和常規的聯機事務系統沒任何區別。下面直接用JdbcTemplate實現了一個簡單的資料庫寫入過程。

執行資料庫寫入的核心程式碼在org.chenkui.spring.batch.sample.items.JdbcWriter。啟動位置是org.chenkui.spring.batch.sample.database.output.JdbcWriterApplication

//隨風溜達的向日葵 chkui.com
public class JdbcWriter {

    @Bean
    public ItemWriter<WeatherEntity> jdbcBatchWriter(JdbcTemplate template) {

        return new ItemWriter<WeatherEntity>() {
            final private static String INSERt_SQL = 
                      "INSERT INTO tmp_test_weather(siteid, month, type, value, ext) VALUES(?,?,?,?,?)";
            @Override
            public void write(List<? extends WeatherEntity> items) throws Exception {
                List<Object[]> batchArgs = new ArrayList<>();
                for (WeatherEntity entity : items) {
                    Object[] objects = new Object[5];
                    objects[0] = entity.getSiteId();
                    objects[1] = entity.getMonth();
                    objects[2] = entity.getType().name();
                    objects[3] = entity.getValue();
                    objects[4] = entity.getExt();
                    batchArgs.add(objects);
                }
                template.batchUpdate(INSERt_SQL, batchArgs);
            }
        };
    }
}

組合使用案例

下面是一些組合使用過程,簡單實現了檔案到資料庫、資料庫到檔案的過程。檔案讀寫的過程已經在檔案讀寫中介紹過,這裡會重複使用之前介紹的檔案讀寫的功能。

下面的案例是將data.csv中的資料寫入到資料庫,然後再將資料寫入到out-data.csv。案例組合使用已有的item完成任務:flatFileReaderjdbcBatchWriterjdbcCursorItemReadersimpleProcessorflatFileWriter。這種ReaderProcessorWriter組合的方式也是完成一個批處理工程的常見開發方式。

案例的執行程式碼在org.chenkui.spring.batch.sample.database.complex包中,使用了2個Step來完成任務,一個將資料讀取到資料庫,一個將資料進行過濾,然後再寫入到檔案:

//隨風溜達的向日葵 chkui.com
public class FileComplexProcessConfig {
    @Bean
    // 配置Step1
    public Step file2DatabaseStep(StepBuilderFactory builder,
            @Qualifier("flatFileReader") ItemReader<WeatherEntity> reader,
            @Qualifier("jdbcBatchWriter") ItemWriter<WeatherEntity> writer) {
        return builder.get("file2DatabaseStep") // 建立
                .<WeatherEntity, WeatherEntity>chunk(50) // 分片
                .reader(reader) // 讀取
                .writer(writer) // 寫入
                .faultTolerant() // 開啟容錯處理
                .skipLimit(20) // 跳過設定
                .skip(Exception.class) // 跳過異常
                .build();
    }

    @Bean
    // 配置Step2
    public Step database2FileStep(StepBuilderFactory builder, 
            @Qualifier("jdbcCursorItemReader") ItemReader<WeatherEntity> reader,
            @Qualifier("simpleProcessor") ItemProcessor<WeatherEntity, MaxTemperatureEntiry> processor,
            @Qualifier("flatFileWriter") ItemWriter<MaxTemperatureEntiry> writer) {
        return builder.get("database2FileStep") // 建立
                .<WeatherEntity, MaxTemperatureEntiry>chunk(50) // 分片
                .reader(reader) // 讀取
                .processor(processor) //
                .writer(writer) // 寫入
                .faultTolerant() // 開啟容錯處理
                .skipLimit(20) // 跳過設定
                .skip(Exception.class) // 跳過異常
                .build();
    }

    @Bean
    public Job file2DatabaseJob(@Qualifier("file2DatabaseStep") Step step2Database,
            @Qualifier("database2FileStep") Step step2File, JobBuilderFactory builder) {
        return builder.get("File2Database").start(step2Database).next(step2File).build