spring

SpringBatch之基本概念讲解

2022-07-25  本文已影响0人  上善若泪

1 SpringBatch

1.1 简介

Spring BatchSpring 提供的一个数据处理框架。企业域中的许多应用程序需要批量处理才能在关键任务环境中执行业务操作。

这些业务运营包括:

Spring Batch是一个轻量级,全面的批处理框架,旨在开发对企业系统日常运营至关重要的强大批处理应用程序。

Spring Batch构建了人们期望的Spring Framework 特性(生产力,基于 POJO 的开发方法和一般易用性),同时使开发人员可以在必要时轻松访问和利用更高级的企业服务。Spring Batch 不是一个 schuedling 的框架。
Spring Batch 提供了可重用的功能,这些功能对于处理大量的数据至关重要,包括记录/跟踪,事务管理,作业处理统计,作业重启,跳过和资源管理

Spring Batch 可用于两种简单的用例(例如将文件读入数据库或运行存储过程)以及复杂的大量用例(例如在数据库之间移动大量数据,转换它等等) 上)。大批量批处理作业可以高度可扩展的方式利用该框架来处理大量信息。

1.2 Spring Batch 架构

一个典型的批处理应用程序大致如下:

其对应的示意图如下:


image.png

Spring Batch 的一个总体的架构如下:

image.png

Spring Batch 中一个 job 可以定义很多的步骤 step,在每一个 step 里面可以定义其专属的 ItemReader 用于读取数据,ItemProcesseor 用于处理数据,ItemWriter 用于写数据,而每一个定义的 job 则都在 JobRepository 里面,我们可以通过 JobLauncher 来启动某一个 job

1.3 Spring Batch 核心概念

1.3.1 什么是 Job

JobStepSpring Batch 执行批处理任务最为核心的两个概念。
其中 Job 是一个封装整个批处理过程的一个概念。JobSpring Batch 的体系当中只是一个最顶层的一个抽象概念,体现在代码当中则它只是一个最上层的接口。

其代码如下:

public interface Job {
 String getName();
 boolean isRestartable();
 void execute(JobExecution execution);
 JobParametersIncrementer getJobParametersIncrementer();
 JobParametersValidator getJobParametersValidator();
}

Job 这个接口当中定义了五个方法,它的实现类主要有两种类型的 job,一个是 simplejob,另一个是 flowjob

Spring Batch 当中,job 是最顶层的抽象,除 job 之外我们还有 JobInstance 以及 JobExecution 这两个更加底层的抽象。
一个 job 是我们运行的基本单位,它内部由 step 组成。job 本质上可以看成 step 的一个容器。
一个 job 可以按照指定的逻辑顺序组合 step,并提供了我们给所有 step设置相同属性的方法,例如一些事件监听,跳过策略。
Spring BatchSimpleJob 类的形式提供了 Job 接口的默认简单实现,它在 Job 之上创建了一些标准功能。

一个使用 java config 的例子代码如下:

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .end()
                     .build();
}

这个配置的意思是:首先给这个job 起了一个名字叫 footballJob,接着指定了这个 job 的三个 step,他们分别由方法 playerLoad,gameLoad,playerSummarization 实现。

1.3.2 什么是 JobInstance

我们在上文已经提到了 JobInstance,他是 Job 的更加底层的一个抽象,他的定义如下:

public interface JobInstance {
 public long getInstanceId();
 public String getJobName(); 
}

方法很简单,一个是返回 Jobid,另一个是返回 Job 的名字。

JobInstance 指的是job 运行当中,作业执行过程当中的概念。Instance 本就是实例的意思。比如说现在有一个批处理的 job,它的功能是在一天结束时执行行一次。我们假定这个批处理 job 的名字为 EndOfDay。在这个情况下,那么每天就会有一个逻辑意义上的 JobInstance,而我们必须记录 job 的每次运行的情况。

1.3.3 什么是 JobParameters

在上文当中我们提到了,同一个 job 每天运行一次的话,那么每天都有一个 jobIntsance,但他们的 job 定义都是一样的,那么我们怎么来区别一个 job 的不同 jobinstance 了。
不妨先做个猜想,虽然 jobinstancejob 定义一样,但是他们有的东西就不一样,例如运行时间。

