在本文中,您将学习如何使用KEDA自动扩展使用来自 Kafka 主题的消息的应用程序。KEDA缩写是Kubernetes Event Driven简写。
为了解释它背后的想法,我将创建两个简单的服务。第一个是向 Kafka 主题发送事件,第二个是接收它们。我们将在 Kubernetes 上运行这两个应用程序。为了简化练习,我们可以使用 Spring Cloud Stream,它提供了与 Kafka 的智能集成。
架构
在我们开始之前,让我们花点时间来了解我们今天的场景。我们有一个单一的Kafka主题,被我们两个应用程序用来交换事件。这个主题由10个分区组成。还有一个单一的生产者实例,定期发送事件。我们将缩减和增加消费者服务的pod数量。消费者服务的所有实例都被分配到同一个Kafka消费者组。这意味着该组中只有一个实例可以接收特定的事件。
每个消费者实例只有一个接收线程。因此,我们可以很容易地模拟一个事件的处理时间。我们将让主线程休眠1秒。另一方面,生产者将以一个可变的速度发送事件。同时,它将在所有可用的分区中分割消息。这样的行为可能会导致分区上的消费者滞后,因为Spring Cloud Stream只有在处理完一个消息后才会提交偏移。在我们的案例中,滞后的数值取决于生产者的速度和运行中的消费者实例的数量。为了澄清这一点,让我们看看下面的图表。

我们的目标非常简单。我们需要根据生产者服务产生的流量来调整消费者实例的数量。偏移滞后的值不能超过期望的阈值。如果我们增加生产者一方的流量率,KEDA应该增加消费者实例的数量。因此,如果我们降低生产者的流量率,它应该减少消费者实例的数量。下面是我们的场景图。

在Spring Cloud Stream中使用Kafka
为了将Spring Cloud Stream用于Kafka,我们只需在Maven pom.xml中加入一个依赖项。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
|
之后,我们可以使用一个标准的Spring Cloud Stream模型。然而,在后台,它通过一个特殊的绑定器实现与Kafka集成。
我们的两个应用程序都非常简单。生产者只是生成和发送事件(默认为JSON格式)。我们在代码中需要做的唯一事情是声明Supplier Bean。在后台,有一个单线程,每秒钟生成和发送CallmeEvent。每一次它只增加消息中的id字段。
@SpringBootApplication
public class ProducerApp {
private static int id = 0;
public static void main(String[] args) {
SpringApplication.run(ProducerApp.class, args);
}
@Bean
public Supplier<CallmeEvent> eventSupplier() {
return () -> new CallmeEvent(++id, "Hello" + id, "PING");
}
}
|
我们可以用下面的属性改变Supplier 每次发送之间的默认固定延迟。比方说,我们想每隔100毫秒发送一个事件。
spring.cloud.stream.poller.fixedDelay = 100
我们还应该提供基本的配置,如Kafka地址、主题名称(如果与Supplier函数的名称不同)、分区的数量和分区的密钥。Spring Cloud Stream会在应用启动时自动创建主题、话题。
spring.cloud.stream.bindings.eventSupplier-out-0.destination = test-topic
spring.cloud.stream.bindings.eventSupplier-out-0.producer.partitionKeyExpression = payload.id
spring.cloud.stream.bindings.eventSupplier-out-0.producer.partitionCount = 10
spring.kafka.bootstrap-servers = one-node-cluster.redpanda:9092
|
现在,消费者的应用。它也不是很复杂。正如我之前提到的,为了模拟处理时间,我们将在接收方法里面休眠主线程。
public class ConsumerApp {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerAApp.class);
public static void main(String[] args) {
SpringApplication.run(ConsumerApp.class, args);
}
@Bean
public Consumer<Message<CallmeEvent>> eventConsumer() {
return event -> {
LOG.info("Received: {}", event.getPayload());
try {
Thread.sleep(1000);
} catch (InterruptedException e) { }
};
}
}
|
最后,在消费者方面的配置。设置消费者组并启用分区是很重要的。
spring.cloud.stream.bindings.eventConsumer-in-0.destination = test-topic
spring.cloud.stream.bindings.eventConsumer-in-0.group = a
spring.cloud.stream.bindings.eventConsumer-in-0.consumer.partitioned = true
spring.kafka.bootstrap-servers = one-node-cluster.redpanda:9092
|
现在,我们应该把这两个应用程序部署在Kubernetes上。
但在这之前,我们先在Kubernetes上安装Kafka和KEDA。
更多点击标题见原文