监控和检查Kafka健康运行的几个方法 - Ivelina


微服务通常使用发送或回复心跳/健康检查的模型作为向报告、调度或编排服务提供状态信息的一种方式。这些不仅在应用程序的正常生命周期中很重要,而且在新版本推出期间也很重要。
我们将看看如何使用不同的库和抽象来实现健康检查,但首先,让我们就在检查应用程序的健康时要考虑的内容达成一致。

健康对 Kafka 应用程序意味着什么?
对于 Kafka 应用程序,健康检查应该在两种情况下故意失败:

  • 与 Kafka 的通信存在问题——这可能是暂时的网络故障、异常较长的处理时间导致心跳失败、消费者被踢出或无法提交。这些是导致重新启动有意义的错误类型(当实例被认为不健康时,编排系统会执行此操作),因为它们肯定会在重新启动时解决
  • 该应用程序存在一个不容忽视的关键问题,需要在进一步处理之前解决。我的意思是,如果应用程序正在处理流数据并且它不能跳过或错过一条记录。在这种情况下,如果应用程序或它与之交互和/或依赖的另一个应用程序中存在错误,即导致所有未处理的记录无法处理,则应用程序不应提交任何偏移量并继续前进。唯一的解决方案是停止,一个简单的方法是报告不健康,重新启动,希望有人能快速修复错误。

由于这些记录本身的问题(无论是它们的格式、结构还是其他任何原因)而导致处理单个记录失败,不应在运行状况检查中考虑。应该通过将这些记录发送到 DLQ(死信队列)、注销问题并发出警报来处理这些小问题。如何处理 DLQ 中的记录超出了本文的范围。

简单的Kafka消费者
KafkaConsumer并没有真正提供开箱即用的“状态”,因此应用程序需要在整个生命周期中跟踪它——从订阅/分配主题和分区,到轮询、可能的暂停、恢复和关闭。可以在这些阶段中的每一个以及它们之间的转换中引发异常。这种情况的好坏之处在于,作为开发人员,您可以完全控制这些关键时刻,因此您可以正确处理它——捕获任何与 Kafka 相关的异常,记录它,警告它并更改缓存状态,使用作为健康指标。
这里要注意的关键是KafkaConsumer它将在一个线程中运行,并且检查健康状况的请求将在另一个线程中处理,因此请确保正确处理缓存状态。

enum State {
   CONSUMING,
   PAUSED,
   ...
   ERROR;
   fun isHealthy(): Boolean {
     return this != ERROR
   }
}

然后在您的健康检查端点中,您只需要检查state.isHealthy(). 使用 Spring,它看起来像这样:

class HealthCheckIndicator: HealthCheckIndicator {
 override fun health(): Health{
    return if(state.isHealthy()) { 
       Health.up() 
    } else {
       Health.down()
    }.build()
 }


Kafka Streams
在内部,KafkaStreams使用普通的KafkaProducer和KafkaConsumer,但这个抽象增加了几个整洁的功能(好吧,不仅是几个,而是在此背景下有用的几个)--它提供了一个方法来获取当前状态,更重要的是,调用它是线程安全的。

KafkaStreams.State有七个可能的值:

