Spring Batch复合条目阅读器教程

在本文中,我们学习了如何实现和测试CompositeItemReader,它允许我们按特定顺序处理来自多个来源的数据。通过将读取器链接在一起,我们可以按特定顺序处理来自文件、数据库或其他来源的数据。

Spring Batch中,CompositeItemReader是一种将多个ItemReader实例组合成单个读取器的工具。当我们需要从多个来源或按特定顺序读取数据时,这特别有用。例如,我们可能希望同时从数据库和文件中读取记录,或者按特定顺序处理来自两个不同表的数据。

CompositeItemReader简化了批处理作业中多个读取器的处理,确保高效灵活的数据处理。在本教程中,我们将介绍在 Spring Batch 中CompositeItemReader的实现,并查看示例和测试用例以验证其行为。

什么是CompositeItemReader
CompositeItemReader的工作原理是将读取过程委托给ItemReader实例列表。它按照定义顺序从每个读取器读取项目,确保按顺序处理数据。

这在以下场景中尤其有用:

  • 从多个数据库或表中读取
  • 合并文件和数据库中的数据
  • 按特定顺序处理来自不同来源的数据
此外,CompositeItemReader是org.springframework.batch.item.support包的一部分,它是在 Spring Batch 5.2.0 中引入的。

实现CompositeItemReader
让我们来看一个示例,其中我们从两个不同的来源读取数据:一个平面文件和一个数据库。目标是将来自两个来源的产品数据合并为一个流以进行批处理。一些产品在平面文件中,而其他产品在数据库中,确保所有可用记录一起处理。

1. 创建产品类别
在设置读取器之前,我们需要一个Product类来表示正在处理的数据的结构。此类封装了有关产品的详细信息,例如其 ID、名称、库存情况和价格。我们将在从 CSV 文件和数据库读取时使用此模型,以确保数据处理的一致性。

Product类充当我们的读者和批处理作业之间的数据传输对象 (DTO):

public class Product {
    private Long productId;
    private String productName;
    private Integer stock;
    private BigDecimal price;
    public Product(Long productId, String productName, Integer stock, BigDecimal price) {
        this.productId = productId;
        this.productName = productName;
        this.stock = stock;
        this.price = price;
    }
    // Getters and Setters
}

Product类表示批处理作业将处理的每条记录。现在我们的数据模型已准备就绪,我们将为CSV 文件和数据库创建单独的ItemReader组件。

2.产品数据的平面文件读取器
第一个读取器使用FlatFileItemReader从 CSV 文件获取数据。我们将其配置为读取分隔文件 ( products.csv ) 并将其字段映射到Product类:

@Bean
public FlatFileItemReader<Product> fileReader() {
  return new FlatFileItemReaderBuilder<Product>()
    .name("fileReader")
    .resource(new ClassPathResource(
"products.csv"))
    .delimited()
    .names(
"productId", "productName", "stock", "price")
    .linesToSkip(1)
    .targetType(Product.class)
    .build();
}

这里,delimited()方法确保数据字段使用分隔符(默认情况下为逗号)分隔。names ()方法定义与Product类的属性匹配的列名,而targetType(Product.class)方法将字段映射到类属性。

3.产品数据数据库读取器
接下来,我们定义一个JdbcCursorItemReader来从名为products的数据库表中检索产品数据。此读取器执行 SQL 查询来获取产品详细信息并将其映射到我们的Product类。

下面是数据库读取器的实现:

@Bean
public JdbcCursorItemReader<Product> dbReader(DataSource dataSource) {
  return new JdbcCursorItemReaderBuilder<Product>()
    .name("dbReader")
    .dataSource(dataSource())
    .sql(
"SELECT productid, productname, stock, price FROM products")
    .rowMapper((rs, rowNum) -> new Product(
      rs.getLong(
"productid"),
      rs.getString(
"productname"),
      rs.getInt(
"stock"),
      rs.getBigDecimal(
"price")))
    .build();
}

JdbcCursorItemReader使用游标从数据库中一次一行地读取产品记录,从而提高批处理效率。rowMapper()函数将结果集中的每一列映射到Product类中的相应字段。

使用CompositeItemReader组合阅读器
现在我们的 CSV 和数据库读取器都已配置为读取产品数据,我们可以使用CompositeItemReader将它们集成:

@Bean
public CompositeItemReader<Product> compositeReader() {
    return new CompositeItemReader<>(Arrays.asList(fileReader(), dbReader()));
}

通过配置我们的CompositeItemReader,我们可以顺序处理来自多个来源的数据。

最初,FlatFileItemReader从 CSV 文件中读取产品记录,将每行解析为Product对象。处理完文件中的所有行后,JdbcCursorItemReader接管并开始直接从数据库获取产品数据。

