使用KEDA和Kafka在 Kubernetes 上自动扩展 - Piotr


在本文中,您将学习如何使用KEDA自动扩展使用来自 Kafka 主题的消息的应用程序。KEDA缩写是Kubernetes Event Driven简写。
为了解释它背后的想法,我将创建两个简单的服务。第一个是向 Kafka 主题发送事件,第二个是接收它们。我们将在 Kubernetes 上运行这两个应用程序。为了简化练习,我们可以使用 Spring Cloud Stream,它提供了与 Kafka 的智能集成。
 
架构
在我们开始之前,让我们花点时间来了解我们今天的场景。我们有一个单一的Kafka主题,被我们两个应用程序用来交换事件。这个主题由10个分区组成。还有一个单一的生产者实例,定期发送事件。我们将缩减和增加消费者服务的pod数量。消费者服务的所有实例都被分配到同一个Kafka消费者组。这意味着该组中只有一个实例可以接收特定的事件。

每个消费者实例只有一个接收线程。因此,我们可以很容易地模拟一个事件的处理时间。我们将让主线程休眠1秒。另一方面,生产者将以一个可变的速度发送事件。同时,它将在所有可用的分区中分割消息。这样的行为可能会导致分区上的消费者滞后,因为Spring Cloud Stream只有在处理完一个消息后才会提交偏移。在我们的案例中,滞后的数值取决于生产者的速度和运行中的消费者实例的数量。为了澄清这一点,让我们看看下面的图表。

我们的目标非常简单。我们需要根据生产者服务产生的流量来调整消费者实例的数量。偏移滞后的值不能超过期望的阈值。如果我们增加生产者一方的流量率,KEDA应该增加消费者实例的数量。因此,如果我们降低生产者的流量率,它应该减少消费者实例的数量。下面是我们的场景图。

 
 
在Spring Cloud Stream中使用Kafka
为了将Spring Cloud Stream用于Kafka,我们只需在Maven pom.xml中加入一个依赖项。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

之后,我们可以使用一个标准的Spring Cloud Stream模型。然而,在后台,它通过一个特殊的绑定器实现与Kafka集成。

我们的两个应用程序都非常简单。生产者只是生成和发送事件(默认为JSON格式)。我们在代码中需要做的唯一事情是声明Supplier Bean。在后台,有一个单线程,每秒钟生成和发送CallmeEvent。每一次它只增加消息中的id字段。

@SpringBootApplication
public class ProducerApp {

   private static int id = 0;

   public static void main(String[] args) {
      SpringApplication.run(ProducerApp.class, args);
   }

   @Bean
   public Supplier<CallmeEvent> eventSupplier() {
      return () -> new CallmeEvent(++id, "Hello" + id, "PING");
   }

}


我们可以用下面的属性改变Supplier 每次发送之间的默认固定延迟。比方说,我们想每隔100毫秒发送一个事件。
spring.cloud.stream.poller.fixedDelay = 100

我们还应该提供基本的配置,如Kafka地址、主题名称(如果与Supplier函数的名称不同)、分区的数量和分区的密钥。Spring Cloud Stream会在应用启动时自动创建主题、话题。

spring.cloud.stream.bindings.eventSupplier-out-0.destination = test-topic
spring.cloud.stream.bindings.eventSupplier-out-0.producer.partitionKeyExpression = payload.id
spring.cloud.stream.bindings.eventSupplier-out-0.producer.partitionCount = 10
spring.kafka.bootstrap-servers = one-node-cluster.redpanda:9092

现在,消费者的应用。它也不是很复杂。正如我之前提到的,为了模拟处理时间,我们将在接收方法里面休眠主线程。

public class ConsumerApp {

   private static final Logger LOG = LoggerFactory.getLogger(ConsumerAApp.class);

   public static void main(String[] args) {
      SpringApplication.run(ConsumerApp.class, args);
   }

   @Bean
   public Consumer<Message<CallmeEvent>> eventConsumer() {
      return event -> {
         LOG.info("Received: {}", event.getPayload());
         try {
            Thread.sleep(1000);
         } catch (InterruptedException e) { }
      };
   }

}

最后,在消费者方面的配置。设置消费者组并启用分区是很重要的。

spring.cloud.stream.bindings.eventConsumer-in-0.destination = test-topic
spring.cloud.stream.bindings.eventConsumer-in-0.group = a
spring.cloud.stream.bindings.eventConsumer-in-0.consumer.partitioned = true
spring.kafka.bootstrap-servers = one-node-cluster.redpanda:9092

现在,我们应该把这两个应用程序部署在Kubernetes上。
但在这之前,我们先在Kubernetes上安装Kafka和KEDA。
更多点击标题见原文