DLancerC

Spring Batch(一)

面对现代企业应用当中的复杂业务以及海量的数据,除了页面复杂的人机交互处理,还有一类是不需要人工干预,只需要定期读入大批量数据,按对应的规则处理。这种处理被称为 “批处理 ”。
特点:

  • 数据量巨大
  • 不需要的人工干预,根据系统的配置自动处理
  • 按时间配置处理。一天或者一个月执行一次

批处理对应三个环节:

  • 读取数据,数据可能来自文件、数据库或消息队列等
  • 数据处理,如金融系统的对账的计费处理
  • 写数据,将输出结果写入文件、数据库或消息队列等

Spring Batch 不仅提供了统一的读写接口、丰富的任务处理方式、灵活的事务管理以及并发处理,同时还支持日志、监控、任务重启与跳过等特性。Spring Batch 是一款批处理应用框架,不是调度框架。它只关注任务处理的相关问题,如事务、并发、监控、执行等,并不会提供相应的任务调度功能。如果需要批处理任务定期执行,可以结合 Quartz 等调度框架试下。

通过 Spring Batch 将 CSV 文件里的数据写入到数据库里,Spring Batch 会读取数据将数据映射为一个对象中,并在连接数据将生成的数据插入到数据中。

Maven 依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dependencies>
<!--批处理核心代码-->
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>3.0.7.RELEASE</version>
</dependency>
<!--批处理基础访问框架-->
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>3.0.7.RELEASE</version>
</dependency>
</dependencies>

Spring Batch 处理周期

输入文件(CSV)→ 读取工具 (Reader)

Spring Batch 里的org.springframework.batch.item 中定义了许多内置的 Reader,如数据库(.database.*), 消息队列(.amqp.*),文件(.file.*),大数据(.data.*)等,读取 CSV 文件使用 org.springframework.batch.item.file.FlatFileItemReader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<bean id="playerReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<!-- 需要处理的资源路径,下面是通过 jobParameters 指定文件,可以不同 Job 处理不同文件 -->
<property name="resource" value="file:#{jobParameters['inputFile']}" />
<!-- 跳过行数 -->
<property name="linesToSkip" value="1" />
<!-- 定义一行数据映射的对象-->
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<!-- lineTokenizer 对象,根据某种分割符分割-->
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<!--分割符,默认是 ","-->
<property name="delimiter" value=","/>
<!-- lineTokenizer 会把行数据转换对应格式的 FieldSet的对象-->
<property name="names" value="id,name,team,goal" />
</bean>
</property>
<!-- fieldSetMapper 会将lineTokenizer.tokenize(line) 拆分的数据 -->
<property name="fieldSetMapper">
<!--映射到对象方法,需要自己定义-->
<bean class="com.dlc.springBatchDemo.reader.PlayerFieldSetMapper" />
</property>
</bean>
</property>
</bean>

定义需要映射的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.dlc.springBatchDemo.model;
public class Player {
private int id;
private String name;
private String team;
private int goal;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getTeam() {
return team;
}
public void setTeam(String team) {
this.team = team;
}
public int getGoal() {
return goal;
}
public void setGoal(int goal) {
this.goal = goal;
}
}

映射处理对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.dlc.springBatchDemo.reader;
import com.dlc.springBatchDemo.model.Player;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;
/** 根据 CSV 文件中的字段集合构建 Player 对象 */
public class PlayerFieldSetMapper implements FieldSetMapper<Player> {
public Player mapFieldSet(FieldSet fieldSet) throws BindException {
Player player = new Player();
player.setId( fieldSet.readInt( "id" ) );
player.setName( fieldSet.readString( "name" ) );
player.setTeam( fieldSet.readString( "team" ) );
player.setGoal( fieldSet.readInt( "goal" ) );
return player;
}
}

或则直接使用org.springframework.batch.item.file.mappin.BeanWrapperFieldSetMapper , 需要 CSV 转换列和对应 Bean 对象的字段名对应。

