快速入门

简介

from:https://www.cnblogs.com/h--d/p/15591075.html

官方文档:https://docs.spring.io/spring-batch/docs/current/reference/html/index.html

轻量级、完善高效的批处理框架

提供可重用组件,日志追踪、事务、任务作业统计、任务重启、跳过、重复资源管理

提供高级功能和特性:分区、远程功能,处理对于大数据量、高性能批处理任务

需要和调度框架(Quartz等)合作完成批处理任务,它只关注任务相关的问题,如:事务、并发、监控、执行

主要功能

  • 事务管理
  • 基于块的处理
  • 声明式的输入输出
  • 启动、停止、再启动
  • 重试、跳过

一个jobLauncher集合job和jobParameter,运行一个job可以定时运行多次,每次产生jobInstance(根据jobParameter区分),运行时产生JobExcution记录成功失败,Job包含多个Step,Step每次实际运行都产生StepExecution,step中大体操作为Item Reader、Processor、Item Write,读写可以操作数据库或文件

包含角色

  • JobLauncher(任务启动器),用来启动任务,程序的入口
  • Job(具体的任务)
  • Step(一个任务中包含多个step步骤)
  • JobRepository(数据存放),数据库接口,任务执行时记录任务状态

入门程序

依赖

springboot、springBatch、mysql

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test</groupId>
    <artifactId>test-springboot-batch</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>

        <!-- mysql -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.12</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

数据库配置

spring:
  datasource:
    username: root
    password: 123456
    url: jdbc:mysql://127.0.0.1:3306/test_springbatch?allowPublicKeyRetrieval=true&useSSL=true
    driver-class-name: com.mysql.cj.jdbc.Driver
    # 初始化数据库,文件在依赖jar包中
    schema: classpath:org/springframework/batch/core/schema-mysql.sql
    initialization-mode: always

启动类

@SpringBootApplication
public class Application &#123;

    public static void main(String[] args) &#123;
        SpringApplication.run(Application.class);
    &#125;
&#125;

Job/Step

简单

