Spring批处理远程分块

  Spring Batch的远程分块其实是一种主从分布式处理模式,一个主处理机和一个或多个从处理机,这样提高了批处理的计算能力,主从之间是通过消息中间件JMS进行通信。

在远程分块中,Step处理分为主从多个处理过程,主从之间通过一些中间件相互通信。下图显示了该模式:

master主组件是一个单个处理,slave从属组件则是多个远程处理过程。如果主处理不会造成瓶颈,这种模式效果最好,因为一般情况下输出处理必须比读取处理更昂贵(在实践中通常是这种情况)。

主master组件知道如何使用消息的通用版本发送条目item块到消息中间件,slave从属服务器是消息中间件的标准监听器(例如如果使用JMS,则是MesssageListener实现),然后调用ItemWriter或ItemProcessor 来处理条目块。

使用此模式的一个优点是读取器、处理器和输出器等组件是现成的(与用于本地执行步骤的组件相同)。这些项目是动态划分的,工作通过中间件共享,因此,如果监听器都是繁忙消费者,那么将自动加入负载平衡。

Spring Batch集成

   Spring批处理通常是和消息系统等中间件集成的,向批处理流程添加消息传递可实现操作的自动化,也可实现方式和策略的分离,比如消息触发要执行的作业,然后以各种方式发送消息。或者,当作业完成或失败时,会触发发送的相应成功或失败的消息等等,消息的使用者可能与批处理应用本身无关。

  消息传递也可以嵌入到批处理的作业执行中,远程分区和远程分块提供了将工作负载分配给多个处理器工作。

本项目我们使用spring-batch-integration实现远程分块。

这里我们使用ActiveMQ作为JMS的实现,主处理和从处理之间通过两个队列通讯,requests队列时主节点通过chunkMessageChannelItemWriter发往从节点的,而从节点通过ChunkProcessorChunkHandler接受到消息后,再将结果发往主节点的消息通过replies。

主节点的Job代码如下:


@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public TaskletStep masterStep() {
return this.stepBuilderFactory.get("masterStep")
.<Integer, Integer>chunk(3)
.reader(itemReader())
.writer(itemWriter())
.build();
}

@Bean
public Job remoteChunkingJob() {
return this.jobBuilderFactory.get("remoteChunkingJob")
.start(masterStep())
.build();
} @Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6));
}

@Bean
public ItemWriter<Integer> itemWriter() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}

与JMS连接的代码如下:

@Value("${broker.url}")
private String brokerUrl;


@Bean
public ActiveMQConnectionFactory jmsConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(this.brokerUrl);
connectionFactory.setTrustAllPackages(true);
return connectionFactory;
}

/*
* Configure outbound flow (requests going to workers)
*/

@Bean
public DirectChannel requests() {
return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory jmsConnectionFactory) {
return IntegrationFlows
.from(requests())
.handle(Jms.outboundAdapter(jmsConnectionFactory).destination("requests"))
.get();
}

/*
* Configure inbound flow (replies coming from workers)
*/

@Bean
public QueueChannel replies() {
return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory jmsConnectionFactory) {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory).destination("replies"))
.channel(replies())
.get();
}

Application.properties:

broker.url=tcp://localhost:61616

spring.batch.initialize-schema=always

spring.datasource.url=jdbc:mysql://localhost:3306/mytest
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

从节点的Job如下:

/*
* Configure worker components
*/

@Bean
public ItemProcessor<Integer, Integer> itemProcessor() {
return item -> {
System.out.println("processing item " + item);
return item;
};
}

@Bean
public ItemWriter<Integer> itemWriter() {
return items -> {
for (Integer item : items) {
System.out.println("writing item " + item + System.currentTimeMillis());
}
};
}

@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
ChunkProcessor<Integer> chunkProcessor = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>();
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
return chunkProcessorChunkHandler;
}

从节点连接JMS代码类似于主节点,自动实现了JMS的侦听器作用。从节点主要是在chunkProcessorChunkHandler这里做了处理和输出回到主节点。

本节源码:Github

Spring batch