SpringBatch之基本概念讲解
1 SpringBatch
1.1 简介
Spring Batch
是 Spring
提供的一个数据处理框架。企业域中的许多应用程序需要批量处理才能在关键任务环境中执行业务操作。
这些业务运营包括:
- 无需用户交互即可最有效地处理大量信息的自动化,复杂处理。这些操作通常包括基于时间的事件(例如月末计算,通知或通信)。
- 在非常大的数据集中重复处理复杂业务规则的定期应用(例如,保险利益确定或费率调整)。
- 集成从内部和外部系统接收的信息,这些信息通常需要以事务方式格式化,验证和处理到记录系统中。批处理用于每天为企业处理数十亿的交易。
Spring Batch
是一个轻量级,全面的批处理框架,旨在开发对企业系统日常运营至关重要的强大批处理应用程序。
Spring Batch
构建了人们期望的Spring Framework
特性(生产力,基于 POJO
的开发方法和一般易用性),同时使开发人员可以在必要时轻松访问和利用更高级的企业服务。Spring Batch
不是一个 schuedling
的框架。
Spring Batch
提供了可重用的功能,这些功能对于处理大量的数据至关重要,包括记录/跟踪,事务管理,作业处理统计,作业重启,跳过和资源管理
Spring Batch
可用于两种简单的用例(例如将文件读入数据库或运行存储过程)以及复杂的大量用例(例如在数据库之间移动大量数据,转换它等等) 上)。大批量批处理作业可以高度可扩展的方式利用该框架来处理大量信息。
1.2 Spring Batch 架构
一个典型的批处理应用程序大致如下:
- 从数据库,文件或队列中读取大量记录。
- 以某种方式处理数据。
- 以修改之后的形式写回数据。
其对应的示意图如下:
image.png
Spring Batch
的一个总体的架构如下:
在 Spring Batch
中一个 job
可以定义很多的步骤 step
,在每一个 step
里面可以定义其专属的 ItemReader
用于读取数据,ItemProcesseor
用于处理数据,ItemWriter
用于写数据,而每一个定义的 job
则都在 JobRepository
里面,我们可以通过 JobLauncher
来启动某一个 job
1.3 Spring Batch 核心概念
1.3.1 什么是 Job
Job
和 Step
是 Spring Batch
执行批处理任务最为核心的两个概念。
其中 Job
是一个封装整个批处理过程的一个概念。Job
在 Spring Batch
的体系当中只是一个最顶层的一个抽象概念,体现在代码当中则它只是一个最上层的接口。
其代码如下:
public interface Job {
String getName();
boolean isRestartable();
void execute(JobExecution execution);
JobParametersIncrementer getJobParametersIncrementer();
JobParametersValidator getJobParametersValidator();
}
在 Job
这个接口当中定义了五个方法,它的实现类主要有两种类型的 job
,一个是 simplejob
,另一个是 flowjob
。
在 Spring Batch
当中,job
是最顶层的抽象,除 job
之外我们还有 JobInstance
以及 JobExecution
这两个更加底层的抽象。
一个 job
是我们运行的基本单位,它内部由 step
组成。job
本质上可以看成 step
的一个容器。
一个 job
可以按照指定的逻辑顺序组合 step
,并提供了我们给所有 step
设置相同属性的方法,例如一些事件监听,跳过策略。
Spring Batch
以 SimpleJob
类的形式提供了 Job
接口的默认简单实现,它在 Job
之上创建了一些标准功能。
一个使用 java config
的例子代码如下:
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.end()
.build();
}
这个配置的意思是:首先给这个job
起了一个名字叫 footballJob
,接着指定了这个 job 的三个 step,他们分别由方法 playerLoad,gameLoad,playerSummarization 实现。
1.3.2 什么是 JobInstance
我们在上文已经提到了 JobInstance
,他是 Job
的更加底层的一个抽象,他的定义如下:
public interface JobInstance {
public long getInstanceId();
public String getJobName();
}
方法很简单,一个是返回 Job
的id
,另一个是返回 Job
的名字。
JobInstance
指的是job
运行当中,作业执行过程当中的概念。Instance
本就是实例的意思。比如说现在有一个批处理的 job
,它的功能是在一天结束时执行行一次。我们假定这个批处理 job
的名字为 EndOfDay
。在这个情况下,那么每天就会有一个逻辑意义上的 JobInstance
,而我们必须记录 job
的每次运行的情况。
1.3.3 什么是 JobParameters
在上文当中我们提到了,同一个 job
每天运行一次的话,那么每天都有一个 jobIntsance
,但他们的 job
定义都是一样的,那么我们怎么来区别一个 job
的不同 jobinstance
了。
不妨先做个猜想,虽然 jobinstance
的 job
定义一样,但是他们有的东西就不一样,例如运行时间。
Spring Batch
中提供的用来标识一个jobinstance
的东西是:JobParameters
。
JobParameters
对象包含一组用于启动批处理作业的参数,它可以在运行期间用于识别或甚至用作参考数据。我们假设的运行时间,就可以作为一个 JobParameters
。
例如,我们前面的 EndOfDay
的 job 现在已经有了两个实例,一个产生于 1 月 1 日,另一个产生于 1 月 2 日,那么我们就可以定义两个 JobParameter
对象:一个的参数是 01-01
,, 另一个的参数是 01-02
。
因此,识别一个 JobInstance
的方法可以定义为:
因此,我么可以通过 Jobparameter 来操作正确的 JobInstance。
1.3.4 什么是 JobExecution
JobExecution
指的是单次尝试运行一个我们定义好的 Job
的代码层面的概念。job
的一次执行可能以失败也可能成功。只有当执行成功完成时,给定的与执行相对应的 JobInstance
才也被视为完成。
还是以前面描述的EndOfDay
的 job
作为示例,假设第一次运行 01-01-2022的 JobInstance
结果是失败。那么此时如果使用与第一次运行相同的 Jobparameter
参数(即 01-01-2022)作业参数再次运行,那么就会创建一个对应于之前 jobInstance
的一个新的 JobExecution
实例,JobInstance
仍然只有一个。
JobExecution
的接口定义如下:
public interface JobExecution {
/**
* Get unique id for this JobExecution.
* @return execution id
*/
public long getExecutionId();
/**
* Get job name.
* @return value of 'id' attribute from <job>
*/
public String getJobName();
/**
* Get batch status of this execution.
* @return batch status value.
*/
public BatchStatus getBatchStatus();
/**
* Get time execution entered STARTED status.
* @return date (time)
*/
public Date getStartTime();
/**
* Get time execution entered end status: COMPLETED, STOPPED, FAILED
* @return date (time)
*/
public Date getEndTime();
/**
* Get execution exit status.
* @return exit status.
*/
public String getExitStatus();
/**
* Get time execution was created.
* @return date (time)
*/
public Date getCreateTime();
/**
* Get time execution was last updated updated.
* @return date (time)
*/
public Date getLastUpdatedTime();
/**
* Get job parameters for this execution.
* @return job parameters
*/
public Properties getJobParameters();
}
每一个方法的注释已经解释的很清楚,这里不再多做解释。只提一下 BatchStatus
,JobExecution
当中提供了一个方法 getBatchStatus
用于获取一个 job
某一次特地执行的一个状态。
BatchStatus
是一个代表 job
状态的枚举类,其定义如下:
public enum BatchStatus {
STARTING, STARTED, STOPPING,
STOPPED, FAILED, COMPLETED, ABANDONED
}
这些属性对于一个job
的执行来说是非常关键的信息,并且 Spring Batch
会将他们持久到数据库当中。
在使用 Spring Batch
的过程当中 Spring Batch
会自动创建一些表用于存储一些job
相关的信息,用于存储 JobExecution
的表为 batch_job_execution
。
1.3.5 什么是 Step
每一个 Step
对象都封装了批处理作业的一个独立的阶段。事实上,每一个 Job
本质上都是由一个或多个步骤组成。每一个 step
包含定义和控制实际批处理所需的所有信息。
任何特定的内容都由编写 Job
的开发人员自行决定。一个 step
可以非常简单也可以非常复杂。
例如,一个 step
的功能是将文件中的数据加载到数据库中,那么基于现在 Spring Batch
的支持则几乎不需要写代码。更复杂的 step
可能具有复杂的业务逻辑,这些逻辑作为处理的一部分。
1.3.6 什么是 StepExecution
与 Job 一样,Step
具有与 JobExecution
类似的 StepExecution
,如下图所示:
StepExecution
表示一次执行 Step
,每次运行一个 Step
时都会创建一个新的 StepExecution
,类似于 JobExecution
。
但是,某个步骤可能由于其之前的步骤失败而无法执行。且仅当 Step
实际启动时才会创建 StepExecution
。
一次 step
执行的实例由 StepExecution
类的对象表示。每个 StepExecution
都包含对其相应步骤的引用以及 JobExecution
和事务相关的数据,例如提交和回滚计数以及开始和结束时间。
此外,每个步骤执行都包含一个 ExecutionContext
,其中包含开发人员需要在批处理运行中保留的任何数据,例如重新启动所需的统计信息或状态信息。
1.3.7 什么是 ExecutionContext
ExecutionContext
即每一个 StepExecution
的执行环境。它包含一系列的键值对。
我们可以用如下代码获取 ExecutionContext:
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
1.3.8 什么是 JobRepository
JobRepository
是一个用于将上述 job
,step
等概念进行持久化的一个类。它同时给 Job
和 Step
以及下文会提到的 JobLauncher
实现提供 CRUD
操作。
首次启动 Job
时,将从 repository
中获取 JobExecution
,并且在执行批处理的过程中,StepExecution
和 JobExecution
将被存储到 repository
当中。
@EnableBatchProcessing
注解可以为 JobRepository
提供自动配置。
1.3.9 什么是 JobLauncher
JobLauncher
这个接口的功能非常简单,它是用于启动指定了 JobParameters
的 Job
,为什么这里要强调指定了 JobParameter
,原因其实我们在前面已经提到了,jobparameter
和 job
一起才能组成一次 job
的执行。
下面是代码实例:
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
上面 run 方法实现的功能是根据传入的 job
以及 jobparamaters
从 JobRepository
获取一个 JobExecution
并执行 Job
。
1.3.10 什么是 Item Reader
ItemReader
是一个读数据的抽象,它的功能是为每一个 Step
提供数据输入。当 ItemReader
以及读完所有数据时,它会返回 null
来告诉后续操作数据已经读完。
Spring Batch
为 ItemReader
提供了非常多的有用的实现类,比如 JdbcPagingItemReader
,JdbcCursorItemReader
等等。
ItemReader
支持的读入的数据源也是非常丰富的,包括各种类型的数据库,文件,数据流,等等。几乎涵盖了我们的所有场景。
下面是一个 JdbcPagingItemReader
的例子代码:
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
JdbcPagingItemReader
必须指定一个 PagingQueryProvider
,负责提供 SQL
查询语句来按分页返回数据。
下面是一个 JdbcCursorItemReader
的例子代码:
private JdbcCursorItemReader<Map<String, Object>> buildItemReader(final DataSource dataSource, String tableName,
String tenant) {
JdbcCursorItemReader<Map<String, Object>> itemReader = new JdbcCursorItemReader<>();
itemReader.setDataSource(dataSource);
itemReader.setSql("sql here");
itemReader.setRowMapper(new RowMapper());
return itemReader;
}
1.3.11 什么是 Item Writer
既然 ItemReader
是读数据的一个抽象,那么 ItemWriter
自然就是一个写数据的抽象,它是为每一个 step
提供数据写出的功能。
写的单位是可以配置的,我们可以一次写一条数据,也可以一次写一个 chunk
的数据。ItemWriter
对于读入的数据是不能做任何操作的。
Spring Batch
为 ItemWriter
也提供了非常多的有用的实现类,当然我们也可以去实现自己的 writer 功能。
1.3.12 什么是 Item Processor
ItemProcessor
对项目的业务逻辑处理的一个抽象,当 ItemReader
读取到一条记录之后,ItemWriter
还未写入这条记录之前,我们可以借助ItemProcessor
提供一个处理业务逻辑的功能,并对数据进行相应操作。
如果我们在 ItemProcessor
发现一条数据不应该被写入,可以通过返回 null
来表示。
ItemProcessor
和 ItemReader
以及 ItemWriter
可以非常好的结合在一起工作,他们之间的数据传输也非常方便。我们直接使用即可。
1.4 chunk 处理流程
1.4.1 chunk 简介
Spring Batch
提供了让我们按照 chunk
处理数据的能力,一个 chunk
的示意图如下:
它的意思就和图示的一样,由于我们一次batch
的任务可能会有很多的数据读写操作,因此一条一条的处理并向数据库提交的话效率不会很高。
因此 Spring Batch
提供了 chunk
这个概念,我们可以设定一个 chunk size
,spring batch
将一条一条处理数据,但不提交到数据库,只有当处理的数据数量达到 chunk size
设定的值得时候,才一起去commit
。
java 的实例定义代码如下:
@Bean
public Job sampleJob(){
return this.jobBuilderFactory.get("sampleJob")
.start(step1())
.end()
.build();
}
@Bean
public Job step1(){
return this.stepBuilderFactory.get("step1")
.<String,String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.build();
}
在上面这个 step
里面,chunk size
被设为了 10
,当 ItemReader
读的数据数量达到 10
的时候,这一批次的数据就一起被传到 itemWriter
,同时 transaction
被提交
1.4.2 skip 策略和失败处理
一个 batch
的 job
的 step
,可能会处理非常大数量的数据,难免会遇到出错的情况,出错的情况虽出现的概率较小,但是我们不得不考虑这些情况,因为我们做数据迁移最重要的是要保证数据的最终一致性。
Spring Batch
当然也考虑到了这种情况,并且为我们提供了相关的技术支持,请看如下 bean 的配置:
@Bean
public Job step1(){
return this.stepBuilderFactory.get("step1")
.<String,String>chunk(10)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit()
.skip(Exception.class)
.noSkip(FileNotFoundException.class)
.build();
}
我们需要留意这三个方法,分别是 skipLimit()
,skip()
,noSkip()
。
-
skipLimit
方法的意思是我们可以设定一个我们允许的这个step
可以跳过的异常数量,假如我们设定为 10,则当这个step
运行时,只要出现的异常数目不超过10
,整个step
都不会fail
。
注意,若不设定skipLimit
,则其默认值是0
-
skip
方法我们可以指定我们可以跳过的异常,因为有些异常的出现,我们是可以忽略的。 -
noSkip
方法的意思则是指出现这个异常我们不想跳过,也就是从skip
的所有exception
当中排除这个exception
。
从上面的例子来说,也就是跳过所有除 FileNotFoundException
的 exception
。
那么对于这个 step
来说,FileNotFoundException
就是一个 fatal
的 exception,抛出这个 exception 的时候 step 就会直接 fail。
1.5 批处理操作指南
本部分是一些使用 Spring Batch
时的值得注意的点。
1.5.1 批处理原则
在构建批处理解决方案时,应考虑以下关键原则和注意事项:
- 批处理体系结构通常会影响体系结构
- 尽可能简化并避免在单批应用程序中构建复杂的逻辑结构
- 保持数据的处理和存储在物理上靠得很近(换句话说,将数据保存在处理过程中)。
- 最大限度地减少系统资源的使用,尤其是 I/O。在
internal memory
中执行尽可能多的操作。 - 查看应用程序 I/O(分析 SQL 语句)以确保避免不必要的物理 I/O。特别是,需要寻找以下四个常见缺陷:当数据可以被读取一次并缓存或保存在工作存储中时,读取每个事务的数据;重新读取先前在同一事务中读取数据的事务的数据;导致不必要的表或索引扫描;未在 SQL 语句的 WHERE 子句中指定键值。
- 在批处理运行中不要做两次一样的事情。例如,如果需要数据汇总以用于报告目的,则应该(如果可能)在最初处理数据时递增存储的总计,因此报告应用程序不必重新处理相同的数据。
- 在批处理应用程序开始时分配足够的内存,以避免在此过程中进行耗时的重新分配。
- 总是假设数据完整性最差。插入适当的检查和记录验证以维护数据完整性。
- 尽可能实施校验和以进行内部验证。例如,对于一个文件里的数据应该有一个数据条数纪录,告诉文件中的记录总数以及关键字段的汇总。
- 在具有真实数据量的类似生产环境中尽早计划和执行压力测试。
- 在大批量系统中,数据备份可能具有挑战性,特别是如果系统以 24-7 在线的情况运行。数据库备份通常在在线设计中得到很好的处理,但文件备份应该被视为同样重要。如果系统依赖于文件,则文件备份过程不仅应该到位并记录在案,还应定期进行测试。
1.5.2 如何默认不启动 job
在使用 java config
使用 Spring Batch
的 job
时,如果不做任何配置,项目在启动时就会默认去跑我们定义好的批处理job
。那么如何让项目在启动时不自动去跑 job
呢
Spring Batch
的job
会在项目启动时自动 run
,如果我们不想让他在启动时 run
的话,可以在 application.properties
中添加如下属性:
spring.batch.job.enabled=false
1.5.3 在读数据时内存不够
在使用 Spring Batch
做数据迁移时,发现在 job
启动后,执行到一定时间点时就卡在一个地方不动了,且 log
也不再打印,等待一段时间之后,得到如下错误:
红字的信息为:
Resource exhaustion event:the JVM was unable to allocate memory from the heap.
翻译过来的意思就是项目发出了一个资源耗尽的事件,告诉我们 java 虚拟机无法再为堆分配内存。
造成这个错误的原因是:这个项目里的 batch job
的 reader
是一次性拿回了数据库里的所有数据,并没有进行分页,当这个数据量太大时,就会导致内存不够用。
解决的办法有两个:
- 调整 reader 读数据逻辑,按分页读取,但实现上会麻烦一些,且运行效率会下降
- 增大 service 内存