spring batch--> processor

2020-04-28  本文已影响0人  刘小刀tina
/**
 * @program: demo-spring-batch
 * @description 从数据库读到的数据 处理之后输出到文件里,用到processor处理
 * @author: tina.liu
 * @create: 2020-04-28 09:49
 **/
@Configuration
@EnableBatchProcessing
public class ItemProcessor {

    @Autowired
    private DataSource dataSource ; //数据源

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory ;

    @Autowired
    @Qualifier(value = "nameUpperProcessor")
    private org.springframework.batch.item.ItemProcessor<User,User>  nameUpperProcessor;

    @Autowired
    @Qualifier(value = "idFilterProcessor")
    private org.springframework.batch.item.ItemProcessor<User,User>  idFilterProcessor;


    @Autowired
    @Qualifier(value = "dbFileWriter")
    private ItemWriter<? super User> dbFileWriter;

    /**
     * 创建Job
     * @return
     */
    @Bean(value = "itemProcessorJob2")
    public Job itemProcessorJob2(){
        return jobBuilderFactory.get("itemProcessorJob2")
                .start(itemProcessorStep2())
                .build();
    }

    /**
     * 创建step
     * @return
     */
    @Bean(value = "itemProcessorStep2")
    public Step itemProcessorStep2() {
        return stepBuilderFactory.get("itemProcessorStep2")
                .<User,User>chunk(2)
                .reader(fromdbReader())
                .processor(process())
                .writer(dbFileWriter)
                .build();
    }



    ///有多种处理方式,对从数据库读取的数据进行一些处理
    @Bean
    public CompositeItemProcessor<User,User> process(){
        CompositeItemProcessor<User,User> processor = new CompositeItemProcessor<>();
        List<org.springframework.batch.item.ItemProcessor<User, User>> delegates = new ArrayList<>();
        delegates.add(nameUpperProcessor);
        delegates.add(idFilterProcessor);
        processor.setDelegates(delegates);
        return processor;
    }


    /**
     * 定义一个reader,从数据库读取数据
     * @return
     */
    @Bean(value = "fromdbReader")
    public JdbcPagingItemReader<User> fromdbReader() {
        JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>();
        //注入数据源
        reader.setDataSource(dataSource);
        //设置一次读取几条数据
        reader.setFetchSize(2);
        //将读取的记录转成user对象
        reader.setRowMapper(new RowMapper<User>() {
            @Override
            public User mapRow(ResultSet rs, int i) throws SQLException {
                User user = new User();
                user.setId(rs.getString(1));
                user.setName(rs.getString(2));
                user.setAge(rs.getString(3));
                return user;
            }
        });
        //如何指定sql语句
        MySqlPagingQueryProvider provider = new MySqlPagingQueryProvider();
        provider.setSelectClause("id,name,age");
        provider.setFromClause("from t_user");
        //如何根据哪个字段排序
        Map<String, Order> sort = new LinkedHashMap(1);
        sort.put("id", Order.ASCENDING); // 根据ID升序
        provider.setSortKeys(sort);
        reader.setQueryProvider(provider);
        return reader;
    }


}//类的打括号

//创建一个类
@Data
class User{
    private String id;
    private String name;
    private String age;
}

//创建一个处理类(将name字母转成大写)
@Component(value = "nameUpperProcessor")
class NameUpperProcessor implements org.springframework.batch.item.ItemProcessor<User,User>{
    @Override
    public User process(User item) throws Exception {
        User u= new User();
        u.setId(item.getId());
        u.setName(item.getName().toUpperCase());
        u.setAge(item.getAge());
        return u;
    }
}


//创建一个处理类 (对ID进行过滤)
@Component(value = "idFilterProcessor")
class IdFilterProcessor implements org.springframework.batch.item.ItemProcessor<User,User>{

    @Override
    public User process(User item) throws Exception {
        int id = Integer.parseInt(item.getId());
        if(id%2 ==0){
            return item;
         }
        return null;
    }

}


//输出的类,将从数据库读取的信息 输出到指定文件
@Configuration
class FileItemWriterConfig{

    @Bean(value = "dbFileWriter")
    public FlatFileItemWriter<User> dbFileWriter () throws Exception {

        //将对象User转成字符串输出到文件
        FlatFileItemWriter<User> writer = new FlatFileItemWriter<User>();
        String path="/Users/lvxiaokai/Desktop/wangke-project2020/demo-spring-batch/src/main/resources/user.txt";
        writer.setResource(new FileSystemResource(path));

        //把user对象转换成字符串
        writer.setLineAggregator(new LineAggregator<User>() {
          ObjectMapper mapper =  new ObjectMapper();
            @Override
            public String aggregate(User item) {
                String str = null;
                try {
                    str =  mapper.writeValueAsString(item);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
                return str;
            }
        });
        writer.afterPropertiesSet();
        System.out.println("将数据库读取的数据写入指定的文档success");
        return writer;
    }

}
上一篇下一篇

猜你喜欢

热点阅读