通过Kafka分布式事务实现微服务数据交换与发件箱模式


作为其业务逻辑的一部分,微服务通常不仅需要更新自己的本地数据存储,而且还需要向其他服务通知发生的数据更改。发件箱模式描述了一种让服务以安全和一致的方式执行这两项任务的方法; 它为源服务提供即时“读取您自己的写入”语义,同时提供跨服务边界的可靠,最终一致的数据交换。
如果你已经构建了几个微服务,你可能会同意最困难的部分是数据:微服务不是孤立存在的,而且往往需要在彼此之间传播数据和数据变化。
例如,考虑一个管理采购订单的微服务:当下新订单时,可能必须将有关该订单的信息转发给货运服务(因此它可以组装一个或多个订单的货件)和客户服务(因此它可以根据新订单更新客户的总贷方余额等事项。
有不同的方法让订单服务了解其他两个关于新采购订单的方法; 例如,它可以调用这些服务提供的一些RESTgrpc或其他(同步)API。但是,这可能会产生一些不希望的耦合:发送服务必须知道要调用哪些其他服务以及在哪里找到它们。它也必须准备好暂时无法使用这些服务。通过提供请求路由,重试,断路器等功能,Istio等服务网络可以在这里提供帮助。
任何同步方法的一般问题是,如果没有它调用的其他服务,一个服务就无法真正运行。虽然缓冲和重试可能有助于仅需要通知某些事件的其他服务,但如果服务实际上需要查询其他服务以获取信息,则情况并非如此。例如,当下订单时,订单服务可能需要从库存服务获得所购买商品库存的次数。
这种同步方法的另一个缺点是它缺乏可重玩性,即新消费者在事件发送后到达并且仍然能够从头开始消费整个事件流的可能性。
这两个问题都可以通过使用异步数据交换方法来解决:即让订单,库存和其他服务通过持久的消息日志(如Apache Kafka)传播事件。通过订阅这些事件流,将通知每个服务有关其他服务的数据更改。它可以对这些事件做出反应,并且如果需要,可以使用针对其自身需求定制的表示在其自己的数据存储中创建该数据的本地表示。例如,这种视图可以被非规范化以有效地支持特定的访问模式,或者它可以仅包含与消费服务相关的原始数据的子集。
持久日志还支持可重复播放性,即可以根据需要添加新的消费者,从而实现您最初可能没有想到的用例,而无需触及源服务。例如,考虑一个数据仓库,该数据仓库应保留有关所有订单的信息,或基于Elasticsearch的采购订单上的一些全文搜索功能。一旦采购订单事件出现在Kafka主题中(Kafka主题的保留策略设置可用于确保事件保留在主题中,只要它对于给定的用例和业务要求是必需的),新的消费者可以订阅,处理主题从一开始就实现了微服务数据库,搜索索引,数据仓库等所有数据的视图。

处理主题增长
根据数据量(记录的数量和大小,变化的频率),将事件长时间或甚至无限期地保留在主题中可能是也可能不可行。通常,在给定时间点之后,与给定数据项(例如,特定购买订单)有关的一些或甚至所有事件可能有资格从商业角度删除。请参阅下面的“从Kafka主题中删除事件”框,了解有关从Kafka主题中删除事件的更多想法,以便将其大小保持在范围内


双重写的问题
微服务为了提供它们的业务功能,微服务通常具有它们自己的本地数据存储。例如,订单服务可以使用关系数据库来持久保存关于采购订单的信息。放置新订单时,这可能会导致服务数据库INSERT中的表PurchaseOrder中的操作。同时,服务可能希望向Apache Kafka发送有关新订单的事件,以便将该信息传播给其他感兴趣的服务。
但是,简单地发出这两个请求可能会导致潜在的不一致。原因是我们不能拥有一个跨越服务数据库和Apache Kafka的共享事务,因为后者不支持在分布式(XA)事务中加入。因此,在不幸的情况下,我们最终可能会在本地数据库中保留新的采购订单,但没有将相应的消息发送给Kafka(例如由于某些网络问题)。或者,反过来说,我们可能已将消息发送给Kafka但未能在本地数据库中保留采购订单。两种情况都是不可取的 这可能导致无法为看似成功下单的订单创建货件。或者货物被创建,
那么如何避免这种情况呢?答案是只修改两个资源(数据库或Apache的卡夫卡)中一个,然后以最终一致的方式驱动第二个的更新。
让我们首先考虑只写入Apache Kafka的情况。
当收到新的采购订单时,订单服务不会INSERT同步进入其数据库; 相反,它只会向Kafka主题发送描述新订单的事件。因此,一次只能修改一个资源,如果出现问题,我们会立即发现它并向订单服务的调用方报告请求失败。
同时,服务本身将订阅该Kafka主题。这样,当新消息到达主题时它将被通知,并且它可以在其数据库中保留新的采购订单。
但是,这里有一个微妙的挑战,那就是缺乏“读你自己的写入数据”语义:例如,我们假设订单服务还有一个API,用于搜索给定客户的所有采购订单。在放置新订单后立即调用该API时,由于处理来自Kafka主题的消息的异步性质,可能会发生采购订单尚未保留在服务的数据库中,因此该查询不会返回该订单。这可能导致非常混乱的用户体验,因为用户例如可能错过他们的购物历史中新放置的订单。
有办法处理这种情况,例如,服务可以将新放置的采购订单保留在内存中并基于此数据响应后续查询。尽管在实现更复杂的查询或考虑订单服务可能还包括群集设置中的多个节点时,就需要在群集内传播该数据。
现在,我们看看另外一个方式,只是同步写入数据库并基于此驱动向Apache Kafka导出消息的情况会怎样?这是发件箱模式的用武之地。

发件箱模式
这种方法的想法是在服务的数据库中有一个“发件箱”表。当接收到下订单的请求时,不仅INSERT进入PurchaseOrder表中,而且,作为同一事务的一部分,还将表示要发送的事件的记录插入该发件箱表中。
该记录描述了服务中发生的事件,例如它可能是一个JSON结构,表示已经放置了新的采购订单,包括订单本身的数据,订单行以及上下文信息(如使用情况)案例标识符。通过通过发件箱表中的记录显式地发出事件,可以确保以适合外部消费者的方式构造事件。这也有助于确保事件使用者在例如更改内部域模型或PurchaseOrder表时不会中断。
异步进程监视该表以查找新条目。如果有,它会将事件作为消息传播到Apache Kafka。这为我们提供了非常好的特性平衡:通过同步写入PurchaseOrder表,源服务受益于“读取您自己的写入”语义。一旦提交了第一个交易,后续的采购订单查询将返回新的持久订单。与此同时,我们通过Apache Kafka获得可靠,异步,最终一致的数据传播到其他服务。
现在,发件箱模式实际上并不是一个新想法。它已经使用了相当长的一段时间。实际上,即使使用实际上可以参与分布式事务的JMS样式的消息代理,也可以避免任何耦合以及远程资源(如消息代理)的停机时间的潜在影响。您还可以在Chris Richardson优秀的microservices.io网站上找到该模式的描述。
然而,该模式得到的关注远远少于它应得的,并且在微服务环境中尤其有用。正如我们所看到的,可以使用变更数据捕获和Debezium以非常优雅和有效的方式实现发件箱模式。在下面,让我们探讨如何。

基于变更数据捕获的实现
基于日志的变更数据捕获(CDC)非常适合捕获发件箱表中的新条目并将其流式传输到Apache Kafka。与任何基于轮询的方法相反,事件捕获在近实时中以非常低的开销发生。Debezium附带了几个数据库的CDC连接器,如MySQL,Postgres和SQL Server。以下示例将使用Postberes的Debezium连接器
您可以在GitHub上找到该示例的完整源代码。有关构建和运行示例代码的详细信息,请参阅README.md。该示例以两个微服务,订单服务发货服务为中心。两者都是用Java实现的,使用CDI作为组件模型,使用JPA / Hibernate访问各自的数据库。订单服务在WildFly上运行,并公开一个简单的REST API,用于下订单和取消特定订单行。它使用Postgres数据库作为其本地数据存储。装运服务基于Thorntail; 通过Apache Kafka,它接收订单服务导出的事件,并在自己的MySQL数据库中创建相应的货件条目。Debezium对订单服务的Postgres数据库的事务日志(“预写日志”,WAL)进行了定制,以便捕获发件箱表中的任何新事件并将它们传播到Apache Kafka。
解决方案的整体架构如下图所示:

请注意,该模式与这些特定的实现选择无关。使用Spring Boot(例如利用Spring Data 对域事件支持),普通JDBC或除Java之外的其他编程语言等替代技术同样可以实现。
现在让我们仔细看看解决方案的一些相关组件。

发件箱表
该outbox表位于订单服务的数据库中,具有以下结构:

Column     |          Type   | Modifiers
--------------+------------------------+-----------
id        | uuid             | not null
aggregatetype | character varying(255) | not null
aggregateid  | character varying(255) | not null
type       | character varying(255) | not null
payload     | jsonb            | not null


它的列是这些:

  • id:每条消息的唯一ID; 消费者可以使用它来检测任何重复事件,例如在故障后重新启动以读取消息时。在创建新事件时生成。
  • aggregatetype:与给定事件相关的聚合根的类型; 理念是,依赖于领域驱动设计的相同概念,导出事件应该引用聚合(“可以被视为单个单元的域对象集群”),其中聚合根提供唯一的入口点用于访问聚合中的任何实体。例如,这可以是“采购订单”或“客户”。
    此值将用于将事件路由到Kafka中的相应主题,因此会有与采购订单相关的所有事件的主题,所有与客户相关的事件的一个主题等。请注意,还包含与子实体相关的事件一个这样的聚合应该使用相同的类型。因此,例如,表示取消单个订单行(它是采购订单汇总的一部分)的事件也应该使用其聚合根的类型“订单”,以确保此事件也将进入“订单”Kafka主题。
  • aggregateid:受给定事件影响的聚合根的id; 例如,这可以是采购订单的ID或客户ID; 与聚合类型类似,与聚合中包含的子实体相关的事件应使用包含聚合根的id,例如订单行取消事件的采购订单ID。此ID将在以后用作Kafka消息的密钥。这样,与一个聚合根或其任何包含的子实体相关的所有事件都将进入该Kafka主题的同一分区,这将确保该主题的使用者将消耗与该主题中的同一聚合相关的所有事件。生产时的确切顺序。
  • type:事件类型,例如“订单已创建”或“订单行已取消”。允许消费者触发合适的事件处理程序。
  • payload:具有实际事件内容的JSON结构,例如包含采购订单,有关购买者的信息,包含的订单行,其价格等。

将事件发送到发件箱
为了“发送”事件到发件箱,订单服务中的代码通常只能INSERT进入发件箱表。但是,最好采用稍微抽象的API,如果需要,可以在以后更轻松地调整发件箱的实现细节。CDI活动非常方便。它们可以在应用程序代码中引发,并由发件箱事件发送者同步处理,它将INSERT在发件箱表中执行所需操作。
所有发件箱事件类型都应实现以下合同,类似于之前显示的发件箱表的结构:

public interface ExportedEvent {

    String getAggregateId();
    String getAggregateType();
    JsonNode getPayload();
    String getType();
}

为了产生这样的事件,应用程序代码使用注入的Event实例,例如在OrderService类中:

@ApplicationScoped
public class OrderService {

    @PersistenceContext
    private EntityManager entityManager;

    @Inject
    private Event<ExportedEvent> event;

    @Transactional
    public PurchaseOrder addOrder(PurchaseOrder order) {
        order = entityManager.merge(order);

        event.fire(OrderCreatedEvent.of(order));
        event.fire(InvoiceCreatedEvent.of(order));

        return order;
    }

    @Transactional
    public PurchaseOrder updateOrderLine(long orderId, long orderLineId,
            OrderLineStatus newStatus) {
        // ...
    }
}

在该addOrder()方法中,JPA实体管理器用于在数据库中保留传入的订单,并且注入event用于触发相应的OrderCreatedEvent和InvoiceCreatedEvent。同样,请记住,尽管存在“事件”的概念,但这两件事情发生在同一个事务中。即在此交易中,将在数据库中插入三条记录:一张在带有采购订单的表中,另一张在发件箱表中。
实际的事件实现是直截了当的; 例如,这是OrderCreatedEvent类:

public class OrderCreatedEvent implements ExportedEvent {

    private static ObjectMapper mapper = new ObjectMapper();

    private final long id;
    private final JsonNode order;

    private OrderCreatedEvent(long id, JsonNode order) {
        this.id = id;
        this.order = order;
    }

    public static OrderCreatedEvent of(PurchaseOrder order) {
        ObjectNode asJson = mapper.createObjectNode()
                .put("id", order.getId())
                .put(
"customerId", order.getCustomerId())
                .put(
"orderDate", order.getOrderDate().toString());

        ArrayNode items = asJson.putArray(
"lineItems");

        for (OrderLine orderLine : order.getLineItems()) {
        items.add(
                mapper.createObjectNode()
                .put(
"id", orderLine.getId())
                .put(
"item", orderLine.getItem())
                .put(
"quantity", orderLine.getQuantity())
                .put(
"totalPrice", orderLine.getTotalPrice())
                .put(
"status", orderLine.getStatus().name())
            );
        }

        return new OrderCreatedEvent(order.getId(), asJson);
    }

    @Override
    public String getAggregateId() {
        return String.valueOf(id);
    }

    @Override
    public String getAggregateType() {
        return
"Order";
    }

    @Override
    public String getType() {
        return
"OrderCreated";
    }

    @Override
    public JsonNode getPayload() {
        return order;
    }
}

请注意Jackson 如何ObjectMapper用于创建事件有效负载的JSON表示。
现在让我们看看消耗任何被激活的代码ExportedEvent并对outbox表进行相应的写操作:

@ApplicationScoped
public class EventSender {

    @PersistenceContext
    private EntityManager entityManager;

    public void onExportedEvent(@Observes ExportedEvent event) {
        OutboxEvent outboxEvent = new OutboxEvent(
                event.getAggregateType(),
                event.getAggregateId(),
                event.getType(),
                event.getPayload()
        );

        entityManager.persist(outboxEvent);
        entityManager.remove(outboxEvent);
    }
}

它相当简单:对于每个事件,CDI运行时将调用该onExportedEvent()方法。OutboxEvent实体的一个实例持久存储在数据库中 - 并立即删除!
起初这可能会令人惊讶。但是,在记住基于日志的CDC如何工作时,它是有意义的:它不会检查数据库中表的实际内容,而是会关闭仅附加事务日志。一旦事务提交,调用persist()并将在日志中remove()创建一个INSERT和一个DELETE条目。之后,Debezium将处理这些事件:对于任何事件INSERT,具有事件有效负载的消息将被发送到Apache Kafka。DELETE另一方面,事件可以被忽略,因为从发件箱表中删除仅仅是技术性,不需要任何传播到消息代理。因此,我们可以通过CDC捕获添加到发件箱表中的事件,但是当查看表本身的内容时,它将始终为空。这意味着表格不需要额外的磁盘空间(除了将在某个时刻自动丢弃的日志文件元素),也不需要单独的管理过程来阻止它无限增长。

注册Debezium连接器
有了outbox实现,就可以注册Debezium Postgres连接器了,这样它就可以捕获发件箱表中的任何新事件并将它们转发给Apache Kafka。这可以通过将以下JSON请求发布到Kafka Connect的REST API来完成:

{
    "name": "outbox-connector",
   
"config": {
       
"connector.class" : "io.debezium.connector.postgresql.PostgresConnector",
       
"tasks.max" : "1",
       
"database.hostname" : "order-db",
       
"database.port" : "5432",
       
"database.user" : "postgresuser",
       
"database.password" : "postgrespw",
       
"database.dbname" : "orderdb",
       
"database.server.name" : "dbserver1",
       
"schema.whitelist" : "inventory",
       
"table.whitelist" : "inventory.outboxevent",
       
"tombstones.on.delete" : "false",
       
"transforms" : "router",
       
"transforms.router.type" : "io.debezium.examples.outbox.routingsmt.EventRouter"
    }
}

这将设置一个实例io.debezium.connector.postgresql.PostgresConnector,从指定的Postgres实例捕获更改。请注意,通过表白名单,仅outboxevent捕获表中的更改。它还应用了名为的单个消息转换(SMT)EventRouter。

删除Kafka主题中的事件
通过设置tombstones.on.deleteto false,当从发件箱表中删除事件记录时,连接器将不会发出删除标记(“tombstones”)。这是有道理的,因为从发件箱表中删除不应影响相应Kafka主题中事件的保留。相反,可以在Kafka中配置事件主题的特定保留时间,例如,将所有采购订单事件保留30天。

或者,可以使用压缩的主题。这需要对发件箱表中的事件设计进行一些更改:

他们必须描述整个集合; 因此,例如,表示取消单个订单行的事件也应描述包含采购订单的完整当前状态; 这样,在日志压缩运行之后,当只看到与给定订单有关的最后一个事件时,消费者也能够获得采购订单的整个状态。

它们必须还有一个boolean属性,指示特定事件是否表示删除事件的聚合根。OrderDeleted然后,可以由下一节中描述的事件路由SMT使用这样的事件(例如类型)来为该聚合根生成删除标记。然后,当OrderDeleted事件已写入主题时,日志压缩将删除与给定采购订单相关的所有事件。

当然,在删除事件时,事件流将不再从一开始就可以重新播放。根据具体的业务需求,仅保留给定采购订单,客户等的最终状态可能就足够了。这可以通过使用压缩的主题和主题delete.retention.ms设置的足够值来实现。另一个选择可能是将历史事件移动到某种冷存储(例如Amazon S3存储桶),如果需要可以从中检索它们,然后从Kafka主题中读取最新事件。采用哪种方法取决于开发和运行解决方案的团队的具体要求,预期数据量和专业知识。


主题路由
默认情况下,Debezium连接器会将源自一个给定表的所有更改事件发送到同一主题,即我们最终会得到一个名为Kafka的主题dbserver1.inventory.outboxevent,该主题将包含所有事件,包括订单事件,客户事件等。
为了简化仅对特定事件类型感兴趣的消费者的实现,更有意义的是,具有多个主题,例如OrderEvents,CustomerEvents等等。例如,装运服务可能对任何客户事件不感兴趣。通过仅订阅该OrderEvents主题,它将确保永远不会收到任何客户事件。
为了将从发件箱表捕获的更改事件路由到不同的主题,使用该自定义SMT EventRouter。以下是其apply()方法的代码,Kafka Connect将为Debezium连接器发出的每条记录调用它:

@Override
public R apply(R record) {
    // Ignoring tombstones just in case
    if (record.value() == null) {
        return record;
    }

    Struct struct = (Struct) record.value();
    String op = struct.getString(
"op");

   
// ignoring deletions in the outbox table
    if (op.equals(
"d")) {
        return null;
    }
    else if (op.equals(
"c")) {
        Long timestamp = struct.getInt64(
"ts_ms");
        Struct after = struct.getStruct(
"after");

        String key = after.getString(
"aggregateid");
        String topic = after.getString(
"aggregatetype") + "Events";

        String eventId = after.getString(
"id");
        String eventType = after.getString(
"type");
        String payload = after.getString(
"payload");

        Schema valueSchema = SchemaBuilder.struct()
            .field(
"eventType", after.schema().field("type").schema())
            .field(
"ts_ms", struct.schema().field("ts_ms").schema())
            .field(
"payload", after.schema().field("payload").schema())
            .build();

        Struct value = new Struct(valueSchema)
            .put(
"eventType", eventType)
            .put(
"ts_ms", timestamp)
            .put(
"payload", payload);

        Headers headers = record.headers();
        headers.addString(
"eventId", eventId);

        return record.newRecord(topic, null, Schema.STRING_SCHEMA, key, valueSchema, value,
                record.timestamp(), headers);
    }
   
// not expecting update events, as the outbox table is "append only",
   
// i.e. event records will never be updated
    else {
        throw new IllegalArgumentException(
"Record of unexpected op type: " + record);
    }
}

当收到删除事件(op= d)时,它将丢弃该事件,因为从发件箱表中删除事件记录与下游消费者无关。收到创建事件(op= c)时,事情变得更有趣。这样的记录将传播到Apache Kafka。
Debezium的更改事件具有复杂的结构,包含所表示行的old(before)和new(after)状态。要传播的事件结构是从after状态获得的。在aggregatetype从捕获的事件记录值被用来构建主题的名称将事件发送到。例如,aggregatetype设置为的事件Order将发送到OrderEvents主题。aggregateid用作消息密钥,确保该聚合的所有消息都将进入该主题的同一分区。消息值是包含原始事件有效负载(编码为JSON)的结构,指示事件何时生成的时间戳和事件类型。最后,事件UUID作为Kafka头字段传播。这允许消费者进行有效的重复检测,而不必检查实际的消息内容。

Apache Kafka中的事件
现在让我们来看看OrderEvents和CustomerEvents主题。
如果您已经检查了示例源并通过Docker Compose启动了所有组件(请参阅示例项目中的README.md文件以获取更多详细信息),您可以通过订单服务的REST API下载采购订单,如下所示:

cat resources/data/create-order-request.json | http POST http://localhost:8080/order-service/rest/orders

同样,可以取消特定的订单行:

cat resources/data/cancel-order-line-request.json | http PUT http://localhost:8080/order-service/rest/orders/1/lines/2

当使用诸如非常实用的kafkacat实用程序之类的工具时,您现在应该在OrderEvents主题中看到类似这样的消息:

kafkacat -b kafka:9092 -C -o beginning -f 'Headers: %h\nKey: %k\nValue: %s\n' -q -t OrderEvents

Headers: eventId=d03dfb18-8af8-464d-890b-09eb8b2dbbdd
Key: "4"
Value: {
"eventType":"OrderCreated","ts_ms":1550307598558,"payload":"{\"id\": 4, \"lineItems\": [{\"id\": 7, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 8, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \񓠃-01-31T12:13:01\", \"customerId\": 123}"}
Headers: eventId=49f89ea0-b344-421f-b66f-c635d212f72c
Key:
"4"
Value: {
"eventType":"OrderLineUpdated","ts_ms":1550308226963,"payload":"{\"orderId\": 4, \"newStatus\": \"CANCELLED\", \"oldStatus\": \"ENTERED\", \"orderLineId\": 7}"}

payload具有消息值的字段是原始事件的字符串ified JSON表示。Debezium Postgres连接器将JSONB列作为字符串发出(使用io.debezium.data.Json逻辑类型名称),这就是引号被转义的原因。JQ用处,更具体地说,它是fromjson操作者,用于显示在一个更可读的方式事件负载:

kafkacat -b kafka:9092 -C -o beginning -t Order | jq '.payload | fromjson'

{
  "id": 4,
 
"lineItems": [
    {
     
"id": 7,
     
"item": "Debezium in Action",
     
"status": "ENTERED",
     
"quantity": 2,
     
"totalPrice": 39.98
    },
    {
     
"id": 8,
     
"item": "Debezium for Dummies",
     
"status": "ENTERED",
     
"quantity": 1,
     
"totalPrice": 29.99
    }
  ],
 
"orderDate": "2019-01-31T12:13:01",
 
"customerId": 123
}
{
 
"orderId": 4,
 
"newStatus": "CANCELLED",
 
"oldStatus": "ENTERED",
 
"orderLineId": 7
}

您还可以查看CustomerEvents主题,以便在添加采购订单时检查表示创建发票的事件。

消费服务中的重复检测
此时,我们实现的发件箱模式功能齐全; 当订单服务收到下订单(或取消订单行)的请求时,它将在其数据库的purchaseorder和orderline表中保持相应的状态。同时,在同一事务中,相应的事件条目将添加到同一数据库中的发件箱表中。Debezium Postgres连接器捕获对该表的任何插入,并将事件路由到与给定事件所代表的聚合类型相对应的Kafka主题。
为了总结,让我们探讨另一种微服务(例如货运服务)如何使用这些消息。该服务的切入点是常规的Kafka消费者实现,这不是太令人兴奋,因此为了简洁起见在此省略。您可以在示例存储库中找到其源代码。对于Order主题上的每个传入消息,消费者调用OrderEventHandler:

@ApplicationScoped
public class OrderEventHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderEventHandler.class);

    @Inject
    private MessageLog log;

    @Inject
    private ShipmentService shipmentService;

    @Transactional
    public void onOrderEvent(UUID eventId, String key, String event) {
        if (log.alreadyProcessed(eventId)) {
            LOGGER.info("Event with UUID {} was already retrieved, ignoring it", eventId);
            return;
        }

        JsonObject json = Json.createReader(new StringReader(event)).readObject();
        JsonObject payload = json.containsKey(
"schema") ? json.getJsonObject("payload") :json;

        String eventType = payload.getString(
"eventType");
        Long ts = payload.getJsonNumber(
"ts_ms").longValue();
        String eventPayload = payload.getString(
"payload");

        JsonReader payloadReader = Json.createReader(new StringReader(eventPayload));
        JsonObject payloadObject = payloadReader.readObject();

        if (eventType.equals(
"OrderCreated")) {
            shipmentService.orderCreated(payloadObject);
        }
        else if (eventType.equals(
"OrderLineUpdated")) {
            shipmentService.orderLineUpdated(payloadObject);
        }
        else {
            LOGGER.warn(
"Unkown event type");
        }

        log.processed(eventId);
    }
}

