1. 程式人生 > >Spring-batch(ItemWriter)數據寫入數據庫,普通文件,xml文件,多文件分類寫入

Spring-batch(ItemWriter)數據寫入數據庫,普通文件,xml文件,多文件分類寫入

writev 奇數 component dbi database RoCE ava 顯示 ==

Spring-batch學習總結(四)
一.ItemWriter簡介
1.對於read讀取數據時是一個item為單位的循環讀取,而對於writer寫入數據則是以chunk為單位,一塊一塊的進行寫入
2.例(我們舉一個小例子來認識其writer原理):
代碼:
OutOverViewApplication

package com.dhcc.batch.batchDemo.output.outview;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class OutOverViewApplication {

    public static void main(String[] args) {
        SpringApplication.run(OutOverViewApplication.class, args);
    }
}

OutputViewItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.outview;

import java.util.ArrayList;
import java.util.List;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class OutputViewItemWriterConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("OutputViewItemWriter")
    private ItemWriter<? super String> outputViewItemWriter;

    @Bean
    public Job OutputViewItemWriterJob3() {
        return jobBuilderFactory.get("OutputViewItemWriterJob3")
                .start(OutputViewItemWriterStep3())
                .build();

    }

    @Bean
    public Step OutputViewItemWriterStep3() {
        return stepBuilderFactory.get("OutputViewItemWriterStep3")
                .<String, String>chunk(10)
                .reader(listViewItemRead())
                .writer(outputViewItemWriter)
                .build();
    }

    @Bean
    @StepScope
    public ListItemViewReader<String> listViewItemRead() {
        List<String> dataList=new ArrayList<>();
        for(int i=0;i<100;i++) {
            dataList.add("my name is zhongqiujie"+i);
        }
        return new ListItemViewReader<String>(dataList);
    }

}

ListItemViewReader

package com.dhcc.batch.batchDemo.output.outview;

import java.util.Iterator;
import java.util.List;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

@SuppressWarnings("hiding")
public class ListItemViewReader<String> implements ItemReader<String>{

    private final Iterator<String> iterator;

    public ListItemViewReader(List<String> data) {
        this.iterator = data.iterator();
    }

    @Override
    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        if (iterator.hasNext()) {
            return this.iterator.next();
        } else {
            return null;
        }
    }
}

OutputViewItemWriter implements

package com.dhcc.batch.batchDemo.output.outview;

import java.util.List;

import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;

@Component("OutputViewItemWriter")
public class OutputViewItemWriter implements ItemWriter<String> {

    @Override
    public void write(List<? extends String> items) throws Exception {
        System.out.println("writer chunk size is :" + items.size());
        for (String item : items) {
            System.out.println("writer data is:" + item);
        }
    }

}

運行結果:
技術分享圖片

二.將數據寫入到數據庫
1.在spring batch中為我們提供了許多將數據寫入到數據庫中的writer
(1)Neo4jItemWriter;
(2)MongoItemWriter;
..........
2.此處我們只學習JdbcBatchItemWriter
例:我們先在數據庫中建立數據表alipaytrando,結構如下:
技術分享圖片
接下來我們將項目中的springbatchtest2文件讀出並寫入到數據庫表alipaytrando中
Springbatchtest2文件結構如下:
技術分享圖片
開始寫代碼:
AlipayTranDo

package com.dhcc.batch.batchDemo.output.db.entity;

public class AlipayTranDo {
        private String tranId;
        private String channel;
        private String tranType;
        private String counterparty;
        private String goods;
        private String amount;
        private String isDebitCredit;
        private String state;

        public AlipayTranDo(String tranId, String channel, String tranType, String counterparty, String goods,
                String amount, String isDebitCredit, String state) {
            super();
            this.tranId = tranId;
            this.channel = channel;
            this.tranType = tranType;
            this.counterparty = counterparty;
            this.goods = goods;
            this.amount = amount;
            this.isDebitCredit = isDebitCredit;
            this.state = state;
        }

