如何在 Spring Batch 中运行多个作业

在本文中,我们探讨了使用 Spring Batch 运行多个作业的一些方法。通过理解本文中使用的基本示例,我们可以设计一个更高效、可扩展且更易于维护的批处理系统。

Spring Batch是一个强大的框架,通过提供可重用的组件和可靠的基础架构,可以轻松处理大量数据。在实际场景中,应用程序通常需要按特定执行顺序同时执行多个作业,以优化性能并有效管理依赖关系。

在本教程中,我们将探讨在 Spring Batch 中运行多个作业的各种方法。

了解 Spring Batch 作业
在 Spring Batch 上下文中,作业是一系列步骤的容器,代表整个流程。每个作业都有一个唯一标识符,可以由按顺序或基于特定条件执行的多个步骤组成。我们可以使用 XML 或 Java 配置作业,JobLauncher通常会启动它们。

在以下场景中运行多个作业是有益的:

  • 并行处理
  • 数据迁移和 ETL 流程
  • 报告生成等
高效管理多个作业对于实现最佳性能、可维护性和可扩展性至关重要。让我们探索在 Spring Batch 中实现此目标的不同方法。

配置
首先,让我们配置依赖项:

<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>3.3.2</version> 
</dependency> 
<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-web</artifactId> 
    <version>3.3.2</version>
</dependency> 
<dependency> 
    <groupId>com.h2database</groupId> 
    <artifactId>h2</artifactId> 
    <scope>runtime</scope>
    <version>2.2.224</version> 
</dependency>

我们添加了 spring-boot-starter-web、 基本 Spring Boot 依赖项、用于批处理的spring-boot-starter-batch和用于内存数据库的h2 。

接下来,让我们启用批处理并配置数据源:

@Configuration
@EnableBatchProcessing
public class BatchConfig {
    @Bean
    public DataSource dataSource() {
        return DataSourceBuilder.create()
          .driverClassName("org.h2.Driver")
          .url(
"jdbc:h2:mem:batchdb;DB_CLOSE_DELAY=-1;")
          .username(
"sa")
          .password(
"")
          .build();
    }
    @Bean
    public DatabasePopulator databasePopulator(DataSource dataSource) {
        ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
        populator.addScript(new ClassPathResource(
"org/springframework/batch/core/schema-h2.sql"));
        populator.setContinueOnError(false);
        populator.execute(dataSource);
        return populator;
    }
}
现在,我们创建两个不同的作业作为示例。每个作业将执行一项简单任务:

@Configuration
public class JobsConfig {
    private static final Logger log = LoggerFactory.getLogger(SequentialJobsConfig.class);
    @Bean
    public Job jobOne(JobRepository jobRepository, Step stepOne) {
        return new JobBuilder(
"jobOne", jobRepository).start(stepOne)
          .build();
    }
    @Bean
    public Step stepOne(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder(
"stepOne", jobRepository).tasklet((contribution, chunkContext) -> {
              log.info(
"Hello");
              return RepeatStatus.FINISHED;
          }, transactionManager)
          .build();
    }
    @Bean
    public Job jobTwo(JobRepository jobRepository, Step stepTwo) {
        return new JobBuilder(
"jobTwo", jobRepository).start(stepTwo)
          .build();
    }
    @Bean
    public Step stepTwo(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder(
"stepTwo", jobRepository).tasklet((contribution, chunkContext) -> {
              log.info(
"World");
              return RepeatStatus.FINISHED;
          }, transactionManager)
          .build();
    }
}

@EnableBatchProcessing 注释 设置了必要的 Spring Batch 组件,例如JobLauncher、JobRepository和JobExplorer。

我们将两个单独的作业jobOne和jobTwo定义为 Spring bean。每个作业都有自己独特的配置和步骤,我们将在这些方法中定义它们。这些步骤是具有事务支持的简单任务,记录消息以确认每个步骤的执行时间。

