Spring-Batch-03-Partition
2017-02-04 本文已影响311人
LoWang
BatchJobConfig.java
package xin.lowang.springbatch.config;
import java.util.Collections;
import java.util.List;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.batch.core.partition.support.SimplePartitioner;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import xin.lowang.springbatch.domain.Person;
import xin.lowang.springbatch.processor.PersonItemProcessor;
@Configuration
@EnableBatchProcessing
public class BatchJobConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public DataSource dataSource;
@Bean
public FlatFileItemReader<Person> reader() {
FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("sample-data.csv"));
reader.setLineMapper(new DefaultLineMapper<Person>() {
{
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {
{
setTargetType(Person.class);
}
});
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames(new String[] { "firstName", "lastName" });
}
});
}
});
return reader;
}
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Person> dbWriter() {
JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)");
writer.setDataSource(dataSource);
return writer;
}
@Bean
public PrintItemWriter<Person> printWriter() {
return new PrintItemWriter<Person>() {
@Override
public void write(List<? extends Person> items) throws Exception {
items.stream().forEach(person -> {
System.out.println(person.getFirstName() + " is ok");
});
}
};
}
@Bean
public LogItemWriter<Person> logWriter() {
return new LogItemWriter<Person>() {
@Override
public void write(List<? extends Person> items) throws Exception {
items.stream().forEach(person -> {
log.info("person {} is ok", person);
});
}
};
}
//定义step和job
@Bean
public Step printStep() {
return stepBuilderFactory.get("printStep").<Person, Person>chunk(2).reader(reader()).processor(processor()).writer(printWriter()).build();
}
@Bean
public Step partitionPrintStep() {
SimplePartitioner partitioner = new SimplePartitioner();
MultiResourcePartitioner multiResourcePartitioner = new MultiResourcePartitioner();
//partitioner.se
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(4);
TaskExecutorPartitionHandler taskExecutorPartitionHandler = new TaskExecutorPartitionHandler();
taskExecutorPartitionHandler.setGridSize(2);
taskExecutorPartitionHandler.setTaskExecutor(taskExecutor);
taskExecutorPartitionHandler.setStep(printStep());
return stepBuilderFactory.get("partitionPrintStep").partitioner("slaveStep", partitioner).partitionHandler(taskExecutorPartitionHandler).build();
}
@Bean
public Step logStep() {
return stepBuilderFactory.get("logStep").<Person, Person>chunk(2).reader(new ListItemReader<>(Collections.singletonList(new Person("chenghao", "Wang"))))
.writer(logWriter()).build();
}
//@Bean
public Job importUserJob() {
Flow flow = new FlowBuilder<SimpleFlow>("asyncFlow").start(logStep()).build();
return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).flow(printStep()).split(new SimpleAsyncTaskExecutor()).add(flow).build().build();
//.next(logStep())
//.end()
//.build();
}
@Bean
public Job partitionImportUserJob() {
return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).start(partitionPrintStep()).build();
}
public class PrintItemWriter<T> implements ItemWriter<T> {
@Override
public void write(List<? extends T> items) throws Exception {
}
}
public class LogItemWriter<T> implements ItemWriter<T> {
protected final Logger log = LoggerFactory.getLogger(LogItemWriter.class);
@Override
public void write(List<? extends T> items) throws Exception {
}
}
}