        public String getTranId() {
            return tranId;
        }

        public void setTranId(String tranId) {
            this.tranId = tranId;
        }

        public String getChannel() {
            return channel;
        }

        public void setChannel(String channel) {
            this.channel = channel;
        }

        public String getTranType() {
            return tranType;
        }

        public void setTranType(String tranType) {
            this.tranType = tranType;
        }

        public String getCounterparty() {
            return counterparty;
        }

        public void setCounterparty(String counterparty) {
            this.counterparty = counterparty;
        }

        public String getGoods() {
            return goods;
        }

        public void setGoods(String goods) {
            this.goods = goods;
        }

        public String getAmount() {
            return amount;
        }

        public void setAmount(String amount) {
            this.amount = amount;
        }

        public String getIsDebitCredit() {
            return isDebitCredit;
        }

        public void setIsDebitCredit(String isDebitCredit) {
            this.isDebitCredit = isDebitCredit;
        }

        public String getState() {
            return state;
        }

        public void setState(String state) {
            this.state = state;
        }

        @Override
        public String toString() {
            return "AlipayTranDO{" +
                    "tranId=‘" + tranId + ‘\‘‘ +
                    ", channel=‘" + channel + ‘\‘‘ +
                    ", tranType=‘" + tranType + ‘\‘‘ +
                    ", counterparty=‘" + counterparty + ‘\‘‘ +
                    ", goods=‘" + goods + ‘\‘‘ +
                    ", amount=‘" + amount + ‘\‘‘ +
                    ", isDebitCredit=‘" + isDebitCredit + ‘\‘‘ +
                    ", state=‘" + state + ‘\‘‘ +
                    ‘}‘;
        }
    }

AlipayTranDoFileMapper

package com.dhcc.batch.batchDemo.output.db.util;

import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;

public class AlipayTranDoFileMapper implements FieldSetMapper<AlipayTranDo> {

    @Override
    public AlipayTranDo mapFieldSet(FieldSet fieldSet) throws BindException {
        return new AlipayTranDo(fieldSet.readString("tranId")
                , fieldSet.readString("channel")
                ,fieldSet.readString("tranType")
                , fieldSet.readString("counterparty")
                , fieldSet.readString("goods")
                ,fieldSet.readString("amount")
                , fieldSet.readString("isDebitCredit")
                , fieldSet.readString("state")
                );
    }

}

OutputItemWriterDBApplication

package com.dhcc.batch.batchDemo.output.db.jdbcout;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class OutputItemWriterDBApplication {
    public static void main(String[] args) {
        SpringApplication.run(OutputItemWriterDBApplication.class, args);

    }
}

*OutputItemWriterDBConfiguration

package com.dhcc.batch.batchDemo.output.db.jdbcout;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;

@Configuration
public class OutputItemWriterDBConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    @Qualifier("outputDBItemReader")
    private ItemReader<? extends AlipayTranDo> outputDBItemReader;
    @Autowired
    @Qualifier("outputDBItemWriter")
    private ItemWriter<? super AlipayTranDo> outputDBItemWriter;

    @Autowired
    private MyProcess myProcess;
    @Bean
    public Job OutputItemWriterDBJob2() {
        return jobBuilderFactory.get("OutputItemWriterDBJob2").start(OutputItemWriterDBStep2()).build();

    }

    @Bean
    public Step OutputItemWriterDBStep2() {
        return stepBuilderFactory.get("OutputItemWriterDBStep2").<AlipayTranDo, AlipayTranDo>chunk(50)
                .reader(outputDBItemReader)
                .processor(myProcess)
                .writer(outputDBItemWriter)
                .build();
    }

}

OutputItemWriterDBItemReaderConfiguration

package com.dhcc.batch.batchDemo.output.db.jdbcout;

import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;
import com.dhcc.batch.batchDemo.output.db.util.AlipayTranDoFileMapper;
@Configuration
public class OutputItemWriterDBItemReaderConfiguration {

