Spring Batch中通过多线程和异步处理提高性能

自计算机使用兴起以来,公司出于不同的目的始终依赖批处理数据,要么是在应用程序之间移动数据 (ETL),要么是进行一些需要很长时间才能实时完成的并行计算。

处理大量数据的挑战始终在于如何充分利用可用的计算资源,从而优化时间和成本。

在批处理这个领域,许多解决方案都将自己作为标准,其中之一就是Spring Batch,它是 JAVA 世界中构建高性能和优化的批处理应用程序的事实上的标准。

案例准备
这里将通过一个简单的作业来演示此功能,该作业执行从数据库读取数据、对其应用一些转换,最后将其写入文件。

数据表:

create table transactions
(
  id                      integer      not null,
  transaction_date        date         not null,
  amount                  numeric      not null,
  created_at              date,
  constraint pk_transactions primary key(id)
);

为了初始化数据库,我创建了以下 postgres 脚本,以向事务表中填充 1 亿条记录。

DO $$
<<block>>
declare
counter     integer := 0;
rec         RECORD;

begin
  --
  for rec in (
    with nums as (
      SELECT
         a id
      FROM generate_series(1, 1000000) as s(a)
    ),
    dates as(
      select row_number() OVER (ORDER BY a) line, a::date as date
      from generate_series(
            '2020-01-01'::date,
            '2020-12-31'::date,
            '1 day'
          ) s(a)
    )
    select row_number() OVER (ORDER BY id) id,
      d.date,
      200*random() amount
    from nums n, dates d
    where d.line <= 100
  )
  loop
    insert into transactions values (rec.id, rec.date, rec.amount, CURRENT_TIMESTAMP);

    counter := counter + 1;

    if MOD(counter, 10000) = 0 then
      raise notice 'Commiting at : %', counter;
      commit ;
    end if;
  end loop;

  raise notice 'Value: %', counter;

END block $$;

为了模拟 1 毫秒的处理时间,我添加了对 Thread.sleep 方法的调用。在现实生活中,处理逻辑可能非常复杂,每个项目的处理时间可能不止 1 毫秒(例如:调用外部网络服务)。

 

  @Bean
    public ItemProcessor<TransactionVO, TransactionVO>.   multithreadedchProcessor() {
        return (transaction) -> {
            Thread.sleep(1);            
            return transaction;
        };
    }


在单线程处理过程中,读取、处理和写入都是在单线程中同步执行的。


多线程步骤
在本文中,我们了解了如何使用 TaskExecutor 对作业步骤进行多线程处理,从而与单线程步骤相比获得大量处理时间。

多线程在步骤级别启用,并且它在自己的线程中执行每个块。

在 Spring Batch 作业中使用多个线程非常简单。您定义一个taskExecutor并在相关步骤中引用它。这会为每个数据块创建一个新线程,同时处理它们。这可以显着提高性能。

然而,大多数 Spring Batch 读者都持有状态(进度信息)。这对于作业重新启动很有用。在多线程环境中,如果不正确同步,此类有状态读取器可能会导致数据不一致。

@Bean
public Step multithreadedManagerStep(StepBuilderFactory stepBuilderFactory) throws Exception {
    return stepBuilderFactory
            .get("Multithreaded : Read -> Process -> Write ")
            .<TransactionVO, TransactionVO>chunk(1000)
            .reader(multithreadedcReader(null))
            .processor(multithreadedchProcessor())
            .writer(multithreadedcWriter())
            .taskExecutor(taskExecutor())
            .build();
}
@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(64);
    executor.setMaxPoolSize(64);
    executor.setQueueCapacity(64);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setThreadNamePrefix(
"MultiThreaded-");
    return executor;
}

以下是整个多线程演示程序的源代码。

@SpringBootApplication
@EnableBatchProcessing
public class MultithreadedBatchPerformanceApplication {

    public static void main(String[] args) {
        SpringApplication.run(MultithreadedBatchPerformanceApplication.class, args);
    }