@Configuration
// 启用批处理功能
@EnableBatchProcessing
public class JobConfiguration &#123;

    // 注入创建任务的对象
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 注入创建步骤的对象
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job helloworldJob() &#123;
        return jobBuilderFactory.get("helloworldJob")
                .start(step1())
                .build();
    &#125;

    @Bean
    public Step step1() &#123;
        return stepBuilderFactory.get("step1")
                .tasklet(new Tasklet() &#123;
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception &#123;
                        System.out.println(Thread.currentThread().getName() + "------" + "hello world");
                        // 返回执行完成状态
                        return RepeatStatus.FINISHED;
                    &#125;
                &#125;).build();
    &#125;
&#125;

运行

运行启动类后,自动执行了任务

参考示例

稍微完整

@EnableBatchProcessing
@Configuration
public class JdbcBatchItemWriterJobConf &#123;

    @Bean
    public String runJob(JobLauncher jobLauncher, Job job) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException &#123;
        jobLauncher.run(
            job, 
            new JobParametersBuilder()
                .addDate("date", new Date()).toJobParameters()
        );
        return "";
    &#125;

    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory, Step step) &#123;
        return jobBuilderFactory.get("jdbc-batch-item-writer-step")
                .start(step)
                .build();
    &#125;

    @Bean
    public Step step(StepBuilderFactory stepBuilderFactory, JdbcBatchItemWriter<People> writer) &#123;
        AtomicLong atomicLong = new AtomicLong();
        return stepBuilderFactory.get("jdbc-batch-item-writer-step")
                .<People, People>chunk(3)
                .reader(() -> atomicLong.get() > 20 ? null : new People(null, "name : " + atomicLong.getAndIncrement()))
                .processor((Function<People, People>) item -> &#123;
                    System.out.println("process : " + item);
                    return item;
                &#125;)
                .writer(writer)
                .build();
    &#125;

    @Bean
    public JdbcBatchItemWriter<People> writer(DataSource dataSource) &#123;
        return new JdbcBatchItemWriterBuilder<People>()
                .dataSource(dataSource)
                .sql("insert into people(id,name) values(null,?)")
                .itemPreparedStatementSetter(((item, ps) -> ps.setString(1, item.getName())))
                .assertUpdates(false)
                .build();
    &#125;

&#125;

核心API

Job

Job是一个封装整个批处理过程的一个概念,代码当中它只是一个最上层的接口

public interface Job &#123;
 String getName();
 boolean isRestartable();
 void execute(JobExecution execution);
 JobParametersIncrementer getJobParametersIncrementer();
 JobParametersValidator getJobParametersValidator();
&#125;

实现类:simplejob,flowjob

job是运行的基本单位,由step组成,可以按照逻辑顺序组合step,给所有step设置相同属性的方法,如事件监听,跳过策略

@Bean
public Job footballJob() &#123;
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .end()
                     .build();
&#125;

JobInstance

public interface JobInstance &#123;
 public long getInstanceId();
 public String getJobName(); 
&#125;

job在运行时,作业执行过程当中的概念

JobParameters

job每次运行都有一个jobInstance,通过jobParameters标识,包含一组用于批处理作业的参数(如运行时间)

JobInstance = JobParameters + Job

JobExecution

job的一次执行有可能成功也可能失败,只有成功完成时,jobInstance才完成

相同的JobParameter参数相同,第一次失败,第二次还使用时,JobInstance还是只有一个,但是会新建一个JobExecution

public interface JobExecution &#123;

 public long getExecutionId();

 public String getJobName(); 

 /** public enum BatchStatus &#123;STARTING, STARTED, STOPPING, STOPPED, FAILED, COMPLETED, ABANDONED &#125; */
 public BatchStatus getBatchStatus();

 public Date getStartTime();

 public Date getEndTime();

 public String getExitStatus();

 public Date getCreateTime();

 public Date getLastUpdatedTime();

 public Properties getJobParameters();

&#125;

spring batch会将他们持久到数据库当中

在使用Spring batch的过程当中spring batch会自动创建一些表用于存储一些job相关的信息

用于存储JobExecution的表为batch_job_execution

step

Step包含定义和控制实际批处理所需的所有信息,由开发指定

如过一个step的功能是将文件中的数据加载到数据库,基于SpringBatch几乎不需要写代码

StepExecution

stepExecution表示执行一次step

每次实际运行一个step才会创建一个新的StepExecution

每个StepExecution都包含对其相应步骤的引用以及JobExecution和事务相关的数据,例如提交和回滚计数以及开始和结束时间。

每个步骤执行都包含一个ExecutionContext,其中包含开发人员需要在批处理运行中保留的任何数据,例如重新启动需要的统计信息和状态信息

ExecutionContext

ExecutionContext即每一个StepExecution 的执行环境。它包含一系列的键值对

获取

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();

JobRepository

持久化的类,给Job、Step、JobLaucher实现提供CRUD

首次启动Job时,将从repository中获取JobExecution

并且在执行批处理的过程中,StepExecution和JobExecution将被存储到repository当中

@EnableBatchProcessing注解可以为JobRepository提供自动配置

JobLauncher

用于启动指定了JobParameter的Job

public interface JobLauncher &#123;

    /**
     * 实现的功能是根据传入的job以及jobparamaters
     * 从JobRepository获取一个JobExecution并执行Job
     */
    public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
&#125;

Item Reader

itemReader是一个读数据的抽象,为Step提供数据输入

如果itemReader已经读完所有数据,会返回null告诉后续操作数据已经读完

实现类:JDBCPagingItemReader、JDBCCursorItemReader等

可以读数据库、文件、数据流等

Reader 目的
FlatFIleItemReader 从平面文件中读取数据。
StaxEventItemReader 从 XML 文件中读取数据。
StoredProcedureItemReader 从数据库的存储过程中读取数据。
JDBCPagingItemReader 从关系数据库数据库中读取数据。
MongoItemReader 从 MongoDB 读取数据。
Neo4jItemReader 从 Neo4j ItemReader 读取数据。

JdbcPagingItemReader

@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) &#123;
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("status", "NEW");

        return new JdbcPagingItemReaderBuilder<CustomerCredit>()
                                           .name("creditReader")
                                           .dataSource(dataSource)
                                           .queryProvider(queryProvider)
                                           .parameterValues(parameterValues)
                                           .rowMapper(customerCreditMapper())
                                           .pageSize(1000)
                                           .build();
&#125;

/**
 * JdbcPagingItemReader必须指定一个PagingQueryProvider,负责提供SQL查询语句来按分页返回数据。
 */ 
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() &#123;
        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

        provider.setSelectClause("select id, name, credit");
        provider.setFromClause("from customer");
        provider.setWhereClause("where status=:status");
        provider.setSortKey("id");

        return provider;
&#125;

JdbcCursorItemReader

private JdbcCursorItemReader<Map<String, Object>> buildItemReader(final DataSource dataSource, String tableName,
            String tenant) &#123;
    JdbcCursorItemReader<Map<String, Object>> itemReader = new JdbcCursorItemReader<>();
    itemReader.setDataSource(dataSource);
    itemReader.setSql("sql here");
    itemReader.setRowMapper(new RowMapper());
    return itemReader;
&#125;

Item Writer

写数据的抽象,为每个Step提供数据写功能

一次可以写一条,也可以写一个chunk