application context 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<beans>
<!-- Spring 注释 -->
<context:annotation-config />
<!-- 扫包,导入包下 component -->
<context:component-scan base-package="com.com.dlc.springBatchDemo" />
<!-- 配置 MySQL Data source -->
<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/bacth"/>
<property name="username" value="dlc"/>
<property name="password" value="dlc"/>
</bean>
<!-- Spring事务管理器, 用于管理MySQL事务 -->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
<!-- Spring 提供的与JDBC connections交互的模板设计模式实现,实际中一般使用Hibernate 等代替-->
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="dataSource" />
</bean>
<!--
Job Repository: 保持着所有Job执行的相关元数据
这里使用的MapJobRepositoryFactoryBean用于测试和快速原型,不具备 Spring Batch 多线程和分割等,MapJobRepositoryFactoryBean是线程安全的
-->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager" />
</bean>
<!-- Job Launcher: java程序来通过JobLauncher启动 Job
-->
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<!-- Reader bean:设置为scope 设置为 Step,当对应bean 使用 Spring Batch 的 JobParameters启动时是不存在,使用 step scope 使Spring Batch在创建ean 时能加载配置 JobParameters 参数 -->
<bean id="productReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
...
</bean>
</beans>

处理程序 (Processor)

处理程序不是 Spring Batch 处理里必须

对 CSV 文件里的数据进行计数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.dlc.springBatchDemo.processor;
import com.dlc.springBatchDemo.model.Player;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
/**
* Processor that finds existing players and updates a player goal
*/
public class PlayerGoalProcessor implements ItemProcessor<Player,Player> {
private static final String GET_PLAYER = "select * from PLAYER where id = ?";
@Autowired
private JdbcTemplate jdbcTemplate;
public Player process(Player player) throws Exception {
// Retrieve the player from the database
List<Player> playerList = jdbcTemplate.query(GET_PLAYER, new Object[] {player.getId()}, new RowMapper<Player>() {
public Player mapRow( ResultSet resultSet, int rowNum ) throws SQLException {
Player p = new Player();
p.setId( resultSet.getInt( 1 ) );
p.setName( resultSet.getString( 2 ) );
p.setTeam( resultSet.getString( 3 ) );
p.setGoal( resultSet.getInt( 4 ) );
return p;
}
});
if( playerList.size() > 0 ) {
Player existingPlayer = playerList.get( 0 );
player.setGoal( existingPlayer.getGoal() + player.getGoal()) );
}
return player;
}
}

PlayerGoalProcessor 只是对数据进行进球数据进行数据统计,此 processor 没有进行数据过滤,但如果 process() 方法返回的结果是 null, 则Spring Batch 将会忽略这个 item, 不将其发送给 Writer

配置

  • applicationContext.xml 引入 Bean

    1
    <bean id="playerGoalProcessor" class="com.dlc.springBatchDemo.processor.PlayerGoalProcessor" />

写工具 (Writer)→数据库 (DataBase)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.dlc.springBatchDemo.writer;
import com.dlc.springBatchDemo.model.Player;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
/**
* Writes players to a database
*/
public class PlayerItemWriter implements ItemWriter<Player>
{
private static final String GET_PLAYER = "select * from PLAYER where id = ?";
private static final String INSERT_PLAYER = "insert into PLAYER (id,name,team,goal) values (?,?,?,?)";
private static final String UPDATE_PLAYER = "update PLAYER set name = ?, team = ?,goal = ? where id = ?";
@Autowired
private JdbcTemplate jdbcTemplate;
public void write(List<? extends Player> players) throws Exception
{
for( Player player : players ){
List<Player> playerList = jdbcTemplate.query(GET_PLAYER, new Object[] {player.getId()}, new RowMapper<Player>() {
public Player mapRow( ResultSet resultSet, int rowNum ) throws SQLException {
Player p = new Player();
p.setId( resultSet.getInt( 1 ) );
p.setName( resultSet.getString( 2 ) );
p.setTeam( resultSet.getString( 3 ) );
p.setGoal( resultSet.getInt( 4 ) );
return p;
}
});
if( playerList.size() > 0 ){
jdbcTemplate.update( UPDATE_PLAYER, player.getName(), player.getTeam(), player.getGoal(), player.getId() );
}else{
jdbcTemplate.update( INSERT_PLAYER, player.getId(), player.getName(), player.getTeam(), player.getGoal() );
}
}
}
}

通过实现ItemWriter继承并实现了其唯一的方法: write() 。write() 方法接受一个泛型 Player 的 list . Spring Batch 使用“chunking”策略实现其 writers , 读取时是一次执行一个item, 而写入时是将一组数据一块写。 可以通过 commit-interval 配置控制每次一起写入的item的数量。

  • applicationContext.xml 引入 Bean

    1
    <bean id="playerWriter" class="com.dlc.springBatchDemo.writer.ProductItemWriter" />

配置 Job

import-csv-job.xml

