面对现代企业应用当中的复杂业务以及海量的数据,除了页面复杂的人机交互处理,还有一类是不需要人工干预,只需要定期读入大批量数据,按对应的规则处理。这种处理被称为 “批处理 ”。
特点:
- 数据量巨大
- 不需要的人工干预,根据系统的配置自动处理
- 按时间配置处理。一天或者一个月执行一次
批处理对应三个环节:
- 读取数据,数据可能来自文件、数据库或消息队列等
- 数据处理,如金融系统的对账的计费处理
- 写数据,将输出结果写入文件、数据库或消息队列等
Spring Batch 不仅提供了统一的读写接口、丰富的任务处理方式、灵活的事务管理以及并发处理,同时还支持日志、监控、任务重启与跳过等特性。Spring Batch 是一款批处理应用框架,不是调度框架。它只关注任务处理的相关问题,如事务、并发、监控、执行等,并不会提供相应的任务调度功能。如果需要批处理任务定期执行,可以结合 Quartz 等调度框架试下。
通过 Spring Batch 将 CSV 文件里的数据写入到数据库里,Spring Batch 会读取数据将数据映射为一个对象中,并在连接数据将生成的数据插入到数据中。
Maven 依赖
|
|
Spring Batch 处理周期
输入文件(CSV)→ 读取工具 (Reader)
Spring Batch 里的org.springframework.batch.item 中定义了许多内置的 Reader,如数据库(.database.*), 消息队列(.amqp.*),文件(.file.*),大数据(.data.*)等,读取 CSV 文件使用 org.springframework.batch.item.file.FlatFileItemReader
|
|
定义需要映射的对象
|
|
映射处理对象
|
|
或则直接使用org.springframework.batch.item.file.mappin.BeanWrapperFieldSetMapper , 需要 CSV 转换列和对应 Bean 对象的字段名对应。
application context 配置
|
|
处理程序 (Processor)
处理程序不是 Spring Batch 处理里必须
对 CSV 文件里的数据进行计数
|
|
PlayerGoalProcessor 只是对数据进行进球数据进行数据统计,此 processor 没有进行数据过滤,但如果 process() 方法返回的结果是 null, 则Spring Batch 将会忽略这个 item, 不将其发送给 Writer
配置
applicationContext.xml 引入 Bean
1<bean id="playerGoalProcessor" class="com.dlc.springBatchDemo.processor.PlayerGoalProcessor" />
写工具 (Writer)→数据库 (DataBase)
|
|
通过实现ItemWriter继承并实现了其唯一的方法: write() 。write() 方法接受一个泛型 Player 的 list . Spring Batch 使用“chunking”策略实现其 writers , 读取时是一次执行一个item, 而写入时是将一组数据一块写。 可以通过 commit-interval 配置控制每次一起写入的item的数量。
applicationContext.xml 引入 Bean
1<bean id="playerWriter" class="com.dlc.springBatchDemo.writer.ProductItemWriter" />
配置 Job
import-csv-job.xml
|
|
一个 job 可以包含 0 到 多个 step; 一个 step 可以包含 0 到 多个 tasklet; 一个 tasklet 可以包含 0 到多个 chunk.
一个 job 可以包含 0到多个 step; 一个 step 可以有 0/1 个tasklet; 一个 tasklet 可以有 0/1 个 chunk。
多个 processors
通过设计批处理的适当的粒度来创建多个 item processor,然后按顺序在同一个 chunk之 中执行。比如:需要processor 来正确地管理 item 数量,需要跳过某个八强被灌3个球力夺银靴的球员。
|
|
Tasklets(微线程)
Spring Batch 会分块读取 CSV 文件,依次读取每一个 item,在经过 processor 处理,处理完成之后会将结果收集并分组为 chunks , 然后把这些记录发送给 writer ,在这里是插入到数据库中。而有时需要一个步骤的 tasklet 不进行读写操作,只进行如:下载文件,压缩解密规定文件,Spring Batch 支持 自定义定义一个tasklet。
ArchivePlayerDataTasklet.java
|
|
applicationContext.xml
|
|
import-csv-job.xml
|
|
Resiliency(弹性)
Skipping Items(跳过某项)
跳过某些记录, 比如 reader 读取的无效记录,或者处理/写入过程中出现异常的对象。 要跳过记录有两种方式:
- 在
chunk元素上定义skip-limit属性, 告诉Spring 最多允许跳过多少个 items,超过则 job 失败(如果无效记录很少那可以接受,但如果无效记录太多,那可能输入数据就有问题了)。 - 定义一个
skippable-exception-classes列表, 用来判断当前记录是否可以跳过, 可以指定include元素来决定发生哪些异常时会跳过当前记录, 还可以指定exclude元素来决定哪些异常不会触发 skip( 比如你想跳过某个异常层次父类, 但排除一或多个子类异常时)。
示例如下:
1234567891011<job id="simpleFileImportJob" xmlns="http://www.springframework.org/schema/batch"><step id="importFileStep"><tasklet><chunk reader="productReader" processor="productProcessor" writer="productWriter" commit-interval="5" skip-limit="10"><skippable-exception-classes><include class="org.springframework.batch.item.file.FlatFileParseException" /></skippable-exception-classes></chunk></tasklet></step></job>这里在处理某条记录时如果抛出
FlatFileParseException异常, 则这条记录将被跳过。 如果超过10次 skip, 那么直接让 job 失败。- 在
重试(Retrying Items)
有时发生的异常是可以重试的, 如由于读入数据库锁导致的失败。 重试(Retry)的实现和跳过(Skip)非常相似:
- 在
chunk元素上定义retry-limit属性, 告诉Spring 每个 item 最多允许重试多少次, 超过则认为该记录处理失败。 如果只用重试, 不指定跳过,则如果某条记录重试处理失败, 则 job将被标记为失败。 - 定义一个
retryable-exception-classes列表, 用来判断当前记录是否可以重试; 可以指定include元素来决定哪些异常发生时当前记录可以重试, 还可以指定exclude元素来决定哪些异常不对当前记录重试执行.。
例如:
1234567891011<job id="simpleFileImportJob" xmlns="http://www.springframework.org/schema/batch"><step id="importFileStep"><tasklet><chunk reader="productReader" processor="productProcessor" writer="productWriter" commit-interval="5" retry-limit="5"><retryable-exception-classes><include class="org.springframework.dao.OptimisticLockingFailureException" /></retryable-exception-classes></chunk></tasklet></step></job>还可以将重试和可跳过的异常通过 skippable exception 与 retry exception 对应起来。 因此, 如果某个异常触发了5次重试, 5次重试之后还没搞定, 恰好该异常也在 skippable 列表中, 则这条记录将被跳过。 如果 exception 不在 skippable 列表则会让整个 job 失败。
- 在
重启 job
对于执行失败的 job作业, 我们可以重新启动,并让他们从上次断开的地方继续执行。 要做到这一点, 只需要使用和上次一模一样的参数来启动 job, 则 Spring Batch 会自动从数据库中找到这个实例然后继续执行。
配置数据库
由于配置里使用的是 MapJobRepositoryFactoryBean ,Spring Batch 默认使用的内存数据,只需要创建 Player 对象的表
|
|
如果Spring Batch 需要使用 Mysql
将<bean id="jobRepository"> 改为
|
|
同时需要手动导入 Spring Batch 的表
spring-batch-core.jar/org/springframework/batch/core/schema-mysql.sql
如果只是测试也可以在 applicationContext.xml 里添加
|
|
测试批量任务
在测试资源目录resources\添加需要导入的文件
sample.csv
|
|
pom 里添加依赖
|
|
添加单元测试类
AppTest.java
|
|
学习参考自 Reading and writing CVS files with Spring Batch and MySQL