    @Bean
    public FlatFileItemReader<AlipayTranDo> outputDBItemReader(){
        FlatFileItemReader<AlipayTranDo> reader=new FlatFileItemReader<AlipayTranDo>();
        reader.setEncoding("UTF-8");
        reader.setResource(new ClassPathResource("/data/init/springbatchtest2.csv"));
        reader.setLinesToSkip(5);

        DelimitedLineTokenizer tokenizer=new DelimitedLineTokenizer();
        tokenizer.setNames(new String[] 
                {"tranId","channel","tranType","counterparty","goods","amount","isDebitCredit","state"}
        );
        DefaultLineMapper<AlipayTranDo> lineMapper=new DefaultLineMapper<AlipayTranDo>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(new AlipayTranDoFileMapper());
        lineMapper.afterPropertiesSet();
        reader.setLineMapper(lineMapper);
        return reader;
    }

}

MyProcess

package com.dhcc.batch.batchDemo.output.db.jdbcout;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;

@Component
public class MyProcess implements ItemProcessor<AlipayTranDo, AlipayTranDo> {

    @Override
    public AlipayTranDo process(AlipayTranDo item) throws Exception {
        System.out.println(item);
        return item;
    }

}

OutputItemWriterDBItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.db.jdbcout;

import javax.sql.DataSource;

import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.dhcc.batch.batchDemo.output.db.entity.AlipayTranDo;

@Configuration
public class OutputItemWriterDBItemWriterConfiguration {
    @Autowired
    private DataSource dataSource;

    @Bean
    public JdbcBatchItemWriter<AlipayTranDo> outputDBItemWriter() {
        System.out.println();
        JdbcBatchItemWriter<AlipayTranDo> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource);
        writer.setSql(
                "insert into alipaytrando"
                + "(tranId,channel,tranType,counterparty,goods,amount,isDebitCredit,state) values"
                + "(:tranId,:channel,:tranType,:counterparty,:goods,:amount,:isDebitCredit,:state) ");
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<AlipayTranDo>());
        return writer;

    }

}

運行結果:
技術分享圖片
觀察控制臺可得我們的項目運行成功,接下來我們再到數據中觀察數據是否成功插入
技術分享圖片
發現表中數據已經插入成功
技術分享圖片
三.將數據寫入到普通文件中
1.FlatFileItemWriter可以將任何一個類型為T的對象數據寫入到普通文件中
2.例:我們將數據庫中的alipaytrando中的數據讀出並且寫入到普通文件中接下裏我們開始編寫代碼:
實體類AlipayTranDo與上一個例子一樣,我們不在重復展示
AlipayTranDoFileMapper

package com.dhcc.batch.batchDemo.output.flatfile;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

public class AlipayTranDoFileMapper implements RowMapper<AlipayTranDo> {

    @Override
    public AlipayTranDo mapRow(ResultSet rs, int rowNum) throws SQLException {
        return new AlipayTranDo(rs.getString("tranId"), rs.getString("channel"), rs.getString("tranType"),
                rs.getString("counterparty"), rs.getString("goods"), rs.getString("amount"),
                rs.getString("isDebitCredit"), rs.getString("state"));
    }

}

AlipayTranDoLineAggregator

package com.dhcc.batch.batchDemo.output.flatfile;

import org.springframework.batch.item.file.transform.LineAggregator;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class AlipayTranDoLineAggregator implements LineAggregator<AlipayTranDo> {
    //JSON
    private ObjectMapper mapper=new ObjectMapper();

    @Override
    public String aggregate(AlipayTranDo alipayTranDo) {
        try {
            return mapper.writeValueAsString(alipayTranDo);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("unable to writer...",e);
        }
    }

}

FlatFileOutputFromDBConfiguration

