使用Spring Boot和Open Telemetry监控Kafka


在本文中,您将了解如何使用 Spring Boot 和 Open Telemetry 为 Kafka 生产者和消费者配置跟踪。我们将使用 Micrometer 库来发送轨迹,并使用 Jaeger 来存储和可视化它们。

Spring Kafka 内置了与 MicrometerKafkaTemplate和侦听器容器的集成。您还将了解如何配置 Spring Kafka 可观察性以将自定义标签添加到跟踪中。

 GitHub 存储库:kafka 目录

依赖关系
让我们看一下所需的 Maven 依赖项列表。我们的两个示例 Spring Boot 应用程序都是相同的。当然,我们需要添加Spring Boot启动器和Spring Kafka来发送或接收消息。为了自动生成与每条消息相关的跟踪,我们包括了 Spring Boot Actuator 和 Micrometer Tracing Open Telemetry 桥。最后,我们需要包含opentelemetry-exporter-otlp将跟踪导出到应用程序外部的库。

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
  </dependency>
  <dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-tracing-bridge-otel</artifactId>
  </dependency>
  <dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-exporter-otlp</artifactId>
  </dependency>
</dependencies>

Spring Boot Kafka 生产者追踪
我们的应用程序不会做任何复杂的事情。他们只是发送和接收消息。这是代表两个应用程序之间交换的消息的类。

让我们从producer应用程序开始。它每秒生成并发送一条消息。@Service下面是负责生成消息的 bean的实现。KafkaTemplate它为此注入并使用bean。

@Service
public class SenderService {

   private static final Logger LOG = LoggerFactory
      .getLogger(SenderService.class);

   AtomicLong id = new AtomicLong();
   @Autowired
   KafkaTemplate<Long, Info> template;

   @Value("${POD:kafka-producer}")
   private String pod;
   @Value(
"${NAMESPACE:empty}")
   private String namespace;
   @Value(
"${CLUSTER:localhost}")
   private String cluster;
   @Value(
"${TOPIC:info}")
   private String topic;

   @Scheduled(fixedRate = 1000)
   public void send() {
      Info info = new Info(id.incrementAndGet(), 
                           pod, 
                           namespace, 
                           cluster, 
                           
"HELLO");
      CompletableFuture<SendResult<Long, Info>> result = template
         .send(topic, info.getId(), info);
      result.whenComplete((sr, ex) ->
                LOG.info(
"Sent({}): {}", sr.getProducerRecord().key(), 
                         sr.getProducerRecord().value()));
    }

}

Spring Boot 提供了一个自动配置的KafkaTemplate. 但是,要使用 Spring Boot 启用 Kafka 跟踪,我们需要自定义该实例。
为了启用跟踪,我们需要调用setObservationEnabled方法。
默认情况下,Micrometer 模块会生成一些通用标签。我们希望至少添加目标主题名称和 Kafka 消息密钥。
因此,我们要创建 KafkaTemplateObservationConvention 接口的自定义实现。它使用 KafkaRecordSenderContext 从 ProducerRecord 对象中获取主题名称和消息键。

@SpringBootApplication
@EnableScheduling
public class KafkaProducer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaProducer.class);

   public static void main(String[] args) {
      SpringApplication.run(KafkaProducer.class, args);
   }

   @Bean
   public NewTopic infoTopic() {
      return TopicBuilder.name("info")
             .partitions(1)
             .replicas(1)
             .build();
   }

   @Bean
   public KafkaTemplate<Long, Info> kafkaTemplate(ProducerFactory<Long, Info> producerFactory) {
      KafkaTemplate<Long, Info> t = new KafkaTemplate<>(producerFactory);
      t.setObservationEnabled(true);
      t.setObservationConvention(new KafkaTemplateObservationConvention() {
         @Override
         public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) {
            return KeyValues.of(
"topic", context.getDestination(),
                   
"id", String.valueOf(context.getRecord().key()));
         }
      });
      return t;
   }

}

我们还需要设置 Jaeger 实例的地址,并决定导出跨度的百分比。下面是包含所需属性的 application.yml 文件:

producer/application.yml

spring:
  application.name: kafka-producer
  kafka:
    bootstrap-servers: ${KAFKA_URL:localhost}:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

management:
  tracing:
    enabled: true
    sampling:
      probability: 1.0
  otlp:
    tracing:
      endpoint: http://jaeger:4318/v1/traces

针对消费者的 Spring Boot Kafka 跟踪
让我们切换到消费者应用程序。它只是接收并打印来自 Kafka 主题的消息。下面是监听器 @Service 的实现。除了整个消息内容外,它还会打印消息键和主题分区编号。

@Service
public class ListenerService {

   private static final Logger LOG = LoggerFactory
      .getLogger(ListenerService.class);

   @KafkaListener(id = "info", topics = "${app.in.topic}")
   public void onMessage(@Payload Info info,
                         @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Long key,
                         @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
      LOG.info(
"Received(key={}, partition={}): {}", key, partition, info);
   }

}

为了在消费者端生成和导出跟踪,我们需要覆盖 ConcurrentKafkaListenerContainerFactory Bean。对于容器监听器工厂,我们应该获取 ContainerProperties 实例,然后调用 setObservationEnabled 方法。与之前一样,我们可以创建 KafkaTemplateObservationConvention 接口的自定义实现,以包含附加标记(可选)。

@SpringBootApplication
@EnableKafka
public class KafkaConsumer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaConsumer.class);

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumer.class, args);
    }

    @Value("${app.in.topic}")
    private String topic;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> listenerFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setObservationEnabled(true);
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

    @Bean
    public NewTopic infoTopic() {
        return TopicBuilder.name(topic)
                .partitions(10)
                .replicas(3)
                .build();
    }

}

