在使用Kafka+微服务发送聚合的领域事件时如何在错误重试时保证顺序?- datadriveninvestor


Apache Kafka已成为跨微服务异步通信的领先平台。它具有强大的功能,可让我们构建健壮的,有弹性的异步体系结构。
同时,我们需要预料到潜在的陷阱。如果无法提前识别出可能(不,将要发生)的问题,将使我们面临易于出错和数据损坏的系统。
在本文中,我们将重点介绍这样的陷阱:处理消息的失败尝试。首先,我们需要意识到消息使用可能会失败。其次,我们需要确保以不会导致更多问题的方式处理此类故障。
 
跨有界上下文传递的消息
当我们刚开始构建微服务时,我们许多人最初采用了集中式模型。每条数据都有一个驻留的单一微服务(即单一事实来源)。如果任何其他微服务需要访问该数据,将使用同步调用以检索它。
这种方法导致了许多问题,包括同步呼叫链较长,单点故障,团队自主权下降等。
最终,我们学得更好。在当今成熟的体系结构中,我们将通信分为命令处理和事件处理
命令处理通常在单个Bounded Context中执行,并且通常仍涉及同步通信。
另一方面,事件通常由一个有界上下文中的服务发出,并异步发布到Kafka,以供其他有界上下文中的服务使用。
例如,以用户有界上下文为例。我们的用户团队将构建负责启用新用户,更新现有用户帐户等的应用程序和服务。
创建或修改用户帐户后,UserAccount服务会将相应的事件发布到Kafka。其他感兴趣的有界上下文可以使用该事件,将其存储在本地,使用其他数据进行增强,等等。例如,我们的登录有界上下文可能想知道用户的当前名称,以便在登录时向他们问候。
我们将这种用例称为跨边界事件发布。
 
聚合重要性 
在执行跨边界事件发布时,我们应该发布Aggregates。聚合是独立的实体组,每个实体都被视为一个单独的原子实体。每个聚合都有一个“根”实体,以及一些提供附加数据的从属实体。
在我们的示例中,我们可以定义一个UserAccount Aggregate。它的根实体可能是用户实体(包含全局唯一ID,名字和姓氏,出生日期等)。它也可能包含代表联系信息的实体(EmailAddress,PhoneNumber等)。重要的是,聚合将采用用户实体的全局唯一ID作为其自己的ID。
当我们的UserAccount发布消息时,该消息的有效负载将是UserAccount Aggregate的某种表示形式(例如JSON或Avro)。重要的是,该服务将指定UserAccount的唯一标识符作为Kafka的分区键。这将确保对任何给定UserAccount实体的更改都将发布到Kafka的同一分区。
我们很快就会看到为什么这是如此重要。
 
发送问题怎么办?
尽管跨Kafka的跨边界事件发布显得相当优雅,但这是一个分布式系统。因此,可能会有很多错误。我们将重点关注最常见的令人烦恼的问题:使用者可能无法成功处理它所消费的消息。
默认情况下,如果使用者没有成功使用一条消息(即,该使用者不能提交当前偏移量),它将重试同一条消息。因此,难道我们不能简单地让这种默认行为接管一切,然后重试消息直到成功吗?
问题是该消息可能永远不会成功。至少,并非没有某种形式的手动干预。然后,消费者将永远不会继续处理任何后续消息,并且我们的消息处理将陷入困境。
我们通常允许同步请求失败。例如,对我们的UserAccount服务进行的“创建用户” POST可能包含错误或丢失的数据。在这种情况下,我们可以简单地返回一个错误代码(例如HTTP 400),然后要求调用方重试。
虽然不理想,但这不会对我们的数据完整性造成任何持久的问题。该POST表示一个命令,尚未发生。即使我们让它失败,我们的数据也将保持一致状态。
当我们删除消息时,情况并非如此。消息表示事件已经已经发生。任何忽略事件的使用者都将与产生事件的上游服务永久不同步。
这就是说,我们不想丢弃消息。
 
流行方案:设立Kafka的重试Topic主题
您会发现更流行的解决方案之一涉及重试主题的概念。具体细节因实现而异,但是总体概念是这样的:

  1. 使用者尝试使用主要主题中的消息。
  2. 如果未能正确使用该消息,则使用者将消息发布到第一个重试主题,然后提交消息的偏移量,以便可以继续处理下一条消息。
  3. 订阅重试主题的是重试使用者,它包含与主要使用者相同的逻辑。该使用者在消息使用尝试之间引入了短暂的延迟。如果此使用者也无法使用该消息,则会将消息发布到第二个重试主题,并提交消息的偏移量。
  4. 这继续进行,并增加了一些重试主题和使用者,每个主题都有越来越多的延迟(用作退避策略)。最终,在最终重试使用者无法处理消息之后,该消息将发布到死信队列(DLQ)中,在此该消息将由工程团队手动进行分类。