package com.dhcc.batch.batchDemo.output.flatfile;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FlatFileOutputFromDBConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("flatFileOutputFromDBItemReader")
    private ItemReader<? extends AlipayTranDo> flatFileOutputFromDBItemReader;

    @Autowired
    @Qualifier("flatFileOutputFromDBItemWriter")
    private ItemWriter<? super AlipayTranDo> flatFileOutputFromDBItemWriter;

    @Bean
    public Job FlatFileOutputFromDBJob() {
        return jobBuilderFactory.get("FlatFileOutputFromDBJob").start(FlatFileOutputFromDBStep()).build();

    }

    @Bean
    public Step FlatFileOutputFromDBStep() {
        return stepBuilderFactory.get("FlatFileOutputFromDBStep").<AlipayTranDo, AlipayTranDo>chunk(100)
                .reader(flatFileOutputFromDBItemReader).writer(flatFileOutputFromDBItemWriter).build();
    }

}

FlatFileOutputFromDBItemReaderConfiguration

package com.dhcc.batch.batchDemo.output.flatfile;

import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FlatFileOutputFromDBItemReaderConfiguration {
    @Autowired
    private DataSource dataSource;

    @Bean
    public JdbcPagingItemReader<AlipayTranDo> flatFileOutputFromDBItemReader() {
        JdbcPagingItemReader<AlipayTranDo> reader = new JdbcPagingItemReader<>();
        reader.setDataSource(this.dataSource); // 設置數據源
        reader.setFetchSize(100); // 設置一次最大讀取條數
        reader.setRowMapper(new AlipayTranDoFileMapper()); // 把數據庫中的每條數據映射到AlipaytranDo對像中
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("tranId,channel,tranType,counterparty,goods,amount,isDebitCredit,state"); // 設置查詢的列
        queryProvider.setFromClause("from alipaytrando"); // 設置要查詢的表
        Map<String, Order> sortKeys = new HashMap<String, Order>();// 定義一個集合用於存放排序列
        sortKeys.put("tranId", Order.ASCENDING);// 按照升序排序
        queryProvider.setSortKeys(sortKeys);
        reader.setQueryProvider(queryProvider);// 設置排序列
        return reader;
    }
}

FlatFileOutputFromDBItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.flatfile;

import java.io.File;

import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;

@Configuration
public class FlatFileOutputFromDBItemWriterConfiguration {

    @Bean
    public FlatFileItemWriter<AlipayTranDo> flatFileOutputFromDBItemWriter(){
        FlatFileItemWriter<AlipayTranDo> writer=new FlatFileItemWriter<AlipayTranDo>();
        try {
            File path=new File("D:"+File.separator+"alipayTranDo.data").getAbsoluteFile();
//          String path=File.createTempFile("alipayTranDo", ".data").getAbsolutePath();
            System.out.println("file is create in :"+path);
            writer.setResource(new FileSystemResource(path));
            writer.setLineAggregator(new AlipayTranDoLineAggregator());
            writer.afterPropertiesSet();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return writer;

    }

}

OutputItemWriterFlatFileApplication

package com.dhcc.batch.batchDemo.output.flatfile;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class OutputItemWriterFlatFileApplication {
    public static void main(String[] args) {
        SpringApplication.run(OutputItemWriterFlatFileApplication.class, args);

    }
}

運行結果:
技術分享圖片
控制臺顯示文件讀取寫入成功,我們根據文件地址,觀察寫入後的普通文件
技術分享圖片
四.將數據寫入到xml文件中
1.將數據寫入到xml文件中,我們必須用到StaxEventItemWriter;
2.我們也會用到XStreamMarshall來序列文件
例:我們將數據庫表alipaytrando中的數據寫入到本地磁盤中
代碼(此處我們只展示writer,用來寫入的類,其他的均與上一個例子相同):

XMLFileOutputFromDBItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.xmlfile;

import java.io.File;
import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;

@Configuration
public class XMLFileOutputFromDBItemWriterConfiguration {

