Spring Batch中重启失败的作业并继续执行

banq

Spring Batch 的默认可重启性支持从作业失败中进行稳健恢复,确保失败的作业可以从故障点恢复,而无需重新处理已完成的项目或丢失数据。

在本文中,我们创建了一个简单的作业来演示此重启功能。我们配置了一个作业来分块处理项目,模拟Item3发生故障。重启后,该作业从Item3恢复,处理完Item5后成功完成,状态为COMPLETED。

我们还探讨了如何使用JobBuilder上的preventRestart()来覆盖此行为。这会强制作业从头开始,例如Item1,而不是从失败点恢复。


Spring Batch提供了强大的失败作业重启机制。这些机制允许作业从故障点恢复处理。此功能对于高效处理大规模数据处理任务至关重要。

Spring Batch 内置的JobRepository会持久保存作业的执行状态。这允许作业默认具有可重启性。因此,失败的作业可以从中断处精确恢复。这确保不会发生重复处理或数据丢失。

在本教程中,我们将研究如何有效地配置和重新启动失败的 Spring Batch 作业。

Maven依赖项
让我们首先将spring-boot-starter-batch、spring-boot-starter-data-jpa和h2依赖项导入到我们的pom.xml中:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>3.3.2</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
    <version>3.3.2</version>
</dependency>
      
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>2.2.224</version>
</dependency>

我们需要基于文件的 H2 数据库来通过在应用程序运行期间保留作业执行状态来实现作业重启能力。

定义一个简单的 Spring Batch 作业
在本节中,我们将探索一个 Spring Batch 作业配置,该配置演示了一个简单的批处理工作流程。我们将定义一个只有一个步骤的作业:处理一个 CSV 文件。

在 Spring Boot 3 中,我们应该避免使用@EnableBatchProcessing,因为它会禁用 Spring Boot 的有用的自动配置(例如创建 Spring Batch 表)。

配置
让我们创建BatchConfig类,它设置一个名为simpleJob的作业:

@Configuration
public class BatchConfig {
    @Bean
    public Job simpleJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new JobBuilder("simpleJob", jobRepository)
          .start(step1(jobRepository, transactionManager))
          .build();
    }
}

simpleJob bean 使用JobBuilder定义批处理作业。该作业包含一个步骤:step1 ,用于读取、处理和写入 CSV 文件:

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("step1", jobRepository)
      .<String, String>chunk(2, transactionManager)
      .reader(flatFileItemReader())
      .processor(itemProcessor())
      .writer(itemWriter())
      .build();
}

步骤 1使用StepBuilder 定义了一个基于块的步骤。此步骤每次处理两个项目的数据,从FlatFileItemReader读取字符串,使用ItemProcessor进行转换,最后使用ItemWriter写入结果——所有操作均在PlatformTransactionManager控制的事务中进行管理, 以确保数据完整性。

JobRepository保留步骤的执行状态,包括FlatFileItemReader的位置,以便在作业失败时从最后一个未提交的块重新启动。

物品阅读器
让我们定义提供输入数据的flatFileItemReader bean:

@Bean
@StepScope
public FlatFileItemReader<String> flatFileItemReader() {
    return new FlatFileItemReaderBuilder<String>()
      .name("itemReader")
      .resource(new ClassPathResource(
"data.csv"))
      .lineMapper(new PassThroughLineMapper())
      .saveState(true)
      .build();
}

该代码定义了一个名为flatFileItemReader的FlatFileItemReader bean, 并使用@StepScope进行注释,以确保每次执行步骤时都会创建一个新实例,从而实现正确的状态管理和可重启性。它从位于类路径下的data.csv文件中读取字符串,并使用PassThroughLineMapper将每一行映射到一个字符串。

此外,它还具有saveState(true)方法来将其读取位置持久化到ExecutionContext中。这样,如果作业失败,读取器就可以从最后一行未处理的行继续执行,并利用JobRepository 进行状态持久化。

为了使作业真正可重启,ItemReader必须将其状态持久化到 Spring Batch 的执行上下文中。像FlatFileItemReader这样的读取器会自动在块之间保存关键的进度信息(例如行号或记录数)。

项目处理器
让我们声明转换输入数据的itemProcessor bean:

@Bean
public RestartItemProcessor itemProcessor() {
    return new RestartItemProcessor();
}
static class RestartItemProcessor implements ItemProcessor<String, String> {
    private boolean failOnItem3 = true;
    public void setFailOnItem3(boolean failOnItem3) {
        this.failOnItem3 = failOnItem3;
    }
    @Override
    public String process(String item) throws Exception {
        System.out.println("Processing: " + item + " (failOnItem3=" + failOnItem3 + ")");
        if (failOnItem3 && item.equals(
"Item3")) {
            throw new RuntimeException(
"Simulated failure on Item3");
        }
        return
"PROCESSED " + item;
    }
}

它通过在每个项目前加上“PROCESSED”来处理每个项目 ,并模拟 Item3 的失败。

条目写入器
现在,让我们创建输出处理后的数据的itemWriter bean:

@Bean
public ItemWriter<String> itemWriter() {
    return items -> {
      System.out.println("Writing items:");
      for (String item : items) {
          System.out.println(
"- " + item);
      }
    };
}

这会将处理过的项目打印到控制台。现在,我们的应用程序已经准备就绪。

重新启动失败的 Spring Batch 作业
Spring Batch 作业默认设计为可重启,允许它们从故障点无缝恢复。因此,无需额外配置即可启用此功能。

