在当今的事件驱动架构中,有效管理数据流至关重要。Apache Kafka是一个流行的选择,但尽管有Spring Kafka等辅助框架,但将其集成到我们的应用程序中仍面临挑战。一项主要挑战是实施适当的动态侦听器管理,这提供了灵活性和控制力,这对于适应应用程序不断变化的工作负载和维护至关重要。
在本教程中,我们将学习如何在 Spring Boot 应用程序中动态启动和停止 Kafka 监听器。
场景
动态监听管理在适应性要求较高的场景中表现出色。例如,在峰值负载期间,我们可以动态启动额外的侦听器以提高吞吐量并减少处理时间。相反,在维护或低流量期间,我们可以停止侦听器以节省资源。这种灵活性也有利于在功能标志后面部署新功能,从而允许无缝即时调整,而不影响整个系统。
让我们考虑一个场景,其中电子商务平台引入了新的推荐引擎,旨在通过根据浏览历史记录和购买模式推荐产品来增强用户体验。为了在全面启动之前验证此功能的有效性,我们决定将其部署在功能标志后面。
激活此功能标志将启动 Kafka 侦听器。当最终用户与平台交互时,由 Kafka 监听器支持的推荐引擎会处理传入的用户活动数据流,以生成个性化的产品推荐。
当我们停用功能标志时,我们会停止 Kafka 侦听器,并且平台默认使用其现有的推荐引擎。无论新引擎处于测试阶段如何,这都确保了无缝的用户体验。
当该功能处于活动状态时,我们会主动收集数据、监控性能指标并对推荐引擎进行调整。我们在多次迭代中重复此功能测试,直到达到预期结果。
通过这个迭代过程,动态监听器管理被证明是一个有价值的工具。它允许无缝引入新功能
首先,我们将spring-kafka依赖导入到我们的项目中:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>3.1.2</version> </dependency>
|
配置Kafka消费者
生产者是将事件发布(写入)到 Kafka 主题的应用程序。
在本教程中,我们将使用单元测试来模拟生产者将事件发送到 Kafka 主题。订阅主题并处理事件流的消费者由我们应用程序中的侦听器代表。该侦听器配置为处理来自 Kafka 的传入消息。
让我们通过KafkaConsumerConfig类配置我们的 Kafka 消费者,其中包括 Kafka 代理的地址、消费者组 ID 以及键和值的反序列化器:
@Bean public DefaultKafkaConsumerFactory<String, UserEvent> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.baeldung.spring.kafka.start.stop.consumer"); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(UserEvent.class)); }
|
配置Kafka监听器
在Spring Kafka中, 使用@KafkaListener注释一个方法会创建一个监听器 ,该监听器消费来自指定主题的消息。为了定义它,我们声明一个UserEventListener类:
@KafkaListener(id = Constants.LISTENER_ID, topics = Constants.MULTI_PARTITION_TOPIC, groupId = "test-group", containerFactory = "kafkaListenerContainerFactory", autoStartup = "false") public void processUserEvent(UserEvent userEvent) { logger.info("Received UserEvent: " + userEvent.getUserEventId()); userEventStore.addUserEvent(userEvent); }
|
上面的监听器等待来自主题multi_partition_topic 的消息并使用processUserEvent()方法 处理它们。我们将groupId指定为test-group,确保消费者成为更广泛组的一部分,从而促进跨多个实例的分布式处理。
我们使用id属性为每个侦听器分配一个唯一标识符。在此示例中,分配的侦听器 ID 为Listener-id-1。
autoStartup属性使我们能够控制在应用程序初始化时是否启动侦听器。在我们的示例中,我们将其设置为false,这意味着侦听器不会在应用程序启动时自动启动。此配置为我们提供了手动启动侦听器的灵活性。
此手动启动可以由各种事件触发,例如新用户注册、应用程序内的特定条件(例如达到特定数据量阈值)或管理操作(例如通过管理界面手动启动侦听器)。例如,如果在线零售应用程序在限时抢购期间检测到流量激增,它可以自动启动额外的侦听器来处理增加的负载,从而优化性能。
UserEventStore充当侦听器接收到的事件的临时存储:
@Component public class UserEventStore { private final List<UserEvent> userEvents = new ArrayList<>(); public void addUserEvent(UserEvent userEvent) { userEvents.add(userEvent); } public List<UserEvent> getUserEvents() { return userEvents; } public void clearUserEvents() { this.userEvents.clear(); } }
|
动态控制监听器
让我们创建一个使用KafkaListenerEndpointRegistry动态启动和停止 Kafka 监听器的KafkaListenerControlService:
@Service public class KafkaListenerControlService { @Autowired private KafkaListenerEndpointRegistry registry; public void startListener(String listenerId) { MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId); if (listenerContainer != null && !listenerContainer.isRunning()) { listenerContainer.start(); } } public void stopListener(String listenerId) { MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId); if (listenerContainer != null && listenerContainer.isRunning()) { listenerContainer.stop(); } } }
|
KafkaListenerControlService可以 根据分配的 ID精确管理各个侦听器实例。startListener()和stopListener()方法都使用listenerId作为参数,允许我们根据需要启动和停止主题的消息消费。
KafkaListenerEndpointRegistry充当 Spring 应用程序上下文中定义的所有 Kafka 侦听器端点的中央存储库。它监视这些侦听器容器,从而允许对其状态进行编程控制,无论是启动、停止还是暂停。对于需要实时调整其消息处理活动而无需重新启动整个应用程序的应用程序来说,此功能至关重要。
验证动态监听器控件
接下来,让我们重点测试 Spring Boot 应用程序中 Kafka 侦听器的动态启动和停止功能。首先,让我们启动监听器:
kafkaListenerControlService.startListener(Constants.LISTENER_ID);
然后,我们通过发送并处理测试事件来验证侦听器是否已激活:
UserEvent startUserEventTest = new UserEvent(UUID.randomUUID().toString());
producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, startUserEventTest));
await().untilAsserted(() -> assertEquals(1, this.userEventStore.getUserEvents().size()));
this.userEventStore.clearUserEvents();
现在侦听器已处于活动状态,我们将发送一批 10 条消息进行处理。发送四条消息后,我们将停止侦听器,然后将剩余的消息发送到 Kafka 主题:
for (long count = 1; count <= 10; count++) { UserEvent userEvent = new UserEvent(UUID.randomUUID().toString()); Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, userEvent)); RecordMetadata metadata = future.get(); if (count == 4) { await().untilAsserted(() -> assertEquals(4, this.userEventStore.getUserEvents().size())); this.kafkaListenerControlService.stopListener(Constants.LISTENER_ID); this.userEventStore.clearUserEvents(); } logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition()); }
|
在启动侦听器之前,我们将验证事件存储中没有消息:
assertEquals(0, this.userEventStore.getUserEvents().size()); kafkaListenerControlService.startListener(Constants.LISTENER_ID); await().untilAsserted(() -> assertEquals(6, this.userEventStore.getUserEvents().size())); kafkaListenerControlService.stopListener(Constants.LISTENER_ID);
|
一旦侦听器再次启动,它就会处理侦听器停止后我们发送到 Kafka 主题的剩余 6 条消息。该测试展示了 Spring Boot 应用程序动态管理 Kafka 监听器的能力。