配置批处理作业
一旦我们为 CSV 文件和数据库定义了读取器,下一步就是配置批处理作业本身。在 Spring Batch 中,一个作业由多个步骤组成,其中每个步骤处理处理管道的特定部分:

@Bean
public Job productJob(JobRepository jobRepository, Step step) {
  return new JobBuilder("productJob", jobRepository)
    .start(step)
    .build();
}
@Bean
public Step step(ItemReader compositeReader, ItemWriter productWriter) {
  return new StepBuilder(
"productStep", jobRepository)
    .<Product, Product>chunk(10, transactionManager)
    .reader(compositeReader)
    .writer(productWriter)
    .build();
}

在这种情况下,我们的工作包含一个步骤,即读取产品数据、以10 个块的形式处理它,并将其写入所需的输出。

productJob bean 负责定义批处理作业。它从配置为处理产品数据处理的productStep开始执行。

通过此设置,我们的批处理作业首先使用CompositeItemReader从两个来源读取产品数据,以 10 个数据块为单位进行处理,然后使用productWriter()写入转换或过滤后的数据。这确保了批处理管道顺畅而高效。

运行批处理作业
现在我们已经配置了读取器和作业,下一步是运行批处理作业并观察CompositeItemReader的行为。我们将在 Spring Boot 应用程序中运行该作业,以查看它如何处理来自 CSV 文件和数据库的数据。

为了以编程方式触发批处理作业,我们需要使用JobLauncher。这使我们能够启动作业并监视其进度:

@Bean
public CommandLineRunner runJob() {
    return args -> {
        try {
            jobLauncher.run(productJob, new JobParametersBuilder()
              .addLong("time", System.currentTimeMillis())
              .toJobParameters());
        } catch (Exception e) {
           
// handle exception
        }
    };
}

在此示例中,我们创建一个CommandLineRunner bean 来在应用程序启动时运行该作业。这将使用JobLauncher调用productJob。我们还添加了带有时间戳的唯一JobParameters,以确保该作业每次都以唯一方式运行。

测试复合项目阅读器
为了确保CompositeItemReader按预期工作,我们将测试CompositeItemReader的功能,以确保它能从 CSV 和数据库源正确读取产品。

准备测试数据
我们首先准备一个包含产品数据的 CSV 文件,作为CompositeItemReader的输入:

productId,productName,stock,price
101,Apple,50,1.99

然后,我们还向产品表中插入一条记录:

@BeforeEach
public void setUp() {
    jdbcTemplate.update("INSERT INTO products (productid, productname, stock, price) VALUES (?, ?, ?, ?)",
      102,
"Banana", 30, 1.49);
}

测试阅读顺序
现在,我们将测试CompositeItemReader,以验证它是否按正确的顺序处理产品,从 CSV 和数据库源读取。在此测试中,我们从 CSV 文件读取产品,然后从数据库读取产品,并断言这些值符合我们的预期:

@Test
public void givenTwoReaders_whenRead_thenProcessProductsInOrder() throws Exception {
    StepExecution stepExecution = new StepExecution(
      "testStep",
      new JobExecution(1L, new JobParameters()),
      1L);
    ExecutionContext executionContext = stepExecution.getExecutionContext();
    compositeReader.open(executionContext);
    Product product1 = compositeReader.read();
    assertNotNull(product1);
    assertEquals(101, product1.getProductId());
    assertEquals(
"Apple", product1.getProductName());
    Product product2 = compositeReader.read();
    assertNotNull(product2);
    assertEquals(102, product2.getProductId());
    assertEquals(
"Banana", product2.getProductName());
}

使用一个读取器返回空结果进行测试
在本节中,我们将测试当使用多个读取器并且其中一个读取器返回null 时CompositeItemReader的行为。 这一点很重要,以确保CompositeItemReader跳过任何不返回任何数据的读取器并继续下一个读取器,直到找到有效数据:

@Test
public void givenMultipleReader_whenOneReaderReturnNull_thenProcessDataFromNextReader() throws Exception {
    ItemStreamReader<Product> emptyReader = mock(ItemStreamReader.class);
    when(emptyReader.read()).thenReturn(null);
    ItemStreamReader<Product> validReader = mock(ItemStreamReader.class);
    when(validReader.read()).thenReturn(new Product(103L, "Cherry", 20, BigDecimal.valueOf(2.99)), null);
    CompositeItemReader<Product> compositeReader = new CompositeItemReader<>(
      Arrays.asList(emptyReader, validReader));
    Product product = compositeReader.read();
    assertNotNull(product);
    assertEquals(103, product.getProductId());
    assertEquals(
"Cherry", product.getProductName());
}