    @Bean
    public StaxEventItemWriter<AlipayTranDo> xmlFileOutputFromDBItemWriter() throws Exception {
        XStreamMarshaller marshaller = new XStreamMarshaller();
        @SuppressWarnings("rawtypes")
        Map<String, Class> aliases = new HashMap<>();
        aliases.put("alipayTranDo", AlipayTranDo.class);
        marshaller.setAliases(aliases);

        StaxEventItemWriter<AlipayTranDo> writer = new StaxEventItemWriter<>();
        writer.setRootTagName("alipaytrandos");
        writer.setMarshaller(marshaller);
        File path = new File("D:" + File.separator + "alipayTranDo.xml").getAbsoluteFile();
        System.out.println("file is create in :" + path);
        writer.setResource(new FileSystemResource(path));
        writer.afterPropertiesSet();
        return writer;

    }

}

運行結果:
技術分享圖片
根據地址觀察寫入後的xml文件
技術分享圖片
五.將數據寫入到多文件
1.將數據寫入多個文件,我們使用CompositItemWriter<T>或者使用ClassifierCompositItemWriter<T>
2.例(1):我們將數據表alipaytrandao中的數據分別寫入到xml文件和json文件中
此處我們只展示writer(其余代碼與上例相同):
mutipleFileOutputFromDBItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.mutiple.composit;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;

@Configuration
public class mutipleFileOutputFromDBItemWriterConfiguration {

    @Bean
    public FlatFileItemWriter<AlipayTranDo> jsonFileItemWriter(){
        FlatFileItemWriter<AlipayTranDo> writer=new FlatFileItemWriter<AlipayTranDo>();
        try {
            File path=new File("D:"+File.separator+"alipayTranDo1.json").getAbsoluteFile();
//          String path=File.createTempFile("alipayTranDo", ".json").getAbsolutePath();
            System.out.println("file is create in :"+path);
            writer.setResource(new FileSystemResource(path));
            writer.setLineAggregator(new AlipayTranDoLineAggregator());
            writer.afterPropertiesSet();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return writer;

    }

    @Bean 
    public StaxEventItemWriter<AlipayTranDo> xmlFileItemWriter() throws Exception{
        XStreamMarshaller marshaller=new XStreamMarshaller();
        @SuppressWarnings("rawtypes")
        Map<String, Class> aliases=new HashMap<>();
        aliases.put("alipayTranDo", AlipayTranDo.class);
        marshaller.setAliases(aliases);

        StaxEventItemWriter<AlipayTranDo> writer=new StaxEventItemWriter<>();
        writer.setRootTagName("alipaytrandos");
        writer.setMarshaller(marshaller);
        File path=new File("D:"+File.separator+"alipayTranDo1.xml").getAbsoluteFile();
        System.out.println("file is create in :"+path);
        writer.setResource(new FileSystemResource(path));
        writer.afterPropertiesSet();
        return writer;

    }

    @Bean
    public CompositeItemWriter<AlipayTranDo> alipayTranDoFileOutputFromDBItemWriter() throws Exception{
        CompositeItemWriter<AlipayTranDo> itemWriter=new CompositeItemWriter<>();
        itemWriter.setDelegates(Arrays.asList(xmlFileItemWriter(),jsonFileItemWriter()));
        itemWriter.afterPropertiesSet();

        return itemWriter;

    }

}

運行結果:
技術分享圖片
觀察文件:
Json:
技術分享圖片
Xml:
技術分享圖片
3.例(2):我們將同一個文件進行分類寫入:
首先我們觀察數據庫表person_buf的數據結構(數據總數是10001):
技術分享圖片

技術分享圖片

我們的目標是將數據從數據庫讀出按照id的奇偶分別寫入不同類型的文件中
接下來上代碼:
Person

package com.dhcc.batch.batchDemo.output.mutiple.classifier;

import java.util.Date;

public class Person {
    private Integer id;
    private String name;
    private String perDesc;
    private Date createTime;
    private Date updateTime;
    private String sex;
    private Float score;
    private Double price;

