SpringBoot使用Testcontainers+Avro消息测试Kafka消费者


您有一个 Spring Boot 微服务来管理用户数据。该微服务侦听来自 Kafka 的传入事件(例如用户创建、更新或删除),将它们转换为您自己的业务对象,将它们写入 PostgreSQL 数据库并通过 REST 接口将它们提供给您的前端。整个基础设施提供Avro消息和 Confluent模式注册表。
现在您要测试您的 Kafka 消费者是否读取了事件,将它们转换为您的数据库实体并保存它们。
多种解决方案:使用MockSchemaRegistryClient,或编写自己的自定义的Avro去/串行化器,或设置一个Testcontainers生态系统与Kafka,一个 Zookeeper 和一个 Confluent Schema Registry,或者在 spring-kafka-test 依赖中使用Spring提供的 EmbeddedKafka 。
所有这些解决方案都有其有效的优点和缺点。
在这篇博文中,我将提出一个解决方案,它使用最少的测试容器集来提供控制、速度和效率之间的最佳折衷。我上面列出的其他解决方案各有利弊,我将在本文后面介绍。
 
设置
让我引导您完成较小的设置,模拟上述情况。我们有一个 Spring Boot 应用程序、一个 PostgreSQL 数据库和我们的 Kafka 消费者。应用程序需要为添加或修改的用户监听 Kafka 消息,并且必须相应地更新数据库项。您可以在此处找到完整的代码存储库。我添加了 UserEvent 类仅用于编译目的,通常这会通过 Avro 插件最终出现在您生成的类中。
 
让我们解决第一个问题:如何在我们的测试环境中与 Schema Registry 交互?我们需要一个模拟Mock。隐藏在 Confluent 的模式注册表包中,在AbstractKafkaAvroSerDeConfig类中,您可以找到模式注册表 url 的注释:
可用于注册或查找模式的模式注册表实例的以逗号分隔的 URL 列表。如果您希望连接到模拟模式注册表以进行测试,您可以使用“mock://”伪协议指定范围。例如,“mock://my-scope-name”对应于“MockSchemaRegistry.getClientForScope(“my-scope-name”)”。
所以这意味着我们可以用一个虚构的模式注册表 URL 配置 Kafka 生产者和消费者,只需要以“mock://”开头,你就会自动开始使用MockSchemaRegistryClient。这样您就不需要显式启动MockSchemaRegistryClient并相应地配置所有内容。
这也消除了对 Confluent Schema Registry Container 的需求,还有运行带有嵌入式 Zookeeper的Kafka Testcontainer,我们不再需要额外的 Zookeeper 容器,我们只需要一个 Testcontainer 来进行消息传递。
这样我最终只有两个测试容器:Kafka 和数据库。
设置和配置您的容器:

 protected static final PostgreSQLContainer<?> postgreSQLContainer =
      new PostgreSQLContainer<>("postgres:10.9")
          .withPassword(
"postgres")
          .withUsername(
"postgres")
          .withExposedPorts(5432)
          .withReuse(true);

  protected static final KafkaContainer kafkaContainer =
      new KafkaContainer(DockerImageName.parse(
"confluentinc/cp-kafka:5.5.3"))
          .withEmbeddedZookeeper()
          .withEnv(
"KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9093 ,BROKER://0.0.0.0:9092")
          .withEnv(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
          .withEnv(
"KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
          .withEnv(
"KAFKA_BROKER_ID", "1")
          .withEnv(
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
          .withEnv(
"KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
          .withEnv(
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
          .withEnv(
"KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
          .withEnv(
"KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "")
          .withEnv(
"KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
          .withNetwork(network);

  static {
    Startables.deepStart(Stream.of(postgreSQLContainer, kafkaContainer)).join();
  }

并使用他们的测试配置创建 Kafka 生产者和消费者:

 public static KafkaProducer<Object, Object> createProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"mock://testUrl");
    props.put(ProducerConfig.CLIENT_ID_CONFIG,
"kafkatest");
    return new KafkaProducer<>(props);
  }

 public static KafkaConsumer<String, Object> createEventConsumer() {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
    props.put(
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://testUrl");
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
"true");
    props.put(ConsumerConfig.GROUP_ID_CONFIG,
"kafkatest");
    return new KafkaConsumer<>(props);
  }

这里我们只添加了模拟前缀模式注册表 url,没有特殊的序列化器或反序列化器。不要忘记在 Kafka 消费者配置中将 specific.avro.reader 的属性设置为“true”,以确保消费者不会因类转换异常而失败。
现在,在您的测试中,您创建您希望 Kafka 生产者发送的事件,让您的 Kafka 消费者订阅该主题,并从KafkaTestUtils 中获取 ConsumerRecord以供您自己的侦听器处理。然后你测试你的听众所做的任何事情的结果,就这样。
 
利弊?
当然,运行时间是一个问题在我的机器上运行平均需要 5 秒。我使用一个 Kafka 和一个 Postgres 容器的版本平均需要 15 秒才能运行。
现在想象一下使用 4 个测试容器(Schema Registry、Kafka 和 Zookeeper,以及数据库容器)可以运行多长时间。并且有更多的代码,需要测试更复杂的业务逻辑。
测试容器在完全可用之前需要启动和联网。我的设置的一个更快的替代方案是将所有基础设施移动到内存中,使用 EmbeddedKafka 作为代理并用 H2 数据库替换 PostgreSQL。您可以在“embeddedKafka”分支中找到EmbeddedKafka的工作版本(我的命名技巧非常有想象力)。
但是使用 EmbeddedKafka 需要注意依赖版本。Confluent Schema Registry Client 带有自己的 Zookeeper 版本,根据 Schema Registry Client 的版本,您最终可能会得到与 kafka-test 依赖项预期的版本不同的 Zookeeper 版本,这将导致 ClassNotFound 异常运行测试时。有关更多详细信息,请查看 Confluent 的版本矩阵和Spring的Kafka 客户端列表。
因此,由于版本限制,我坚持使用容器设置。对于这个演示应用程序,它根本不重要,但为了避免意外行为,我建议尽可能接近生产版本。当然,使用 H2 也可能并不总是一种选择,这取决于您使用生产数据库的功能(例如约束、json 数据类型、某些连接语句)。所以这是关于控制和测试运行时的妥协。
另外,我是否已经提到“视情况而定”?
你可以在github上找到代码,主分支是两个容器的演示,embeddedKafka分支是容器免费版本。