Spring Batch是 Java 中用于批处理的强大框架,因此使其成为数据处理活动和计划作业运行的流行选择。根据业务逻辑的复杂性,作业可以依赖不同的配置值和动态参数。
在本文中,我们将探讨如何使用JobParameters以及如何从基本批处理组件访问它们。
演示设置
我们将为药房服务开发 Spring Batch。主要业务任务是找到即将过期的药品,根据销量计算新的价格,并通知消费者即将过期的药品。此外,我们将从内存中的 H2 数据库中读取数据并将所有处理详细信息写入日志以简化实现。
1.依赖关系
要开始演示应用程序,我们需要添加 Spring Batch 和 H2 依赖项:
<dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <version>2.2.224</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> <version>3.2.0</version> </dependency>
|
我们可以在 Maven Central 存储库中找到最新的H2和Spring Batch版本。
2.准备测试数据
让我们首先在schema-all.sql中定义模式:
DROP TABLE medicine IF EXISTS; CREATE TABLE medicine ( med_id VARCHAR(36) PRIMARY KEY, name VARCHAR(30), type VARCHAR(30), expiration_date TIMESTAMP, original_price DECIMAL, sale_price DECIMAL );
|
data.sql中提供了初始测试数据:
INSERT INTO medicine VALUES ('ec278dd3-87b9-4ad1-858f-dfe5bc34bdb5', 'Lidocaine', 'ANESTHETICS', DATEADD('DAY', 120, CURRENT_DATE), 10, null); INSERT INTO medicine VALUES ('9d39321d-34f3-4eb7-bb9a-a69734e0e372', 'Flucloxacillin', 'ANTIBACTERIALS', DATEADD('DAY', 40, CURRENT_DATE), 20, null); INSERT INTO medicine VALUES ('87f4ff13-de40-4c7f-95db-627f309394dd', 'Amoxicillin', 'ANTIBACTERIALS', DATEADD('DAY', 70, CURRENT_DATE), 30, null); INSERT INTO medicine VALUES ('acd99d6a-27be-4c89-babe-0edf4dca22cb', 'Prozac', 'ANTIDEPRESSANTS', DATEADD('DAY', 30, CURRENT_DATE), 40, null);
|
Spring Boot 在应用程序启动过程中运行这些文件,我们将在测试执行中使用这些测试数据。
3.医学领域类
对于我们的服务,我们需要一个简单的Medicine实体类:
@AllArgsConstructor @Data public class Medicine { private UUID id; private String name; private MedicineCategory type; private Timestamp expirationDate; private Double originalPrice; private Double salePrice; }
|
ItemReader使用expirationDate字段来计算药物是否即将过期。当药物接近失效日期时,ItemProcessor将更新salePrice字段。
4.应用程序属性
应用程序需要src/main/resources/application.properties文件中的多个属性:
spring.batch.job.enabled=false batch.medicine.cron=0 */1 * * * * batch.medicine.alert_type=LOGS batch.medicine.expiration.default.days=60 batch.medicine.start.sale.default.days=45 batch.medicine.sale=0.1
|
由于我们只配置一项作业,因此spring.batch.job.enabled应设置为false以禁用初始作业执行。默认情况下,Spring 在上下文启动后使用空参数运行作业:
[main] INFO o.s.b.a.b.JobLauncherApplicationRunner - Running default command line with: []
batch.medicine.cron属性定义计划运行的cron 表达式。根据定义的场景,我们应该每天运行该作业。然而,在我们的例子中,作业每分钟都会启动,以便能够轻松检查处理行为。
InputReader、InputProcessor和InpurWriter需要其他属性来执行业务逻辑。
作业参数
Spring Batch 包含一个JobParameters类,旨在存储特定作业运行的运行时参数。事实证明,此功能在各种情况下都很有用。例如,它允许传递特定运行期间生成的动态变量。此外,它还可以创建一个控制器,该控制器可以根据客户端提供的参数启动作业。
在我们的场景中,我们将利用此类来保存应用程序参数和动态运行时参数。
1. StepScope和JobScope
除了常规 Spring 中众所周知的 bean 作用域之外,Spring Batch 还引入了两个额外的作用域:StepScope和JobScope。有了这些范围,就可以为工作流中的每个步骤或作业创建唯一的 bean。 Spring 确保与特定步骤/作业关联的资源在其整个生命周期中被隔离和独立管理。
有了这个功能,我们可以轻松控制上下文,并在特定运行的读取、处理和写入部分之间共享所有所需的属性。为了能够注入作业参数,我们需要使用@StepScope或@JobScope注释依赖的 bean 。
2.在计划执行中填充作业参数
让我们定义MedExpirationBatchRunner类,它将通过cron 表达式启动我们的工作(在我们的例子中每 1 分钟一次)。我们应该用@EnableScheduling注释该类并定义适当的@Scheduled入口方法:
@Component @EnableScheduling public class MedExpirationBatchRunner { ... @Scheduled(cron = "${batch.medicine.cron}", zone = "GMT") public void runJob() { ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); launchJob(now); } }
|
由于我们想要手动启动作业,因此我们应该使用JobLaucher类并在JobLauncher# run()方法中提供填充的JobParameter。在我们的示例中,我们提供了application.properties中的值以及两个特定于运行的参数(触发作业的日期和跟踪 ID):
public void launchJob(ZonedDateTime triggerZonedDateTime) { try { JobParameters jobParameters = new JobParametersBuilder() .addString(BatchConstants.TRIGGERED_DATE_TIME, triggerZonedDateTime.toString()) .addString(BatchConstants.ALERT_TYPE, alertType) .addLong(BatchConstants.DEFAULT_EXPIRATION, defaultExpiration) .addLong(BatchConstants.SALE_STARTS_DAYS, saleStartDays) .addDouble(BatchConstants.MEDICINE_SALE, medicineSale) .addString(BatchConstants.TRACE_ID, UUID.randomUUID().toString()) .toJobParameters(); jobLauncher.run(medExpirationJob, jobParameters); } catch (Exception e) { log.error("Failed to run", e); } }
|
配置参数后,我们有几种选择如何在代码中使用这些值。
3.读取Bean定义中的作业参数
使用 SpEL,我们可以从配置类中的 bean 定义访问作业参数。 Spring 将所有参数组合到常规字符串到对象映射中:
@Bean @StepScope public MedicineProcessor medicineProcessor(@Value("#{jobParameters}") Map<String, Object> jobParameters) { ... }
|
在该方法内部,我们将使用jobParameters来启动 MedicineProcessor 的适当字段。
4.直接读取Service中的作业参数
另一种选择是在ItemReader本身中使用 setter 注入。我们可以像从任何其他映射中一样通过 SpEL 表达式获取确切的参数值:
@Setter public class ExpiresSoonMedicineReader extends AbstractItemCountingItemStreamItemReader<Medicine> { @Value("#{jobParameters['DEFAULT_EXPIRATION']}") private long defaultExpiration; }
|
我们只需要确保SpEL中使用的key与参数初始化时使用的key相同即可。
5.通过Before Step读取作业参数
Spring Batch 提供了一个StepExecutionListener接口,允许我们监听步骤执行阶段:步骤开始之前和步骤完成后。我们可以利用此功能,在步骤开始之前访问属性,并执行任何自定义逻辑。最简单的方法就是使用@BeforeStep注释,它对应于StepExecutionListener中的beforeStep()方法:
@BeforeStep public void beforeStep(StepExecution stepExecution) { JobParameters parameters = stepExecution.getJobExecution() .getJobParameters(); ... log.info("Before step params: {}", parameters); }
|
作业配置
让我们把所有的部分结合起来看一下全貌。
读取器、处理器和写入器需要两个属性:BatchConstants.TRIGGERED_DATE_TIME和BatchConstants.TRACE_ID。
我们将对所有步骤 bean 定义中的公共参数使用相同的提取逻辑:
private void enrichWithJobParameters(Map<String, Object> jobParameters, ContainsJobParameters container) { if (jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME) != null) { container.setTriggeredDateTime(ZonedDateTime.parse(jobParameters.get(BatchConstants.TRIGGERED_DATE_TIME) .toString())); } if (jobParameters.get(BatchConstants.TRACE_ID) != null) { container.setTraceId(jobParameters.get(BatchConstants.TRACE_ID).toString()); } }
|
总的来说,其他参数是特定于组件的,没有共同的逻辑。
1.配置ItemReader
首先,我们要配置ExpiresSoonMedicineReader并丰富常用参数:
@Bean @StepScope public ExpiresSoonMedicineReader expiresSoonMedicineReader(JdbcTemplate jdbcTemplate, @Value("#{jobParameters}") Map<String, Object> jobParameters) { ExpiresSoonMedicineReader medicineReader = new ExpiresSoonMedicineReader(jdbcTemplate); enrichWithJobParameters(jobParameters, medicineReader); return medicineReader; }
|
让我们仔细看看阅读器的具体实现。TriggeredDateTime和traceId参数在bean构建期间直接注入,defaultExpiration参数由Spring通过setter注入。为了演示,我们在doOpen()方法中使用了所有这些:
public class ExpiresSoonMedicineReader extends AbstractItemCountingItemStreamItemReader<Medicine> implements ContainsJobParameters { private ZonedDateTime triggeredDateTime; private String traceId; @Value("#{jobParameters['DEFAULT_EXPIRATION']}") private long defaultExpiration; private List<Medicine> expiringMedicineList; ... @Override protected void doOpen() { expiringMedicineList = jdbcTemplate.query(FIND_EXPIRING_SOON_MEDICINE, ps -> ps.setLong(1, defaultExpiration), (rs, row) -> getMedicine(rs)); log.info("Trace = {}. Found {} meds that expires soon", traceId, expiringMedicineList.size()); if (!expiringMedicineList.isEmpty()) { setMaxItemCount(expiringMedicineList.size()); } } @PostConstruct public void init() { setName(ClassUtils.getShortName(getClass())); } }
|
ItemReader不应标记为@Component。另外,我们需要调用setName()方法来设置所需的读者名称。
2.配置ItemProcessor和ItemWriter
ItemProcessor和ItemWriter遵循与ItemReader相同的方法。因此它们不需要任何特定配置来访问参数。 bean定义逻辑通过enrichWithJobParameters()方法初始化公共参数。其他由单个类使用且不需要在所有组件中填充的参数由 Spring 通过在相应类中进行 setter 注入来丰富。
我们应该使用@StepScope注释来标记所有依赖于属性的bean 。否则,Spring 将仅在上下文启动时创建一次 bean,并且不会注入参数值。
3.配置完整流程
我们不需要采取任何特定操作来使用参数配置作业。因此我们只需要组合所有的bean:
@Bean public Job medExpirationJob(JobRepository jobRepository, PlatformTransactionManager transactionManager, MedicineWriter medicineWriter, MedicineProcessor medicineProcessor, ExpiresSoonMedicineReader expiresSoonMedicineReader) { Step notifyAboutExpiringMedicine = new StepBuilder("notifyAboutExpiringMedicine", jobRepository).<Medicine, Medicine>chunk(10) .reader(expiresSoonMedicineReader) .processor(medicineProcessor) .writer(medicineWriter) .faultTolerant() .transactionManager(transactionManager) .build(); return new JobBuilder("medExpirationJob", jobRepository) .incrementer(new RunIdIncrementer()) .start(notifyAboutExpiringMedicine) .build(); }
|
运行应用程序
让我们运行一个完整的示例,看看应用程序如何使用所有参数。我们需要从SpringBatchExpireMeminationApplication类启动 Spring Boot 应用程序。
一旦计划的方法执行,Spring 就会记录所有参数:
INFO o.s.b.c.l.support.SimpleJobLauncher - Job: [SimpleJob: [name=medExpirationJob]] launched with the following parameters: [{'SALE_STARTS_DAYS':'{value=45, type=class java.lang.Long, identifying=true}','MEDICINE_SALE':'{value=0.1, type=class java.lang.Double, identifying=true}','TRACE_ID':'{value=e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, type=class java.lang.String, identifying=true}','ALERT_TYPE':'{value=LOGS, type=class java.lang.String, identifying=true}','TRIGGERED_DATE_TIME':'{value=2023-12-06T22:36:00.011436600Z, type=class java.lang.String, identifying=true}','DEFAULT_EXPIRATION':'{value=60, type=class java.lang.Long, identifying=true}'}]
|
首先,ItemReader写入根据DEFAULT_EXPIRATION参数找到的药物信息:
INFO c.b.b.job.ExpiresSoonMedicineReader - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. Found 2 meds that expires soon
|
其次,ItemProcessor使用SALE_STARTS_DAYS和MEDICINE_SALE参数来计算新价格:
INFO c.b.b.job.MedicineProcessor - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, calculated new sale price 18.0 for medicine 9d39321d-34f3-4eb7-bb9a-a69734e0e372 INFO c.b.b.job.MedicineProcessor - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5, calculated new sale price 36.0 for medicine acd99d6a-27be-4c89-babe-0edf4dca22cb
|
最后,ItemWriter将更新的药物写入同一跟踪中的日志:
INFO c.b.b.job.MedicineWriter - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. This medicine is expiring Medicine(id=9d39321d-34f3-4eb7-bb9a-a69734e0e372, name=Flucloxacillin, type=ANTIBACTERIALS, expirationDate=2024-01-16 00:00:00.0, originalPrice=20.0, salePrice=18.0) INFO c.b.b.job.MedicineWriter - Trace = e35a26a4-4d56-4dfe-bf36-c1e5f20940a5. This medicine is expiring Medicine(id=acd99d6a-27be-4c89-babe-0edf4dca22cb, name=Prozac, type=ANTIDEPRESSANTS, expirationDate=2024-01-06 00:00:00.0, originalPrice=40.0, salePrice=36.0) INFO c.b.b.job.MedicineWriter - Finishing job started at 2023-12-07T11:58:00.014430400Z
|
结论
在本文中,我们学习了如何在 Spring Batch 中使用作业参数。ItemReader、ItemProcessor和ItemWriter可以在 bean 初始化期间手动丰富参数,也可以由 Spring 通过@BeforeStep或 setter 注入来丰富。