完成的第一件事onOrderEvent()是检查之前是否已处理具有给定UUID的事件。如果是这样,将忽略对该同一事件的任何进一步调用。这是为了防止由此数据管道的“至少一次”语义引起的任何重复事件处理。例如,在确认分别使用源数据库或消息传递代理检索特定事件之前,可能会发生Debezium连接器或使用服务失败。在这种情况下,在重新启动Debezium或消费服务之后,可能会再次处理一些事件。将事件UUID传播为Kafka消息头允许有效地检测和排除消费者中的重复。
如果第一次收到消息,则解析消息值,并ShippingService使用事件有效负载调用与特定事件类型对应的方法的业务方法。最后,消息被标记为使用消息日志处理。
这MessageLog只是跟踪服务的本地数据库中表中所有消耗的事件:

@ApplicationScoped
public class MessageLog {

    @PersistenceContext
    private EntityManager entityManager;

    @Transactional(value=TxType.MANDATORY)
    public void processed(UUID eventId) {
        entityManager.persist(new ConsumedMessage(eventId, Instant.now()));
    }

    @Transactional(value=TxType.MANDATORY)
    public boolean alreadyProcessed(UUID eventId) {
        return entityManager.find(ConsumedMessage.class, eventId) != null;
    }
}

这样,如果由于某种原因回滚事务,原始消息也不会被标记为已处理,并且异常将冒泡到Kafka事件消费者循环。这允许稍后重新尝试处理该消息。
请注意,在将任何不可处理的消息重新路由到死信队列或类似消息之前,更完整的实现应该只负责重试给定消息一定次数。消息日志表上也应该有一些内容; 周期性地,可以删除早于消费者与代理提交的当前偏移的所有事件,因为它确保这些消息不会再次传播给消费者。