  • CREATED--在生命周期的开始阶段
  • RUNNING--准备消费或正在消费
  • REBALANCING--消费者组正在进行再平衡
  • PENDING_SHUTDOWN - 从以上任何一种过渡到NOT_RUNNING的状态
  • NOT_RUNNING - 由于调用close()而在正常的生命周期中停止。
  • PENDING_ERROR--过渡到ERROR的状态
  • ERROR--流不能自行恢复

最后两个可以被报告为 "停机 "或 "不健康"。

Spring Kafka Streams
Spring的实现甚至进一步脱离了低级别的概念,并提供了一种更高级的跟踪状态的方式。一旦你成功创建了你的流,你就可以注册一个StateListeneron。该接口实际上是一个消费者,在每次转换时接收流的新旧状态。

如果你的应用程序需要回复另一个轮询状态的服务,那么你就不能直接使用监听器,你仍然需要缓存健康状况并使用该变量来回复健康检查请求。

// 创建 "流"
stream.setStateListener{ newState, oldState -> 
   if (newState == ERROR || newState == PENDING_ERROR)
     healthy = false
}

然而,如果你应该在预定的时间间隔内推送状态,那么你可以直接使用监听器的实现。

正如你可能已经猜到的,状态的枚举就是上一节提到的KafkaStreams.State。

Kafka-Connect
Kafka在数据驱动的公司中发挥着巨大的作用,但仅有它是不够的。通常,有各种内部和外部的 "非Kafka "流需要与Kafka集成,这时Kafka-Connect出现了。它提供了一个相当丰富的生态系统,包括来自S3、Snowflake、Mongo等数据汇的即用型连接器,甚至包括来自SQL DB的追踪CDC(变更数据捕获)。

每个连接器都有一套从源头复制数据的任务,而且这些任务是平行运行的,可能会发生一个或多个失败。

目前还没有提供开箱即用的健康检查端点,但有一种方法可以扩展你的部署并自己添加它。要做到这一点,你需要。

创建一个小的Java项目并添加org.apache.kafka:connect-api:XXX依赖项。
扩展ConnectRestExtension并实现register方法。

@Override
public void register(ConnectRestExtensionContext restPluginContext){
  restPluginContext
    .configurable()
    .register(new HealthcheckController(new HealthcheckService(restPluginContext.clusterState())));
}

在你的控制器中定义一个端点:

@Path("/health")
@Produces(MediaType.APPLICATION_JSON)
public class HealthcheckController {
 private final HealthcheckService healthcheckService;

 public HealthcheckController(HealthcheckService healthcheckService)   {
     this.healthcheckService = healthcheckService;
 }

 @GET
 public Response healthcheck() {
    List<ConnectorState> states = healthcheckService.getStates();
    if (states.stream().anyMatch(state -> !state.isHealthy())) {
       return Response.status(Status.INTERNAL_SERVER_ERROR).build();
    }
    return Response.status(Status.OK).build();
 }

}

实现服务中实际健康检查的逻辑:
public class HealthcheckService {
  private final ConnectClusterState clusterState;

  public HealthcheckService(ConnectClusterState clusterState) {
    this.clusterState = clusterState;
  }

  public List<ConnectorState> getStates() {
    return clusterState.connectors()
              .stream()
              .map(this::getConnectorState)
              .collect(toList());
  }

  private ConnectorStatus getConnectorState(String connectorName) {
    ConnectorHealth connectorHealth = clusterState.connectorHealth(connectorName);
    if (connectorHealth.connectorState().state() != "RUNNING") {
       return ConnectorState.UNHEALTHY);
    }

    if(connectorHealth.tasksState()
         .entrySet()
         .stream()
         .anyMatch(this::isTaskNotRunning)) {
       return ConnectorState.UNHEALTHY;  
// decision time
    }

    return ConnectorState.HEALTHY;
  }

  private boolean isTaskNotRunning(Entry<Integer, TaskState> entry){
      return entry.getValue().state() !=
"RUNNING";
  }
}

ConnectorState :

enum ConnectorState {
    HEALTHY,
    UNHEALTHY;

    public boolean isHealthy() {
        return this == HEALTHY;
    }
}

  • 构建并将jar放到插件文件夹中(针对部署)。
  • 将此添加到你的配置中以激活该扩展:

rest.extension.classes=my.kafkaconnect.extension.HealthcheckRestExtension

这是最极端、最简单、也是最不理想的实现,即使只有一个任务失败,你也会把kafka-connect报告为不健康的。当你开始开发和实验一个连接器时,这是一个很好的起点。然而,一旦你有多个连接器,每个都有多个任务,一个更好的处理方式是首先尝试恢复单个任务和连接器。