    public Person() {
        super();
    }

    public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score,
            Double price) {
        super();
        this.id = id;
        this.name = name;
        this.perDesc = perDesc;
        this.createTime = createTime;
        this.updateTime = updateTime;
        this.sex = sex;
        this.score = score;
        this.price = price;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public String getPerDesc() {
        return perDesc;
    }

    public void setPerDesc(String perDesc) {
        this.perDesc = perDesc;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Date getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public Float getScore() {
        return score;
    }

    public void setScore(Float score) {
        this.score = score;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime="
                + updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]";
    }

}

PersonLineAggregator

package com.dhcc.batch.batchDemo.output.mutiple.classifier;

import org.springframework.batch.item.file.transform.LineAggregator;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class PersonLineAggregator implements LineAggregator<Person> {
    //JSON
    private ObjectMapper mapper=new ObjectMapper();

    @Override
    public String aggregate(Person person) {
        try {
            return mapper.writeValueAsString(person);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("unable to writer...",e);
        }
    }

}

PersonRowMapper

package com.dhcc.batch.batchDemo.output.mutiple.classifier;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;
/**
 * 實現將數據庫中的每條數據映射到Person對象中
 * @author Administrator
 *
 */
public class PersonRowMapper implements RowMapper<Person> {

    /**
     * rs一條結果集,rowNum代表當前行
     */
    @Override
    public Person mapRow(ResultSet rs, int rowNum) throws SQLException {
        return new Person(rs.getInt("id")
                ,rs.getString("name")
                ,rs.getString("per_desc")
                ,rs.getDate("create_time")
                ,rs.getDate("update_time")
                ,rs.getString("sex")
                ,rs.getFloat("score")
                ,rs.getDouble("price"));
    }

}

OutputItemWriterMutipleClassFileApplication

package com.dhcc.batch.batchDemo.output.mutiple.classifier;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class OutputItemWriterMutipleClassFileApplication {
    public static void main(String[] args) {
        SpringApplication.run(OutputItemWriterMutipleClassFileApplication.class, args);

    }
}

ClassifierMutipleFileOutputFromDBConfiguration

package com.dhcc.batch.batchDemo.output.mutiple.classifier;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ClassifierMutipleFileOutputFromDBConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("mutipleFileOutputFromDBItemReader")
    private ItemReader<? extends Person> mutipleFileOutputFromDBItemReader;

    @Autowired
    @Qualifier("alipayTranDoFileOutputFromDBItemWriter")
    private ItemWriter<? super Person> alipayTranDoFileOutputFromDBItemWriter;

    @Autowired
    @Qualifier("jsonFileItemWriter")
    private ItemStream jsonFileItemWriter;

    @Autowired
    @Qualifier("xmlFileItemWriter")
    private ItemStream xmlFileItemWriter;

    @Bean
    public Job mutipleFileOutputFromDBJob1() {
        return jobBuilderFactory.get("mutipleFileOutputFromDBJob1")
                .start(mutipleFileOutputFromDBStep1())
                .build();

    }

