SpringFrameworkspring-batch首页投稿(暂停使用,暂停投稿)

spring batch的java config实践

2017-11-03  本文已影响242人  一根弦的风筝

背景

在后台服务开发中, 经常要用到多线程技术进行加速执行, 每家公司都有内部多线程的框架, 这些框架不是文档不规范, 就是只能适用特定场景.
基于这些原因, spring batch带来了更易用, 性能更好的解决方案.

基本概念

JobRepository

job仓库, 提供了JobLauncher, Job, Setp的CRUD实现

JobLauncher

job的启动器, 可以传入job所需参数

Job

一个任务概念, 可以包含多个step, 且对step的执行顺序进行编排

Step

具体步骤, 基本包含reader, writer, reader后可选processor, 或者使用tesklet

下面用一个图来说明他们之间的关系

spring-batch-reference-model.png

概念还是挺简单的, 就是框架有点复杂, 用起来坑不少

实践代码

我这里使用java config形式使用spring batch, 需要额外注意的是, 所有带有@Bean的方法名不要重复

  1. build.gradle
buildscript {
    ext {
        springBootVersion = '1.5.2.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'war'

sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    compile('org.springframework.boot:spring-boot-starter-batch')
}

这里引用了spring boot starter batch, 只是为了解决jar包依赖问题, 实际使用时没有使用spring boot.

创建任务使用的obj, TestObj.java

public class TestObj {
    private String id;
    private int index;
    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public int getIndex() {
        return index;
    }

    public void setIndex(int index) {
        this.index = index;
    }
    
}

主逻辑BatchConfiguration.java

@EnableBatchProcessing
public class BatchConfiguration {
    
    Object lock = new Object();
    
    Logger logger = Logger.getRootLogger();
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step step1() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setAllowCoreThreadTimeOut(true);
        taskExecutor.afterPropertiesSet();

        return stepBuilderFactory.get("step1")
            .<TestObj, TestObj> chunk(10)
            .reader(new ItemReader<TestObj>() {
                private List<TestObj> list = null;
    
                @Override
                public synchronized TestObj read() throws Exception {
                    if (list == null) {
                        list = new ArrayList<TestObj>();
                        for (int i = 0; i < 10000; i++) {
                            TestObj obj = new TestObj();
                            obj.setId(UUID.randomUUID().toString());
                            obj.setIndex(i);
                            list.add(obj);
                        }
                        System.out.println("----------------"+list.size());
                    }
                    if (!this.list.isEmpty()) {
                        TestObj t = this.list.remove(0);
                        logger.info("step1==========read data:" + t.getIndex()));
                        return t;
                    }
                    return null;
                }
            })
            .processor(new ItemProcessor<TestObj, TestObj>() {
                public TestObj process(TestObj item) {
                    logger.debug("step1==============process: " + item.getIndex());
                    return item;
                }
            })
            .writer(new ItemWriter<TestObj>() {
                @Override
                public void write(List<? extends TestObj> items) throws Exception {
                    logger.debug("step1=============write batch start: " + items.size());
                    for (TestObj item : items) {
                        logger.debug("step1=============write: " + item.getIndex());
                    }
                    logger.info("step1=============write batch end: " + items.size());
                }
            })
            .taskExecutor(taskExecutor)
            .build();
    }
    
    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2").tasklet(new Tasklet() {
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
                logger.debug("step2========================Tasklet");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

    @Bean
    public Job job1(Step step1, Step step2) throws Exception {
        return jobBuilderFactory.get("job1").incrementer(new RunIdIncrementer()).start(step1).next(step2).build();
    }

}

最后是启动类Main.java

public class Main {

    public static void main(String[] args) {
        Logger logger = Logger.getRootLogger();
        
        logger.setLevel(Level.INFO);
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
        ctx.register(BatchConfiguration.class);
        ctx.refresh();
        JobLauncher  jobLauncher = ctx.getBean(JobLauncher.class);
        Job job = (Job)ctx.getBean("job1");
        try {
            jobLauncher.run(job, new JobParameters());
            logger.debug("------job1 finished");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

其中需要注意以下几点

  1. @EnableBatchProcessing 会默认给出一些基础配置
  1. 此处使用了多线程进行执行任务, 其中taskExecutor.setAllowCoreThreadTimeOut(true);表示当没有任务时(默认为60s), 线程池中线程会自动销毁
  2. 自定义的ItemReader实现类中的read()方法需要加synchronized(多线程环境一定要加), 在官方文档上有提过一嘴, 如果不是使用多线程可以不加, 在官方很多默认实现中, 有一些是线程安全的, 有一些则不是, 如果非线程安全, 使用时都需要加上synchronized关键字
  3. 如果read()方法, 返回null, 则整个任务结束.
  4. chunk(10)表示当每次传入write的list的个数为10时, write执行一次, 为主要的调优方法
  5. 实际使用中, processor可以去掉

一些说明

spring batch本身有很多功能以及高级特性(比如监听, 任务流, spring batch admin), 本文中不做展开, 这里只针对最常用情况给出一个可用版本, 在我实际使用过程中, 发现大多数文章的例子基本都无法使用或者是使用xml或者不能单独执行.

很多时候还是要多看官方文档, 只是官方文档有点太平铺直叙了

参考

spring batch官方文档

上一篇下一篇

猜你喜欢

热点阅读