Kafka Streams 2.5提供更高的高可用性和交互式查询 - confluent


是什么阻止您将Kafka Streams用作构建应用程序的数据层?毕竟,它具有快速的嵌入式RocksDB存储,可为您处理冗余,具有高度可伸缩性并提供正好一次精确的语义。
推荐博文:Apache Kafka可以替换数据库吗?– 2020年的更新,Kai Waehner在文中解释了如何使用Kafka Streams构建高度可扩展的微服务,本文重点介绍了Kafka Streams到现在为止缺少的一件事:高可用性保证。
分布式系统中的基本挑战之一是在一致性和可用性之间实现微妙的平衡。

一致性与可用性
每个应用程序都要求不同的一致性保证。从一个客户向另一个客户转移资金的财务应用程序必须保持高度一致。相反,如果推荐引擎是以追求高可用性为目标,则会对一致性模型较弱的内容感到可以忍受。
Kafka和Kafka Streams都提供了许多配置来调整应用程序以达到可用性和一致性的平衡。例如,在Kafka中,调整副本,同步副本和确认的数量可为您提供更大范围的可用性和一致性保证。

同样,为了进行查询,对Kafka Streams(直到2.4版)进行了调整,以实现高度一致性。系统中可以有N个副本,但是查询支持仅限于活动分区。这意味着,如果您使用REST API来提供存储在Kafka Streams中的状态,如果其中一个处理活动分区的VM变得不可用,您将有停机时间。发生故障的活动分区仅在通过新的重新平衡重新分配它们时才能够提供查询,并且在发生故障之前将其还原到以前的状态。重选和恢复的过程可能需要几秒钟到几分钟的时间。对于某些对时间敏感的应用程序,这可能是灾难性的,因为无法为驻留在发生故障的分区上的客户提供数据。在沃尔玛,系统不可用构成了巨大的挑战,

我们的挑战:沃尔玛规模的可用性
在沃尔玛,我们已经使用Kafka Streams两年来构建我们的一站式客户数据平台。该平台每天吸收数十亿个交互和事务事件,使用REST API提供派生的知识,并触发订阅特定事件的各种基于客户的机器学习模型。电子商务中的主要用例之一是欺诈检测。
欺诈检测模型是交易流程的一部分,并在每次购物车结帐时触发。因此,向模型提供数据并运行实际模型以识别欺诈的过程必须在几毫秒内完成。用于从平台获取用户历史记录的查询带有非常严格的SLA,因此我们必须优化Kafka流以实现高可用性和低延迟。我们向Kafka Streams添加了一些Kafka改进建议(KIP),从而减少了服务停机时间并帮助我们实现了单位数的服务等待时间。这些改进将在以下各节中介绍。
在云环境中,VM变得不可访问是很常见的,这会导致服务中断。最大的原因之一是云补丁。尽管修补程序对于修复错误和漏洞至关重要,但它们可能会影响系统可用性。单个VM上的补丁程序至少会在应用程序中触发两次重新平衡,一次是在离开集群时,另一次是在补丁程序后重新加入时,这两种操作均会导致API请求失败。对于Walmart,每个VM的补丁大约需要10-15分钟。虚拟机重新加入群集后,将其从群集中删除,在其他位置创建新的备用数据库并选择活动数据库的完整工作就变得徒劳了。
为避免这种情况,一种选择是将session.timeout.ms设置增加到15分钟。该选项的问题是对VM上的活动分区的REST调用失败,该故障已中断。重新平衡过程中可用性的降低导致我们挑战现状,并找到一种方法来在重新平衡过程中或VM发生故障时提高应用程序的可用性。
Kafka 2.5版本中引入了KIP-535,允许状态存储在重新平衡期间提供旧的数据读取,解决了上述挑战。它能够从备用服务器提供查询服务以及热还原活动服务器以在重新平衡期间启用查询的功能。
除了提高可用性之外,从备用服务器启用服务还提供了负载平衡的好处。如果您的应用程序收到对特定分区的异常高的调用,则备用任务可以通过为查询提供服务来减轻负载,而使用此KIP可以实现这一任务。
KIP-535公开的分区信息可用于从其特定分区中获取密钥的数据。不幸的是,存储的内部实现会在实例上所有分区的所有可用存储上进行迭代,效率低下,并且在每次查找过程中会增加可观的延迟。为了改进此流程,我们添加了另一项改进KIP-562

使用KIP-562从特定分区提供服务
允许应用程序开发人员以可以查询特定分区或选择是否查询旧状态的方式编写API。以下代码显示了使用此新KIP的代码段:

//找到活动的备用主机列表和分区
final KeyQueryMetadata keyQueryMetadata = kafkaStreams.queryMetadataForKey(TABLE_NAME, key, (topic, “key”, value, numPartitions) -> 0);

////使用以上信息将查询重定向到包含键“ key”的分区的主机

//key belongs to this partition
final int keyPartition = keyQueryMetadata.getPartition();

//获取key所属的特定分区“ keyPartition”的存储,并查看陈旧的存储
final ReadOnlyKeyValueStore<String,String> store = kafkaStreams
           .store(StoreQueryParameters.fromNameAndType(TABLE_NAME,    queryableStoreType).enableStaleStores().withPartition(keyPartition));                     
String value = store.get(“key”);


使用KAFKA-9169修复备用数据库的不必要恢复
想象一下,您的应用程序进行了重新平衡,并且在重新平衡之后,所有待机任务将使用对他们可用的第一个偏移量进行恢复,即使它们已经恢复了数据并且很热。此过程会大大增加群集中的CPU使用率,网络和磁盘I / O,并会影响不必要的情况。对于Walmart,每个备用VM的网络I / O通常小于10 MB / s,但是由于存在错误,它增加到200 MB / s以上,直到完全还原了备用数据库为止。
这是KAFKA-9169在2.5版中修复的关键错误之一。由于我们已经启用了备用数据库的服务功能,因此这是有害的,因为由于这种不必要的还原,备用数据库无法服务。如果您为变更日志主题启用了保留时间,则偏移量可能不可用,从而导致OffsetOutOfRangeExceptions,并且存储将被删除并重新创建。此错误修复程序还通过停止备用任务中的不必要还原来帮助提高Kafka Streams群集的可用性。

 总结
ksqlDB(Kafka Streams上的SQL抽象)的流行证明了基于事件流数据库的用例数量不断增长。即使经过考验,Kafka Streams和ksqlDB的未来也取决于能否通过提供高可用性和低延迟提供数据来挑战传统数据库的能力。通过使用不同的事件流引擎和数据库来构建单个应用程序,这还将减少开发人员的开销。上面讨论的所有改进旨在使我们进一步朝这一最终目标迈进。
对查询在Kafka Streams中的工作方式感兴趣吗?交互式查询是使用Kafka Streams应用程序提供数据的绝佳起点。要了解ksqlDB如何使用此功能,请查看博客文章:ksqlDB中的高可用性,容错请求查询
​​​​​​​