从概念上讲,重试主题模式定义了失败的消息将被分流到的多个主题。如果“主主题”的使用者使用了无法处理的消息,它将将该消息发布到“重试主题1”并提交当前偏移量,从而将自身释放给下一条消息。重试主题的使用者将是主要使用者的副本,但如果它无法处理消息,它将发布到新的重试主题。最终,如果最后一个重试使用者无法处理该消息,它将把消息发布到死信队列(Dead Letter Queue,DLQ)。
起初,这种方法似乎很合理。实际上,它对于许多用例都可以正常工作。问题在于它不能满足通用的解决方案。实际上,存在一些用例,例如我们的跨边界事件发布,这种方法实际上将是危险的。
 
流行方案的问题是什么?
第一个问题是,它没有考虑到事件使用失败的两个原因之一:可恢复错误和不可恢复错误。
可恢复的错误是那些错误,如果我们重试了多次,这些错误最终将得以解决。一个简单的示例是将数据保存到数据库的使用者。如果数据库暂时不可用,那么当下一条消息通过时,使用者将失败。一旦数据库再次变得可用,则使用者将能够再次处理该消息。
另一种看待它的方式是:可恢复错误是那些根本原因在消息和使用者外部的错误。解决该错误后,我们的消费者将继续前进,好像没有发生任何错误。(这是造成混乱的常见原因。“可恢复”一词并不意味着应用程序本身(在我们的情况下为消费者)可以进行恢复。相反,某些外部资源(在此示例中为数据库)会失败并最终恢复。)
关于可恢复错误的事情是,它们将困扰主题中的几乎每条消息。回想一下,主题中的所有消息都应遵循相同的架构,并代表相同类型的数据。同样,我们的消费者将针对该主题的每个事件执行相同的操作。因此,如果消息A由于数据库中断而失败,那么消息B,消息C等也将失败。
不可恢复的错误是无论我们重试多少次都将失败的错误。例如,消息中缺少字段可能会导致NullPointerException。否则包含特殊字符的字段可能会使消息无法解析。
与可恢复错误不同,不可恢复错误通常会影响单个孤立消息。例如,如果仅消息A包含不可解析的特殊字符,则消息B将成功,消息C等也将成功。
与可恢复错误不同,解决不可恢复错误也意味着必须修复消息消费使用者本身(永远不要“修复”消息本身-它们是不可变的记录!)例如,我们可能会修复消息消费使用者,以便正确处理空值,然后重新部署它。
那么,这与重试主题解决方案有什么关系?
对于初学者来说,它对于可恢复的错误不是特别有用。请记住,在解决外部问题之前,可恢复的错误将影响每条消息,而不仅仅是当前的一条消息。因此,可以肯定的是,将失败的消息分流到重试主题将为下一条消息扫清障碍。但是,该信息也将失败,下一个以及下一个也将失败。我们最好还是让消费者自己重试,直到问题解决。可恢复的错误由消费消费使用者自己重试。
不可恢复的错误呢?通过重试队列可以在这些情况下提供帮助。如果一条麻烦的消息阻止了所有后续消息的使用,那么毫无疑问地分流该消息肯定会阻止我们的用户使用(当然,有多个重试主题将变得多余)。
但是,虽然重试队列可以帮助解除受不可恢复错误困扰的消息的使用者,但它也可能带来更多隐患。
 