    @Bean
    public Step mutipleFileOutputFromDBStep1() {
        return stepBuilderFactory.get("mutipleFileOutputFromDBStep1").<Person, Person>chunk(100)
                .reader(mutipleFileOutputFromDBItemReader)
                .writer(alipayTranDoFileOutputFromDBItemWriter)
                .stream(jsonFileItemWriter)
                .stream(xmlFileItemWriter)
                .build();
    }

}

mutipleFileOutputFromDBItemReaderConfiguration

package com.dhcc.batch.batchDemo.output.mutiple.classifier;

import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class mutipleFileOutputFromDBItemReaderConfiguration {
    @Autowired
    private DataSource dataSource;

    @Bean
    public JdbcPagingItemReader<Person> mutipleFileOutputFromDBItemReader() {
        JdbcPagingItemReader<Person> reader = new JdbcPagingItemReader<>();
        reader.setDataSource(this.dataSource); // 設置數據源
        reader.setFetchSize(100); // 設置一次最大讀取條數
        reader.setRowMapper(new PersonRowMapper()); // 把數據庫中的每條數據映射到AlipaytranDo對像中
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 設置查詢的列
        queryProvider.setFromClause("from person_buf"); // 設置要查詢的表
        Map<String, Order> sortKeys = new HashMap<String, Order>();// 定義一個集合用於存放排序列
        sortKeys.put("id", Order.ASCENDING);// 按照升序排序
        queryProvider.setSortKeys(sortKeys);
        reader.setQueryProvider(queryProvider);// 設置排序列
        return reader;
    }
}

mutipleFileOutputFromDBItemWriterConfiguration

package com.dhcc.batch.batchDemo.output.mutiple.classifier;

import java.io.File;
import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;

@Configuration
public class mutipleFileOutputFromDBItemWriterConfiguration {

    @Bean
    public FlatFileItemWriter<Person> jsonFileItemWriter(){
        FlatFileItemWriter<Person> writer=new FlatFileItemWriter<Person>();
        try {
            File path=new File("D:"+File.separator+"person.json").getAbsoluteFile();
            System.out.println("file is create in :"+path);
            writer.setResource(new FileSystemResource(path));
            writer.setLineAggregator(new PersonLineAggregator());
            writer.afterPropertiesSet();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return writer;

    }

    @Bean 
    public StaxEventItemWriter<Person> xmlFileItemWriter() throws Exception{
        XStreamMarshaller marshaller=new XStreamMarshaller();
        @SuppressWarnings("rawtypes")
        Map<String, Class> aliases=new HashMap<>();
        aliases.put("person", Person.class);
        marshaller.setAliases(aliases);

        StaxEventItemWriter<Person> writer=new StaxEventItemWriter<>();
        writer.setRootTagName("persons");
        writer.setMarshaller(marshaller);
        File path=new File("D:"+File.separator+"person.xml").getAbsoluteFile();
        System.out.println("file is create in :"+path);
        writer.setResource(new FileSystemResource(path));
        writer.afterPropertiesSet();
        return writer;

    }

    @Bean
    public ClassifierCompositeItemWriter<Person> alipayTranDoFileOutputFromDBItemWriter() throws Exception{
        ClassifierCompositeItemWriter<Person> itemWriter=new ClassifierCompositeItemWriter<Person>();

        itemWriter.setClassifier(new MyWriterClassifier(jsonFileItemWriter(),xmlFileItemWriter()));
        return itemWriter;

    }

}

MyWriterClassifier

package com.dhcc.batch.batchDemo.output.mutiple.classifier;

import org.springframework.batch.item.ItemWriter;
import org.springframework.classify.Classifier;

public class MyWriterClassifier implements Classifier<Person, ItemWriter<? super Person>> {
    private ItemWriter<Person> jsonWriter;
    private ItemWriter<Person> xmlWriter;

    /**
     * 
     */
    private static final long serialVersionUID = -2911015707834323846L;

    public MyWriterClassifier(ItemWriter<Person> jsonWriter, ItemWriter<Person> xmlWriter) {
        this.jsonWriter = jsonWriter;
        this.xmlWriter = xmlWriter;

    }

    @Override
    public ItemWriter<? super Person> classify(Person classifiable) {
        if (classifiable.getId()%2==0) {
            return jsonWriter;
        }else {
            return xmlWriter;
        }
    }

}

運行結果:
技術分享圖片
觀察文件:
Person.json:(我們可以看出id為偶數的都寫在了json文件中)
技術分享圖片
Person.xml:(我們可以看出id為奇數的都寫在了xml文件中)

技術分享圖片

Spring-batch(ItemWriter)數據寫入數據庫,普通文件,xml文件,多文件分類寫入