本文将教您如何使用 Spring Boot 和 Spring for Kafka 为 Kafka 消费者配置并发。Spring for Kafka 的并发与Kafka 分区和消费者组密切相关。消费者组中的每个消费者都可以从多个分区接收消息。组内的消费者使用单个线程,而消费者组使用多个线程来消费消息。虽然每个消费者都是单线程的,但记录的处理可以利用多个线程。我们将分析如何用Spring Boot和Spring for Kafka来实现。
GitHub 存储库。no-transactions-service目录
先决条件
我们将在今天的练习中使用三种不同的工具。当然,我们将使用最新版本的 Spring Boot 3 和 Java 19 创建 Spring Boot 消费者应用程序。为了在本地运行 Kafka,我们将使用 Redpanda——一个与 Kafka API 兼容的平台。您可以使用他们的 CLI 工具轻松启动和管理 Redpanda – rpk. 如果您想在笔记本电脑上安装,请按照此处rpk提供的安装说明进行操作。
最后,我们需要一个负载测试工具。我正在使用k6工具及其扩展来与 Kafka 集成。当然,这只是一个提议,你可以使用任何你喜欢的其他解决方案。k6我能够快速生成大量消息并将其发送到 Kafka 。为了使用 k6,您需要将其安装在笔记本电脑上。这是安装说明。之后,您需要安装xk6-kafka扩展。在以下文档中,您有完整的扩展列表k6。
介绍
出于本练习的目的,我们将创建一个连接到 Kafka 并从单个主题接收消息的简单 Spring Boot 应用程序。从业务逻辑的角度来看,它处理内存数据库中帐户和存储之间的事务。这是我们需要包含在 Maven 中的依赖项列表pom.xml:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency>
|
让我们看看配置:
spring: application.name: no-transactions-service kafka: bootstrap-servers: ${KAFKA_URL} consumer: key-deserializer: org.apache.kafka.common.serialization.LongDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: "[spring.json.value.default.type]": "pl.piomin.services.common.model.Order" "[spring.json.trusted.packages]": "pl.piomin.services.common.model" "[spring.json.use.type.headers]": false producer: key-serializer: org.apache.kafka.common.serialization.LongSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
|
- 我们的应用程序使用KAFKA_URL环境变量中设置的地址连接到Kafka代理。
- 它希望得到JSON格式的消息。因此,我们需要设置JsonDeserializer作为解串器的值。
- 传入的消息被序列化为pl.piomin.services.common.model.Order对象。
- 为了使其工作,我们需要设置spring.json.value.default.type和spring.json.trusted.packages属性。
- k6工具不会设置一个包含JSON目标类型信息的头,所以我们需要用spring.json.use.type.headers属性在Spring for Kafka上禁用该功能。
这里是代表传入信息的类。
public class Order {
private Long id; private Long sourceAccountId; private Long targetAccountId; private int amount; private String status;
// GETTERS AND SETTERS... }
|
我们需要做的最后一件事是为Kafka启用Spring,并生成一些测试账户来进行交易事务。
@SpringBootApplication @EnableKafka public class NoTransactionsService {
public static void main(String[] args) { SpringApplication.run(NoTransactionsService.class, args); }
private static final Logger LOG = LoggerFactory .getLogger(NoTransactionsService.class);
Random r = new Random();
@Autowired AccountRepository repository;
@PostConstruct public void init() { for (int i = 0; i < 1000; i++) { repository.save(new Account(r.nextInt(1000, 10000))); } }
}
|
使用Redpanda运行Kafka
一旦我们成功安装了rpk CLI,我们就可以通过执行以下命令轻松地运行一个单节点的Kafka代理:
一旦我们创建了一个broker,我们就可以创建一个主题。
我们将在测试中使用带有交易名称的主题。在第一步中,我们用一个分区做测试。
$ rpk topic create transactions -p 1
|
为Kafka准备负载测试
我们的负载测试将生成和发送JSON格式的随机值的订单。k6工具允许我们用JavaScript编写测试。我们需要使用k6 Kafka扩展库。Kafka代理的地址是从KAFKA_URL环境变量中获取的。我们每次生成一个新的消息时,都会递增订单的id字段。
import { Writer, SchemaRegistry, SCHEMA_TYPE_JSON, } from "k6/x/kafka"; import { randomIntBetween } from 'https://jslib.k6.io/k6-utils/1.2.0/index.js';
const writer = new Writer({ brokers: [`${__ENV.KAFKA_URL}`], topic: "transactions", });
const schemaRegistry = new SchemaRegistry();
export function setup() { return { index: 1 }; }
export default function (data) { writer.produce({ messages: [ { value: schemaRegistry.serialize({ data: { id: data.index++, sourceAccountId: randomIntBetween(1, 1000), targetAccountId: randomIntBetween(1, 1000), amount: randomIntBetween(10, 50), status: "NEW" }, schemaType: SCHEMA_TYPE_JSON, }), }, ], }); }
export function teardown(data) { writer.close(); }
|
在运行测试之前,我们需要设置KAFKA_URL环境变量。然后我们可以使用k6运行命令来生成和发送大量的信息。
$ k6 run load-test.js -u 1 -d 30s
|
场景一:单分区主题监听器
让我们从默认值开始。我们的主题只有一个分区。我们正在创建@KafkaListener,只用主题和消费者组的名字。一旦监听器收到一个传入的消息,它就会调用AccountService Bean来处理这个订单。
@Inject AccountService service; @KafkaListener( id = "transactions", topics = "transactions", groupId = "a") public void listen(Order order) { LOG.info("Received: {}", order); service.process(order); }
|
我们的Spring Boot Kafka应用已经准备好了并发性。我们将在交易期间用PESSIMISTIC_WRITE模式锁定账户实体。
public interface AccountRepository extends CrudRepository<Account, Long> {
@Lock(LockModeType.PESSIMISTIC_WRITE) Optional<Account> findById(Long id); }
|
这里是我们的AccountService Bean的实现,用于处理传入的订单。
@Service public class AccountService {
private static final Logger LOG = LoggerFactory .getLogger(AccountService.class); private final Random RAND = new Random();
KafkaTemplate<Long, Order> kafkaTemplate; AccountRepository repository;
public AccountService(KafkaTemplate<Long, Order> kafkaTemplate, AccountRepository repository) { this.kafkaTemplate = kafkaTemplate; this.repository = repository; }
@Transactional public void process(Order order) { Account accountSource = repository .findById(order.getSourceAccountId()) .orElseThrow(); // (1)
Account accountTarget = repository .findById(order.getTargetAccountId()) .orElseThrow(); // (2)
if (accountSource.getBalance() >= order.getAmount()) { // (3) accountSource.setBalance(accountSource.getBalance() - order.getAmount()); repository.save(accountSource); accountTarget.setBalance(accountTarget.getBalance() + order.getAmount()); repository.save(accountTarget); order.setStatus("PROCESSED"); } else { order.setStatus("FAILED"); }
try { Thread.sleep(RAND.nextLong(1, 20)); // (4) } catch (InterruptedException e) { throw new RuntimeException(e); }
LOG.info("Processed: order->{}", new OrderDTO(order, accountSource, accountTarget));
// (5) CompletableFuture<SendResult<Long, Order>> result = kafkaTemplate .send("orders", order.getId(), order); result.whenComplete((sr, ex) -> LOG.debug("Sent(key={},partition={}): {}", sr.getProducerRecord().partition(), sr.getProducerRecord().key(), sr.getProducerRecord().value())); }
}
|
process(...)方法是@Transactional。在第一步,我们找到源(1)和目标(2)账户实体。然后,如果源账户(3)上有足够的资金,我们就在账户之间进行转账。我还模拟了一个延迟,只是为了测试目的(4)。最后,我们可以使用KafkaTemplate Bean(5)异步发送一个响应到另一个主题。
我们只有一个运行的应用程序实例和一个负责处理消息的线程。让我们为我们的消费者组验证分区上的当前Lag。
消息的处理速度非常慢。
场景二:多分区主题监听器
现在,transactions主题由 10 个分区组成。我们不会更改应用程序代码和配置中的任何内容。我们将删除之前创建的主题并使用以下命令创建一个包含 10 个分区的新主题:
$ rpk topic delete transactions $ rpk topic create transaction -p 10
|
我们再次使用以下 Maven 命令启动应用程序:
让我们分析应用程序日志:虽然我们有 10 个分区,但仍然有一个线程在监听它们。
场景 3:多个分区的消费者并发
现在,我们要在Kafka监听器层面上启用并发性。为了实现它,我们需要在@KafkaListener注解里面设置并发concurrency 字段。这个参数仍然与Kafka分区有关。所以,设置高于分区数量的值是没有意义的。在我们的案例中,有10个分区--与之前的方案相同。
@KafkaListener( id = "transactions", topics = "transactions", groupId = "a", concurrency = "10") public void listen(Order order) { LOG.info("Received: {}", order); service.process(order); }
|
之后,我们可以启动Spring Boot应用。让我们看看会发生什么。正如你所看到的,我们有10个并发连接--每个都绑定在一个线程上。
但是,如果我们运行负载测试,分区上的延迟仍然很大。
理论上,我们可以通过增加实例数量来提高整体性能。但是,这种方法不会改变任何东西。为什么?让我们运行另一个并查看日志。现在,只有 5 个线程仍然绑定到分区。其他五个线程处于空闲状态。系统的整体性能没有改变。
场景四:多线程处理
最后一个场景。我们将使用 Java 创建一个线程池ExecutorService。我们仍然可以使用具有 Kafka 消费者并发功能的自定义线程池,如下所示(通过参数concurrency)。每次侦听器收到新消息时,它都会在单独的线程中处理它们。
@Service public class NoTransactionsListener {
private static final Logger LOG = LoggerFactory .getLogger(NoTransactionsListener.class);
AccountService service; ExecutorService executorService = Executors.newFixedThreadPool(30);
public NoTransactionsListener(AccountService service) { this.service = service; }
@KafkaListener( id = "transactions", topics = "transactions", groupId = "a", concurrency = "3") public void listen(Order order) { LOG.info("Received: {}", order); executorService.submit(() -> service.process(order)); }
}
|
在那种情况下,应该澄清一件事。使用应用程序级别的自定义线程池,我们将丢失单个分区内的消息排序。之前的模型保证了顺序,因为我们每个分区都有一个线程。对于我们的 Spring Boot 应用来说,这并不重要,因为我们只是在独立处理消息。
让我们启动应用程序。有 3 个并发线程从分区接收消息。
有 30 个线程用于处理消息,3 个线程用于监听分区。一旦在消费者线程中接收到消息,它就由工作线程处理。
现在,我们可以再次运行负载测试。它生成并向我们的 Kafka 代理发送了约 85k 条消息(每秒约 2700 条)。
滞后Lag值看起来真的很好,尽管应用程序不能无延迟地处理所有请求。
这是因为Spring Kafka中提交偏移量的默认ack模式是BATCH。如果我们把它改为RECORD模式,即在监听器处理完记录后返回时提交偏移量,我们应该会得到一个更精确的滞后值。
为了覆盖该选项,我们需要定义ConcurrentKafkaListenerContainerFactory Bean,如下所示。
@Bean ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(3); factory.getContainerProperties() .setAckMode(ContainerProperties.AckMode.RECORD); return factory; }
|
详细点击标题