重试队列可解决不可恢复错误,但是忽略了消息事件的顺序
让我们简要回顾一下跨边界事件发布的一些重要方面。在边界上下文中处理命令后,我们将相应的事件发布到Kafka主题。重要的是,我们将聚合的ID指定为分区键。
为什么这很重要?它确保对任何给定聚合的更改始终发布到同一分区。
好吧,那为什么如此重要?当事件发布到同一分区时,可以保证事件按照发生的顺序进行处理。如果连续对同一聚合进行更改,并且所产生的事件发布到不同的分区,则可能发生竞争状况,其中消费者在第一个更改之前消耗了第二个更改。这导致数据不一致。
让我们举个简单的例子。我们的用户有界上下文提供了一个允许用户更改其名称的应用程序。用户将其名字从Zoey更改为Zoë ……然后立即将其更改为Zoiee。如果我们不在乎顺序,则下游使用者(例如,登录有界上下文)可能会先处理对Zoiee的更改,然后不久用Zoë覆盖它。
现在,登录数据已与我们的用户数据不同步。此外,此后Zoiee每当她登录到我们的网站时会看到“欢迎光临,Zoë!” 。
这是重试主题的真正问题。它们使我们的消费者容易受到混乱事件的处理。如果使用者在处理Zoë更改时受到临时数据库中断的影响,它将把该消息分流到重试主题,稍后再尝试。如果数据库中断是由时整流Zoiee变化出现,那么该消息将被成功处理第一,后来被改写Zoë的变化。
为了清楚起见。实际上,乱序处理事件可能导致各种数据损坏问题。更糟糕的是,这些问题起初很少被注意到。取而代之的是,它们通常会导致数据损坏,这种损坏在一段时间内仍未引起注意,但随着时间的流逝而增长。通常,当我们意识到发生了什么事时,大量数据就会受到影响。
 
重试主题队列什么时候可行?
需要明确的是,这并不是普遍的错误模式。当然,有一些适合的用例。特别地,当消费者的工作是收集不可修改的记录时,此模式适用。这样的示例可能包括:
  • 处理网站事件流以生成报告的消费者
  • 将事务交易添加到分类账的消费者(只要不需要按特定顺序跟踪这些交易)
  • ETL正在从另一个数据源获取数据的使用者

那些和其他类似的消费者可能会从重试主题模式中受益,而没有数据损坏的风险。
即使有这种用例,我们仍应谨慎行事。构建这样的解决方案既复杂又耗时。因此,作为一个组织,我们不想为每个新消费者编写一个新的解决方案。取而代之的是,我们要创建一个解决方案-库,容器等-可以在各种服务之间重用。
其中还存在其他危险。我们可能会为相关消费者建立一个重试主题的解决方案。不幸的是,不久之后,该解决方案便进入了跨边界事件发布用户。拥有这些消费者的团队可能没有意识到风险。正如我们前面所讨论的,在发生重大数据损坏之前,他们可能不会看到任何问题。
因此,在实施重试主题解决方案之前,我们应100%确定以下任一情况:
  • 我们的业务是这样,我们永远不会有消费者来更新现有数据,或者
  • 我们拥有严格的控制措施,以确保我们的重试主题解决方案不会在此类消费者中实施

 
关键:跨边界发布事件情况下如何解决?
鉴于重试主题队列模式对于跨边界事件发布使用者不是可接受的解决方案,那么什么是可接受的解决方案?
  • 消除错误类型

对于初学者来说,如果我们在可恢复错误和不可恢复错误之间进行歧义消除,我们的生活将会更加轻松。如果我们的使用者开始遇到可恢复的错误,那么重试主题就变得多余了,我们可以避免使用它们。
因此,我们的第一步应该是确定我们遇到的错误类型:

void processMessage(KafkaMessage km) {
  try {
    Message m = km.getMessage();
    transformAndSave(m);
  } catch (Throwable t) {
    if (isRecoverable(t)) {
      // ...
    } else {
     
// ...
    }
  }
}

在上面的伪Java代码示例中,isRecoverable()当确定t是否表示可恢复错误时将采用白名单方法。换句话说,它检查t以确定它是否与任何已知的可恢复错误(例如,SQL异常或ReST客户端超时)相匹配,如果匹配,则返回true。否则,它返回false。这将有助于防止我们的用户无休止地阻止不可恢复的错误。
请注意,我们总是可以从一小列已知的可恢复错误开始。随着经验的增加,我们可以随着时间的推移完善列表。
  • 在消息使用者内部重试可恢复的错误

如果错误可恢复,该怎么办?正如我们刚才所讨论的,将消息存储到一个单独的主题中毫无意义。我们只会为下一条消息失败扫清道路。相反,消费者可以简单地重试,直到条件恢复。
当然,可恢复的错误意味着外部资源有问题。我们将资源与请求一起使用将无济于事。因此,我们想对重试应用退避策略。我们的伪Java代码现在可能看起来像这样:

void processMessage(KafkaMessage km) {
  try {
    Message m = km.getMessage();
    transformAndSave(m);
  } catch (Throwable t) {
    if (isRecoverable(t)) {
      doWithRetry(m, Backoff.EXPONENTIAL, this::transformAndSave);
    } else {
      // ...
    }
  }
}

  • 确保在不可恢复的错误期间聚合保持顺序