让我们确认一下作业的定义:

@Autowired
private Job jobOne;
@Autowired
private Job jobTwo;
@Test
void givenJobsDefinitions_whenJobsLoaded_thenJobNamesShouldMatch() {
    assertNotNull(jobOne, "jobOne should be defined");
    assertEquals(
"jobOne", jobOne.getName());
    assertNotNull(jobTwo,
"jobTwo should be defined");
    assertEquals(
"jobTwo", jobTwo.getName());
}

顺序作业执行
如果我们的作业需要依次运行,尤其是当它们相互依赖输出时,顺序执行是最佳选择。让我们通过示例了解其工作原理。

@Component
public class SequentialJobsConfig {
    @Autowired
    private Job jobOne;
    @Autowired
    private Job jobTwo;
    @Autowired
    private JobLauncher jobLauncher;
    public void runJobsSequentially() {
        JobParameters jobParameters = new JobParametersBuilder().addString("ID", "Sequential 1")
          .toJobParameters();
        JobParameters jobParameters2 = new JobParametersBuilder().addString(
"ID", "Sequential 2")
          .toJobParameters();
       
// Run jobs one after another
        try {
            jobLauncher.run(jobOne, jobParameters);
            jobLauncher.run(jobTwo, jobParameters2);
        } catch (Exception e) {
           
// handle exception
            e.printStackTrace();
        }
    }
}

我们定义了一个名为SequentialJobsConfig的组件,并将我们之前创建的两个作业添加到该类中。之后,使用JobLauncher运行作业。我们构建了jobParameters,以确保每个作业实例都是唯一的,方法是使用 addString() 方法添加ID 。这种方法允许我们控制执行流程并在继续执行下一个作业之前检查每个作业的结果。

让我们检查一下作业是否成功运行:

@Autowired
private SequentialJobsConfig sequentialJobsConfig;
@Test
void givenSequentialJobs_whenExecuted_thenRunJobsInOrder() {
    assertDoesNotThrow(() -> sequentialJobsConfig.runJobsSequentially(), "Sequential job execution should execute");
}

并行作业执行
在某些情况下,我们的作业彼此不依赖,并行运行它们可以缩短执行时间。我们可以利用 Spring 的TaskExecutor接口来实现这一点:

