假设有一个您需要定期运行的流程,例如一天结束 (EOD)。假设这个流程中需要处理的数据量在不断增加。
最初,你可以做一个非常简单的 Spring 调度(或者 Quartz 或者你有什么),它只执行一个方法,一次加载所有数据,处理所有数据并将结果写回数据库。
如果读取的行数(例如从数据库中)是 10,000 行,它可能工作得很好,但如果突然有 10 000 000 行怎么办?执行可能会失败,因为内存不足错误
或者需要很长的时间才能完成。
远程分区
获取初始数据集时,例如,如果我们从数据库中读取事务(或任何域对象),我们只获取事务 ID。
将它们划分为分区(不是块;块在 Spring Batch 世界中具有不同的含义)并将分区发送给可以处理它们并执行实际业务逻辑的工作人员。
常规分区和远程分区的主要区别在于工作者的位置。
- 在常规分区的情况下,作为工作者的进程是与正在进行数据分区的进程在同一JVM中的本地线程。
- 但在远程分区的情况下,工作者不是在同一个JVM中运行,而是完全不同的JVM。当有一些工作需要处理时,会通过消息传递系统通知各个工作者。
局限性
Kafka是基于主题运行的。主题可以有分区。你可以拥有的消费者数量(对于同一个消费者组)取决于你对主题的分区数量。这意味着,你的分区批处理作业的并发系数与主题分区的数量直接相关。
一个主题所使用的分区数量应在创建该主题时设置。后来,我们可以改变现有主题的分区数量,但是你必须注意到某些副作用。
这意味着Kafka不可能根据数据量来动态地扩展工作者的数量。我所说的动态是指,有时你需要10个工人,但假设在圣诞节期间数据量大增,你就需要50个。这就需要一些自定义的脚本了。
毕竟,我认为一个好的经验法则--在Kafka的情况下--是过度扩大主题分区的数量。比方说,如果你在非高峰期需要10个消费者,而在高峰期需要20个,我认为你可以选择两倍/三倍的数量,以确保你有增长的空间,而不会有太多的头痛。因此,我认为60是一个很好的分区数字,最多可以支持60个同时进行的消费者。当然,这取决于你的数据量的增长速度,但你应该明白这个道理。
技术栈
- Spring Batch
- Spring Integration
- Spring for Apache Kafka
- MySQL
- Liquibase
Manager
我们将从管理器和它的配置开始。让我们有一个ManagerConfiguration类。我们将需要几个配置的依赖项和两个注释。
@Configuration @Profile("manager") public class ManagerConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory;
@Autowired private RemotePartitioningManagerStepBuilderFactory stepBuilderFactory;
@Autowired private KafkaTemplate kafkaTemplate; }
|
@Profile注解是至关重要的,因为我们只想让这个配置在我们试图运行管理器时启动,我们将用Spring的profile来控制它。JobBuilderFactory将被用来创建我们的分区作业。RemotePartitioningManagerStepBuilderFactory将用于为我们的工作创建步骤,使用这个类而不是普通的StepBuilderFactory非常重要。另外,请注意,有一个非常类似的StepBuilderFactory,叫做RemotePartitioningWorkerStepBuilderFactory,它是用来给工人而不是经理使用的。我们很快就会到那里。
KafkaTemplate是自动为我们配置的,我们将需要它来配置管理器和Kafka之间的通道。
现在,让我们上添加一个通道,我们将把它作为从应用程序到Kafka的输出通道。
@Configuration @Profile("manager") public class ManagerConfiguration { // previous content is omitted for simplicity
@Bean public DirectChannel outboundRequests() { return new DirectChannel(); } }
|
DirectChannel只是Spring Integration中对消息通道的一个抽象。接下来,让我们创建一个Partitioner,对我们的数据集进行分区。这将是一个新的类,我把它叫做ExamplePartitioner。
public class ExamplePartitioner implements Partitioner { public static final String PARTITION_PREFIX = "partition";
@Override public Map<String, ExecutionContext> partition(int gridSize) { int partitionCount = 50; Map<String, ExecutionContext> partitions = new HashMap<>(); for (int i = 0; i < partitionCount; i++) { ExecutionContext executionContext = new ExecutionContext(); executionContext.put("data", new ArrayList<Integer>()); partitions.put(PARTITION_PREFIX + i, executionContext); } for (int i = 0; i < 1000; i++) { String key = PARTITION_PREFIX + (i % partitionCount); ExecutionContext executionContext = partitions.get(key); List<Integer> data = (List<Integer>) executionContext.get("data"); data.add(i + 1); } return partitions; } }
|
这个分区器的实现没有做任何有趣的事情。它创建了50个分区,对于每个分区,它把一些数字放入一个列表中,在关键数据下可以访问。
这意味着,每个分区在列表中会有20个数字。
这就是你可以想象获得交易或任何你想处理的ID的地方,稍后下线,工作者将从数据库中加载相应的行。
很好,让我们创建工作步骤job steps并从分区器中创建一个bean。
@Configuration @Profile("manager") public class ManagerConfiguration { // previous content is omitted for simplicity
@Bean public ExamplePartitioner partitioner() { return new ExamplePartitioner(); }
@Bean public Step partitionerStep() { return stepBuilderFactory.get("partitionerStep") .partitioner(Constants.WORKER_STEP_NAME, partitioner()) .outputChannel(outboundRequests()) .build(); } }
|
没有什么特别的,我们创建了调用分区器的步骤,以及我们想把分区发送到的输出通道。
另外,这里有一个对常量类的引用,让我给你看看它的内容。
public class Constants { public static final String TOPIC_NAME = "work"; public static final String WORKER_STEP_NAME = "simpleStep"; public static final int TOPIC_PARTITION_COUNT = 3; }
|
这就是全部。我们将调用Kafka主题工作,它将有3个主题分区,我们想在分区数据集上调用的工作步骤被称为simpleStep。
很好,现在我们来创建分区器工作。
@Configuration @Profile("manager") public class ManagerConfiguration { // previous content is omitted for simplicity
@Bean(name = "partitionerJob") public Job partitionerJob() { return jobBuilderFactory.get("partitioningJob") .start(partitionerStep()) .incrementer(new RunIdIncrementer()) .build(); } }
|
同样,没有什么特别的,只是引用了我们之前创建的分区器步骤,并在作业中添加了RunIdIncrementer,这样我们就可以轻松地重新运行作业。
很好。现在,我想说的是最复杂的东西,如何将通道接入Kafka,并确保主题分区被正确利用。
我们也会用Spring Integration来做这个:
@Configuration @Profile("manager") public class ManagerConfiguration { // previous content is omitted for simplicity
@Bean public IntegrationFlow outboundFlow() { KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(kafkaTemplate); messageHandler.setTopicExpression(new LiteralExpression(Constants.TOPIC_NAME)); return IntegrationFlows .from(outboundRequests()) .log() .handle(messageHandler) .get(); } }
|
首先,我们需要一个KafkaProducerMessageHandler,它将接收到的消息并将其发布到Kafka主题中。
该主题由setTopicExpression方法调用来标记,最后,我们只需将所有东西作为一个集成流来连接。
然而,这还不会利用主题分区,消息将被发布到同一个分区。
让我们通过setPartitionIdExpression方法为其添加一个自定义表达式。
@Configuration @Profile("manager") public class ManagerConfiguration { // previous content is omitted for simplicity
@Bean public IntegrationFlow outboundFlow() { KafkaProducerMessageHandler messageHandler = new KafkaProducerMessageHandler(kafkaTemplate); messageHandler.setTopicExpression(new LiteralExpression(Constants.TOPIC_NAME)); Function<Message<?>, Long> partitionIdFn = (m) -> { StepExecutionRequest executionRequest = (StepExecutionRequest) m.getPayload(); return executionRequest.getStepExecutionId() % Constants.TOPIC_PARTITION_COUNT; }; messageHandler.setPartitionIdExpression(new FunctionExpression<>(partitionIdFn)); return IntegrationFlows .from(outboundRequests()) .log() .handle(messageHandler) .get(); } }
|
我们提供一个FunctionExpression,它将动态地解开消息,获得stepExecutionId属性并与modulo运算符相结合。
分区计数的当前值是3。这意味着分区ID表达式将从[0, 1, 2]范围内返回一个值,这将表示目标主题分区。
这算是在分区之间提供了一种平均分配,但不是100%。
如果你需要一个复杂的分区ID决定器,你肯定可以调整实现。
另外,你也可以同样使用setMessageKeyExpression方法来提供一个类似的FunctionExpression来计算消息key,而不是直接告诉Kafka要使用哪个分区。
还有一点需要注意的是,我在集成流程中加入了log(),所以发送出去的消息会被记录下来;只是为了调试的目的。
这就是管理器的配置。
Worker
工作者的配置将是类似的。让我们创建一个WorkerConfiguration类。
@Configuration @Profile("worker") public class WorkerConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory;
@Autowired private RemotePartitioningWorkerStepBuilderFactory stepBuilderFactory;
@Autowired private DataSource dataSource;
@Bean public IntegrationFlow inboundFlow(ConsumerFactory<String, String> cf) { return IntegrationFlows .from(Kafka.messageDrivenChannelAdapter(cf, Constants.TOPIC_NAME)) .channel(inboundRequests()) .get(); }
@Bean public QueueChannel inboundRequests() { return new QueueChannel(); } }
|
几个依赖关系,一个用于入站消息的消息通道,并将其与Spring Integration连接起来。
让我们来创建一个工作步骤。
@Configuration @Profile("worker") public class WorkerConfiguration { // previous content is omitted for simplicity
@Bean public Step simpleStep() { return stepBuilderFactory.get(Constants.WORKER_STEP_NAME) .inputChannel(inboundRequests()) .<Integer, Customer>chunk(100) .reader(itemReader(null)) .processor(itemProcessor()) .writer(itemWriter()) .build(); } }
|
这将创建步骤定义,将其与入站消息通道相连,并引用 ItemReader、ItemProcessor 和 ItemWriter 实例。这些看起来如下。
@Configuration @Profile("worker") public class WorkerConfiguration { // previous content is omitted for simplicity @Bean @StepScope public ItemReader<Integer> itemReader(@Value("#{stepExecutionContext['data']}") List<Integer> data) { List<Integer> remainingData = new ArrayList<>(data); return new ItemReader<>() { @Override public Integer read() { if (remainingData.size() > 0) { return remainingData.remove(0); }
return null; } }; } }
|
ItemReader是一个Bean,它将在Spring Batch执行上下文中的数据键下接收分区数据作为一个参数。请注意,必须在Bean定义上使用@StepScope,以便为该步骤启用后期绑定。实现很简单。我们将把收到的ID存储在一个本地列表中,在每个ItemReader调用期间,我们将从列表中删除一个项目,直到没有剩余。
@Configuration @Profile("worker") public class WorkerConfiguration { // previous content is omitted for simplicity @Bean public ItemWriter<Customer> itemWriter() { return new JdbcBatchItemWriterBuilder<Customer>() .beanMapped() .dataSource(dataSource) .sql("INSERT INTO customers (id) VALUES (:id)") .build(); }
@Bean public ItemProcessor<Integer, Customer> itemProcessor() { return new ItemProcessor<>() { @Override public Customer process(Integer item) { return new Customer(item); } }; } }
|
ItemProcessor和ItemWriter则更简单。ItemProcessor只是将ID转换为Customer对象,模拟对DTO的某种处理,ItemWriter只是将Customers写入数据库。
客户类是一个简单的POJO,没有什么特别的。
public class Customer { private int id;
public Customer(int id) { this.id = id; }
public int getId() { return id; } }
|
最后的配置步骤
接下来我们需要做的是用所需的分区数量创建Kafka主题,所以让我们创建一个新的KafkaConfiguration类。
@Configuration public class KafkaConfiguration { @Bean public NewTopic topic() { return TopicBuilder.name(Constants.TOPIC_NAME) .partitions(Constants.TOPIC_PARTITION_COUNT) .build(); } }
|
如果分区计数还不存在,这将自动创建一个主题。
接下来,我们需要创建数据库结构来存储我们的客户,并允许Spring管理其状态。让我们在 src/main/resources/db/changelog 文件夹下创建一个 db.changelog-master.xml 文件,内容如下。
<?xml version="1.0" encoding="UTF-8"?> <databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.1.xsd"> <changeSet id="0001-initial" author="Arnold Galovics"> <createTable tableName="customers"> <column name="id" type="number"> </column> </createTable> <sqlFile path="classpath:/org/springframework/batch/core/schema-mysql.sql" relativeToChangelogFile="false"/> </changeSet> </databaseChangeLog>
|
createTable很简单,SQL文件的导入是由Spring Batch的核心模块提供的东西。
让我们在application.properties中添加一些配置。
spring.datasource.url=jdbc:mysql://localhost:3306/db_example?createDatabaseIfNotExist=true spring.datasource.username=root spring.datasource.password=mysql spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.liquibase.change-log=classpath:db/changelog/db.changelog-master.xml
|
用Liquibase配置DataSource。然后是Kafka生产者的配置。
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.group-id=producer-g
|
这里最重要的是使用JsonSerializer,这样Spring Batch要发送的消息就会被编码成JSON。
同样地,消费者:
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.group-id=consumer-g
|
还有一件事:
spring.kafka.consumer.properties.spring.json.trusted.packages=*
运行
创建用于启动应用程序的信息库。我将创建一个docker-compose.yml。
version: "3" services: zookeeper: image: 'bitnami/zookeeper:latest' ports: - '2181:2181' environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: 'bitnami/kafka:latest' ports: - '9092:9092' environment: - KAFKA_BROKER_ID=1 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT - KAFKA_CFG_LISTENERS=CLIENT://:9093,EXTERNAL://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9093,EXTERNAL://localhost:9092 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT depends_on: - zookeeper kafka-ui: image: provectuslabs/kafka-ui:latest ports: - '8080:8080' environment: - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9093 depends_on: - kafka mysql: image: mysql ports: - '3306:3306' command: --default-authentication-plugin=mysql_native_password restart: always environment: MYSQL_ROOT_PASSWORD: mysql
|
我就不多说了。它启动了一个Kafka代理,一个位于8080端口的Kafka UI实例,如果你想查看主题的状态,还有一个MySQL服务器。
docker-compose up启动一切。
完整代码可在GitHub 上获得。