在本文中,您将了解如何使用 Spring Boot 和 Open Telemetry 为 Kafka 生产者和消费者配置跟踪。我们将使用 Micrometer 库来发送轨迹,并使用 Jaeger 来存储和可视化它们。
Spring Kafka 内置了与 MicrometerKafkaTemplate和侦听器容器的集成。您还将了解如何配置 Spring Kafka 可观察性以将自定义标签添加到跟踪中。
GitHub 存储库:kafka 目录
依赖关系
让我们看一下所需的 Maven 依赖项列表。我们的两个示例 Spring Boot 应用程序都是相同的。当然,我们需要添加Spring Boot启动器和Spring Kafka来发送或接收消息。为了自动生成与每条消息相关的跟踪,我们包括了 Spring Boot Actuator 和 Micrometer Tracing Open Telemetry 桥。最后,我们需要包含opentelemetry-exporter-otlp将跟踪导出到应用程序外部的库。
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-tracing-bridge-otel</artifactId> </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-exporter-otlp</artifactId> </dependency> </dependencies>
|
Spring Boot Kafka 生产者追踪
我们的应用程序不会做任何复杂的事情。他们只是发送和接收消息。这是代表两个应用程序之间交换的消息的类。
让我们从producer应用程序开始。它每秒生成并发送一条消息。@Service下面是负责生成消息的 bean的实现。KafkaTemplate它为此注入并使用bean。
@Service public class SenderService {
private static final Logger LOG = LoggerFactory .getLogger(SenderService.class);
AtomicLong id = new AtomicLong(); @Autowired KafkaTemplate<Long, Info> template;
@Value("${POD:kafka-producer}") private String pod; @Value("${NAMESPACE:empty}") private String namespace; @Value("${CLUSTER:localhost}") private String cluster; @Value("${TOPIC:info}") private String topic;
@Scheduled(fixedRate = 1000) public void send() { Info info = new Info(id.incrementAndGet(), pod, namespace, cluster, "HELLO"); CompletableFuture<SendResult<Long, Info>> result = template .send(topic, info.getId(), info); result.whenComplete((sr, ex) -> LOG.info("Sent({}): {}", sr.getProducerRecord().key(), sr.getProducerRecord().value())); }
}
|
Spring Boot 提供了一个自动配置的KafkaTemplate. 但是,要使用 Spring Boot 启用 Kafka 跟踪,我们需要自定义该实例。
为了启用跟踪,我们需要调用setObservationEnabled方法。
默认情况下,Micrometer 模块会生成一些通用标签。我们希望至少添加目标主题名称和 Kafka 消息密钥。
因此,我们要创建 KafkaTemplateObservationConvention 接口的自定义实现。它使用 KafkaRecordSenderContext 从 ProducerRecord 对象中获取主题名称和消息键。
@SpringBootApplication @EnableScheduling public class KafkaProducer {
private static final Logger LOG = LoggerFactory .getLogger(KafkaProducer.class);
public static void main(String[] args) { SpringApplication.run(KafkaProducer.class, args); }
@Bean public NewTopic infoTopic() { return TopicBuilder.name("info") .partitions(1) .replicas(1) .build(); }
@Bean public KafkaTemplate<Long, Info> kafkaTemplate(ProducerFactory<Long, Info> producerFactory) { KafkaTemplate<Long, Info> t = new KafkaTemplate<>(producerFactory); t.setObservationEnabled(true); t.setObservationConvention(new KafkaTemplateObservationConvention() { @Override public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) { return KeyValues.of("topic", context.getDestination(), "id", String.valueOf(context.getRecord().key())); } }); return t; }
}
|
我们还需要设置 Jaeger 实例的地址,并决定导出跨度的百分比。下面是包含所需属性的 application.yml 文件:
producer/application.yml
spring: application.name: kafka-producer kafka: bootstrap-servers: ${KAFKA_URL:localhost}:9092 producer: key-serializer: org.apache.kafka.common.serialization.LongSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
management: tracing: enabled: true sampling: probability: 1.0 otlp: tracing: endpoint: http://jaeger:4318/v1/traces
|
针对消费者的 Spring Boot Kafka 跟踪
让我们切换到消费者应用程序。它只是接收并打印来自 Kafka 主题的消息。下面是监听器 @Service 的实现。除了整个消息内容外,它还会打印消息键和主题分区编号。
@Service public class ListenerService {
private static final Logger LOG = LoggerFactory .getLogger(ListenerService.class);
@KafkaListener(id = "info", topics = "${app.in.topic}") public void onMessage(@Payload Info info, @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Long key, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) { LOG.info("Received(key={}, partition={}): {}", key, partition, info); }
}
|
为了在消费者端生成和导出跟踪,我们需要覆盖 ConcurrentKafkaListenerContainerFactory Bean。对于容器监听器工厂,我们应该获取 ContainerProperties 实例,然后调用 setObservationEnabled 方法。与之前一样,我们可以创建 KafkaTemplateObservationConvention 接口的自定义实现,以包含附加标记(可选)。
@SpringBootApplication @EnableKafka public class KafkaConsumer {
private static final Logger LOG = LoggerFactory .getLogger(KafkaConsumer.class);
public static void main(String[] args) { SpringApplication.run(KafkaConsumer.class, args); }
@Value("${app.in.topic}") private String topic;
@Bean public ConcurrentKafkaListenerContainerFactory<String, String> listenerFactory(ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.getContainerProperties().setObservationEnabled(true); factory.setConsumerFactory(consumerFactory); return factory; }
@Bean public NewTopic infoTopic() { return TopicBuilder.name(topic) .partitions(10) .replicas(3) .build(); }
}
|
配置:
consumer/application.yml
spring: application.name: kafka-consumer kafka: bootstrap-servers: ${KAFKA_URL:localhost}:9092 consumer: key-deserializer: org.apache.kafka.common.serialization.LongDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: "*"
app.in.topic: ${TOPIC:info}
management: tracing: enabled: true sampling: probability: 1.0 otlp: tracing: endpoint: http://jaeger:4318/v1/traces
|
在 Docker 上试用
完成实施后,我们就可以尝试我们的解决方案了。我们将以 Docker 容器的形式运行 Kafka 和 Jaeger。首先,让我们为生产者和消费者应用构建项目和容器镜像。Spring Boot 为此提供了内置工具。因此,我们只需执行以下命令即可:
$ mvn clean package spring-boot:build-image
然后,我们就可以定义包含容器列表的 docker-compose.yml 文件了。使用基于环境变量的样式可以动态覆盖 Spring Boot 的属性。因此,我们可以轻松更改容器的 Kafka 和 Jaeger 地址。下面是我们的 docker-compose.yml:
docker-compose.yml
version: "3.8" services: broker: image: moeenz/docker-kafka-kraft:latest restart: always ports: - "9092:9092" environment: - KRAFT_CONTAINER_HOST_NAME=broker jaeger: image: jaegertracing/all-in-one:latest ports: - "16686:16686" - "4317:4317" - "4318:4318" producer: image: library/producer:1.0-SNAPSHOT links: - broker - jaeger environment: MANAGEMENT_OTLP_TRACING_ENDPOINT: http://jaeger:4318/v1/traces SPRING_KAFKA_BOOTSTRAP_SERVERS: broker:9092 consumer: image: library/consumer:1.0-SNAPSHOT links: - broker - jaeger environment: MANAGEMENT_OTLP_TRACING_ENDPOINT: http://jaeger:4318/v1/traces SPRING_KAFKA_BOOTSTRAP_SERVERS: broker:9092
|
让我们用以下命令运行所有已定义的容器:
$ docker compose up
在 Kubernetes 上运行
我们的示例应用程序已准备好在 Kubernetes 上部署。您可以使用 Skaffold CLI 轻松完成部署。在此之前,我们需要在 Kubernetes 上安装 Kafka 和 Jaeger。我就不详细介绍 Kafka 的安装了。关于如何使用 Strimzi 操作符在 Kubernetes 上运行 Kafka,你可以在我的文章中找到详细介绍。之后,我们可以继续安装 Jaeger。第一步,我们需要添加以下 Helm 仓库:
$ helm repo add jaegertracing https://jaegertracing.github.io/helm-charts
默认情况下,Jaeger Helm 图表不公开 OTLP 端点。为了启用它们,我们需要覆盖一些默认设置。下面是我们的 YAML 清单值:
jaeger-values.yaml
collector: service: otlp: grpc: name: otlp-grpc port: 4317 http: name: otlp-http port: 4318
|
让我们使用 jaeger-values.yaml 中的参数在 jaeger 命名空间中安装 Jaeger:
$ helm install jaeger jaegertracing/jaeger -n jaeger \ --create-namespace \ -f jaeger-values.yaml
|
安装 Jaeger 之后,我们就可以验证 Kubernetes 服务列表了。我们将使用 jaeger-collector 服务为应用程序发送跟踪,并使用 jaeger-query 服务访问用户界面仪表板。
$ kubectl get svc -n jaeger NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE jaeger-agent ClusterIP 10.96.147.104 <none> 5775/UDP,6831/UDP,6832/UDP,5778/TCP,14271/TCP 14m jaeger-cassandra ClusterIP None <none> 7000/TCP,7001/TCP,7199/TCP,9042/TCP,9160/TCP 14m jaeger-collector ClusterIP 10.96.111.236 <none> 14250/TCP,14268/TCP,4317/TCP,4318/TCP,14269/TCP 14m jaeger-query ClusterIP 10.96.88.64 <none> 80/TCP,16685/TCP,16687/TCP 14m
|
最后,我们可以运行连接到 Kafka 和 Jaeger 的示例 Spring Boot 应用程序。下面是生产者应用程序的部署对象。它通过定义 KAFKA_URL 和 MANAGEMENT_OTLP_TRACING_ENDPOINT 环境变量来覆盖默认的 Kafka 和 Jaeger 地址。
apiVersion: apps/v1 kind: Deployment metadata: name: producer spec: selector: matchLabels: app: producer template: metadata: labels: app: producer spec: containers: - name: producer image: piomin/producer resources: requests: memory: 200Mi cpu: 100m ports: - containerPort: 8080 env: - name: MANAGEMENT_OTLP_TRACING_ENDPOINT value: http://jaeger-collector.jaeger:4318/v1/traces - name: KAFKA_URL value: my-cluster-kafka-bootstrap - name: CLUSTER value: c1 - name: TOPIC value: test-1 - name: POD valueFrom: fieldRef: fieldPath: metadata.name - name: NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace
|
消费者应用程序的类似部署对象,见github源码
假设你在 Git 仓库的 kafka 目录中,只需运行以下命令即可部署两个应用程序。顺便说一下,我将为消费者应用程序创建两个部署(consumer-1 和 consumer-2),以便 Jaeger 可视化。
$ skaffold run -n strimzi --tail
运行应用程序后,就可以访问 Jaeger 面板并验证跟踪列表。为了访问仪表板,我们可以启用 jaeger-query 服务的端口转发。
$ kubectl port-forward svc/jaeger-query 80:80