class ConnectorStatus {
   final String name;
   final ConnectorState state;
   final Set<Integer> unhealthyTasks;
   public ConnectorStatus(String name, ConnectorState state, Set<Integer> unhealthyTasks) {
      this(name, state, Collections.emptySet());
   }
  
   public ConnectorStatus(String name, ConnectorState state, Set<Integer> unhealthyTasks) {
      this.name = name;
      this.state = state;
      this.unhealthyTasks = unhealthyTasks;
   }
}

然后,扩展本身或其他服务(不仅是通常会使用的编排器或调度程序)可以使用健康检查来触发重启和警报。如果连接器或任务不正常,您可以在扩展程序中分离一个线程以命中重新启动的端点并在发出警报之前重试 N 次。
只有在所有连接器都关闭并且发生了可怕的事情时,才会在这种情况下响应“不健康”。如果所有 N 次自我修复尝试都失败并且您收到警报,您可能需要以某种方式进行干预。

Spring Gotcha
如果您使用 Spring HealthIndicator,请确保您报告此健康状况的任何内容都可以“读取”响应,因为您可以在其中添加世界上的所有详细信息并报告任何状态,响应代码将为 200。如果编排或其他系统只依赖于您回复 OK 表示健康,5XX 表示不健康,然后确保添加此配置:

management:
  endpoint:
    health:
       status:
         http-mapping:
           UP:  200
           DOWN: 503 // or whatever fits


为什么要跟踪Consumer Lag
消费者滞后实际上是消费者组最后提交的偏移量与最后可用于读取的偏移量之间的差异。如果数据的生产速度远远超过它的消费速度,或者消费者在处理传入消息时遇到问题,则消费者组将滞后。
这可以用作性能指标。如果您已就数据从源到达目的地的速度达成了 SLO(服务水平目标)并且没有得到满足,那么快速查看消费者滞后警报或仪表板将告诉您管道中的哪个应用程序是有罪的参与者。
另一个潜在的用例是,如果您有应用程序无法及时处理的数据突然涌入。这可能与季节性有关,或者其中一项业务是业务的季节性,或者它受到一次性事件的影响。在这些情况下,如果没有计划,您会注意到滞后图中的峰值,您可能需要手动更改配置并扩展受影响组中的消费者。
此外,延迟可能是处理中存在错误的症状,只有在应用程序承受更大压力时才能发现该错误。这将非常危险,因为应用程序正在积极处理,但速度很慢,但取决于错误的类型,它可能无法正确处理。
此外,该应用程序甚至可以“死”但错误地报告正常,这就是我谈论健康检查的原因。这不是最令人兴奋的任务,但没有人希望生产中的僵尸应用程序。

以编程方式跟踪消费者滞后
如果您有 Kafka 的内部部署并且需要开发自己的指标报告服务,那么您可以通过AdminClientAPI 以编程方式完成。
这是 Kotlin 中最压缩的示例,不关心异常处理和返回格式,只是为了演示这个想法:

// create 'adminClient' and 'consumer'
fun getLags(): Map<String, List<Pair<TopicPartition, Long>>> {   
    adminClient.listConsumerGroups().valid().get()
      .map { it.groupId() }
      .associateWith { groupId ->
         adminClient.listConsumerGroupOffsets(groupId)
               .partitionToOffsetAndMetadata().get()
               .map{ (topicPartition, offsetMetadata) ->
                  topicPartition to getEndOffset(topicPartition) - offsetMetadata.offset()
               }
       }
}
fun getEndOffset(topicPartition: TopicPartition): Long {
    consumer.assign(listOf(topicPartition))
    consumer.seekToEnd(listOf(topicPartition))
    return consumer.position(topicPartition)
}

跟踪消费者滞后 - CLI
这更像是一种局部的、间歇性使用的解决方案,但值得一提的是:

kafka-consumer-groups \ 
  --bootstrap-server localhost:9092 \ 
  --group my_group \ 
  --describe


这将返回以下格式的信息:

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID

还有更多与数据管道的整体健康状况相关的指标,但这两个指标应该是您设置的前两个指标。