在本文中,我们学习了如何实现和测试CompositeItemReader,它允许我们按特定顺序处理来自多个来源的数据。通过将读取器链接在一起,我们可以按特定顺序处理来自文件、数据库或其他来源的数据。
在 Spring Batch中,CompositeItemReader是一种将多个ItemReader实例组合成单个读取器的工具。当我们需要从多个来源或按特定顺序读取数据时,这特别有用。例如,我们可能希望同时从数据库和文件中读取记录,或者按特定顺序处理来自两个不同表的数据。
CompositeItemReader简化了批处理作业中多个读取器的处理,确保高效灵活的数据处理。在本教程中,我们将介绍在 Spring Batch 中CompositeItemReader的实现,并查看示例和测试用例以验证其行为。
什么是CompositeItemReader
CompositeItemReader的工作原理是将读取过程委托给ItemReader实例列表。它按照定义顺序从每个读取器读取项目,确保按顺序处理数据。
这在以下场景中尤其有用:
- 从多个数据库或表中读取
- 合并文件和数据库中的数据
- 按特定顺序处理来自不同来源的数据
此外,CompositeItemReader是org.springframework.batch.item.support包的一部分,它是在 Spring Batch 5.2.0 中引入的。
实现CompositeItemReader
让我们来看一个示例,其中我们从两个不同的来源读取数据:一个平面文件和一个数据库。目标是将来自两个来源的产品数据合并为一个流以进行批处理。一些产品在平面文件中,而其他产品在数据库中,确保所有可用记录一起处理。
1. 创建产品类别
在设置读取器之前,我们需要一个Product类来表示正在处理的数据的结构。此类封装了有关产品的详细信息,例如其 ID、名称、库存情况和价格。我们将在从 CSV 文件和数据库读取时使用此模型,以确保数据处理的一致性。
Product类充当我们的读者和批处理作业之间的数据传输对象 (DTO):
public class Product { private Long productId; private String productName; private Integer stock; private BigDecimal price; public Product(Long productId, String productName, Integer stock, BigDecimal price) { this.productId = productId; this.productName = productName; this.stock = stock; this.price = price; } // Getters and Setters }
|
Product类表示批处理作业将处理的每条记录。现在我们的数据模型已准备就绪,我们将为CSV 文件和数据库创建单独的ItemReader组件。
2.产品数据的平面文件读取器
第一个读取器使用FlatFileItemReader从 CSV 文件获取数据。我们将其配置为读取分隔文件 ( products.csv ) 并将其字段映射到Product类:
@Bean public FlatFileItemReader<Product> fileReader() { return new FlatFileItemReaderBuilder<Product>() .name("fileReader") .resource(new ClassPathResource("products.csv")) .delimited() .names("productId", "productName", "stock", "price") .linesToSkip(1) .targetType(Product.class) .build(); }
|
这里,delimited()方法确保数据字段使用分隔符(默认情况下为逗号)分隔。names ()方法定义与Product类的属性匹配的列名,而targetType(Product.class)方法将字段映射到类属性。
3.产品数据数据库读取器
接下来,我们定义一个JdbcCursorItemReader来从名为products的数据库表中检索产品数据。此读取器执行 SQL 查询来获取产品详细信息并将其映射到我们的Product类。
下面是数据库读取器的实现:
@Bean public JdbcCursorItemReader<Product> dbReader(DataSource dataSource) { return new JdbcCursorItemReaderBuilder<Product>() .name("dbReader") .dataSource(dataSource()) .sql("SELECT productid, productname, stock, price FROM products") .rowMapper((rs, rowNum) -> new Product( rs.getLong("productid"), rs.getString("productname"), rs.getInt("stock"), rs.getBigDecimal("price"))) .build(); }
|
JdbcCursorItemReader使用游标从数据库中一次一行地读取产品记录,从而提高批处理效率。rowMapper()函数将结果集中的每一列映射到Product类中的相应字段。
使用CompositeItemReader组合阅读器
现在我们的 CSV 和数据库读取器都已配置为读取产品数据,我们可以使用CompositeItemReader将它们集成:
@Bean public CompositeItemReader<Product> compositeReader() { return new CompositeItemReader<>(Arrays.asList(fileReader(), dbReader())); }
|
通过配置我们的CompositeItemReader,我们可以顺序处理来自多个来源的数据。
最初,FlatFileItemReader从 CSV 文件中读取产品记录,将每行解析为Product对象。处理完文件中的所有行后,JdbcCursorItemReader接管并开始直接从数据库获取产品数据。
配置批处理作业
一旦我们为 CSV 文件和数据库定义了读取器,下一步就是配置批处理作业本身。在 Spring Batch 中,一个作业由多个步骤组成,其中每个步骤处理处理管道的特定部分:
@Bean public Job productJob(JobRepository jobRepository, Step step) { return new JobBuilder("productJob", jobRepository) .start(step) .build(); } @Bean public Step step(ItemReader compositeReader, ItemWriter productWriter) { return new StepBuilder("productStep", jobRepository) .<Product, Product>chunk(10, transactionManager) .reader(compositeReader) .writer(productWriter) .build(); }
|
在这种情况下,我们的工作包含一个步骤,即读取产品数据、以10 个块的形式处理它,并将其写入所需的输出。
productJob bean 负责定义批处理作业。它从配置为处理产品数据处理的productStep开始执行。
通过此设置,我们的批处理作业首先使用CompositeItemReader从两个来源读取产品数据,以 10 个数据块为单位进行处理,然后使用productWriter()写入转换或过滤后的数据。这确保了批处理管道顺畅而高效。
运行批处理作业
现在我们已经配置了读取器和作业,下一步是运行批处理作业并观察CompositeItemReader的行为。我们将在 Spring Boot 应用程序中运行该作业,以查看它如何处理来自 CSV 文件和数据库的数据。
为了以编程方式触发批处理作业,我们需要使用JobLauncher。这使我们能够启动作业并监视其进度:
@Bean public CommandLineRunner runJob() { return args -> { try { jobLauncher.run(productJob, new JobParametersBuilder() .addLong("time", System.currentTimeMillis()) .toJobParameters()); } catch (Exception e) { // handle exception } }; }
|
在此示例中,我们创建一个CommandLineRunner bean 来在应用程序启动时运行该作业。这将使用JobLauncher调用productJob。我们还添加了带有时间戳的唯一JobParameters,以确保该作业每次都以唯一方式运行。
测试复合项目阅读器
为了确保CompositeItemReader按预期工作,我们将测试CompositeItemReader的功能,以确保它能从 CSV 和数据库源正确读取产品。
准备测试数据
我们首先准备一个包含产品数据的 CSV 文件,作为CompositeItemReader的输入:
productId,productName,stock,price 101,Apple,50,1.99
|
然后,我们还向产品表中插入一条记录:
@BeforeEach public void setUp() { jdbcTemplate.update("INSERT INTO products (productid, productname, stock, price) VALUES (?, ?, ?, ?)", 102, "Banana", 30, 1.49); }
|
测试阅读顺序
现在,我们将测试CompositeItemReader,以验证它是否按正确的顺序处理产品,从 CSV 和数据库源读取。在此测试中,我们从 CSV 文件读取产品,然后从数据库读取产品,并断言这些值符合我们的预期:
@Test public void givenTwoReaders_whenRead_thenProcessProductsInOrder() throws Exception { StepExecution stepExecution = new StepExecution( "testStep", new JobExecution(1L, new JobParameters()), 1L); ExecutionContext executionContext = stepExecution.getExecutionContext(); compositeReader.open(executionContext); Product product1 = compositeReader.read(); assertNotNull(product1); assertEquals(101, product1.getProductId()); assertEquals("Apple", product1.getProductName()); Product product2 = compositeReader.read(); assertNotNull(product2); assertEquals(102, product2.getProductId()); assertEquals("Banana", product2.getProductName()); }
|
使用一个读取器返回空结果进行测试
在本节中,我们将测试当使用多个读取器并且其中一个读取器返回null 时CompositeItemReader的行为。 这一点很重要,以确保CompositeItemReader跳过任何不返回任何数据的读取器并继续下一个读取器,直到找到有效数据:
@Test public void givenMultipleReader_whenOneReaderReturnNull_thenProcessDataFromNextReader() throws Exception { ItemStreamReader<Product> emptyReader = mock(ItemStreamReader.class); when(emptyReader.read()).thenReturn(null); ItemStreamReader<Product> validReader = mock(ItemStreamReader.class); when(validReader.read()).thenReturn(new Product(103L, "Cherry", 20, BigDecimal.valueOf(2.99)), null); CompositeItemReader<Product> compositeReader = new CompositeItemReader<>( Arrays.asList(emptyReader, validReader)); Product product = compositeReader.read(); assertNotNull(product); assertEquals(103, product.getProductId()); assertEquals("Cherry", product.getProductName()); }
|