    @Bean
    public Job multithreadedJob(JobBuilderFactory jobBuilderFactory) throws Exception {
        return jobBuilderFactory
                .get("Multithreaded JOB")
                .incrementer(new RunIdIncrementer())
                .flow(multithreadedManagerStep(null))
                .end()
                .build();
    }

    @Bean
    public Step multithreadedManagerStep(StepBuilderFactory stepBuilderFactory) throws Exception {
        return stepBuilderFactory
                .get(
"Multithreaded : Read -> Process -> Write ")
                .<TransactionVO, TransactionVO>chunk(1000)
                .reader(multithreadedcReader(null))
                .processor(multithreadedchProcessor())
                .writer(multithreadedcWriter())
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(64);
        executor.setMaxPoolSize(64);
        executor.setQueueCapacity(64);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix(
"MultiThreaded-");
        return executor;
    }

    @Bean
    public ItemProcessor<TransactionVO, TransactionVO> multithreadedchProcessor() {
        return (transaction) -> {
            Thread.sleep(1);
           
//og.info(Thread.currentThread().getName());
            return transaction;
        };
    }

    @Bean
    public ItemReader<TransactionVO> multithreadedcReader(DataSource dataSource) throws Exception {

        return new JdbcPagingItemReaderBuilder<TransactionVO>()
                .name(
"Reader")
                .dataSource(dataSource)
                .selectClause(
"SELECT * ")
                .fromClause(
"FROM transactions ")
                .whereClause(
"WHERE ID <= 1000000 ")
                .sortKeys(Collections.singletonMap(
"ID", Order.ASCENDING))
                .rowMapper(new TransactionVORowMapper())
                .build();
    }

    @Bean
    public FlatFileItemWriter<TransactionVO> multithreadedcWriter() {

        return new FlatFileItemWriterBuilder<TransactionVO>()
                .name(
"Writer")
                .append(false)
                .resource(new FileSystemResource(
"transactions.txt"))
                .lineAggregator(new DelimitedLineAggregator<TransactionVO>() {
                    {
                        setDelimiter(
";");
                        setFieldExtractor(new BeanWrapperFieldExtractor<TransactionVO>() {
                            {
                                setNames(new String[]{
"id", "date", "amount", "createdAt"});
                            }
                        });
                    }
                })
                .build();
    }
}

执行该批处理后,大约 6 分钟就能将 100 万条记录写入文件。

任务:[FlowJob: [name=Synchronous JOB]] 已完成,参数如下:{run.id=10}]和以下状态:[完成],时间为 6m12s265ms

在这种规模下,单线程步骤处理 100 万个项目所需的时间几乎是多线程步骤的 4 倍。考虑到在我们的示例中,处理时间为 1ms/条目,这是一个巨大的收益。

缺点
由于配置简单,多线程步骤比其他功能具有巨大的优势,但它们也不乏缺点,其中:

  • 作业无法重新启动。如果作业失败,它就无法从原来的位置继续执行
  • 没有处理物品的订单保证
  • 您应该考虑线程安全(示例可能不使用基于游标的 JDBC 项目读取器)

异步处理
异步处理是另一种提升 Spring Batch 性能的技术。它将通过在单独的线程中执行来扩展每个项目的处理,一旦完成,它就会返回AsyncItemWriter 将处理的Future 。

当步骤的瓶颈是处理时,这种缩放技术特别有用,例如,您有一个读取器从 csv 文件读取项目,并且对于每个项目读取,项目处理器需要访问外部 API 并执行一些复杂的操作计算,然后将结果写入目的地。

在 Spring 批处理中有两个类可以帮助实现此机制:AsyncItemProcessor 和 AsyncItemWriter,它们分别是 ItemProcess 和 ItemWriter 的装饰器。 AsyncItemWriter 只是为了方便解开不同 AsyncItemProcessor 返回的Futures 。

过使用 2 个装饰器类启用异步处理:

  • 异步项目处理器

该类将处理委托给 ItemProcessor,并允许通过设置taskExecutor 进行多线程处理。