Spring Batch 中提供的用来标识一个jobinstance 的东西是:JobParameters

JobParameters 对象包含一组用于启动批处理作业的参数,它可以在运行期间用于识别或甚至用作参考数据。我们假设的运行时间,就可以作为一个 JobParameters

例如,我们前面的 EndOfDay 的 job 现在已经有了两个实例,一个产生于 1 月 1 日,另一个产生于 1 月 2 日,那么我们就可以定义两个 JobParameter 对象:一个的参数是 01-01,, 另一个的参数是 01-02
因此,识别一个 JobInstance 的方法可以定义为:

image.png

因此,我么可以通过 Jobparameter 来操作正确的 JobInstance。

1.3.4 什么是 JobExecution

JobExecution 指的是单次尝试运行一个我们定义好的 Job 的代码层面的概念。job 的一次执行可能以失败也可能成功。只有当执行成功完成时,给定的与执行相对应的 JobInstance 才也被视为完成。

还是以前面描述的EndOfDayjob 作为示例,假设第一次运行 01-01-2022的 JobInstance 结果是失败。那么此时如果使用与第一次运行相同的 Jobparameter 参数(即 01-01-2022)作业参数再次运行,那么就会创建一个对应于之前 jobInstance 的一个新的 JobExecution 实例,JobInstance 仍然只有一个。

JobExecution 的接口定义如下:

public interface JobExecution {
 /**
  * Get unique id for this JobExecution.
  * @return execution id
  */
 public long getExecutionId();
 /**
  * Get job name.
  * @return value of 'id' attribute from <job>
  */
 public String getJobName(); 
 /**
  * Get batch status of this execution.
  * @return batch status value.
  */
 public BatchStatus getBatchStatus();
 /**
  * Get time execution entered STARTED status. 
  * @return date (time)
  */
 public Date getStartTime();
 /**
  * Get time execution entered end status: COMPLETED, STOPPED, FAILED 
  * @return date (time)
  */
 public Date getEndTime();
 /**
  * Get execution exit status.
  * @return exit status.
  */
 public String getExitStatus();
 /**
  * Get time execution was created.
  * @return date (time)
  */
 public Date getCreateTime();
 /**
  * Get time execution was last updated updated.
  * @return date (time)
  */
 public Date getLastUpdatedTime();
 /**
  * Get job parameters for this execution.
  * @return job parameters  
  */
 public Properties getJobParameters();

}

每一个方法的注释已经解释的很清楚,这里不再多做解释。只提一下 BatchStatusJobExecution 当中提供了一个方法 getBatchStatus 用于获取一个 job 某一次特地执行的一个状态。
BatchStatus 是一个代表 job 状态的枚举类,其定义如下:

public enum BatchStatus {
STARTING, STARTED, STOPPING, 
   STOPPED, FAILED, COMPLETED, ABANDONED 
}

这些属性对于一个job 的执行来说是非常关键的信息,并且 Spring Batch 会将他们持久到数据库当中。
在使用 Spring Batch 的过程当中 Spring Batch 会自动创建一些表用于存储一些job 相关的信息,用于存储 JobExecution 的表为 batch_job_execution

1.3.5 什么是 Step

每一个 Step 对象都封装了批处理作业的一个独立的阶段。事实上,每一个 Job 本质上都是由一个或多个步骤组成。每一个 step 包含定义和控制实际批处理所需的所有信息。
任何特定的内容都由编写 Job 的开发人员自行决定。一个 step 可以非常简单也可以非常复杂。

例如,一个 step 的功能是将文件中的数据加载到数据库中,那么基于现在 Spring Batch 的支持则几乎不需要写代码。更复杂的 step 可能具有复杂的业务逻辑,这些逻辑作为处理的一部分。

1.3.6 什么是 StepExecution

与 Job 一样,Step 具有与 JobExecution 类似的 StepExecution,如下图所示:

image.png