另一方面,如果我们的用户遇到一条带有不可恢复错误的消息,那么我们将希望快速隐藏该消息并释放后续的消息。稍后,我们将解决问题(通常通过修复并重新部署使用者),然后通过使用者重新运行该消息。
因此,我们将设置一个主题,将这些失败的消息发布到该主题。由于我们不会立即重试消息,因此我们将其称为stash主题。
在决定如何使用隐藏主题之前,让我们重新讨论顺序问题。我们将重用我们先前的“用户/登录”示例。尝试处理Zoë名称中的ë字符时,登录有界上下文中的消息消费使用者可能会遇到错误。
使用者将其识别为不可恢复的错误,将消息放在一边,然后继续处理后续消息。不久之后,消费者将获得Zoiee消息并成功处理它。
当天晚些时候,我们的团队会修复使用者,以便其可以正确处理特殊字符并进行重新部署。然后,我们将Zoë消息重新发布给使用者,我们的使用者现在可以正确处理该消息。
因此,当用户有界上下文将用户视为Zoiee时,登录有界上下文将她称为Zoë。
那么,我们如何在保持正确顺序的同时仍允许我们的消费者成功处理好消息呢?这时候将聚合作为消息传递如此重要的原因。因为我们不需要保持所有消息顺序,所以只需要将消息与单个Aggregate关联即可。
  • 聚合ID存储列表

我们的消费者将跟踪已储存的任何消息的聚合ID(我们将其称为“储存列表”)。在尝试处理任何消息之前,我们的消息消费使用者将查读存储清单(stash list)。如果存储列表中已包含该消息的“聚合ID”,则也将立即将该消息发布到另外一个为存储列表设立的主题topic。我们这样做是因为:这意味着我们跳过了对本来没有错误的消息的处理,也将确保与同一聚合相关联的消息保持有序。
例如,假设我们的Zoë / Zoiee用户的ID为123。消费Zoë消息时,我们的消息消费使用者将执行以下操作:
  • 检查ID 123的存储清单。
  • 在存储列表中未找到123后,尝试处理该消息,但失败,并显示不可恢复的错误。
  • 将123添加到存储列表中。
  • 将消息发布到专门的kafka的存储主题topic,然后提交消息的偏移量。

当Zoiee消息很快到达时,消费者将立即将其发布到与存储列表相关的kafka存储主题topic,因为其聚合ID现在存在于存储列表中。但是,任何其他消息将由消费者正常处理。
  • 重试失败的消息

当然,我们最终将需要重试存储主题中的消息。由于我们的消费使用者无法在当前状态下处理消息,因此需要人工干预。因此,适当的警报是我们解决方案的一部分,一旦将消息发布到存储主题,就会立即触发。这将通知我们的团队,他们将需要调查为什么无法处理该消息。
我们的错误日志将告诉我们,我们的消费者无法处理特殊字符。与消费者解决问题后,我们会将固定版本部署到生产环境中。
现在,我们需要将存储主题队列的消息路由到固定使用者:
  • 我们将部署使用者的固定版本,将其配置为从存储主题而非主要主题队列中读取消息进行消费。
  • 在消费之前,将清除消费者的存储清单。
  • 消费者将在存储主题中处理消息。
  • 一旦处理完所有隐藏消息,它将从主要主题恢复正常使用。

我们需要考虑的最后一件事是,在使用存储主题时,我们的消费者可能会遇到错误。在这种情况下,其错误处理行为应如我们已经描述的那样:
  • 如果错误是可恢复的,则将重试并重试;
  • 如果错误是不可恢复的,它将存储该消息,提交其偏移量,并继续下一条消息。

因此,我们可以为消费者创建两个重试主题,消费者可以在两个主题之间进行切换。或者,我们的使用者可以将失败的消息重新发布到相同的单个存储主题。这将要求它跟踪偏移量(或其他数据)以指示何时停止处理并返回到主要主题。
 
总结
如果重试的正确处理看起来很复杂,那是因为-特别是当一切顺利时,与Kafka相对优雅的风格相比。我们构建的任何合适的解决方案(无论是重试主题,隐藏主题还是其他解决方案)都将比我们想要的复杂。
不幸的是,如果我们希望在微服务之间建立弹性的异步通信流,那么我们就不能忽略它。但是,如果我们牢记以下几点,便可以构建正确的解决方案:
  • 了解Kafka通过主题,分区和分区键提供的功能。
  • 区分可恢复错误和不可恢复错误。
  • 现在和将来都要清楚我们组织的用例。我们只是在移动独立记录吗?……在这种情况下,我们可能不关心顺序。还是我们正在传播表示数据更改的事件?……在这种情况下,排序至关重要。