Spring并行批处理

  Spring Batch提供了可处理大量记录所必需的可重用功能,包括日志记录/跟踪,事务管理,作业处理统计,作业重启,跳过和资源管理等交叉问题。这里展示一个并行运行多个作业的示例,作业彼此独立并以并行方式完成执行。SpringBatch入口概念是Job,一个Job由多个step步骤组成,通过步骤的不同并行方式实现并行批处理,步骤并行模式有以下几个方式:

  • 步骤step是多线程(单个处理过程)

  • 步骤step是并行的(单个处理过程 )

  • 步骤是远程分块(多个处理过程)

  • 步骤分区数据分片(单个或多个过程)

     

多线程步骤

启动并行处理的最简单方法是在Step配置中添加一个TaskExecutor。

@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}

TaskExecutor 是一个标准的Spring接口,最简单的多线程TaskExecutor是 SimpleAsyncTaskExecutor,上述配置的结果是Step通过在单独的执行线程中进行读取,处理和输出每个块。请注意,这意味着处理条目是没有固定的顺序。线程池默认为4个线程.你增加此限制以确保线程池是充分利用

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.throttleLimit(20)
.build();
}

如果在步骤中使用了数据库连接池,这些连接池的最大连接数也可能会限制批处理的并发性,确保这些资源池中设置至少与步骤中所需的并发线程数一样大。

步骤step对于多线程使用还是有一定限制,其条目的读入、处理和输出处理器都是有状态的。如果状态没有被线程隔离,那么这些组件在多线程中不可用Step。你可以使用SynchronizedItemStreamReader确保线程安全。

并行步骤

    如果你的业务逻辑可以分成不同的职责并分配给各个步骤,那么它就可以在一个流程中并行化。并行步执行易于配置和使用。

首先使用FlowBuilder构建一个个小的flow流程,在这个流程里面指定步骤,两个流程flow是并行执行的,下面有两个并行流flow1和flow2,flow1里面有step1 step2先后顺序,flow2有step3,也就是说{step1,step2}一起和step3是并行的:

@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
}

@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}

@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}

@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}

@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("spring_batch");
}

需要指定TaskExecutor 应该使用哪个实现来执行各个流。默认值为 SyncTaskExecutor没有用,需要异步TaskExecutor才能并行运行这些步骤。

再看一个并行案例:

@Bean
public Job parallelStepsJob() {

Flow masterFlow = new FlowBuilder<Flow>("masterFlow").start(taskletStep("step1")).build();


Flow flowJob1 = new FlowBuilder<Flow>("flow1").start(taskletStep("step2")).build();
Flow flowJob2 = new FlowBuilder<Flow>("flow2").start(taskletStep("step3")).build();
Flow flowJob3 = new FlowBuilder<Flow>("flow3").start(taskletStep("step4")).build();

Flow slaveFlow = new FlowBuilder<Flow>("splitflow")
.split(new SimpleAsyncTaskExecutor()).add(flowJob1, flowJob2, flowJob3).build();

return (jobBuilderFactory.get("parallelFlowJob")
.incrementer(new RunIdIncrementer())
.start(masterFlow)
.next(slaveFlow)
.build()).build();

}


private TaskletStep taskletStep(String step) {
return stepBuilderFactory.get(step).tasklet((contribution, chunkContext) -> {
IntStream.range(1, 100).forEach(token -> logger.info("Step:" + step + " token:" + token));
return RepeatStatus.FINISHED;
}).build();

}

这里有四个流程,主流程masterFlow是最先开始,然后并行的是flowJob1, flowJob2, flowJob3

下面我们将分析分布式的处理方式。见 spring batch批处理

 

 

猜你喜欢