@Bean
public AsyncItemProcessor<TransactionVO, TransactionVO> asyncProcessor() {
    AsyncItemProcessor<TransactionVO, TransactionVO> asyncItemProcessor = new AsyncItemProcessor<>();
    asyncItemProcessor.setDelegate(itemProcessor());
    asyncItemProcessor.setTaskExecutor(taskExecutor());

    return asyncItemProcessor;
}
@Bean
public ItemProcessor<TransactionVO, TransactionVO> itemProcessor() {
    return (transaction) -> {
        Thread.sleep(1);
        return transaction;
    };
}

  • 异步项目编写器

由项目处理器处理的项目被包装到Future中并传递给编写器以解开包装并写入其目的地。

@Bean
public AsyncItemWriter<TransactionVO> asyncWriter() {
    AsyncItemWriter<TransactionVO> asyncItemWriter = new AsyncItemWriter<>();
    asyncItemWriter.setDelegate(itemWriter());
    return asyncItemWriter;
}


以下是整个异步处理演示程序的源代码。

@SpringBootApplication
@EnableBatchProcessing
public class AsyncProcessingBatchPerformanceApplication {

    public static void main(String[] args) {
        SpringApplication.run(AsyncProcessingBatchPerformanceApplication.class, args);
    }

    @Bean
    public Job asyncJob(JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory
                .get("Asynchronous Processing JOB")
                .incrementer(new RunIdIncrementer())
                .flow(asyncManagerStep(null))
                .end()
                .build();
    }

    @Bean
    public Step asyncManagerStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory
                .get(
"Asynchronous Processing : Read -> Process -> Write ")
                .<TransactionVO, Future<TransactionVO>>chunk(1000)
                .reader(asyncReader(null))
                .processor(asyncProcessor())
                .writer(asyncWriter())
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public AsyncItemProcessor<TransactionVO, TransactionVO> asyncProcessor() {
        AsyncItemProcessor<TransactionVO, TransactionVO> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(itemProcessor());
        asyncItemProcessor.setTaskExecutor(taskExecutor());

        return asyncItemProcessor;
    }

    @Bean
    public AsyncItemWriter<TransactionVO> asyncWriter() {
        AsyncItemWriter<TransactionVO> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(itemWriter());
        return asyncItemWriter;
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(64);
        executor.setMaxPoolSize(64);
        executor.setQueueCapacity(64);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix(
"MultiThreaded-");
        return executor;
    }

    @Bean
    public ItemProcessor<TransactionVO, TransactionVO> itemProcessor() {
        return (transaction) -> {
            Thread.sleep(1);
            return transaction;
        };
    }

    @Bean
    public ItemReader<TransactionVO> asyncReader(DataSource dataSource) {

        return new JdbcPagingItemReaderBuilder<TransactionVO>()
                .name(
"Reader")
                .dataSource(dataSource)
                .selectClause(
"SELECT * ")
                .fromClause(
"FROM transactions ")
                .whereClause(
"WHERE ID <= 1000000 ")
                .sortKeys(Collections.singletonMap(
"ID", Order.ASCENDING))
                .rowMapper(new TransactionVORowMapper())
                .build();
    }

    @Bean
    public FlatFileItemWriter<TransactionVO> itemWriter() {

        return new FlatFileItemWriterBuilder<TransactionVO>()
                .name(
"Writer")
                .append(false)
                .resource(new FileSystemResource(
"transactions.txt"))
                .lineAggregator(new DelimitedLineAggregator<TransactionVO>() {
                    {
                        setDelimiter(
";");
                        setFieldExtractor(new BeanWrapperFieldExtractor<TransactionVO>() {
                            {
                                setNames(new String[]{
"id", "date", "amount", "createdAt"});
                            }
                        });
                    }
                })
                .build();
    }
}

执行该批处理后,大约 2 分钟就能将 100 万条记录写入文件。

任务:[FlowJob: [name=Asynchronous Processing JOB]] 已完成,参数如下:{run.id=2}]和以下状态:[完成],时间为 2m6s76ms


缺点

  • 没有处理条目的顺序保证

本文的源代码可以在GitHub 存储库中找到。