@Component
public class ParallelJobService {
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private Job jobOne;
    @Autowired
    private Job jobTwo;
    public void runJobsInParallel() {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.execute(() -> {
            try {
                jobLauncher.run(jobOne, new JobParametersBuilder().addString("ID", "Parallel 1")
                  .toJobParameters());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        taskExecutor.execute(() -> {
            try {
                jobLauncher.run(jobTwo, new JobParametersBuilder().addString(
"ID", "Parallel 2")
                  .toJobParameters());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        taskExecutor.close();
    }
}

在此配置中,我们使用 Spring 的SimpleAsyncTaskExecutor通过JobLauncher启动作业。

然而,在使用并行方式时,我们需要考虑线程安全、资源争用、事务管理等因素,以确保稳定高效的执行。

使用作业调度
有时,我们不只是想运行多个作业,而是想在特定时间或间隔运行这些作业。这就是作业调度发挥作用的地方。这可以使用 Spring 的调度支持或外部调度程序轻松实现。

使用 Spring 的@Scheduling
@Scheduled注释允许以给定的时间间隔重复执行方法(作业)。此方法需要使用@EnableScheduling注释启用调度。

让我们创建一个带有所需注释的ScheduledJobs类来配置我们的作业:

@Configuration
@EnableScheduling
public class ScheduledJobs {
    private static final Logger log = LoggerFactory.getLogger(SequentialJobsConfig.class);
    @Autowired
    private Job jobOne;
    @Autowired
    private Job jobTwo;
    @Autowired
    private JobLauncher jobLauncher;
    @Scheduled(cron = "0 */1 * * * *")  // Run every minute
    public void runJob1() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
          .addString(
"jobID", String.valueOf(System.currentTimeMillis()))
          .toJobParameters();
        log.info(
"Executing sheduled job 1");
        jobLauncher.run(jobOne, jobParameters);
    }
    @Scheduled(fixedRate = 1000 * 60 * 3)  
// Run every 3 minutes
    public void runJob2() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
          .addString(
"jobID", String.valueOf(System.currentTimeMillis()))
          .toJobParameters();
        log.info(
"Executing sheduled job 2");
        jobLauncher.run(jobTwo, jobParameters);
    }
}

在此示例中,我们使用了上一节中创建的作业类。我们将jobOne配置为每分钟运行一次,而将jobTwo配置为每 3 分钟运行一次。@Scheduled注释允许使用固定速率或 cron 表达式定义从简单到复杂的调度模式。

使用 Quartz 调度器
Quartz调度程序是一个功能强大的库,用于在 Java 应用程序中调度任务。与@Scheduling一样,Quartz 允许以特定的时间间隔运行多个作业。为了能够使用 Quartz,我们需要添加spring-boot-starter-quartz 依赖项:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
    <version>3.3.2</version>
</dependency>

接下来,让我们创建两个作业,QuartzJobOne和QuartzJobTwo:

@Component
public class QuartzJobOne implements Job {
    private static final Logger log = LoggerFactory.getLogger(QuartzJobOne.class);
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        try {
            log.info("Job One is executing from quartz");
        } catch (Exception e) {
            log.error(
"Error executing Job One: {}", e.getMessage(), e);
            throw new JobExecutionException(e);
        }
    }
}
@Component
public class QuartzJobTwo implements Job {
    private static final Logger log = LoggerFactory.getLogger(QuartzJobOne.class);
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        try {
            log.info(
"Job Two is executing from quartz");
        } catch (Exception e) {
            log.error(
"Error executing Job Two: {}", e.getMessage(), e);
            throw new JobExecutionException(e);
        }
    }
}

现在,让我们为每个作业定义两个 bean、 JobDetail和一个Trigger :

@Configuration
public class QuartzConfig {
    @Autowired
    private Job quartzJobOne;
    @Autowired
    private Job quartzJobTwo;
    @Bean
    public JobDetail job1Detail() {
        return JobBuilder.newJob().ofType(quartzJobOne.getClass())
          .withIdentity("quartzJobOne", "group1")
          .storeDurably()
          .build();
    }
    @Bean
    public JobDetail job2Detail() {
        return JobBuilder.newJob().ofType(quartzJobTwo.getClass())
          .withIdentity(
"quartzJobTwo", "group1")
          .storeDurably()
          .build();
    }
    @Bean
    public Trigger job1Trigger(JobDetail job1Detail) {
        return TriggerBuilder.newTrigger()
          .forJob(job1Detail)
          .withIdentity(
"quartzJobOneTrigger", "group1")
          .withSchedule(CronScheduleBuilder.cronSchedule(
"0/10 * * * * ?"))
          .build();
    }
    @Bean
    public Trigger job2Trigger(JobDetail job2Detail) {
        return TriggerBuilder.newTrigger()
          .forJob(job2Detail)
          .withIdentity(
"quartzJobTwoTrigger", "group1")
          .withSchedule(CronScheduleBuilder.cronSchedule(
"0/15 * * * * ?"))
          .build();
    }
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean() {
        SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();
        schedulerFactory.setJobDetails(job1Detail(), job2Detail());
        schedulerFactory.setTriggers(job1Trigger(job1Detail()), job2Trigger(job2Detail()));
        return schedulerFactory;
    }
}

我们使用JobBuilder为我们的 Quartz 作业创建了一个JobDetail,分别指定了作业类及其标识。其次,我们创建了一个触发器,并使用 cron 表达式定义了作业的运行时间,该表达式分别安排作业每 10 秒和 15 秒运行一次。

我们在schedulerFactoryBean bean中自动启动我们的作业。运行quartz作业的方法有很多种,包括使用参数运行作业、使用日历调度以及暂停和恢复作业。

Quartz 灵活性高,支持复杂的调度场景。但是,它需要额外的设置,并且比使用 @Scheduling 更复杂。

动态作业执行
我们已经介绍了几种使用 Spring Batch 运行多个作业的方法,这些方法要求我们预先静态配置和定义作业。但是,有些情况下,我们希望根据某些运行时条件按需创建作业。在使用 Spring Batch 时,我们可以像往常一样使用面向块或基于任务集的方法来实现这一点。在这个例子中,我们将使用基于块的方法。

在面向块的方法中,每个作业的数据都从ItemReader读取,然后由ItemProcessor处理。读取和处理后的块随后传递给ItemWriter。

让我们创建一个DynamicJobService类并定义负责运行作业的方法:

@Service
public class DynamicJobService {
    private final JobRepository jobRepository;
    private final JobLauncher jobLauncher;
    private final PlatformTransactionManager transactionManager;
    public DynamicJobService(JobRepository jobRepository, JobLauncher jobLauncher, PlatformTransactionManager transactionManager) {
        this.jobRepository = jobRepository;
        this.jobLauncher = jobLauncher;
        this.transactionManager = transactionManager;
    }
    public void createAndRunJob(Map<String, List<String>> jobsData) throws Exception {
        List<Job> jobs = new ArrayList<>();
        // Create chunk-oriented jobs
        for (Map.Entry<String, List<String>> entry : jobsData.entrySet()) {
            if (entry.getValue() instanceof List) {
                jobs.add(createJob(entry.getKey(), entry.getValue()));
            }
        }
       
// Run all jobs
        for (Job job : jobs) {
            JobParameters jobParameters = new JobParametersBuilder().addString(
"jobID", String.valueOf(System.currentTimeMillis()))
              .toJobParameters();
            jobLauncher.run(job, jobParameters);
        }
    }
    private Job createJob(String jobName, List<String> data) {
        return new JobBuilder(jobName, jobRepository).start(createStep(data))
          .build();
    }
    private Step createStep(List<String> data) {
        return new StepBuilder(
"step", jobRepository).<String, String> chunk(10, transactionManager)
          .reader(new ListItemReader<>(data))
          .processor(item -> item.toUpperCase())
          .writer(items -> items.forEach(System.out::println))
          .build();
    }
}

在上面的示例中,我们创建了一个名为createAndRunJob的方法,该方法根据jobsData生成作业并启动它们。以下是执行过程中发生的情况:

reader ()方法从输入列表中一次读取一个项目。每个项目都传递给 process (),process() 将项目的首字母转换为大写。处理后的项目随后被收集到一个块中,块大小定义为 10。一旦块已填满或没有更多数据,块中的所有项目都将传递给 writer ()。writer 随后将块中的所有项目打印到控制台,并重复此过程,直到所有项目都处理完毕。

让我们看看该服务的实际运行:

@Autowired
private DynamicJobService dynamicJobService;
@Test
void givenJobData_whenJobsCreated_thenJobsRunSeccessfully() throws Exception {
    Map<String, List<String>> jobsData = new HashMap<>();
    jobsData.put("chunkJob1", Arrays.asList("data1", "data2", "data3"));
    jobsData.put(
"chunkJob2", Arrays.asList("data4", "data5", "data6"));
    assertDoesNotThrow(() -> dynamicJobService.createAndRunJob(jobsData),
"Dynamic job creation and execution should run successfully");
}

我们创建了两个作业并将其传递给服务的createAndRunJob方法,每个作业都有一个作业标识及其数据。

在实际示例中,我们可能会运行更复杂的处理逻辑。如果内置实现不能满足我们的特定要求,最好分别创建  ItemReader、ItemProcessor和ItemWriter的自定义实现。