1. 程式人生 > >spring-batch (ItemProcessor) 數據處理過程

spring-batch (ItemProcessor) 數據處理過程

scope 完成 delegate lex super figure rri tin 方法

Spring-batch學習總結(五)
學習目標:掌握ItemProcessor
1.ItemProcessor:spring-batch中數據處理的過程
2.ItemProcessor主要用於實現業務邏輯,驗證,過濾,等
3.Spring-batch為我們提供ItemProcessor<I,O>這個接口,它包含一個方法O process(I item
4.我們用代碼進行演示:
例:我們讀取數據庫表person_buf中的數據,將其id為奇數的數據剔除,將讀出name進行字母大寫轉換
首先觀察數據庫表數據結構:
技術分享圖片

代碼:
Person

package com.dhcc.batch.batchDemo.processor;

import java.util.Date;

public class Person {
    private Integer id;
    private String name;
    private String perDesc;
    private Date createTime;
    private Date updateTime;
    private String sex;
    private Float score;
    private Double price;

    public Person() {
        super();
    }

    public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score,
            Double price) {
        super();
        this.id = id;
        this.name = name;
        this.perDesc = perDesc;
        this.createTime = createTime;
        this.updateTime = updateTime;
        this.sex = sex;
        this.score = score;
        this.price = price;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public String getPerDesc() {
        return perDesc;
    }

    public void setPerDesc(String perDesc) {
        this.perDesc = perDesc;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Date getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public Float getScore() {
        return score;
    }

    public void setScore(Float score) {
        this.score = score;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime="
                + updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]";
    }

}

PersonLineAggregator

package com.dhcc.batch.batchDemo.processor;

import org.springframework.batch.item.file.transform.LineAggregator;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class PersonLineAggregator implements LineAggregator<Person> {
    //JSON
    private ObjectMapper mapper=new ObjectMapper();

    @Override
    public String aggregate(Person person) {
        try {
            return mapper.writeValueAsString(person);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("unable to writer...",e);
        }
    }

}

PersonRowMapper

package com.dhcc.batch.batchDemo.processor;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;
/**
 * 實現將數據庫中的每條數據映射到Person對象中
 * @author Administrator
 *
 */
public class PersonRowMapper implements RowMapper<Person> {

    /**
     * rs一條結果集,rowNum代表當前行
     */
    @Override
    public Person mapRow(ResultSet rs, int rowNum) throws SQLException {
        return new Person(rs.getInt("id")
                ,rs.getString("name")
                ,rs.getString("per_desc")
                ,rs.getDate("create_time")
                ,rs.getDate("update_time")
                ,rs.getString("sex")
                ,rs.getFloat("score")
                ,rs.getDouble("price"));
    }

}

ProcessorFileApplication

package com.dhcc.batch.batchDemo.processor;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class ProcessorFileApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProcessorFileApplication.class, args);

    }
}

ProcessorFileOutputFromDBConfiguration

package com.dhcc.batch.batchDemo.processor;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;

@Configuration
public class ProcessorFileOutputFromDBConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Autowired
    private ItemProcessor<Person, Person> fristNameUpperCaseProcessor;

    @Autowired
    private ItemProcessor<Person, Person> idFilterProcessor;

    @Bean
    public Job ProcessorFileOutputFromDBJob() {
        return jobBuilderFactory.get("ProcessorFileOutputFromDBJob")
                .start(ProcessorFileOutputFromDBStep())
                .build();

    }

    @Bean
    public Step ProcessorFileOutputFromDBStep() {
        return stepBuilderFactory.get("ProcessorFileOutputFromDBStep")
                .<Person, Person>chunk(100)
                .reader(ProcessorFileOutputFromItemWriter())
                .processor(personDataProcessor())
                .writer(ProcessorFileOutputFromItemReader())
                .build();
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<Person> ProcessorFileOutputFromItemWriter() {
        JdbcPagingItemReader<Person> reader = new JdbcPagingItemReader<>();
        reader.setDataSource(this.dataSource); // 設置數據源
        reader.setFetchSize(100); // 設置一次最大讀取條數
        reader.setRowMapper(new PersonRowMapper()); // 把數據庫中的每條數據映射到AlipaytranDo對像中
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 設置查詢的列
        queryProvider.setFromClause("from person_buf"); // 設置要查詢的表
        Map<String, Order> sortKeys = new HashMap<String, Order>();// 定義一個集合用於存放排序列
        sortKeys.put("id", Order.ASCENDING);// 按照升序排序
        queryProvider.setSortKeys(sortKeys);
        reader.setQueryProvider(queryProvider);// 設置排序列
        return reader;
    }

    @Bean
    public CompositeItemProcessor<Person, Person> personDataProcessor(){
        CompositeItemProcessor<Person, Person> processor=new CompositeItemProcessor<>();
        List<ItemProcessor<Person, Person>> listProcessor=new ArrayList<>();
        listProcessor.add(fristNameUpperCaseProcessor);
        listProcessor.add(idFilterProcessor);
        processor.setDelegates(listProcessor);
        return processor;

    }

    @Bean
    @StepScope
    public FlatFileItemWriter<Person> ProcessorFileOutputFromItemReader() {
        FlatFileItemWriter<Person> writer = new FlatFileItemWriter<Person>();
        try {
            File path = new File("D:" + File.separator + "newPerson.json").getAbsoluteFile();
            System.out.println("file is create in :" + path);
            writer.setResource(new FileSystemResource(path));
            writer.setLineAggregator(new PersonLineAggregator());
            writer.afterPropertiesSet();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return writer;
    }

}

FristNameUpperCaseProcessor

package com.dhcc.batch.batchDemo.processor;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

@Component
public class FristNameUpperCaseProcessor implements ItemProcessor<Person, Person> {

    @Override
    public Person process(Person item) throws Exception {
        return new Person(item.getId(), item.getName().toUpperCase(), item.getPerDesc(), item.getCreateTime(),
                item.getUpdateTime(), item.getSex(), item.getScore(), item.getPrice());
    }

}

IdFilterProcessor

package com.dhcc.batch.batchDemo.processor;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

@Component
public class IdFilterProcessor implements ItemProcessor<Person, Person> {

    @Override
    public Person process(Person item) throws Exception {
        if (item.getId() % 2 == 0) {
            return item;
        } else {
            return null;
        }
    }

}

運行結果:
技術分享圖片
觀察寫入完成後的文件:
技術分享圖片
可以看出我們已經完成了我們的目標

spring-batch (ItemProcessor) 數據處理過程