使用 Debezium 实现真正的原子微服务以确保数据一致性 – brainDOSE

21-11-04 banq

传统的微服务发件箱模式实现需要开发人员手动创建发件箱事件表并编写代码将数据从发件箱表发送到相应的消息平台。Debezium 发件箱事件路由器和发件箱 Quarkus 扩展一起解决了这个问题,并通过声明性实现强制执行标准方法来做到这一点。这使开发人员可以专注于业务逻辑实现并实现更快的应用程序交付。

 

概述

您可能已经知道微服务设计的最佳实践之一是使用它自己的私有数据服务(在有状态应用程序的情况下)实现应用程序服务,如每个服务模式的数据库中所述。最终,当您的应用程序环境发展壮大时,您将拥有许多微服务和数据服务,如果不是数千个,也可能是数百个。这些服务作为独立的可部署模块运行,但它们很可能需要在它们之间或与 3rd 方系统交换数据。

例如,当一个支付请求被提交到 CASA 服务时,它需要自动更新自己的数据库,同时将交易数据发送到其他感兴趣的消费者服务,例如核心银行。大多数情况下,您还需要在每个处理阶段(CASA 和核心银行业务)记录交易的审计跟踪副本。您不希望 CASA 服务负责更新审计跟踪数据服务或直接调用核心银行服务,而不是原子性,它们现在彼此紧密依赖。

您可能会考虑跨两个服务更新数据服务,但是微服务没有干净的方法(像传统的 2PC 事务管理一样的自动化方法)来跨越多个系统的事务会话,在大多数情况下,系统可能不支持 2PC(两个-阶段提交),例如消息平台、文件系统等。情况会很快变得复杂,可能会导致数据一致性和完整性问题。

这是发件箱模式来拯救的地方。

 

发件箱模式

根据发件箱模式,不是让应用程序在多个数据服务上执行 CRUD 操作或调用其他服务来执行此操作,应用程序服务应该只在其自己的数据库上执行 CRUD 操作。为了与其他消费者服务共享数据,它应该将相关数据插入到同一数据库或模式中的发件箱表中。这将确保全部失败或全部成功的情况,因为所有这些 CRUD 操作都在同一个本地事务会话中执行。

需要一个单独的独立机制将这些数据从发件箱表中继到相应的消费者服务。在我们的示例中,需要使用数据进行进一步处理的核心银行业务。

说到这种消息传递机制,其中一个选项就是流行的事件驱动平台——Apache Kafka,本文将在示例中使用它,如下图基于 Apache Kafka 和 Debezium 的发件箱模式所示。

注意每个应用服务都有自己的数据库(这里是PostgreSQL数据库),每个数据库都有自己的业务数据表和对应的发件箱事件表。Debezium 负责使用这些发件箱表消息并将它们生成给 Apache Kafka。

您可以看到这是理想的微服务架构,它促进了原子性,但同时防止了数据一致性问题。最重要的是,Debezium 发件箱模式与发件箱 Quarkus 扩展一起消除了许多手动步骤,让开发人员专注于业务逻辑实现。

 

什么是Debezium?

Debezium是一个用于变更数据捕获 (CDC) 的开源分布式平台。它提供非侵入式 CDC 来捕获应用程序提交的数据库插入、更新和删除,并将这些更改流式传输到 Apache Kafka。

Debezium 建立在 Apache Kafka 之上。它是使用Kafka Connect 框架实现的。每个数据库集成都是捕获数据库更改的 Kafka 源连接器实现。

Debezium 目前提供以下连接器:

您可以使用简单消息转换 (SMT)应用声明性消息转换。SMT 允许您为 Kafka 消息转换定义谓词。谓词指定如何有条件地将转换应用于连接器处理的消息子集。您可以将谓词分配给您为源连接器(例如 Debezium)或接收器连接器配置的转换。

Debezium 提供了多种 SMT,您可以使用它们在 Kafka Connect 将记录保存到 Kafka 主题之前修改事件记录。

