避免踩坑:如何防止Apache Kafka丢失数据? Kafka中那些为追求高可用性而忽视的高可靠性配置参数 - SoftwareMill技术博客


讨论Kafka中最重要的配置,从而能防止Kafka中的数据丢失。

生产者消息确认机制
这是生产者级别的超重要配置。根据文档,消息确认acks属性是:
生产者客户端要求kafka集群中选举的领导者在确认请求完成之前已收到的确认数。

可能的值是:0,1,all及-1(其等同于all)。
这是什么意思?
如果将此值设置为“ 0”,则表示“发完消息即忘记”。生产者客户端只管闷头继续发送消息,无视kafka是否能够保存它发送的消息。当每个消息都很重要时,这绝对不是一个好主意。
将其设置为“ 1”意味着kafka集群能够保留消息时,生产者客户端才继续发送消息。听起来好多了,但可以想象一下当Kafka集群选举的领导者服务器节点崩溃了,并且尚未将消息复制到集群中其他备份节点时,这些消息也会丢失。
属性值“ all”表明它可以确保消息在所有备份节点上都持久存在,但是事情更加复杂,实际上,这意味着它是在同步的所有备份节点上写入操作。这令人困惑,并且可能是一个陷阱。稍后会更多。无论如何,“全部”(或“ -1”)是最安全的设置。

生产者重试机制
通过retries设置,可以微调在发送失败时,生产者应尝试将消息发送给kafka多少次。默认值为2147483647,其为最大整数。
听起来很简单?好吧,就像在许多其他Kafka设置中一样,这里也是一个陷阱。实际上有两个陷阱。
首先,还有另一个设置delivery.timeout.ms可以设置所有重试的超时时间。因此,如果retriesa是一个很大的数字,但超时很小,则消息传递仍然会失败。
其次,如果消息的顺序很重要,则将max.in.flight.requests.per.connection设置为1 至关重要。将其设置为大于1的值可能导致消息重新排序。更多关于这在优秀文章,其解释的细节问题

复制写入
Kafka提供复制。在某个节点失败的情况下,它可以成为“救命稻草”。有几件重要的事情要记住:

  • 复制是在分区级别完成的,
  • 每个分区都有一个领导者和一个或多个关注者,
  • 追随者从领导者那里获取数据,
  • 如果生产者acks设置为all—在同步所有副本备份节点上保留消息后,生产者将被确认。

这些事实得出重要结论:
  • 如果您在多个站点(例如,多个可用性跨机房的区域)中的kafka中设置了高复制级别,则所有关注者获取消息都需要一些时间。
  • 如果领导者在收到消息之后但在追随者获取此消息之前失败,则消息将丢失。重播此消息变成生产者客户端的责任。这就是acks设置如此重要的原因。

可以使用以下命令配置所有主题的默认复制因子,default.replication.factor请注意,默认设置为1

创建主题时,也可以为每个主题设置复制因子。如果要为特定主题设置不同的设置,则最好将其设置auto.create.topics.enable为false并使用脚本创建主题。

复制-特殊情况
复制还有2个其他设置:offsets.topic.replication.factor和transaction.state.log.replication.factor
这些是“特殊”主题的代理设置:第一个存储的消费者客户端offsets偏移量,而第二个存储的是事务明细。请记住,特殊主题的设置不能使用常规主题的默认设置。

最小化同步副本
如前所述,生产者acks属性确定Kafka集群何时应确认该消息,并且all设置更加安全。在这种情况下,“all”一词是什么意思?这意味着all in-sync replicas。

min.insync.replicas 意味:
当生产者将acks设置为“ all”(或“ -1”)时,min.insync.replicas指定必须确认写入才能使成功写入的最小备份副本数量。如果无法满足此最小值,则生产者将引发异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。
考虑场景:我们设置acks=all和min.insync.replicas=1(这是默认设置!)。网络不稳定,只有领导者处于同步状态(例如,其他kafka节点失去了与Zookeeper的连接)。生产者将写入消息,并根据min.insync.replicas配置确认了。因为网络不稳定,领导者节点无法与其他备份节点正常通讯,这条消息只保存在领导者节点上,这意味着该消息将永远不会复制到其他备份节点并丢失。
这种情况似乎是不现实的,但这是一个真实的例子。

min.insync.replicas最小安全设置为2

默认值是1,会很危险,很容易忘记对其进行更改。

在min.insync.replicas上配置的代理将成为所有新的主题(你可以每个主题进行配置)的默认。
同样,事务主题不使用此设置,它有自己的:transaction.state.log.min.isr。

不干净的领导人选举
设置unclean.leader.election.enable为false,可以防止kafka集群节点(如果不在ISR列表中)却成为领导者。

为什么?考虑一个场景:

  1. 您有3个kafka集群节点,节点broker_1是一个领导者。
  2. 节点broker_3 由于某种原因脱机。
  3. 节点broker_1从ISR列表中删除它。
  4. 生产者继续其工作,并写一些消息。
  5. 在broker_1和broker_2同时下线。
  6. 在broker_3复苏和再次在线,它成为了一个领导者。
  7. 在broker_2复苏和开始跟随broker_3。

这是什么意思?broker_3离线时,broker_1存储和确认的消息都会丢失。

消费者自动提交
当消费者客户端收到一些消息时,必须“告诉” Hello, I already got this message, please don't give it to me again if I ask for new messages!!,方法是提交偏移量。可以手动或自动完成。
如果将enable.auto.commit设置为true的偏移量,则会在后台定期提交。因此,您无法控制何时发送偏移,即使在处理消息之前也可以发送偏移。容易想象如果消费者失败了会发生什么,它不会获得已经比确认的偏移量更“旧”的消息,并且这些消息会丢失。
(实际上,这有点复杂-偏移量存储的不是针对特定的消费者实例,而是针对消费者组,但是现在让我们简化一下)。
如果您手动提交偏移量而使用者在处理消息之后且提交偏移量之前失败,会发生什么情况?是的,该消息被处理两次,至多一次传递。如果唯一性很重要,则必须在处理过程中对消息进行重复数据删除,但这是另一个故事的主题。(banq注:收件箱模式)

消息未同步到磁盘
当所有kafka经纪人节点都确认消息后会发生什么?这是否意味着它已经存储在磁盘上了?其实没有。
这只意味着它保存在经纪人的内存中。这是个问题吗?是的,当所有经纪人同时失败时,这可能是一个问题。所有经纪人都在同一个可用区中,这是最糟糕的做法。
通常,是操作系统决定将消息刷新到磁盘的时间,但是可以通过在代理级别设置log.flush.interval.messages 或 log.flush.interval.ms,或主题级别上设置flush.messages.flush.ms。
例如,设置flush.messages=1将导致将每条消息写入磁盘。您可以想象它会对性能产生重大影响,因此在进行操作之前请三思。
情况甚至更糟:如果将数据同步到磁盘,则可以将其存储在磁盘缓存中。但是几乎不可能同时破坏所有代理中的所有磁盘的情况。另一方面,如果群集设计不良,并且所有代理都位于同一可用性区域中,则电源故障可能会导致数据丢失。

总结
Kafka是一个分布式系统,如果配置正确,可以超级持久。一些避免踩坑要点:
生产者设置

  • acks=all
  • retries=2147483647
  • delivery.timeout.ms=2147483647

经纪人设定
  • unclean.leader.election.enable=false
  • default.replication.factor=3
  • min.insync.replicas=2
  • offsets.topic.replication.factor=3
  • transaction.state.log.replication.factor=3
  • transaction.state.log.min.isr=2

消费者设置
  • enable.auto.commit=false