SpringBoot-SpringBatch
2019-01-16 本文已影响85人
Radom7
SpringBatch简介
SpringBatch是一个轻量级的综合性批处理框架,可用于开发企业信息系统中那些至关重要的数据批量处理业务. Spring Batch基于POJO和Spring框架,相当容易上手使用,让开发者很容易地访问和利用企业级服务.Spring Batch不是调度(scheduling)框架.因为已经有很多非常好的企业级调度框架,包括商业性质的和开源的,例如Quartz, Tivoli, Control-M等.它是为了与调度程序一起协作完成任务而设计的,而不是用来取代调度框架的.
SpringBatch提供了大量的,可重用的功能,这些功能对大数据处理来说是必不可少的,包括 日志/跟踪(tracing),事务管理,任务处理(processing)统计,任务重启, 忽略(skip),和资源管理等功能。 此外还提供了许多高级服务和特性,使之能够通过优化(optimization ) 和分片技术(partitioningtechniques)来高效地执行超大型数据集的批处理任务。
SpringBatch是一个具有高可扩展性的框架,简单的批处理,或者复杂的大数据批处理作业都可以通过Spring Batch框架来实现。
SpringBoot整合SpringBatch
- pom文件的springbatch依赖,同时加如mysql的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
- 在resource文件夹中新建一个csv文件,用来保存所需要进行批处理的数据:
- 在数据库里新建一张person表:
CREATE TABLE person(
id int PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(20),
age int,
nation VARCHAR(20),
address VARCHAR(20)
);
- application.yml文件指定数据库:
spring:
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/basedb
username: root
password: root
- 新建Person实体类,这里不说了,直接说springbatch配置类:
@Configuration
@EnableBatchProcessing
public class CsvBatchConfig {
/**
* ItemReader定义,用来读取数据
* 1,使用FlatFileItemReader读取文件
* 2,使用FlatFileItemReader的setResource方法设置csv文件的路径
* 3,对此对cvs文件的数据和领域模型类做对应映射
* @return
* @throws Exception
*/
@Bean
public ItemReader<Person> reader()throws Exception {
FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("person.csv"));
reader.setLineMapper(new DefaultLineMapper<Person>(){
{
setLineTokenizer(new DelimitedLineTokenizer(){
{
setNames(new String[]{"name","age","nation","address"});
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>(){{
setTargetType(Person.class);
}});
}
});
return reader;
}
/**
* ItemProcessor定义,用来处理数据
* @return
*/
@Bean
public ItemProcessor<Person,Person> processor(){
//使用我们自定义的ItemProcessor的实现CsvItemProcessor
CsvItemProcessor processor = new CsvItemProcessor();
//为processor指定校验器为CsvBeanValidator()
processor.setValidator(csvBeanValidator());
return processor;
}
/**
* ItemWriter定义,用来输出数据
* spring能让容器中已有的Bean以参数的形式注入,Spring Boot已经为我们定义了dataSource
* @param dataSource
* @return
*/
@Bean
public ItemWriter<Person> writer(@Qualifier("dataSource") DataSource dataSource){
JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<>();
//我们使用JDBC批处理的JdbcBatchItemWriter来写数据到数据库
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
String sql = "insert into person "+" (name,age,nation,address) "
+" values(:name,:age,:nation,:address)";
//在此设置要执行批处理的SQL语句
writer.setSql(sql);
writer.setDataSource(dataSource);
return writer;
}
/**
* JobRepository,用来注册Job的容器
* jobRepositor的定义需要dataSource和transactionManager,Spring Boot已为我们自动配置了
* 这两个类,Spring可通过方法注入已有的Bean
* @param dataSource
* @param transactionManager
* @return
* @throws Exception
*/
@Bean
public JobRepository jobRepository(@Qualifier("dataSource") DataSource dataSource, PlatformTransactionManager transactionManager)throws Exception{
JobRepositoryFactoryBean jobRepositoryFactoryBean =
new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
jobRepositoryFactoryBean.setDatabaseType(DatabaseType.MYSQL.name());
return jobRepositoryFactoryBean.getObject();
}
/**
* JobLauncher定义,用来启动Job的接口
* @param dataSource
* @param transactionManager
* @return
* @throws Exception
*/
@Bean
public SimpleJobLauncher jobLauncher(@Qualifier("dataSource") DataSource dataSource, PlatformTransactionManager transactionManager)throws Exception{
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
return jobLauncher;
}
/**
* Job定义,我们要实际执行的任务,包含一个或多个Step
* @param jobBuilderFactory
* @param s1
* @return
*/
@Bean
public Job importJob(JobBuilderFactory jobBuilderFactory, Step s1){
return jobBuilderFactory.get("importJob")
.incrementer(new RunIdIncrementer())
.flow(s1)//为Job指定Step
.end()
.listener(csvJobListener())//绑定监听器csvJobListener
.build();
}
/**
*step步骤,包含ItemReader,ItemProcessor和ItemWriter
* @param stepBuilderFactory
* @param reader
* @param writer
* @param processor
* @return
*/
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer,
ItemProcessor<Person,Person> processor){
return stepBuilderFactory
.get("step1")
.<Person,Person>chunk(65000)//批处理每次提交65000条数据
.reader(reader)//给step绑定reader
.processor(processor)//给step绑定processor
.writer(writer)//给step绑定writer
.build();
}
@Bean
public CsvJobListener csvJobListener(){
return new CsvJobListener();
}
@Bean
public Validator<Person> csvBeanValidator(){
return new CsvBeanValidator<Person>();
}
}
- 自定义一个校验器:
public class CsvItemProcessor extends ValidatingItemProcessor<Person> {
@Override
public Person process(Person item) throws ValidationException {
/**
* 需要执行super.process(item)才会调用自定义校验器
*/
super.process(item);
/**
* 对数据进行简单的处理,若民族为汉族,则数据转换为01,其余转换为02
*/
if (item.getNation().equals("汉族")) {
item.setNation("01");
} else {
item.setNation("02");
}
return item;
}
}
- 还有数据校验类:
public class CsvBeanValidator<T> implements Validator<T>,InitializingBean{
private javax.validation.Validator validator;
@Override
public void validate(T value) throws ValidationException {
/**
* 使用Validator的validate方法校验数据
*/
Set<ConstraintViolation<T>> constraintViolations =
validator.validate(value);
if (constraintViolations.size() > 0) {
StringBuilder message = new StringBuilder();
for (ConstraintViolation<T> constraintViolation : constraintViolations) {
message.append(constraintViolation.getMessage() + "\n");
}
throw new ValidationException(message.toString());
}
}
/**
* 使用JSR-303的Validator来校验我们的数据,在此进行JSR-303的Validator的初始化
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {
ValidatorFactory validatorFactory =
Validation.buildDefaultValidatorFactory();
validator = validatorFactory.usingContext().getValidator();
}
}