Writer 目的
FlatFIleItemWriter 将数据写入平面文件。
StaxEventItemWriter 将数据写入 XML 文件。
StoredProcedureItemWriter 将数据写入数据库的存储过程。
JDBCPagingItemWriter 将数据写入关系数据库。
MongoItemWriter 将数据写入 MongoDB。
Neo4jItemWriter 将数据写入 Neo4j。
@EnableBatchProcessing
@Configuration
public class JdbcBatchItemWriterJobConf &#123;

    @Bean
    public String runJob(JobLauncher jobLauncher, Job job) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException &#123;
        jobLauncher.run(job, new JobParametersBuilder()
                .addDate("date", new Date()).toJobParameters());
        return "";
    &#125;

    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory, Step step) &#123;
        return jobBuilderFactory.get("jdbc-batch-item-writer-step")
                .start(step)
                .build();
    &#125;

    @Bean
    public Step step(StepBuilderFactory stepBuilderFactory, JdbcBatchItemWriter<People> writer) &#123;
        AtomicLong atomicLong = new AtomicLong();
        return stepBuilderFactory.get("jdbc-batch-item-writer-step")
                .<People, People>chunk(3)
                .reader(() -> atomicLong.get() > 20 ? null : new People(null, "name : " + atomicLong.getAndIncrement()))
                .processor((Function<People, People>) item -> &#123;
                    System.out.println("process : " + item);
                    return item;
                &#125;)
                .writer(writer)
                .build();
    &#125;

    @Bean
    public JdbcBatchItemWriter<People> writer(DataSource dataSource) &#123;
        return new JdbcBatchItemWriterBuilder<People>()
                .dataSource(dataSource)
                .sql("insert into people(id,name) values(null,?)")
                .itemPreparedStatementSetter(((item, ps) -> ps.setString(1, item.getName())))
                .assertUpdates(false)
                .build();
    &#125;

&#125;

Item Processor

在ItemReader读一条数据后,itemWriter未写入这条记录前

借助ItemProcessor处理业务逻辑,对数据进行操作,如果一条数据不应该被写入,可以返回null

Chunk处理流程

一次batch的任务可能多次数据读写,一条条处理并提交数据库效率不高

使用chunk,设定一个chunk size,spring batch一条条处理数据,但不提交数据库,只有数据量达到chunk size才一起commit

chunk size被设为了10,当ItemReader读的数据数量达到10的时候,这一批次的数据就一起被传到itemWriter,同时transaction被提交

skip策略和失败处理

skipLimit(),skip(),noSkip()

skipLimit设定step可以跳过的异常数量,默认为0,如果超过其,step就会失败

skip可以指定跳过的的异常,有些异常是可以忽略的

noSkip,上述例子中,跳过所有不是FileNotFoundException的Exception,当出现FileNotFoundException时,step会直接fail

操作指南

  • 批处理体系结构通常会影响体系结构

  • 尽可能简化并避免在单批应用程序中构建复杂的逻辑结构

  • 保持数据的处理和存储在物理上靠得很近(换句话说,将数据保存在处理过程中)。

  • 最大限度地减少系统资源的使用

  • 查看应用程序I / O(分析SQL语句)以确保避免不必要的物理I / O,避免以下

    • 当数据可以被读取一次并缓存或保存在工作存储中时,读取每个事务的数据。【?】
    • 重新读取先前在同一事务中读取数据的事务的数据。
    • 导致不必要的表或索引扫描。
    • 未在SQL语句的WHERE子句中指定键值。
  • 在批处理运行中不要做两次一样的事情。例如,如果需要数据汇总以用于报告目的,则应该在最初处理数据时递增存储的总计

  • 在批处理应用程序开始时分配足够的内存,以避免在此过程中进行耗时的重新分配。

  • 总是假设数据完整性最差。插入适当的检查和记录验证以维护数据完整性。

  • 尽可能实施校验和以进行内部验证。例如,对于一个文件里的数据应该有一个数据条数纪录,告诉文件中的记录总数以及关键字段的汇总。

  • 在具有真实数据量的类似生产环境中尽早计划和执行压力测试

  • 在大批量系统中,数据备份可能具有挑战性,特别是如果系统以24-7在线的情况运行。文件备份很重要,如果系统依赖于文件,则还应定期进行测试文件备份。

默认不启动

使用spring batch的job时,项目在启动时就会默认去跑我们定义好的批处理job

spring.batch.job.enabled=false

读时内存不够

红字的信息为:Resource exhaustion event:the JVM was unable to allocate memory from the heap.

造成这个错误的原因是: 这个项目里的batch job的reader是一次性拿回了数据库里的所有数据,并没有进行分页,当这个数据量太大时,就会导致内存不够用。解决的办法有两个:

  • 调整reader读数据逻辑,按分页读取,但实现上会麻烦一些,且运行效率会下降
  • 增大service内存