自计算机使用兴起以来,公司出于不同的目的始终依赖批处理数据,要么是在应用程序之间移动数据 (ETL),要么是进行一些需要很长时间才能实时完成的并行计算。
处理大量数据的挑战始终在于如何充分利用可用的计算资源,从而优化时间和成本。
在批处理这个领域,许多解决方案都将自己作为标准,其中之一就是Spring Batch,它是 JAVA 世界中构建高性能和优化的批处理应用程序的事实上的标准。
案例准备
这里将通过一个简单的作业来演示此功能,该作业执行从数据库读取数据、对其应用一些转换,最后将其写入文件。
数据表:
create table transactions ( id integer not null, transaction_date date not null, amount numeric not null, created_at date, constraint pk_transactions primary key(id) );
|
为了初始化数据库,我创建了以下 postgres 脚本,以向事务表中填充 1 亿条记录。
DO $$ <<block>> declare counter integer := 0; rec RECORD;
begin -- for rec in ( with nums as ( SELECT a id FROM generate_series(1, 1000000) as s(a) ), dates as( select row_number() OVER (ORDER BY a) line, a::date as date from generate_series( '2020-01-01'::date, '2020-12-31'::date, '1 day' ) s(a) ) select row_number() OVER (ORDER BY id) id, d.date, 200*random() amount from nums n, dates d where d.line <= 100 ) loop insert into transactions values (rec.id, rec.date, rec.amount, CURRENT_TIMESTAMP);
counter := counter + 1;
if MOD(counter, 10000) = 0 then raise notice 'Commiting at : %', counter; commit ; end if; end loop;
raise notice 'Value: %', counter;
END block $$;
|
为了模拟 1 毫秒的处理时间,我添加了对 Thread.sleep 方法的调用。在现实生活中,处理逻辑可能非常复杂,每个项目的处理时间可能不止 1 毫秒(例如:调用外部网络服务)。
@Bean public ItemProcessor<TransactionVO, TransactionVO>. multithreadedchProcessor() { return (transaction) -> { Thread.sleep(1); return transaction; }; }
|
在单线程处理过程中,读取、处理和写入都是在单线程中同步执行的。
多线程步骤
在本文中,我们了解了如何使用 TaskExecutor 对作业步骤进行多线程处理,从而与单线程步骤相比获得大量处理时间。
多线程在步骤级别启用,并且它在自己的线程中执行每个块。
在 Spring Batch 作业中使用多个线程非常简单。您定义一个taskExecutor并在相关步骤中引用它。这会为每个数据块创建一个新线程,同时处理它们。这可以显着提高性能。
然而,大多数 Spring Batch 读者都持有状态(进度信息)。这对于作业重新启动很有用。在多线程环境中,如果不正确同步,此类有状态读取器可能会导致数据不一致。
@Bean public Step multithreadedManagerStep(StepBuilderFactory stepBuilderFactory) throws Exception { return stepBuilderFactory .get("Multithreaded : Read -> Process -> Write ") .<TransactionVO, TransactionVO>chunk(1000) .reader(multithreadedcReader(null)) .processor(multithreadedchProcessor()) .writer(multithreadedcWriter()) .taskExecutor(taskExecutor()) .build(); } @Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(64); executor.setMaxPoolSize(64); executor.setQueueCapacity(64); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setThreadNamePrefix("MultiThreaded-"); return executor; }
|
以下是整个多线程演示程序的源代码。
@SpringBootApplication @EnableBatchProcessing public class MultithreadedBatchPerformanceApplication {
public static void main(String[] args) { SpringApplication.run(MultithreadedBatchPerformanceApplication.class, args); }
@Bean public Job multithreadedJob(JobBuilderFactory jobBuilderFactory) throws Exception { return jobBuilderFactory .get("Multithreaded JOB") .incrementer(new RunIdIncrementer()) .flow(multithreadedManagerStep(null)) .end() .build(); }
@Bean public Step multithreadedManagerStep(StepBuilderFactory stepBuilderFactory) throws Exception { return stepBuilderFactory .get("Multithreaded : Read -> Process -> Write ") .<TransactionVO, TransactionVO>chunk(1000) .reader(multithreadedcReader(null)) .processor(multithreadedchProcessor()) .writer(multithreadedcWriter()) .taskExecutor(taskExecutor()) .build(); }
@Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(64); executor.setMaxPoolSize(64); executor.setQueueCapacity(64); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setThreadNamePrefix("MultiThreaded-"); return executor; }
@Bean public ItemProcessor<TransactionVO, TransactionVO> multithreadedchProcessor() { return (transaction) -> { Thread.sleep(1); //og.info(Thread.currentThread().getName()); return transaction; }; }
@Bean public ItemReader<TransactionVO> multithreadedcReader(DataSource dataSource) throws Exception {
return new JdbcPagingItemReaderBuilder<TransactionVO>() .name("Reader") .dataSource(dataSource) .selectClause("SELECT * ") .fromClause("FROM transactions ") .whereClause("WHERE ID <= 1000000 ") .sortKeys(Collections.singletonMap("ID", Order.ASCENDING)) .rowMapper(new TransactionVORowMapper()) .build(); }
@Bean public FlatFileItemWriter<TransactionVO> multithreadedcWriter() {
return new FlatFileItemWriterBuilder<TransactionVO>() .name("Writer") .append(false) .resource(new FileSystemResource("transactions.txt")) .lineAggregator(new DelimitedLineAggregator<TransactionVO>() { { setDelimiter(";"); setFieldExtractor(new BeanWrapperFieldExtractor<TransactionVO>() { { setNames(new String[]{"id", "date", "amount", "createdAt"}); } }); } }) .build(); } }
|
执行该批处理后,大约 6 分钟就能将 100 万条记录写入文件。
任务:[FlowJob: [name=Synchronous JOB]] 已完成,参数如下:{run.id=10}]和以下状态:[完成],时间为 6m12s265ms
在这种规模下,单线程步骤处理 100 万个项目所需的时间几乎是多线程步骤的 4 倍。考虑到在我们的示例中,处理时间为 1ms/条目,这是一个巨大的收益。
缺点
由于配置简单,多线程步骤比其他功能具有巨大的优势,但它们也不乏缺点,其中:
- 作业无法重新启动。如果作业失败,它就无法从原来的位置继续执行
- 没有处理物品的订单保证
- 您应该考虑线程安全(示例可能不使用基于游标的 JDBC 项目读取器)
异步处理
异步处理是另一种提升 Spring Batch 性能的技术。它将通过在单独的线程中执行来扩展每个项目的处理,一旦完成,它就会返回AsyncItemWriter 将处理的Future 。
当步骤的瓶颈是处理时,这种缩放技术特别有用,例如,您有一个读取器从 csv 文件读取项目,并且对于每个项目读取,项目处理器需要访问外部 API 并执行一些复杂的操作计算,然后将结果写入目的地。
在 Spring 批处理中有两个类可以帮助实现此机制:AsyncItemProcessor 和 AsyncItemWriter,它们分别是 ItemProcess 和 ItemWriter 的装饰器。 AsyncItemWriter 只是为了方便解开不同 AsyncItemProcessor 返回的Futures 。
过使用 2 个装饰器类启用异步处理:
该类将处理委托给 ItemProcessor,并允许通过设置taskExecutor 进行多线程处理。@Bean public AsyncItemProcessor<TransactionVO, TransactionVO> asyncProcessor() { AsyncItemProcessor<TransactionVO, TransactionVO> asyncItemProcessor = new AsyncItemProcessor<>(); asyncItemProcessor.setDelegate(itemProcessor()); asyncItemProcessor.setTaskExecutor(taskExecutor());
return asyncItemProcessor; } @Bean public ItemProcessor<TransactionVO, TransactionVO> itemProcessor() { return (transaction) -> { Thread.sleep(1); return transaction; }; }
|
由项目处理器处理的项目被包装到Future中并传递给编写器以解开包装并写入其目的地。@Bean public AsyncItemWriter<TransactionVO> asyncWriter() { AsyncItemWriter<TransactionVO> asyncItemWriter = new AsyncItemWriter<>(); asyncItemWriter.setDelegate(itemWriter()); return asyncItemWriter; }
|
以下是整个异步处理演示程序的源代码。
@SpringBootApplication @EnableBatchProcessing public class AsyncProcessingBatchPerformanceApplication {
public static void main(String[] args) { SpringApplication.run(AsyncProcessingBatchPerformanceApplication.class, args); }
@Bean public Job asyncJob(JobBuilderFactory jobBuilderFactory) { return jobBuilderFactory .get("Asynchronous Processing JOB") .incrementer(new RunIdIncrementer()) .flow(asyncManagerStep(null)) .end() .build(); }
@Bean public Step asyncManagerStep(StepBuilderFactory stepBuilderFactory) { return stepBuilderFactory .get("Asynchronous Processing : Read -> Process -> Write ") .<TransactionVO, Future<TransactionVO>>chunk(1000) .reader(asyncReader(null)) .processor(asyncProcessor()) .writer(asyncWriter()) .taskExecutor(taskExecutor()) .build(); }
@Bean public AsyncItemProcessor<TransactionVO, TransactionVO> asyncProcessor() { AsyncItemProcessor<TransactionVO, TransactionVO> asyncItemProcessor = new AsyncItemProcessor<>(); asyncItemProcessor.setDelegate(itemProcessor()); asyncItemProcessor.setTaskExecutor(taskExecutor());
return asyncItemProcessor; }
@Bean public AsyncItemWriter<TransactionVO> asyncWriter() { AsyncItemWriter<TransactionVO> asyncItemWriter = new AsyncItemWriter<>(); asyncItemWriter.setDelegate(itemWriter()); return asyncItemWriter; }
@Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(64); executor.setMaxPoolSize(64); executor.setQueueCapacity(64); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setThreadNamePrefix("MultiThreaded-"); return executor; }
@Bean public ItemProcessor<TransactionVO, TransactionVO> itemProcessor() { return (transaction) -> { Thread.sleep(1); return transaction; }; }
@Bean public ItemReader<TransactionVO> asyncReader(DataSource dataSource) {
return new JdbcPagingItemReaderBuilder<TransactionVO>() .name("Reader") .dataSource(dataSource) .selectClause("SELECT * ") .fromClause("FROM transactions ") .whereClause("WHERE ID <= 1000000 ") .sortKeys(Collections.singletonMap("ID", Order.ASCENDING)) .rowMapper(new TransactionVORowMapper()) .build(); }
@Bean public FlatFileItemWriter<TransactionVO> itemWriter() {
return new FlatFileItemWriterBuilder<TransactionVO>() .name("Writer") .append(false) .resource(new FileSystemResource("transactions.txt")) .lineAggregator(new DelimitedLineAggregator<TransactionVO>() { { setDelimiter(";"); setFieldExtractor(new BeanWrapperFieldExtractor<TransactionVO>() { { setNames(new String[]{"id", "date", "amount", "createdAt"}); } }); } }) .build(); } }
|
执行该批处理后,大约 2 分钟就能将 100 万条记录写入文件。任务:[FlowJob: [name=Asynchronous Processing JOB]] 已完成,参数如下:{run.id=2}]和以下状态:[完成],时间为 2m6s76ms
缺点
本文的源代码可以在GitHub 存储库中找到。