以下是 Debezium 提供的 SMT 列表。

  • 主题路由:根据应用于原始主题名称的正则表达式将记录重新路由到不同的主题。
  • 基于内容的路由:根据事件内容将选定的事件重新路由到其他主题。
  • 新记录状态提取:从 Debezium 更改事件中提取字段名称和值的平面结构,促进无法处理 Debezium 复杂事件结构的接收器连接器。
  • MongoDB 新文档状态提取[url=https://debezium.io/documentation/reference/1.6/transformations/event-flattening.html]新记录状态提取[/url] :SMT特定于 MongoDB 的对应部分 。
  • 发件箱事件路由器:提供一种在多个(微)服务之间安全可靠地交换数据的方法。
  • 消息过滤:根据连接器的内容,将过滤器应用于连接器发出的更改事件。这使您可以仅传播与您相关的那些记录。

 

使用 Debezium 实现发件箱

Debezium 引入了发件箱事件路由器,试图从非常早期的版本开始就提供发件箱模式实现。

来自 Red Hat 的Gunnar MorlingReliable Microservices Data Exchange with the Outbox Pattern 中写了一篇关于这个概念的有趣且鼓舞人心的文章,它提供了如何使用 Debezium 实现这个发件箱模式的蓝图。

在文章中,Gunnar 概述了单独使用 Debezium 执行此操作的手动方法。今天,我们将研究如何使用Debezium Outbox RouterDebezium Quarkus Extension自动执行大部分手动步骤。通过这个新的 Quarkus 扩展,它提供了一种声明式方法,使开发人员的生活更轻松,例如自动创建发件箱表、基于事件的路由等。

Debezium Quarkus 扩展功能目前处于孵化状态,即根据收到的反馈,确切的语义、配置选项等可能会在未来的修订版中发生变化。

 

使用 Debezium 发件箱模式的虚构支付交易实现

让我们通过浏览我根据下图创建的示例来深入了解细节。

如图所示,casa-service做了两件事:

  • 为 CASA 事务请求和查询提供 REST 接口。它在自己的名为casa-postgres的数据库上执行必要的数据库插入和查询(casa表)。
  • 通过 Kafka Topic ( casa.response.events ) 使用来自核心服务的响应消息,并根据收到的响应更新casa表。

core-service 消费使用来自casa.events主题的Kafka消息,并按照core-postgres数据库中casa 表在各自的账户实现账户平衡。

如您所见,这 2 个业务服务纯粹是在执行自己的业务逻辑处理并维护自己的私有数据库。

每个数据库中都有发件箱表(CasaOutboxEvent和ResponseOutboxEvent)。Debezium 在 Quarkus 扩展的帮助下提供了开箱即用的发件箱表实现、持久性和事件流。作为开发人员,您可以自由地专注于您的业务逻辑实现。

那么这些发件箱表格和消息是如何创建的呢?

您需要做一些事情。让我们以casa-service为例。

 

Maven 依赖

首先,通过在pom.xml 中插入以下 maven 依赖项以及其他依赖项,使您的 Quarkus 应用程序能够使用 Debezium Outbox Quarkus 扩展。

<dependency>
   <groupId>io.debezium</groupId>
   <artifactId>debezium-quarkus-outbox</artifactId>
   <version>1.7.0.Alpha1</version>
</dependency>

事件数据模型

创建一个数据模型来表示您希望发送给消费者的数据,在这种情况下,消费者是core-service。下面是 POJO CasaEventData.java代码片段的样子。这个 POJO 数据模型将由我们稍后要创建的CasaEvent.java使用。

/**
 * Provide the event data model for Casa outbox event implementation.
 * Provides the implementation for Debezium Outbox Pattern.
 */
@RegisterForReflection
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class CasaEventData {
    /**
     * Casa unique transaction id.
     */
    private String id;
    private String recipientAccountNo;
    private String sourceAccountNo;
    private double amount;
    /**
     * Casa created timestamp to indicates the timestamp when the Casa create event is being fired.
     */
    private Instant createdTimestamp;
    /**
     * Core processed timestamp to indicates the timestamp when the core processed the transaction.
     */
    private Instant coreProcessedTimestamp;
    /**
     * Audit timestamp to indicates the timestamp when the audit event is being fired.
     */
    private Instant auditTimestamp;
    /**
     * Response received timestamp to indicates the timestamp when the response from core is received by origination.
     */
    private Instant responseReceivedTimestamp;
    /**
     * Referecense information for recipient.
     */
    private String recipientReference;
    /**
     * Payment type. @see blog.braindose.paygate.model.PaymentTypes
     */
    private PaymentTypes paymentType;
    /**
     * Status of casa processing. @see blog.braindose.paygate.model.Status
     */
    private Status status;
    /**
     * Messages response from core backend if any. This could be error message when processing failed.
     */
    private String responseMessages;
 
    /**
     * Kafka header id
     * This is an unique id for each kafka message. Can be used to perform deduplication in the event of duplicated message sent by producer in the event of message resent due to unforeseen failure.
     */
    private String messageId;
 
    /**
     * Event source. This should be unique identifiable value for auditing purpose. Suggest to use Class.getName()
     */
    private String eventSources;
    /**
     * Event timestamp to provides timestamp when the event is being fired.
     */
    private Instant eventTimestamp;
 
   /// ... more omitted codes

 

发件箱事件实现

CasaEvent.java是实现io.debezium.outbox.quarkus.ExportedEvent的 Java 类。它为事件信息提供了必要的实现,例如有效载荷、聚合、事件类型等。有效载荷应为JsonNode类型,这是我们决定将哪些数据作为有效载荷发送的地方。

/**
 * Casa Event for Outbox pattern implementation using Debezium.
 */
@Immutable
public class CasaEvent implements ExportedEvent<String, JsonNode> {
 
    private static ObjectMapper mapper = new ObjectMapper();
 
    /**
     * Unique Casa transaction id. @see blog.braindose.opay.casa.Casaid
     */
    private final String id;
    /**
     * Payload to be sent to Kafka in JSON format.
     */
    private final JsonNode casa;
    /**
     * Timestamp for outbox pattern implementation. Defaulted to Casa transactionTimestamp. @see blog.braindose.opay.casa.Casa#transactionTimestamp
     */
    private final Instant timestamp;
 
    public CasaEvent(CasaEventData casa) {
        this.id = casa.getId();
        this.timestamp = casa.getEventTimestamp();
        this.casa = convertToJson(casa);
    }
 
    private JsonNode convertToJson(CasaEventData casa) {
        ObjectNode asJson = mapper.createObjectNode()
                .put("id", casa.getId())
                .put("recipientAccountNo", casa.getRecipientAccountNo())
                .put("sourceAccountNo", casa.getSourceAccountNo())
                .put("amount", casa.getAmount())
                .put("recipientReference", casa.getRecipientReference())
                .put("paymentType", casa.getPaymentType().toString())
                .put("createdTimestamp", casa.getCreatedTimestamp().toString())
                .put("eventSources", casa.getEventSources().toString())
                .put("eventTimestamp", casa.getEventTimestamp().toString())
                .put("status", casa.getStatus().toString());
        return asJson;
    }
 
    @Override
    public String getAggregateId() {
        return id;
    }
 
    @Override
    public String getAggregateType() {
        return "casa";
    }
 
    @Override
    public JsonNode getPayload() {
        return casa;
    }
 
    @Override
    public String getType() {
        return "payment";
    }
 
    @Override
    public Instant getTimestamp() {
        return timestamp;
    }
 
}

该CasaEvent将deserialised并填充到由Quarkus扩展API下表结构。当在您的代码中调用javax.enterprise.event.Event 中的fire(ExportedEvent)方法时,由CasaEvent表示的数据将插入到此表中。我们稍后会谈到这一点。

Column     |          Type   | Modifiers
--------------+------------------------+-----------
id        | uuid            | not null
aggregatetype | character varying(255) | not null
aggregateid  | character varying(255) | not null
type       | character varying(255) | not null
payload     | jsonb                  |

以下信息取自 Debezium 文档。SMT 将使用通过CasaEvent.java实现提供的这些信息来构造要发送到 Apache Kafka 的正确消息。

  • ID:包含事件的唯一 ID。在发件箱消息中,此值是一个头部。例如,您可以使用此 ID 删除重复消息。要从不同的发件箱表格列中获取事件的唯一 ID,请 在连接器配置中设置 table.field.event.id SMT 选项
  • aggregatetype:包含 SMT 附加到连接器向其发出发件箱消息的主题名称的值。默认行为是此值替换 SMT 选项中的默认 ${routedByValue} 变量 route.topic.replacement。例如,在默认配置中,  route.by.field SMT 选项设置为 aggregatetype ,  route.topic.replacement SMT 选项设置为 outbox.event.${routedByValue}。假设您的应用程序向发件箱表中添加了两条记录。在第一条记录中, aggregatetype 列中的值为 customers。在第二条记录中, aggregatetype 列中的值为 orders。连接器向outbox.event.customers 主题发出第一条记录 。连接器向outbox.event.orders 主题发出第二条记录 。要从不同的发件箱表格列中获取此值,请设置 route.by.field 连接器配置中的 SMT 选项
  • aggregateid:包含事件键,它为负载提供 ID。SMT 使用此值作为发出的发件箱消息中的键。这对于维护 Kafka 分区中的正确顺序很重要。要从不同的发件箱表格列中获取事件密钥 ,请在连接器配置中设置 table.field.event.key SMT 选项
  • payload:发件箱更改事件的表示。默认结构是 JSON。默认情况下,Kafka 消息值仅由该 payload 值组成。但是,如果发件箱事件配置为包含附加字段,则 Kafka 消息值包含一个封装了有效负载和附加字段的信封,并且每个字段都单独表示。有关更多信息,请参阅 使用附加字段发送消息。要从不同的发件箱表列获取事件负载 ,请在连接器配置中设置 table.field.event.payload SMT 选项
  • 其他自定义列:发件箱表中的任何其他列都可以 添加到 有效负载部分内的发件箱事件中,也可以作为消息标题添加到发件箱事件中。一个例子可能是一列 eventType ,它传达了一个用户定义的值,有助于对事件进行分类或组织。

 

触发发件箱事件

您需要做的下一件事是在您的应用程序代码中触发发件箱事件。让我们看一下CasaResource.java,它提供了用于创建 CASA 请求的 REST 接口。

/**
 * Provides REST interfaces for Casa services.
 */
@Path("casa")
public class CasaResource {
 
    @Inject
    Event<ExportedEvent<?, ?>> event;
     
    private static final Logger LOGGER = Logger.getLogger(CasaResource.class);
    private CasaEventData casaEventData = null;
    private boolean failed = false;
 
    /**
     * Create a new Casa transaction
     * @param casa
     * @return
     */
    @POST
    @Produces(MediaType.APPLICATION_JSON)
    @Consumes(MediaType.APPLICATION_JSON)
    @Transactional
    public Casa add(Casa casa) {
        try{
            casa.id = GenTxnId.id(TxnTypes.CASA);
            casa.createdTimestamp = Instant.now();
            casa.status = Status.SUBMITTED;
            casaEventData = new CasaEventData(casa.id, casa.recipientAccountNo, casa.sourceAccountNo, casa.amount, casa.createdTimestamp, casa.recipientReference, casa.paymentType, casa.status);
            casaEventData.setEventTimestamp(casa.createdTimestamp);
            casaEventData.setEventSources(Casa.class.getName());
            casa.persistAndFlush();
            event.fire(new CasaEvent(casaEventData));
        }
        catch(PersistenceException e){
            failed = true;
            if (casaEventData != null){
                casaEventData.setStatus(Status.FAILED);
                casaEventData.setResponseMessages("Error creating the Casa record in database.");
            }
            LOGGER.error("Error creating the Casa record in database.", e);
            throw e;
        }
        finally{
            if (casaEventData != null){
                event.fire(new CasaAuditEvent(casaEventData, EventTypes.PAYMENT, AggregateTypes.AUDIT_CASA));
                if (failed) event.fire(new CasaFailedEvent(casaEventData));
            }
        }
        return casa;
    }
    /// more codes omitted
    ...
    ...
    ...
 
}

从上面的代码可以看出,你注入javax.enterprise.event.Event作为事件

@Inject
Event<ExportedEvent<?, ?>> event;

然后使用它来触发以CasaEventData作为参数的发件箱事件。从编码的角度来看,这就是您需要做的所有事情。简单干净!

event.fire(new CasaEvent(casaEventData));

 

Application.Properties

我们需要做的最后一件事是配置application.properties。这是为您希望命名的发件箱表指定名称的地方。您还可以配置是否要在插入发件箱表中的事件数据后将其删除。对于生产,您可能希望这样做以节省数据库存储使用量。一旦插入事件数据,您就不需要将事件数据保留在发件箱表中,因为 Debezium 已经捕获了插入事件,之后就不需要它们了。

# Debezium outbox
quarkus.debezium-outbox.table-name=CasaOutboxEvent
%dev.quarkus.debezium-outbox.remove-after-insert=false
%prod.quarkus.debezium-outbox.remove-after-insert=true

还有许多其他配置可用于 Quarkus 实现来自定义发件箱行为。

 

卡夫卡连接集群

一旦应用程序实现准备就绪。您需要有一个正在运行的 Kafka Connect 集群,并带有适当的 Debezium 连接器插件。您可以配置连接器插件,在此示例中是用于 PostgresQLDebezium 连接器,还有一些额外的配置,如下面的casa-service示例。

{
    "name": "outbox-connector-casa",
    "config" : {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "casa-postgres",
        "database.port": "5432",
        "database.user": "casa",
        "database.password": "casa",
        "database.dbname": "casa",
        "database.server.name": "casa-event",
        "schema.include.list": "payment",
        "table.include.list": "payment.CasaOutboxEvent",
        "tombstones.on.delete": "false",
        "transforms": "outbox",
        "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
        "transforms.outbox.route.topic.replacement": "${routedByValue}.events",
        "transforms.outbox.table.field.event.timestamp": "timestamp",
        "transforms.outbox.table.field.event.id": "aggregateid",
        "transforms.outbox.table.fields.additional.placement": "type:header:eventType"
    }
}

Kafka Connector 容器在Docker Hub 中可用。您还可以从 GitHub获取Dockerfile的副本并自己构建一个版本。

我们正在使用由transforms键指定的发件箱转换。紧随其后的是其他 Debezium Outbox SMT 配置,可以通过以“ transforms.outbox ”开头的键轻松识别。“。可以在此处找到这些 SMT 设置的详细信息。其他是标准的 Kafka Connector 配置,例如与数据库服务器相关的设置。

请注意为transforms.outbox.route.topic.replacement配置的值。$ {} routedByValue是指aggregateType在CasaEvent.java。这允许我们将不同类型事务的相应消息动态路由到不同的 Kafka 主题。在这种情况下,值为“casa”,Kafka Topic 变为casa.events。

所述transforms.outbox.table.field.event.id被配置为将aggregateId,其意。使用唯一的事务 ID 作为 Kafka 消息键,我们可以在本示例的后面部分使用 Kafka Streams 轻松地对来自不同主题的这些消息进行转换。

请注意transforms.outbox.table.fields.additional.placement,它指定通过从CasaEvent.java注入eventType将其他头字段放入 Kafka 消息头中。这是非常有用的调整,你可能必须场景中使用单一卡夫卡主题捕捉到相同的交易类型的事件,而是针对不同的状态,如例如通过贡纳尔Morling给出。就我而言,我现在正在使用它并保持原样。

 

一些额外的实现

有了上面的内容,我已经为核心服务复制了类似的方法,并且在很短的时间内完成了它。

为了让事情变得更复杂(实际上并非如此),我还使用相同的方法来实现casa-service和core-service的审计跟踪事件,正如您从以下代码片段中看到的那样。由于我使用的是相同的发件箱表结构,我基本上可以重用我的许多Java类,并且眨眼间完成了事件跟踪事件实现。

if (casaEventData != null){
   event.fire(new CasaAuditEvent(casaEventData, EventTypes.PAYMENT, AggregateTypes.AUDIT_CASA));
   if (failed) event.fire(new CasaFailedEvent(casaEventData));
}

随着它变得更容易,我还实现了捕获FailedEvent,以便可以将这些失败的事务捕获到另一个 Kafka 主题中,以便可能的人工干预或使用工作流进行一些自动化处理。在这种情况下,casa-service和core-service将不需要担心如何处理那些失败的事务。整洁的!

 

使用 Kafka Streams 进行审计跟踪聚合

审计事件是来自casa-service和core-service的信息片段。我们需要聚合这些断开连接的信息,以便为每个事务创建一个完整的单一审计跟踪条目,并将其存储到数据库中以进行安全保存和验证(本示例中未涵盖)。这就是 Kafka Streams 派上用场的地方。我基本上做了一个简单的join(),然后是一个reduce()来做到这一点。这很容易完成,因为我的 Kafka Message 键是我之前配置的唯一事务 ID。

负载使用的是JsonNode,我注意到它在初始阶段被捕获时被反序列化为带有额外双引号的 JSON 字符串。使用ObjectMapperSerde将无法直接序列化为 Java 对象,我别无选择,只能将其序列化为 String 格式。我希望这可以在将来 GA 时得到改进。

builder.stream(
            KAFKA_TOPIC_CASA_AUDIT,
            Consumed.with(Serdes.String(), Serdes.String()))        // Serialized JSON string from JsonNode creates extra double quotes, causing it is not possible to use Jackson to deserialize into Java object
            .join(
                coreAuditStream,
                (casaAudit, coreAudit) -> {
                    // Multiple audit entries since Casa service received the response from core service.
                    List<AuditEntry> auditEntries = new ArrayList<>();
                    try {
                        LOGGER.debug("Processing casa audit trail...");
                        CasaEventData casaAuditObj = createCasaEventData(casaAudit);
                        auditEntries.add(createAuditEntry(casaAuditObj));
                         
                        LOGGER.debug("Processing core audit trail for casa transaction ...");
 
                        auditEntries.add(createAuditEntry(createCasaEventData(coreAudit)));
                         
                        AuditData<Casa> auditData = new AuditData<>(
                            casaAuditObj.getId(), 
                            auditEntries, 
                            new Casa(casaAuditObj.getRecipientAccountNo(), casaAuditObj.getSourceAccountNo(), casaAuditObj.getAmount(), casaAuditObj.getRecipientReference()), 
                            Instant.now().toString(), 
                            casaAuditObj.getStatus().toString());
 
                        String jsonInString = mapper.writeValueAsString(auditData);
                        LOGGER.debug("Joined result = " + jsonInString);
                        return jsonInString;
                         
                    } catch (JsonProcessingException e) {
                        LOGGER.error("Problem parsing Kafka message into JSON.");
                        throw new RuntimeException("Problem parsing Kafka message into JSON", e);
                    }
                },
                JoinWindows.of(Duration.ofMinutes(KAFKA_STREAMS_JOINWINDOW_DURATION)),
                Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
            )
            .groupByKey()
            .reduce(            // deduplication of audit trail ... 
                (value1, value2) -> AuditData.reduce(value1, value2),
                Materialized.with(Serdes.String(), Serdes.String())
            )
            .toStream()
            //.print(org.apache.kafka.streams.kstream.Printed.toSysOut())
            .to(KAFKA_TOPIC_PAYMENT_AUDIT, Produced.with(Serdes.String(), Serdes.String()))
        ;

 

运行示例

请前往GitHub并在本地磁盘中克隆项目的副本。在命令提示符中导航到模块目录。按照README.md 中的说明构建模块,然后运行以下docker compose命令以将所有服务作为容器启动。

docker compose up --build

使用docker ps检查所有容器的状态,等待它们变得健康。

使用以下配置创建必要的 Kafka 连接器。这里有 3 个连接器需要注册。

  • outbox-connector-casa – 这是casa-postgres数据库中发件箱表的 Debezium 连接器
  • outbox-connector-core – 这是core-postgres数据库中发件箱表的 Debezium 连接器
  • mongodb-sink – 这是审计表的 Kafka MongoDB 连接器。

{
    "name": "outbox-connector-casa",
    "config" : {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "casa-postgres",
        "database.port": "5432",
        "database.user": "casa",
        "database.password": "casa",
        "database.dbname": "casa",
        "database.server.name": "casa-event",
        "schema.include.list": "payment",
        "table.include.list": "payment.CasaOutboxEvent",
        "tombstones.on.delete": "false",
        "transforms": "outbox",
        "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
        "transforms.outbox.route.topic.replacement": "${routedByValue}.events",
        "transforms.outbox.table.field.event.timestamp": "timestamp",
        "transforms.outbox.table.field.event.id": "aggregateid",
        "transforms.outbox.table.fields.additional.placement": "type:header:eventType"
    }
}
{
    "name": "outbox-connector-core",
    "config" : {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "core-postgres",
        "database.port": "5432",
        "database.user": "core",
        "database.password": "core",
        "database.dbname": "core",
        "database.server.name": "core-event",
        "schema.include.list": "core",
        "table.include.list": "core.ResponseOutboxEvent",
        "tombstones.on.delete": "false",
        "transforms": "outbox",
        "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
        "transforms.outbox.route.topic.replacement": "${routedByValue}.events",
        "transforms.outbox.table.field.event.timestamp": "timestamp",
        "transforms.outbox.table.field.event.id": "aggregateid",
        "transforms.outbox.table.fields.additional.placement": "type:header:eventType"
    }
}
{
    "name": "mongodb-sink",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
        "tasks.max": 1,
        "topics": "payment.audit.events",
        "connection.uri": "mongodb://audit:audit@audit-mongodb:27017",
        "database": "audit",
        "collection": "payment",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "max.num.retries": 3
    }
}

Kafka Connect 集群 URL 是http://localhost:9080/connectors。使用curl或Postman为前面提到的每个连接器执行 HTTP 发布以创建上述连接器。

以下是您将在审计表中看到的结果。

rs0:PRIMARY> use audit
switched to db audit
rs0:PRIMARY> db.payment.find().pretty();
{
    "_id" : ObjectId("613981a793f9aa4f64c33eb6"),
    "payload" : {
        "amount" : 50.58,
        "recipientAccountNo" : "1-987654-1234-4569",
        "recipientReference" : "Payment for lunch",
        "sourceAccountNo" : "1-234567-4321-9876"
    },
    "id" : "1-20210909-033739893-17839",
    "lastStatus" : "COMPLETED",
    "auditEntries" : [
        {
            "eventSource" : "blog.braindose.opay.casa.Casa",
            "eventTimestamp" : "2021-09-09T03:37:39.905947Z",
            "responseMessages" : null,
            "status" : "SUBMITTED"
        },
        {
            "eventSource" : "blog.braindose.opay.core.casa.ConsumeCasa",
            "eventTimestamp" : "2021-09-09T03:37:42.180891Z",
            "responseMessages" : null,
            "status" : "COMPLETED"
        },
        {
            "eventSource" : "blog.braindose.opay.casa.ConsumeCasaResponse",
            "eventTimestamp" : "2021-09-09T03:37:43.271790Z",
            "responseMessages" : null,
            "status" : "COMPLETED"
        }
    ],
    "eventTimestamp" : "2021-09-09T03:37:45.720514Z"
}

 

GitHub 上的示例代码

1
猜你喜欢