1
2
3
4
5
6
7
8
9
10
11
<beans>
<!-- Import our beans -->
<import resource="classpath:/applicationContext.xml" />
<job id="simpleFileImportJob" xmlns="http://www.springframework.org/schema/batch">
<step id="importFileStep">
<tasklet>
<chunk reader="playerReader" processor="playerGoalProcessor" writer="playerWriter" commit-interval="5" />
</tasklet>
</step>
</job>
</beans>

一个 job 可以包含 0 到 多个 step; 一个 step 可以包含 0 到 多个 tasklet; 一个 tasklet 可以包含 0 到多个 chunk.

一个 job 可以包含 0到多个 step; 一个 step 可以有 0/1tasklet; 一个 tasklet 可以有 0/1chunk

多个 processors

通过设计批处理的适当的粒度来创建多个 item processor,然后按顺序在同一个 chunk之 中执行。比如:需要processor 来正确地管理 item 数量,需要跳过某个八强被灌3个球力夺银靴的球员。

1
2
3
4
5
6
7
8
9
10
<bean id="playerGoalProcessor" class="com.dlc.springBatchDemo.processor.PlayerGoalProcessor" />
<bean id="playerFilterProcessor" class="com.dlc.springBatchDemo.processor.playerFilterProcessor" />
<bean id="playerCompositeProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">
<property name="delegates">
<list>
<ref bean="playerFilterProcessor" />
<ref bean="playerGoalProcessor" />
</list>
</property>
</bean>

Tasklets(微线程)

Spring Batch 会分块读取 CSV 文件,依次读取每一个 item,在经过 processor 处理,处理完成之后会将结果收集并分组为 chunks , 然后把这些记录发送给 writer ,在这里是插入到数据库中。而有时需要一个步骤的 tasklet 不进行读写操作,只进行如:下载文件,压缩解密规定文件,Spring Batch 支持 自定义定义一个tasklet。

ArchivePlayerDataTasklet.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.dlc.springBatchDemo.tasklet;
import org.apache.commons.io.FileUtils;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import java.io.File;
/**
* Collect player goal data
* 实现Tasklet 接口的 execute() 方法
*/
public class ArchivePlayerDataTasklet implements Tasklet
{
private File inputFile;
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception
{
File archiveDir = new File( inputFile.getParent() + File.separator+"archive");
FileUtils.forceMkdir( archiveDir );
System.out.println(inputFile);
FileUtils.copyFileToDirectory( inputFile , archiveDir );
return RepeatStatus.FINISHED;
}
public File getInputFile() {
return inputFile;
}
public void setInputFile(File inputFile) {
this.inputFile = inputFile;
}
}

applicationContext.xml

1
2
3
<bean id="archiveFileTasklet" class="com.dlc.springBatchDemo.tasklet.ArchivePlayerDataTasklet" scope="step">
<property name="inputFile" value="#{jobParameters['inputFile']}" />
</bean>

import-csv-job.xml

1
2
3
4
5
6
7
8
9
<beans>
<!-- Import our beans -->
<import resource="classpath:/applicationContext.xml" />
<job id="simpleFileImportJob" xmlns="http://www.springframework.org/schema/batch">
<step id="archive">
<tasklet id="archiveFileTasklet"></tasklet>
</step>
</job>
</beans>

