Spring批处理分区分片

  在Spring batch中,Partitioning意味着对数据进行分片,然后每片实现专门处理,假设单线程处理100个数据需要10分钟,但是 我们将100个数据分片成十块,每块单独处理:

Thread 1 - Process from 1 to 10
Thread 2 - Process from 11 to 20
Thread 3 - Process from 21 to 30
......
Thread 9 - Process from 81 to 90
Thread 10 - Process from 91 to 100

这样整个过程可能只需要1 分钟。

分区原理是一个master主处理器对应多个从slave处理器:

从处理器的节点可以是远程服务器的服务,也可以是本地执行的线程。主处理器发送给从处理器的消息是不需要持久或实现JMS那种严格的保证消息传递的,Spring Batch元数据JobRepository会确保每个slave执行一次,每次Job执行只执行一次。

本地分区

   我们的案例是从数据库表user导出数据库到User对象,然后到保存10个csv文件中。

看看csv文件:

从数据表写入数据后大概为:

1 dd password 30
2 dd password 24
3 dd password 22
4 dd password 28
5 dd password 34
6 dd password 20

姓名:密码:年龄

用户User模型是:


@Data
@Entity
public class User {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
int id;
String username;
String password;
int age;
}

这里使用了lomok,节省setter/getter代码编写,使用了JPA注释主要是为了能自动生成数据表user:


CREATE TABLE `user` (
`id` int(11) NOT NULL ,
`username` varchar(45) NOT NULL default '',
`password` varchar(45) NOT NULL default '',
`age` int(11) ,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; insert into `user` (`id`, `age`, `password`, `username`) values('1','30','password','1dd');
insert into `user` (`id`, `age`, `password`, `username`) values('2','24','password','2dd');
insert into `user` (`id`, `age`, `password`, `username`) values('3','22','password','3dd');
insert into `user` (`id`, `age`, `password`, `username`) values('4','28','password','4dd');
insert into `user` (`id`, `age`, `password`, `username`) values('5','34','password','5dd');
insert into `user` (`id`, `age`, `password`, `username`) values('6','20','password','6dd');

 

将这个sql存入schema.sql和application.properties放一起,这样启动时再配置一下,就可以自动生成数据表,application.properties配置如下:

spring.batch.initialize-schema=always

spring.datasource.url=jdbc:mysql://localhost:3306/mytest
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

spring.jpa.generate-ddl=true

最后一行是自动生成User表,第一行是生成Spring batch 自身的表。

基础条件准备好了,看看分区核心类:

@Configuration
@EnableBatchProcessing
public class PartitionerJob {

@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;

@Bean
public Job PartitionJob() {
return jobBuilderFactory.get("partitionJob").incrementer(new RunIdIncrementer())
.start(masterStep()).next(step2()).build();
}

@Bean
public Step step2() {
return stepBuilderFactory.get("step2").tasklet(dummyTask()).build();
}

@Bean
public DummyTasklet dummyTask() {
return new DummyTasklet();
}

@Bean
public Step masterStep() {
return stepBuilderFactory.get("masterStep").partitioner(slave().getName(), rangePartitioner())
.partitionHandler(masterSlaveHandler()).build();
}

@Bean
public PartitionHandler masterSlaveHandler() {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setGridSize(10);
handler.setTaskExecutor(taskExecutor());
handler.setStep(slave());
try {
handler.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return handler;
}

@Bean(name = "slave")
public Step slave() {
log.info("...........called slave .........");

return stepBuilderFactory.get("slave").<User, User>chunk(100)
.reader(slaveReader(null, null, null))
.processor(slaveProcessor(null)).writer(slaveWriter(null, null)).build();
}

@Bean
public RangePartitioner rangePartitioner() {
return new RangePartitioner();
}

@Bean
public SimpleAsyncTaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}

@Bean
@StepScope
public UserProcessor slaveProcessor(@Value("#{stepExecutionContext[name]}") String name) {
log.info("********called slave processor **********");
UserProcessor userProcessor = new UserProcessor();
userProcessor.setThreadName(name);
return userProcessor;
}

@Bean
@StepScope
public JdbcPagingItemReader<User> slaveReader(
@Value("#{stepExecutionContext[fromId]}") final String fromId,
@Value("#{stepExecutionContext[toId]}") final String toId,
@Value("#{stepExecutionContext[name]}") final String name) {
log.info("slaveReader start " + fromId + " " + toId);
JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setQueryProvider(queryProvider());
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("fromId", fromId);
parameterValues.put("toId", toId);
log.info("Parameter Value " + name + " " + parameterValues);
reader.setParameterValues(parameterValues);
reader.setPageSize(1000);
reader.setRowMapper(new BeanPropertyRowMapper<User>() {{
setMappedClass(User.class);
}});
log.info("slaveReader end " + fromId + " " + toId);
return reader;
}

@Bean
public PagingQueryProvider queryProvider() {
log.info("queryProvider start ");
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setDataSource(dataSource);
provider.setSelectClause("select id, username, password, age");
provider.setFromClause("from user");
provider.setWhereClause("where id >= :fromId and id <= :toId");
provider.setSortKey("id");
log.info("queryProvider end ");
try {
return provider.getObject();
} catch (Exception e) {
log.info("queryProvider exception ");
e.printStackTrace();
}

return null;
}

@Bean
@StepScope
public FlatFileItemWriter<User> slaveWriter(
@Value("#{stepExecutionContext[fromId]}") final String fromId,
@Value("#{stepExecutionContext[toId]}") final String toId) {
FlatFileItemWriter<User> reader = new FlatFileItemWriter<>();
reader.setResource(new FileSystemResource(
"csv/outputs/users.processed" + fromId + "-" + toId + ".csv"));
//reader.setAppendAllowed(false);
reader.setLineAggregator(new DelimitedLineAggregator<User>() {{
setDelimiter(",");
setFieldExtractor(new BeanWrapperFieldExtractor<User>() {{
setNames(new String[]{"id", "username", "password", "age"});
}});
}});
return reader;
}
}

在PartitionerJob:

  1. 在TaskExecutorPartitionHandler设置实际线程数的网格大小。
  2. 对于slaveReader, ,#{stepExecutionContext[fromId]},#{stepExecutionContext[toId]和#{stepExecutionContext[name]值将被注入ExecutionContext中rangePartitioner。
  3. 对于writer,每个线程将以不同的csv文件输出记录,文件名格式为:users.processed [fromId]} - [toId] .csv。
  4. DummyTasklet是一个汇聚任务,从处理结束后的汇聚,这里什么都不做

 

