20-12-12
banq
在本文中,我们将学习如何使用Spring Boot创建一个简单的Spring Batch作业。首先,我们从定义一些基本配置开始。然后,我们将看到如何添加文件读取器和数据库写入器。最后,如何应用一些自定义处理并检查我们的作业是否成功执行。
可以在GitHub上获得本文的完整源代码 。
首先,让我们将spring-boot-starter-batch添加到我们的 pom.xml中:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> <version>2.4.0.RELEASE</version> </dependency> |
我们还将添加org.hsqldb依赖关系,该依赖关系也可以从Maven Central 获得:
<dependency> <groupId>org.hsqldb</groupId> <artifactId>hsqldb</artifactId> <version>2.5.1</version> <scope>runtime</scope> </dependency> |
我们将构建一个工作,该工作将从CSV文件导入咖啡清单,使用自定义处理器对其进行转换,然后将最终结果存储在内存数据库中。
让我们从定义应用程序入口点开始:
@SpringBootApplication public class SpringBootBatchProcessingApplication { public static void main(String[] args) { SpringApplication.run(SpringBootBatchProcessingApplication.class, args); } } |
如我们所见,这是一个标准的Spring Boot应用程序。由于我们希望在可能的情况下使用默认配置值,因此我们将使用一组非常简单的应用程序配置属性。
我们将在src / main / resources / application.properties文件中定义以下属性:
file.input=coffee-list.csv |
此属性包含我们输入的咖啡清单的位置。每行都包含我们咖啡的品牌,来源和一些特征:
Blue Mountain,Jamaica,Fruity Lavazza,Colombia,Strong Folgers,America,Smokey |
我们将看到,这是一个平面CSV文件,这意味着Spring可以在不进行任何特殊自定义的情况下对其进行处理。
接下来,我们将添加一个SQL脚本schema-all.sql来创建我们的咖啡桌来存储数据:
DROP TABLE coffee IF EXISTS; CREATE TABLE coffee ( coffee_id BIGINT IDENTITY NOT NULL PRIMARY KEY, brand VARCHAR(20), origin VARCHAR(20), characteristics VARCHAR(30) ); |
通常,Spring Boot将在启动过程中自动运行此脚本。
随后,我们将需要一个简单的域类来保存我们的咖啡项目:
public class Coffee { private String brand; private String origin; private String characteristics; public Coffee(String brand, String origin, String characteristics) { this.brand = brand; this.origin = origin; this.characteristics = characteristics; } // getters and setters } |
如前所述,我们的Coffee对象包含三个属性。
现在,到关键部分,我们的工作配置。我们将逐步进行,建立我们的配置并解释其中的每个部分:
@Configuration @EnableBatchProcessing public class BatchConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Value("${file.input}") private String fileInput; // ... } |
首先,我们从标准的Spring @Configuration类开始。接下来,我们在类中添加 @EnableBatchProcessing批注。值得注意的是,这使我们能够使用许多有用的bean来支持工作,并将节省大量的日常工作。
此外,使用此注释还使我们可以访问两个有用的工厂,稍后将在构建作业配置和作业步骤时使用它们。
对于初始配置的最后一部分,我们包括对先前声明的file.input属性的引用。
现在,我们可以继续在配置中定义一个阅读器bean:
@Bean public FlatFileItemReader reader() { return new FlatFileItemReaderBuilder().name("coffeeItemReader") .resource(new ClassPathResource(fileInput)) .delimited() .names(new String[] { "brand", "origin", "characteristics" }) .fieldSetMapper(new BeanWrapperFieldSetMapper() {{ setTargetType(Coffee.class); }}) .build(); } |
简而言之,上面定义的阅读器bean将查找名为coffee-list.csv的文件,并将每个订单项解析为Coffee对象。
同样,我们定义一个writer bean:
@Bean public JdbcBatchItemWriter writer(DataSource dataSource) { return new JdbcBatchItemWriterBuilder() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO coffee (brand, origin, characteristics) VALUES (:brand, :origin, :characteristics)") .dataSource(dataSource) .build(); } |
这次,我们包含了由咖啡对象的Java Bean属性驱动的将单个咖啡项目插入到数据库中所需的SQL语句。方便地,通过@EnableBatchProcessing批注自动创建dataSource。
最后,我们需要添加实际的作业步骤和配置:
@Bean public Job importUserJob(JobCompletionNotificationListener listener, Step step1) { return jobBuilderFactory.get("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1) .end() .build(); } @Bean public Step step1(JdbcBatchItemWriter writer) { return stepBuilderFactory.get("step1") .<Coffee, Coffee> chunk(10) .reader(reader()) .processor(processor()) .writer(writer) .build(); } @Bean public CoffeeItemProcessor processor() { return new CoffeeItemProcessor(); } |
如我们所见,我们的工作相对简单,由step1方法中定义的一个步骤组成。
让我们看一下此步骤正在执行的操作:
- 首先,我们配置步骤,以便使用chunk(10)声明一次最多写入十条记录
- 然后,我们使用读取器bean读取咖啡数据,该读取器bean是使用reader方法设置的
- 接下来,我们将每个咖啡项目传递给自定义处理器,在其中应用一些自定义业务逻辑
- 最后,我们使用之前看到的编写器将每个咖啡项目写入数据库
另一方面,我们的importUserJob包含我们的作业定义,其中包含使用内置RunIdIncrementer类的ID 。我们还设置了一个JobCompletionNotificationListener,用来在作业完成时得到通知。
为了完成我们的作业配置,我们列出了每个步骤(尽管此作业只有一个步骤)。现在,我们已经完成了完美的配置!
让我们详细了解一下我们先前在作业配置中定义的自定义处理器:
public class CoffeeItemProcessor implements ItemProcessor<Coffee, Coffee> { private static final Logger LOGGER = LoggerFactory.getLogger(CoffeeItemProcessor.class); @Override public Coffee process(final Coffee coffee) throws Exception { String brand = coffee.getBrand().toUpperCase(); String origin = coffee.getOrigin().toUpperCase(); String chracteristics = coffee.getCharacteristics().toUpperCase(); Coffee transformedCoffee = new Coffee(brand, origin, chracteristics); LOGGER.info("Converting ( {} ) into ( {} )", coffee, transformedCoffee); return transformedCoffee; } } |
特别感兴趣的是,ItemProcessor接口为我们提供了一种在作业执行期间应用某些特定业务逻辑的机制。
为了简单起见,我们定义了CoffeeItemProcessor,它接受一个输入Coffee对象,并将每个属性转换为uppercase。
此外,我们还将编写一个JobCompletionNotificationListener, 以在工作完成时提供一些反馈:
@Override public void afterJob(JobExecution jobExecution) { if (jobExecution.getStatus() == BatchStatus.COMPLETED) { LOGGER.info("!!! JOB FINISHED! Time to verify the results"); String query = "SELECT brand, origin, characteristics FROM coffee"; jdbcTemplate.query(query, (rs, row) -> new Coffee(rs.getString(1), rs.getString(2), rs.getString(3))) .forEach(coffee -> LOGGER.info("Found < {} > in the database.", coffee)); } } |
在上面的示例中,我们重写afterJob方法并检查作业是否成功完成。此外,我们运行一个简单查询以检查每个咖啡项目是否已成功存储在数据库中。
现在我们已经准备就绪,可以开始工作了,这是有趣的部分。让我们继续工作吧:
... 17:41:16.336 [main] INFO c.b.b.JobCompletionNotificationListener - !!! JOB FINISHED! Time to verify the results 17:41:16.336 [main] INFO c.b.b.JobCompletionNotificationListener - Found < Coffee [brand=BLUE MOUNTAIN, origin=JAMAICA, characteristics=FRUITY] > in the database. 17:41:16.337 [main] INFO c.b.b.JobCompletionNotificationListener - Found < Coffee [brand=LAVAZZA, origin=COLOMBIA, characteristics=STRONG] > in the database. 17:41:16.337 [main] INFO c.b.b.JobCompletionNotificationListener - Found < Coffee [brand=FOLGERS, origin=AMERICA, characteristics=SMOKEY] > in the database. ... |
如我们所见,我们的工作成功运行,每项咖啡都按预期存储在数据库中。