Spring Batch中构建自定义读取器和写入器

在 Spring Batch 中,自定义读取器和写入器是您可以创建的组件,用于以符合应用程序要求的选定方式读取和写入数据。这些组件在批处理作业中用于处理记录的输入和输出。

  • 自定义读取器:Spring Batch 中的自定义读取器负责分析来自数据源的数据并将其传递给批处理进行进一步处理。要创建自定义阅读器,我们需要实现 ItemReader 接口。
  • 自定义编写器:Spring Batch 中的自定义编写器负责将处理后的数据写入目的地。要创建自定义编写器,您需要实现 ItemWriter 接口。

自定义读取器和自定义写入器示例
场景:假设您正在构建一个 Spring Batch 应用程序来处理来自 GeeksforGeeks 的文章。这些文章是从外部源以自定义 JSON 格式接收的。您的任务是阅读和处理这些文章,提取相关信息,然后将其存储在数据库中。

  • 使用自定义阅读器问题:文章的外部源使用的 JSON 格式是自定义的,与 Spring Batch 提供的标准 JSON 阅读器不匹配。
  • 使用自定义阅读器的解决方案:创建一个自定义阅读器,该阅读器可以理解自定义 JSON 格式并实现正确读取和解析文章的逻辑。在自定义阅读器中实现任何必要的数据转换或验证逻辑。
  • Custom Writer的使用问题:处理后的文章数据需要存储在数据库中,标准的JDBC writer可能无法提供所需的灵活性。
  • 使用自定义编写器的解决方案:开发一个自定义编写器,该自定义编写器获取处理后的文章数据并使用特定架构或业务规则将其写入数据库。合并任何其他步骤,例如更新相关表或在自定义编写器中执行后处理任务。

自定义 Json 文章阅读器:

// CustomJsonArticleReader.java 
import com.fasterxml.jackson.databind.ObjectMapper; 
import org.springframework.batch.item.ItemReader; 

import java.io.IOException; 
import java.util.Iterator; 
import java.util.List; 

public class CustomJsonArticleReader implements ItemReader<Article> { 

    private final Iterator<String> jsonArticlesIterator; 
    private final ObjectMapper objectMapper = new ObjectMapper(); 

    public CustomJsonArticleReader(List<String> jsonArticles) { 
        this.jsonArticlesIterator = jsonArticles.iterator(); 
    } 

    @Override
    public Article read() { 
        if (jsonArticlesIterator.hasNext()) { 
            String jsonArticle = jsonArticlesIterator.next(); 

            try
                
//自定义逻辑来解析 JSON 并将其转换为文章对象 ;
                return objectMapper.readValue(jsonArticle, Article.class); 
            } catch (IOException e) { 
                
// Handle JSON parsing errors 
                e.printStackTrace(); 
            } 
        } 
        return null
    } 

在这个给定的代码中:

  • CustomJsonArticleReader 从表示文章的 JSON 字符串列表中读取数据。
  • 它使用 Jackson 的 ObjectMapper 将 JSON 反序列化为 Article 对象。
  • 在读取方法中处理任何必要的数据转换或验证逻辑。

自定义数据库文章作者:

// CustomDatabaseArticleWriter.java 
import org.springframework.batch.item.ItemWriter; 

import java.util.List; 

public class CustomDatabaseArticleWriter implements ItemWriter<Article> { 

    private final ArticleRepository articleRepository;
// 假设您有一个 Spring Data JPA 文章存储库 ;

    public CustomDatabaseArticleWriter(ArticleRepository articleRepository) { 
        this.articleRepository = articleRepository; 
    } 

    @Override
    public void write(List<? extends Article> articles) throws Exception { 
        for (Article article : articles) { 
            
//在数据库中存储文章的自定义逻辑 ;
            articleRepository.save(article); 

            
// 如果需要,可添加业务逻辑或后处理步骤 
       
//例如,更新相关表格或触发事件 ;
        } 
    } 

在给定的代码中:

  • CustomDatabaseArticleWriter 实现 ItemWriter 接口,用于在数据库中处理和存储 Article 对象。
  • 它使用 Spring Data JPA 存储库 (ArticleRepository) 将文章保存到数据库。
  • 您可以在写入方法中包含其他业务逻辑或后处理步骤。

在 Spring Batch 作业配置中集成自定义读取器和写入器:

  • 配置依赖项:确保项目中具有必要的依赖项,包括 Spring Batch 和任何其他所需的库。另外,如果您使用数据库,请确保具有适当的数据库配置。
  • 创建自定义阅读器:实现自定义阅读器(例如 CustomJsonArticleReader),该阅读器了解 GeeksforGeeks 文章的自定义 JSON 格式并从外部源读取数据。
  • 创建自定义编写器:实现自定义编写器(例如,CustomDatabaseArticleWriter)来处理已处理文章的存储,在持久数据之前应用任何必要的业务逻辑或转换。
  • 配置 Spring Batch 作业:创建 Spring Batch 作业配置类(例如 BatchConfiguration)并为自定义读取器和写入器定义 bean。注入任何所需的依赖项,例如数据库存储库。让我们将 CustomJsonArticleReader 和 CustomDatabaseArticleWriter 集成到 Spring Batch 作业配置中。

import org.springframework.batch.core.Job; 
import org.springframework.batch.core.Step; 
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; 
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; 
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; 
import org.springframework.batch.core.launch.support.RunIdIncrementer; 
import org.springframework.batch.item.ItemReader; 
import org.springframework.batch.item.ItemWriter; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 

@Configuration
@EnableBatchProcessing
public class BatchConfiguration { 

    private final JobBuilderFactory jobBuilderFactory; 
    private final StepBuilderFactory stepBuilderFactory; 

    public BatchConfiguration(JobBuilderFactory jobBuilderFactory, 
                            StepBuilderFactory stepBuilderFactory) { 
        this.jobBuilderFactory = jobBuilderFactory; 
        this.stepBuilderFactory = stepBuilderFactory; 
    } 

    @Bean
    public ItemReader<Article> customJsonArticleReader() { 
        List<String> jsonArticles = Arrays.asList( 
                "{\"title\":\"Article 1\",\"content\":\"Content 1\"}"
                
"{\"title\":\"Article 2\",\"content\":\"Content 2\"}"
        ); 
        return new CustomJsonArticleReader(jsonArticles); 
    } 

    @Bean
    public ItemWriter<Article> customDatabaseArticleWriter(ArticleRepository articleRepository) { 
        return new CustomDatabaseArticleWriter(articleRepository); 
    } 

    @Bean
    public Step processArticlesStep(ItemReader<Article> customJsonArticleReader, ItemWriter<Article> customDatabaseArticleWriter) { 
        return stepBuilderFactory.get(
"processArticlesStep"
                .<Article, Article>chunk(10) 
                .reader(customJsonArticleReader) 
                .writer(customDatabaseArticleWriter) 
                .build(); 
    } 

    @Bean
    public Job processArticlesJob(Step processArticlesStep) { 
        return jobBuilderFactory.get(
"processArticlesJob"
                .incrementer(new RunIdIncrementer()) 
                .flow(processArticlesStep) 
                .end() 
                .build(); 
    } 

  • 定义处理步骤:在使用自定义读取器和写入器的作业中配置处理步骤。设置必要的块大小并指定任何其他处理逻辑(如果需要)。
  • 运行作业:通过调度程序、命令行界面或任何其他触发机制运行 Spring Batch 作业。监控作业执行以确保文章得到正确处理和存储。
  • 测试和调试:彻底测试集成,检查数据检索、处理和存储是否正确。调试作业执行过程中出现的任何问题。