spring-batch

Spring-Batch-01-Simple

2017-02-04  本文已影响152人  LoWang

Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。

业务方案:
1、批处理定期提交。
2、并行批处理:并行处理工作。
3、企业消息驱动处理
4、大规模的并行处理
5、手动或是有计划的重启
6、局部处理:跳过记录(如:回滚)
技术目标:
1、利用Spring编程模型:使程序员专注于业务处理,让Spring框架管理流程。
2、明确分离批处理的执行环境和应用。
3、提供核心的,共通的接口。
4、提供开箱即用(out of the box)的简单的默认的核心执行接口。
5、提供Spring框架中配置、自定义、和扩展服务。
6、所有存在的核心服务可以很容的被替换和扩展,不影响基础层。
7、提供一个简单的部署模式,利用Maven构建独立的Jar文件。

简单入门例子:

build.gradle

buildscript {
    ext {
      springBootGradlePluginVersion = '1.5.1.RELEASE'
    }
    repositories {
        maven { url 'http://maven.aliyun.com/nexus/content/groups/public'}
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootGradlePluginVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'

jar {
    baseName = 'spring-batch'
    version =  '0.1.0'
}

repositories {
    maven { url 'http://maven.aliyun.com/nexus/content/groups/public'}
    mavenCentral()
}

sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
    compile("org.springframework.boot:spring-boot-starter-batch")
    compile("org.hsqldb:hsqldb")
    testCompile("junit:junit")
}

sample-data.csv

Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
---,---
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
---,---
John,Doe
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe

BatchApplication.java

package xin.lowang.springbatch;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BatchApplication {
    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }
}

BatchJobConfig.java


package xin.lowang.springbatch.config;

import java.util.Collections;
import java.util.List;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

import xin.lowang.springbatch.domain.Person;
import xin.lowang.springbatch.processor.PersonItemProcessor;

@Configuration
@EnableBatchProcessing
public class BatchJobConfig {
    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    @Autowired
    public DataSource dataSource;

    @Bean
    public FlatFileItemReader<Person> reader() {
        FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("sample-data.csv"));
        reader.setLineMapper(new DefaultLineMapper<Person>() {
            {
                setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {
                    {
                        setTargetType(Person.class);
                    }
                });
                setLineTokenizer(new DelimitedLineTokenizer() {
                    {
                        setNames(new String[] { "firstName", "lastName" });
                    }
                });
            }
        });
        return reader;
    }

    @Bean
    public PersonItemProcessor processor() {
        return new PersonItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Person> dbWriter() {
        JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
        writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)");
        writer.setDataSource(dataSource);
        return writer;
    }

    @Bean
    public PrintItemWriter<Person> printWriter() {
        return new PrintItemWriter<Person>() {
            @Override
            public void write(List<? extends Person> items) throws Exception {
                items.stream().forEach(person -> {
                    System.out.println(person.getFirstName() + " is ok");
                });
            }
        };
    }

    @Bean
    public LogItemWriter<Person> logWriter() {
        return new LogItemWriter<Person>() {
            @Override
            public void write(List<? extends Person> items) throws Exception {
                items.stream().forEach(person -> {log.info("person {} is ok",person); });
            }
        };
    }

    //定义step和job
    @Bean
    public Step printStep() {
        return stepBuilderFactory.get("printStep")
                .<Person, Person>chunk(2)
                .reader(reader())
                .processor(processor())
                .writer(printWriter())
                .build();
    }

    @Bean
    public Step logStep() {
        return stepBuilderFactory
                .get("logStep")
                .<Person, Person>chunk(2)
                .reader(new ListItemReader<>(Collections.singletonList(new Person("chenghao", "Wang"))))
                .writer(logWriter())
                .build();
    }

    @Bean
    public Job importUserJob() {
        return jobBuilderFactory
                .get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .flow(printStep())
                .next(logStep())
                .end()
                .build();
    }

    public class PrintItemWriter<T> implements ItemWriter<T> {
        @Override
        public void write(List<? extends T> items) throws Exception {
        }
    }

    public class LogItemWriter<T> implements ItemWriter<T> {
        protected final Logger log = LoggerFactory.getLogger(LogItemWriter.class);

        @Override
        public void write(List<? extends T> items) throws Exception {
        }
    }
}

Person.java


package xin.lowang.springbatch.domain;

public class Person {
    private String lastName;
    private String firstName;

    public Person() {
    }

    public Person(String lastName, String firstName) {
        super();
        this.lastName = lastName;
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    @Override
    public String toString() {
        return "firstName: " + firstName + ", lastName: " + lastName;
    }
}

PersonItemProcessor.java


package xin.lowang.springbatch.processor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;

import xin.lowang.springbatch.domain.Person;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {
    private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

    @Override
    public Person process(final Person item) throws Exception {
        final String firstName = item.getFirstName();
        final String lastName = item.getLastName();
        final Person transformed = new Person(lastName.toUpperCase(), firstName.toUpperCase());
        log.info("Transoform {} to {} ", item, transformed);
        return transformed;
    }
}

上一篇 下一篇

猜你喜欢

热点阅读