但是,为了使其有效工作,作业状态必须持久保存在JobRepository中。此外,JobRepository必须由数据库支持,以确保可靠地存储和检索作业的执行状态。

以下小节介绍如何模拟作业失败并重新启动它。

模拟作业失败
为了模拟作业失败,ItemProcessor配置为在处理Item3时 抛出RuntimeException 。当Item3发生故障时,JobRepository 会存储此状态,并将该作业标记为FAILED。

使用mvn spring-boot:run运行应用程序会产生以下内容:

Starting new job execution...
Processing: Item1
Processing: Item2
Writing items:
- PROCESSED Item1
- PROCESSED Item2
Processing: Item3
[Exception: Simulated failure on Item3]
Job started with status: FAILED

此输出确认Item1和Item2已被处理和写入,但Item3上的故障导致作业停止,并且状态将持续到随后的重新启动。

重新启动作业
要重新启动失败的作业,我们使用CommandLineRunner通过JobExplorer和固定的JobParameters检测失败的作业实例:

@Bean
CommandLineRunner run(JobLauncher jobLauncher, Job job, JobExplorer jobExplorer,
    JobOperator jobOperator, BatchConfig.RestartItemProcessor itemProcessor) {
    return args -> {
      JobParameters jobParameters = new JobParametersBuilder()
        .addString("jobId", "test-job-" + System.currentTimeMillis())
        .toJobParameters();
      List<JobInstance> instances = jobExplorer.getJobInstances(
"simpleJob", 0, 1);
      if (!instances.isEmpty()) {
          JobInstance lastInstance = instances.get(0);
          List<JobExecution> executions = jobExplorer.getJobExecutions(lastInstance);
          if (!executions.isEmpty()) {
              JobExecution lastExecution = executions.get(0);
              if (lastExecution.getStatus() == BatchStatus.FAILED) {
                  itemProcessor.setFailOnItem3(false);
                  JobExecution restartedExecution = jobLauncher.run(job, jobParameters);
                 
// final Long restartId = jobOperator.restart(lastExecution.getId());
                 
// final JobExecution restartedExecution = jobExplorer.getJobExecution(restartedExecution);
                 
// ...
              }
          }
      }
    };
}

该代码通过JobExplorer检查作业存储库中是否存在simpleJob实例。

它检测到状态为FAILED的失败执行。它会使用JobLauncher.run()或JobOperator.restart()来恢复该特定作业。该作业将从其上次持久化的状态恢复。这确保了先前处理过的项目不会被重新处理。

为了忽略重启期间Item3的失败,我们在启动重启的作业之前设置了itemProcessor.setFailOnItem3(false) ,从而允许RestartItemProcessor处理Item3而不会引发异常。

现在,我们再次运行该应用程序。

让我们检查一下输出:

Restarting failed job execution with ID: [execution_id]
Processing: Item3
Processing: Item4
Writing items:
- PROCESSED Item3
- PROCESSED Item4
Processing: Item5
Writing items:
- PROCESSED Item5
Restarted job status: COMPLETED

Spring Batch 作业在Item3上失败了。它成功地从故障点重新启动。然后,该作业以块的形式处理了Item3到Item5,写入了结果,并以COMPLETED状态完成。

测试作业重启
让我们添加一个单元测试来验证 Spring Batch 的作业重启功能,而不是使用CommandLineRunner:

@Test
public void givenItems_whenFailed_thenRestartFromFailure() throws Exception {
    // Given
    createTestFile(
"Item1\nItem2\nItem3\nItem4");
    JobParameters jobParameters = new JobParametersBuilder()
      .addLong(
"time", System.currentTimeMillis())
      .toJobParameters();
   
// When
    JobExecution firstExecution = jobLauncherTestUtils.launchJob(jobParameters);
    assertEquals(BatchStatus.FAILED, firstExecution.getStatus());
    Long executionId = firstExecution.getId();
    itemProcessor.setFailOnItem3(false);
   
// Then
    JobExecution restartedExecution = jobLauncherTestUtils.launchJob(jobParameters);
    assertEquals(BatchStatus.COMPLETED, restartedExecution.getStatus());
    assertEquals(
      firstExecution.getJobInstance().getInstanceId(),
      restartedExecution.getJobInstance().getInstanceId()
    );
}

该测试方法首先执行一个作业,该作业在处理Item3时故意失败(断言预期的FAILED状态)。然后,它修改处理器行为,使其不再在该项目上失败。最后,它使用相同的参数重新启动该作业,以确认它能够从失败点成功完成。

该测试验证了三个关键方面。首先,它确认初始故障是否按预期发生,从而测试故障模拟。其次,它确保重新启动的作业从故障点继续处理,从而验证重新启动逻辑。最后,它检查两次执行是否属于同一个作业实例,从而验证实例跟踪。

防止作业重启
我们可以使用preventRestart()方法阻止重新启动作业:

@Bean
public Job simpleJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new JobBuilder("simpleJob", jobRepository)
      .start(step1(jobRepository, transactionManager))
      .preventRestart()
      .build();
}

在JobBuilder中添加.preventRestart()方法,可以将作业配置为始终从头开始(例如,Item1),而不是在重新启动时从失败点(例如,Item3)继续执行。这将覆盖 Spring Batch 的默认行为,即为了便于重启,将作业状态持久保存在JobRepository中。