配置:
consumer/application.yml
spring:
  application.name: kafka-consumer
  kafka:
    bootstrap-servers: ${KAFKA_URL:localhost}:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"

app.in.topic: ${TOPIC:info}

management:
  tracing:
    enabled: true
    sampling:
      probability: 1.0
  otlp:
    tracing:
      endpoint: http:
//jaeger:4318/v1/traces


在 Docker 上试用
完成实施后,我们就可以尝试我们的解决方案了。我们将以 Docker 容器的形式运行 Kafka 和 Jaeger。首先,让我们为生产者和消费者应用构建项目和容器镜像。Spring Boot 为此提供了内置工具。因此,我们只需执行以下命令即可:

$ mvn clean package spring-boot:build-image

然后,我们就可以定义包含容器列表的 docker-compose.yml 文件了。使用基于环境变量的样式可以动态覆盖 Spring Boot 的属性。因此,我们可以轻松更改容器的 Kafka 和 Jaeger 地址。下面是我们的 docker-compose.yml:

docker-compose.yml

version: "3.8"
services:
  broker:
    image: moeenz/docker-kafka-kraft:latest
    restart: always
    ports:
      -
"9092:9092"
    environment:
      - KRAFT_CONTAINER_HOST_NAME=broker
  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      -
"16686:16686"
      -
"4317:4317"
      -
"4318:4318"
  producer:
    image: library/producer:1.0-SNAPSHOT
    links:
      - broker
      - jaeger
    environment:
      MANAGEMENT_OTLP_TRACING_ENDPOINT: http:
//jaeger:4318/v1/traces
      SPRING_KAFKA_BOOTSTRAP_SERVERS: broker:9092
  consumer:
    image: library/consumer:1.0-SNAPSHOT
    links:
      - broker
      - jaeger
    environment:
      MANAGEMENT_OTLP_TRACING_ENDPOINT: http:
//jaeger:4318/v1/traces
      SPRING_KAFKA_BOOTSTRAP_SERVERS: broker:9092


让我们用以下命令运行所有已定义的容器:

$ docker compose up

在 Kubernetes 上运行
我们的示例应用程序已准备好在 Kubernetes 上部署。您可以使用 Skaffold CLI 轻松完成部署。在此之前,我们需要在 Kubernetes 上安装 Kafka 和 Jaeger。我就不详细介绍 Kafka 的安装了。关于如何使用 Strimzi 操作符在 Kubernetes 上运行 Kafka,你可以在我的文章中找到详细介绍。之后,我们可以继续安装 Jaeger。第一步,我们需要添加以下 Helm 仓库:

$ helm repo add jaegertracing https://jaegertracing.github.io/helm-charts

默认情况下,Jaeger Helm 图表不公开 OTLP 端点。为了启用它们,我们需要覆盖一些默认设置。下面是我们的 YAML 清单值:

jaeger-values.yaml

collector:
  service:
    otlp:
      grpc:
        name: otlp-grpc
        port: 4317
      http:
        name: otlp-http
        port: 4318

让我们使用 jaeger-values.yaml 中的参数在 jaeger 命名空间中安装 Jaeger:

$ helm install jaeger jaegertracing/jaeger -n jaeger \
    --create-namespace \
    -f jaeger-values.yaml

安装 Jaeger 之后,我们就可以验证 Kubernetes 服务列表了。我们将使用 jaeger-collector 服务为应用程序发送跟踪,并使用 jaeger-query 服务访问用户界面仪表板。

$ kubectl get svc -n jaeger
NAME               TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                                           AGE
jaeger-agent       ClusterIP   10.96.147.104   <none>        5775/UDP,6831/UDP,6832/UDP,5778/TCP,14271/TCP     14m
jaeger-cassandra   ClusterIP   None            <none>        7000/TCP,7001/TCP,7199/TCP,9042/TCP,9160/TCP      14m
jaeger-collector   ClusterIP   10.96.111.236   <none>        14250/TCP,14268/TCP,4317/TCP,4318/TCP,14269/TCP   14m
jaeger-query       ClusterIP   10.96.88.64     <none>        80/TCP,16685/TCP,16687/TCP                        14m


最后,我们可以运行连接到 Kafka 和 Jaeger 的示例 Spring Boot 应用程序。下面是生产者应用程序的部署对象。它通过定义 KAFKA_URL 和 MANAGEMENT_OTLP_TRACING_ENDPOINT 环境变量来覆盖默认的 Kafka 和 Jaeger 地址。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
      - name: producer
        image: piomin/producer
        resources:
          requests:
            memory: 200Mi
            cpu: 100m
        ports:
        - containerPort: 8080
        env:
          - name: MANAGEMENT_OTLP_TRACING_ENDPOINT
            value: http://jaeger-collector.jaeger:4318/v1/traces
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap
          - name: CLUSTER
            value: c1
          - name: TOPIC
            value: test-1
          - name: POD
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
          - name: NAMESPACE
            valueFrom:
              fieldRef:
                fieldPath: metadata.namespace


消费者应用程序的类似部署对象,见github源码

假设你在 Git 仓库的 kafka 目录中,只需运行以下命令即可部署两个应用程序。顺便说一下,我将为消费者应用程序创建两个部署(consumer-1 和 consumer-2),以便 Jaeger 可视化。

$ skaffold run -n strimzi --tail

运行应用程序后,就可以访问 Jaeger 面板并验证跟踪列表。为了访问仪表板,我们可以启用 jaeger-query 服务的端口转发。

$ kubectl port-forward svc/jaeger-query 80:80