总结
发件箱模式是在不同微服务之间传播数据的好方法。
通过仅修改单个资源(源服务自己的数据库),它避免了在不共享一个公共事务上下文(数据库和Apache Kafka)的情况下同时更改多个资源的任何潜在不一致。通过首先写入数据库,源服务立即“读取您自己的写入”语义,这对于一致的用户体验很重要,允许在写入后调用的查询方法立即反映任何数据更改。
同时,该模式使异步事件传播到其他微服务。Apache Kafka是服务之间消息传递的高度可扩展和可靠的主干。给定正确的主题保留设置,新的消费者可能在最初生成事件后很长时间内出现,并根据事件历史建立自己的本地状态。
将Apache Kafka置于整体架构的中心也可确保所涉及服务的分离。例如,如果解决方案的单个组件失效或在一段时间内不可用,例如在更新期间,事件将在稍后处理:在重新启动之后,Debezium连接器将继续从它离开的位置拖出发件箱表。之前。同样,任何消费者都将继续处理其先前偏移的主题。通过跟踪已经成功处理的消息,可以检测重复项并从重复处理中排除重复项。
当然,不同服务之间的这种事件管道最终是一致的,即诸如运输服务之类的消费者可能落后于诸如订单服务之类的生产者。通常,这很好,并且可以根据应用程序的业务逻辑进行处理。例如,通常不需要在下订单的同一秒内创建货件。此外,整体解决方案的端到端延迟通常很低(几秒甚至亚秒范围),这要归功于基于日志的变更数据捕获,它允许近实时发送事件。
要记住的最后一件事是,通过发件箱公开的事件的结构应该被视为发射服务的API的一部分。即在需要时,应仔细调整其结构并考虑兼容性因素。这是为了确保在升级生产服务时不会意外破坏任何消费者。同时,消费者在处理消息时应该宽容,例如在遇到接收事件中的未知属性时不会失败。
非常感谢Hans-Peter Grahsl,Jiri Pechanec,Justin Holmes和RenéKerner在撰写这篇文章时的反馈!

关于Debezium
Debezium是一个开源分布式平台,可将现有数据库转换为事件流,因此应用程序几乎可以立即查看和响应数据库中每个已提交的行级更改。Debezium构建于Kafka之上,提供Kafka Connect兼容连接器,可监控特定的数据库管理系统。Debezium记录了Kafka日志中数据更改的历史记录,因此您的应用程序可以随时停止和重新启动,并且可以轻松地使用它在未运行时丢失的所有事件,从而确保正确且完整地处理所有事件。Debezium是开源的下Apache许可证,版本2.0