主节点步骤如下:

public Step masterStep() {
return stepBuilderFactory.get("masterStep").partitioner(slave().getName(), rangePartitioner())
.partitionHandler(masterSlaveHandler()).build();
} @Bean
public RangePartitioner rangePartitioner() {
return new RangePartitioner();
}

这里有一个rangePartitioner对象,是我们的核心分区类,对每个分区后的从节点处理器具体业务:


@Slf4j
public class RangePartitioner implements Partitioner {

@Override
public Map<String, ExecutionContext> partition(int gridSize) {
log.info("partition called gridsize= " + gridSize);

Map<String, ExecutionContext> result
= new HashMap<String, ExecutionContext>();

int range = 10;
int fromId = 1;
int toId = range;

for (int i = 1; i <= gridSize; i++) {
ExecutionContext value = new ExecutionContext();

System.out.println("\nStarting : Thread" + i);
System.out.println("fromId : " + fromId);
System.out.println("toId : " + toId);

value.putInt("fromId", fromId);
value.putInt("toId", toId);

// give each thread a name, thread 1,2,3
value.putString("name", "Thread" + i);

result.put("partition" + i, value);

fromId = toId + 1;
toId += range;

}
return result;
}
}

核心类是在每个线程输出:


Starting : Thread1
fromId : 1
toId : 10

Starting : Thread2
fromId : 11
toId : 20

Starting : Thread3
fromId : 21
toId : 30

Starting : Thread4
fromId : 31
toId : 40

Starting : Thread5
fromId : 41
toId : 50

Starting : Thread6
fromId : 51
toId : 60

Starting : Thread7
fromId : 61
toId : 70

Starting : Thread8
fromId : 71
toId : 80

Starting : Thread9
fromId : 81
toId : 90

Starting : Thread10
fromId : 91
toId : 100

现在再回到看看主步骤后面一个动作,设置了masterSlaveHandler类:

@Bean
public PartitionHandler masterSlaveHandler() {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setGridSize(10);
handler.setTaskExecutor(taskExecutor());
handler.setStep(slave());
try {
handler.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace();
}
return handler;
}

这里设置了分片大小为10个,同时设置了异步任务执行器,也将从节点的步骤设置进去。

以上源码可见:github

远程分区

    远程分区=本地分区+远程分块,远程分块可见之前教程,实际是远程分块加上JMS消息系统,实现分布式系统协调计算。具体代码可见spring batch remote partition

我们看一下主要步骤代码:

@Bean
public Job remotePartitioningJob() {
return this.jobBuilderFactory.get("remotePartitioningJob")
.start(masterStep())
.build();
}
/*
* Configure the master step
*/
@Bean
public Step masterStep() {
return this.masterStepBuilderFactory.get("masterStep")
.partitioner("workerStep", new BasicPartitioner())
.gridSize(GRID_SIZE)
.outputChannel(requests())
.build();
}

这里的BasicPartitioner类似rangePartitioner,后面就与本地分区不一样了,设置了grid大小以后,就输出到JMS outputChanner,与从节点进行通信,看看从节点代码:

/*
* Configure the worker step
*/
@Bean
public Step workerStep() {
return this.workerStepBuilderFactory.get("workerStep")
.inputChannel(requests())
.tasklet(tasklet(null))
.build();
}

@Bean
@StepScope
public Tasklet tasklet(@Value("#{stepExecutionContext['partition']}") String partition) {
return (contribution, chunkContext) -> {
System.out.println("processing " + partition);
return RepeatStatus.FINISHED;
};
}

从节点使用inputChannel从JMS监听到消息以后,运行tasklet。

这里主节点发生消息到从节点以后,从节点并没有返回响应,如果希望获得从节点结果汇聚到主节点,类似Map/reduce的reduce作用,那么主节点代码如下:

@Bean
public Step masterStep() {
return this.masterStepBuilderFactory.get("masterStep")
.partitioner("workerStep", new BasicPartitioner())
.gridSize(GRID_SIZE)
.outputChannel(requests())
.inputChannel(replies())
.build();
}

我们发现outputChannel以后有从replies队列用inputChannel获得消息。看看从节点是怎么回复结果的:

@Bean
public Step workerStep() {
return this.workerStepBuilderFactory.get("workerStep")
.inputChannel(requests())
.outputChannel(replies())
.tasklet(tasklet(null))
.build();
}

这里是inputChannel以后再输出outputChannel到replies队列。

Spring batch专题