1. 程式人生 > >Spring Boot 2.0.3整合Spring Batch

Spring Boot 2.0.3整合Spring Batch

Batch用來做大資料處理,是一項不錯的選擇,由於公司的整體架構是Spring Boot,因此自己研究了一下兩者之間的關係。

1.在官網http://start.spring.io/,選擇MYSQL,BATCH,WEB

2.自定義MyBatchConfig類,添加註解@Configuration--配置註解,@EnableBatchProcessing--batch註解,相關程式碼如下:

package com.kmm.config;

import com.kmm.bean.Person;
import com.kmm.listener.MyJobListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.*;
import org.springframework.batch.support.DatabaseType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.List;

@Configuration
@EnableBatchProcessing
public class MyBatchConfig {
    @Bean
    public ItemReader<Person> reader() throws Exception{
        ItemReader<Person> itemReader = new ItemReader<Person>() {
            @Override
            public Person read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                    System.out.println("reader");
                    return  new Person();
            }
        };
        return itemReader;
    }

    @Bean
    public ItemProcessor<Person, Person> processor(){
        ItemProcessor<Person, Person> processor = new ItemProcessor<Person, Person>() {
            @Override
            public Person process(Person person) throws Exception {
                    System.out.println("processor");
                    return  new Person();
            }
        };
        return processor;
    }

    @Bean
    public ItemWriter<Person> writer(){
        ItemWriter<Person> itemWriter = new ItemWriter<Person>() {
            @Override
            public void write(List<? extends Person> list) throws Exception {
                System.out.println("writer");
            }
        };
        return itemWriter;
    }

    @Bean
    public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{

        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource);
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        jobRepositoryFactoryBean.setDatabaseType(DatabaseType.MYSQL.name());

        return jobRepositoryFactoryBean.getObject();
    }
//
    @Bean
    public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{

        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(this.jobRepository(dataSource, transactionManager));

        return jobLauncher;
    }

    @Bean
    public Job myJob(JobBuilderFactory jobs, Step step){
        return jobs.get("myJob1")
                .incrementer(new RunIdIncrementer())
                .flow(step) // 為Job指定Step
                .end()
                .listener(myJobListener()) // 繫結監聽器
                .build();
    }

    @Bean
    public Step step(StepBuilderFactory stepBuilderFactory,
                           ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor){
        return stepBuilderFactory.get("MyStep")
                .<Person, Person>chunk(5000) // 批處理每次提交5000條資料
                .reader(reader) // 給step繫結reader
                .processor(processor) // 給step繫結processor
                .writer(writer) // 給step繫結writer
                .build();
    }

    @Bean
    public MyJobListener myJobListener(){
        return new MyJobListener();
    }
}

Batch的流程為,reader,processor,writer;

processor可使用自定義,內容如下:

import com.kmm.bean.Person;
import org.springframework.batch.item.ItemProcessor;


public class MyProcessor implements ItemProcessor<Person, Person> {

    @Override
    public Person process(Person person) throws Exception {
        return person;
    }
}

reader和writer同理

3.使用Controller來測試該job,內容如下:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class DemoController {

    @Autowired
    SimpleJobLauncher jobLauncher;

    @Autowired
    Job importJob;

    public JobParameters jobParameters;

    @RequestMapping("/test")
    public void imp() throws  Exception{
        jobParameters = new JobParametersBuilder()
                .addLong("time",System.currentTimeMillis())
                .toJobParameters();
        jobLauncher.run(importJob,jobParameters);
    }
}

4.配置mysql資料來源(yaml檔案型別):

spring:
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/batch?useSSL=false
    username: root
    password: root

5.配置batch(yaml檔案型別)

spring:
  batch:
    initialize-schema: always
    job:
      enabled: false

spring.batch.initialize-shcema:always   初始化資料庫(boot2和boot1,初始化資料庫有區別)

spring.batch.job.enabled:false   專案啟動時不執行job

6.初始化的資料庫表:

7.執行結果:

 

8.如果考慮定時執行該job,可加@Scheduled(cron = "0 0/5 * * * ?"),每五分鐘執行一次,程式碼如下

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class DemoController {

    @Autowired
    SimpleJobLauncher jobLauncher;

    @Autowired
    Job importJob;

    public JobParameters jobParameters;

    @Scheduled(cron = "0 0/5 * * * ?")
    @RequestMapping("/test")
    public void imp() throws  Exception{
        jobParameters = new JobParametersBuilder()
                .addLong("time",System.currentTimeMillis())
                .toJobParameters();
        jobLauncher.run(importJob,jobParameters);
    }

剛開始研究,如果有什麼問題,歡迎大家共同討論