Resiliency(弹性)

  • Skipping Items(跳过某项)

    跳过某些记录, 比如 reader 读取的无效记录,或者处理/写入过程中出现异常的对象。 要跳过记录有两种方式:

    • chunk 元素上定义 skip-limit 属性, 告诉Spring 最多允许跳过多少个 items,超过则 job 失败(如果无效记录很少那可以接受,但如果无效记录太多,那可能输入数据就有问题了)。
    • 定义一个 skippable-exception-classes 列表, 用来判断当前记录是否可以跳过, 可以指定 include 元素来决定发生哪些异常时会跳过当前记录, 还可以指定 exclude 元素来决定哪些异常不会触发 skip( 比如你想跳过某个异常层次父类, 但排除一或多个子类异常时)。

    示例如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    <job id="simpleFileImportJob" xmlns="http://www.springframework.org/schema/batch">
    <step id="importFileStep">
    <tasklet>
    <chunk reader="productReader" processor="productProcessor" writer="productWriter" commit-interval="5" skip-limit="10">
    <skippable-exception-classes>
    <include class="org.springframework.batch.item.file.FlatFileParseException" />
    </skippable-exception-classes>
    </chunk>
    </tasklet>
    </step>
    </job>

    这里在处理某条记录时如果抛出 FlatFileParseException 异常, 则这条记录将被跳过。 如果超过10次 skip, 那么直接让 job 失败。

  • 重试(Retrying Items)

    有时发生的异常是可以重试的, 如由于读入数据库锁导致的失败。 重试(Retry)的实现和跳过(Skip)非常相似:

    • chunk 元素上定义 retry-limit 属性, 告诉Spring 每个 item 最多允许重试多少次, 超过则认为该记录处理失败。 如果只用重试, 不指定跳过,则如果某条记录重试处理失败, 则 job将被标记为失败。
    • 定义一个 retryable-exception-classes 列表, 用来判断当前记录是否可以重试; 可以指定 include 元素来决定哪些异常发生时当前记录可以重试, 还可以指定 exclude 元素来决定哪些异常不对当前记录重试执行.。

    例如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    <job id="simpleFileImportJob" xmlns="http://www.springframework.org/schema/batch">
    <step id="importFileStep">
    <tasklet>
    <chunk reader="productReader" processor="productProcessor" writer="productWriter" commit-interval="5" retry-limit="5">
    <retryable-exception-classes>
    <include class="org.springframework.dao.OptimisticLockingFailureException" />
    </retryable-exception-classes>
    </chunk>
    </tasklet>
    </step>
    </job>

    还可以将重试和可跳过的异常通过 skippable exception 与 retry exception 对应起来。 因此, 如果某个异常触发了5次重试, 5次重试之后还没搞定, 恰好该异常也在 skippable 列表中, 则这条记录将被跳过。 如果 exception 不在 skippable 列表则会让整个 job 失败。

  • 重启 job

    对于执行失败的 job作业, 我们可以重新启动,并让他们从上次断开的地方继续执行。 要做到这一点, 只需要使用和上次一模一样的参数来启动 job, 则 Spring Batch 会自动从数据库中找到这个实例然后继续执行。

配置数据库

由于配置里使用的是 MapJobRepositoryFactoryBean ,Spring Batch 默认使用的内存数据,只需要创建 Player 对象的表

1
2
3
4
5
6
7
CREATE TABLE PLAYER (
ID INT NOT NULL,
NAME VARCHAR(128) NOT NULL,
TEAM VARCHAR(128),
GOAL INT,
PRIMARY KEY(ID)
);

如果Spring Batch 需要使用 Mysql

<bean id="jobRepository"> 改为

1
<batch:job-repository id="jobRepository" data-source="dataSource" />

同时需要手动导入 Spring Batch 的表

spring-batch-core.jar/org/springframework/batch/core/schema-mysql.sql

如果只是测试也可以在 applicationContext.xml 里添加

1
2
3
4
<jdbc:initialize-database data-source="dataSource">
<jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql" />
<jdbc:script location="org/springframework/batch/core/schema-mysql.sql" />
</jdbc:initialize-database>

测试批量任务

在测试资源目录resources\添加需要导入的文件

sample.csv

1
2
3
4
5
6
7
8
id,name,team,goal
1,Cristiano Ronaldo,Real Madrid, 12
2,Lionel Messi,Barcelona, 11
3,Lewandowski,Bayern München, 8
4,Cavani,Paris Saint-Germain, 8
5,Aubameyang,Dortmund, 7
6,Mbappé,Monaco, 6
7,Griezmann,Barcelona, 6

pom 里添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<project>
...
<dependencies>
...
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<version>${spring.batch.version}</version>
<scope>test</scope>
</dependency>
...
</dependencies>
...
</project>

添加单元测试类

AppTest.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.dlc.springBatchDemo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.batch.test.StepScopeTestExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestExecutionListeners;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.support.DependencyInjectionTestExecutionListener;
import java.util.Date;
import static org.junit.Assert.assertEquals;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({ "classpath:/applicationContext.xml","classpath:/jobs/csv-import-job.xml","classpath:test-contxt.xml"})
public class BatchSpringContextTest {
@Autowired
private Job csvImportJob;
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void launchJobWithJobLauncherTest() throws Exception {
JobParameters jobParameters = new JobParametersBuilder().addString("inputFile","sample.csv").toJobParameters();
jobLauncherTestUtils.setJob(csvImportJob);
JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);
assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
}
}

Spring Batch Demo

学习参考自 Reading and writing CVS files with Spring Batch and MySQL