spring batch--> processor
2020-04-28 本文已影响0人
刘小刀tina
/**
* @program: demo-spring-batch
* @description 从数据库读到的数据 处理之后输出到文件里,用到processor处理
* @author: tina.liu
* @create: 2020-04-28 09:49
**/
@Configuration
@EnableBatchProcessing
public class ItemProcessor {
@Autowired
private DataSource dataSource ; //数据源
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory ;
@Autowired
@Qualifier(value = "nameUpperProcessor")
private org.springframework.batch.item.ItemProcessor<User,User> nameUpperProcessor;
@Autowired
@Qualifier(value = "idFilterProcessor")
private org.springframework.batch.item.ItemProcessor<User,User> idFilterProcessor;
@Autowired
@Qualifier(value = "dbFileWriter")
private ItemWriter<? super User> dbFileWriter;
/**
* 创建Job
* @return
*/
@Bean(value = "itemProcessorJob2")
public Job itemProcessorJob2(){
return jobBuilderFactory.get("itemProcessorJob2")
.start(itemProcessorStep2())
.build();
}
/**
* 创建step
* @return
*/
@Bean(value = "itemProcessorStep2")
public Step itemProcessorStep2() {
return stepBuilderFactory.get("itemProcessorStep2")
.<User,User>chunk(2)
.reader(fromdbReader())
.processor(process())
.writer(dbFileWriter)
.build();
}
///有多种处理方式,对从数据库读取的数据进行一些处理
@Bean
public CompositeItemProcessor<User,User> process(){
CompositeItemProcessor<User,User> processor = new CompositeItemProcessor<>();
List<org.springframework.batch.item.ItemProcessor<User, User>> delegates = new ArrayList<>();
delegates.add(nameUpperProcessor);
delegates.add(idFilterProcessor);
processor.setDelegates(delegates);
return processor;
}
/**
* 定义一个reader,从数据库读取数据
* @return
*/
@Bean(value = "fromdbReader")
public JdbcPagingItemReader<User> fromdbReader() {
JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>();
//注入数据源
reader.setDataSource(dataSource);
//设置一次读取几条数据
reader.setFetchSize(2);
//将读取的记录转成user对象
reader.setRowMapper(new RowMapper<User>() {
@Override
public User mapRow(ResultSet rs, int i) throws SQLException {
User user = new User();
user.setId(rs.getString(1));
user.setName(rs.getString(2));
user.setAge(rs.getString(3));
return user;
}
});
//如何指定sql语句
MySqlPagingQueryProvider provider = new MySqlPagingQueryProvider();
provider.setSelectClause("id,name,age");
provider.setFromClause("from t_user");
//如何根据哪个字段排序
Map<String, Order> sort = new LinkedHashMap(1);
sort.put("id", Order.ASCENDING); // 根据ID升序
provider.setSortKeys(sort);
reader.setQueryProvider(provider);
return reader;
}
}//类的打括号
//创建一个类
@Data
class User{
private String id;
private String name;
private String age;
}
//创建一个处理类(将name字母转成大写)
@Component(value = "nameUpperProcessor")
class NameUpperProcessor implements org.springframework.batch.item.ItemProcessor<User,User>{
@Override
public User process(User item) throws Exception {
User u= new User();
u.setId(item.getId());
u.setName(item.getName().toUpperCase());
u.setAge(item.getAge());
return u;
}
}
//创建一个处理类 (对ID进行过滤)
@Component(value = "idFilterProcessor")
class IdFilterProcessor implements org.springframework.batch.item.ItemProcessor<User,User>{
@Override
public User process(User item) throws Exception {
int id = Integer.parseInt(item.getId());
if(id%2 ==0){
return item;
}
return null;
}
}
//输出的类,将从数据库读取的信息 输出到指定文件
@Configuration
class FileItemWriterConfig{
@Bean(value = "dbFileWriter")
public FlatFileItemWriter<User> dbFileWriter () throws Exception {
//将对象User转成字符串输出到文件
FlatFileItemWriter<User> writer = new FlatFileItemWriter<User>();
String path="/Users/lvxiaokai/Desktop/wangke-project2020/demo-spring-batch/src/main/resources/user.txt";
writer.setResource(new FileSystemResource(path));
//把user对象转换成字符串
writer.setLineAggregator(new LineAggregator<User>() {
ObjectMapper mapper = new ObjectMapper();
@Override
public String aggregate(User item) {
String str = null;
try {
str = mapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return str;
}
});
writer.afterPropertiesSet();
System.out.println("将数据库读取的数据写入指定的文档success");
return writer;
}
}