StepExecution 表示一次执行 Step,每次运行一个 Step 时都会创建一个新的 StepExecution,类似于 JobExecution
但是,某个步骤可能由于其之前的步骤失败而无法执行。且仅当 Step 实际启动时才会创建 StepExecution

一次 step 执行的实例由 StepExecution 类的对象表示。每个 StepExecution 都包含对其相应步骤的引用以及 JobExecution 和事务相关的数据,例如提交和回滚计数以及开始和结束时间。
此外,每个步骤执行都包含一个 ExecutionContext,其中包含开发人员需要在批处理运行中保留的任何数据,例如重新启动所需的统计信息或状态信息。

1.3.7 什么是 ExecutionContext

ExecutionContext 即每一个 StepExecution 的执行环境。它包含一系列的键值对。

我们可以用如下代码获取 ExecutionContext:

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();

1.3.8 什么是 JobRepository

JobRepository 是一个用于将上述 jobstep 等概念进行持久化的一个类。它同时给 JobStep 以及下文会提到的 JobLauncher 实现提供 CRUD 操作。
首次启动 Job 时,将从 repository 中获取 JobExecution,并且在执行批处理的过程中,StepExecutionJobExecution 将被存储到 repository 当中。

@EnableBatchProcessing 注解可以为 JobRepository 提供自动配置。

1.3.9 什么是 JobLauncher

JobLauncher 这个接口的功能非常简单,它是用于启动指定了 JobParametersJob,为什么这里要强调指定了 JobParameter,原因其实我们在前面已经提到了,jobparameterjob 一起才能组成一次 job的执行。

下面是代码实例:

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

上面 run 方法实现的功能是根据传入的 job 以及 jobparamatersJobRepository 获取一个 JobExecution 并执行 Job

1.3.10 什么是 Item Reader

ItemReader 是一个读数据的抽象,它的功能是为每一个 Step 提供数据输入。当 ItemReader 以及读完所有数据时,它会返回 null 来告诉后续操作数据已经读完。

Spring BatchItemReader 提供了非常多的有用的实现类,比如 JdbcPagingItemReaderJdbcCursorItemReader 等等。
ItemReader 支持的读入的数据源也是非常丰富的,包括各种类型的数据库,文件,数据流,等等。几乎涵盖了我们的所有场景。

下面是一个 JdbcPagingItemReader 的例子代码:

@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("status", "NEW");

        return new JdbcPagingItemReaderBuilder<CustomerCredit>()
               .name("creditReader")
               .dataSource(dataSource)
               .queryProvider(queryProvider)
               .parameterValues(parameterValues)
               .rowMapper(customerCreditMapper())
               .pageSize(1000)
               .build();
}

@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

        provider.setSelectClause("select id, name, credit");
        provider.setFromClause("from customer");
        provider.setWhereClause("where status=:status");
        provider.setSortKey("id");

        return provider;
}

JdbcPagingItemReader 必须指定一个 PagingQueryProvider,负责提供 SQL 查询语句来按分页返回数据。

下面是一个 JdbcCursorItemReader 的例子代码:

 private JdbcCursorItemReader<Map<String, Object>> buildItemReader(final DataSource dataSource, String tableName,
            String tenant) {

        JdbcCursorItemReader<Map<String, Object>> itemReader = new JdbcCursorItemReader<>();
        itemReader.setDataSource(dataSource);
        itemReader.setSql("sql here");
        itemReader.setRowMapper(new RowMapper());
        return itemReader;
    }

1.3.11 什么是 Item Writer

既然 ItemReader 是读数据的一个抽象,那么 ItemWriter 自然就是一个写数据的抽象,它是为每一个 step 提供数据写出的功能。

写的单位是可以配置的,我们可以一次写一条数据,也可以一次写一个 chunk 的数据。ItemWriter对于读入的数据是不能做任何操作的。

Spring BatchItemWriter 也提供了非常多的有用的实现类,当然我们也可以去实现自己的 writer 功能。

1.3.12 什么是 Item Processor

ItemProcessor 对项目的业务逻辑处理的一个抽象,当 ItemReader 读取到一条记录之后,ItemWriter 还未写入这条记录之前,我们可以借助ItemProcessor提供一个处理业务逻辑的功能,并对数据进行相应操作。

如果我们在 ItemProcessor 发现一条数据不应该被写入,可以通过返回 null 来表示。
ItemProcessorItemReader 以及 ItemWriter 可以非常好的结合在一起工作,他们之间的数据传输也非常方便。我们直接使用即可。

1.4 chunk 处理流程

1.4.1 chunk 简介

Spring Batch 提供了让我们按照 chunk 处理数据的能力,一个 chunk 的示意图如下:

image.png

它的意思就和图示的一样,由于我们一次batch的任务可能会有很多的数据读写操作,因此一条一条的处理并向数据库提交的话效率不会很高。
因此 Spring Batch 提供了 chunk 这个概念,我们可以设定一个 chunk sizespring batch 将一条一条处理数据,但不提交到数据库,只有当处理的数据数量达到 chunk size 设定的值得时候,才一起去commit

java 的实例定义代码如下:

@Bean
public Job sampleJob(){
    return this.jobBuilderFactory.get("sampleJob")
                .start(step1())
                .end()
                .build();
}
@Bean
public Job step1(){
    return this.stepBuilderFactory.get("step1")
                .<String,String>chunk(10)
                .reader(itemReader())
                .writer(itemWriter())
                .build();
}

在上面这个 step 里面,chunk size 被设为了 10,当 ItemReader 读的数据数量达到 10 的时候,这一批次的数据就一起被传到 itemWriter,同时 transaction 被提交

1.4.2 skip 策略和失败处理

一个 batchjobstep,可能会处理非常大数量的数据,难免会遇到出错的情况,出错的情况虽出现的概率较小,但是我们不得不考虑这些情况,因为我们做数据迁移最重要的是要保证数据的最终一致性。

Spring Batch 当然也考虑到了这种情况,并且为我们提供了相关的技术支持,请看如下 bean 的配置:

@Bean
public Job step1(){
    return this.stepBuilderFactory.get("step1")
                .<String,String>chunk(10)
                .reader(flatFileItemReader())
                .writer(itemWriter())
                .faultTolerant()
                .skipLimit()
                .skip(Exception.class)
                .noSkip(FileNotFoundException.class)
                .build();
}

我们需要留意这三个方法,分别是 skipLimit()skip()noSkip()

从上面的例子来说,也就是跳过所有除 FileNotFoundExceptionexception

那么对于这个 step 来说,FileNotFoundException 就是一个 fatal 的 exception,抛出这个 exception 的时候 step 就会直接 fail。

1.5 批处理操作指南

本部分是一些使用 Spring Batch 时的值得注意的点。

1.5.1 批处理原则

在构建批处理解决方案时,应考虑以下关键原则和注意事项:

1.5.2 如何默认不启动 job

在使用 java config 使用 Spring Batchjob 时,如果不做任何配置,项目在启动时就会默认去跑我们定义好的批处理job。那么如何让项目在启动时不自动去跑 job

Spring Batchjob 会在项目启动时自动 run,如果我们不想让他在启动时 run 的话,可以在 application.properties 中添加如下属性:
spring.batch.job.enabled=false

1.5.3 在读数据时内存不够

在使用 Spring Batch 做数据迁移时,发现在 job 启动后,执行到一定时间点时就卡在一个地方不动了,且 log 也不再打印,等待一段时间之后,得到如下错误:

image.png

红字的信息为:

Resource exhaustion event:the JVM was unable to allocate memory from the heap.
翻译过来的意思就是项目发出了一个资源耗尽的事件,告诉我们 java 虚拟机无法再为堆分配内存。

造成这个错误的原因是:这个项目里的 batch jobreader 是一次性拿回了数据库里的所有数据,并没有进行分页,当这个数据量太大时,就会导致内存不够用。

解决的办法有两个:

转载于:https://mp.weixin.qq.com/s/BoBmmTe-EKwA7du7EAgwDQ

上一篇下一篇

猜你喜欢

热点阅读