Spring Cloud Stream Kafka事务介绍


Apache Kafka 提供了基础事务支持,而 Spring for Apache Kafka(又名 Spring Kafka)库则在 Spring 侧扩展了这种支持,使 Spring 开发人员能够更自然地依赖 Spring Framework 中提供的传统事务支持来使用它。

Spring Cloud Stream 中的 Kafka 绑定器进一步加强了 Spring 对 Apache Kafka 的支持,使在 Spring Cloud Stream Kafka 应用程序中使用相同的支持成为可能。

要使用 Apache Kafka Binder,您需要将spring-cloud-stream-binder-kafka依赖项添加到 Spring Cloud Stream 应用程序,如以下 Maven 示例所示:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

在本系列博客的第一部分中,我们将简要介绍 Kafka 事务、一些有助于依赖事务的用例分析,以及 Apache Kafka 和 Spring 生态系统中的事务构建模块。

在许多用例中,都需要在 Apache Kafka 中以事务方式发布、消费和处理记录。在生产者启动的应用程序或实现消耗-处理-生产模式的流程中以事务方式生产记录时,记录会以原子方式写入 Kafka。如果出了问题,整个流程会回滚,事务不会提交。

需要记住的一点是,Apache Kafka 与支持事务的关系数据库不同,后者在发生事务回滚时不会持久保存数据记录,而 Apache Kafka 仍会将记录发布到主题分区。

这种行为是由于 Apache Kafka 基于日志的只附加、不可变的基本架构造成的,它不允许对记录进行任何修改,例如在将记录添加到记录日志后再将其删除。

有人可能会问,使用事务有什么好处,因为当事务中止时,记录可能会发布到主题分区,从而可能导致消费者看到这些记录。但是,具有适当隔离级别的消费者永远不会看到回滚的记录,即使回滚事务的记录在主题分区中。

因此,从端到端的角度来看,整个流程都保证是完全事务性的。

事务用例
在 Kafka 应用程序中,事务通常会增加大量开销。在 Apache Kafka 中使用事务时,每条记录都必须向记录添加特殊事务日志,向特殊事务状态主题发送事务标记,等等。所有这些步骤都会耗费时间和空间,增加整体延迟。因此,每个应用程序都必须通过分析用例来仔细研究是否需要事务支持。

事务提供了一种主要保护数据的方法,以提供 ACID 功能。它通过提供原子性、一致性、数据隔离和持久性来确保数据完整性。

在当今企业的一些关键任务用例中,使用事务并依赖其带来的 ACID 语义是非常可取的。关于何时使用事务并证明其带来的开销是合理的,并没有简单明了的答案。你必须审视应用程序,评估其中的利害关系。

事务的典型例子是任何需要处理财务数据的应用:

鲍勃给爱丽丝汇款,从鲍勃的账户中扣款,而爱丽丝则记入贷方。

如果在这个过程中出现任何差错,整个过程都会回滚,就像什么都没发生一样,因为我们不希望流程处于杂乱无章的状态。

如果流程从鲍勃的账户中扣款,但爱丽丝却没有入账(反之亦然),那就有问题了。

从 Apache Kafka 的角度来看,这里有几个问题:

  • 首先,一条信息来到 Kafka 处理器,要求从鲍勃的账户和接收者的信息中扣款。
  • 处理器对信息进行处理,然后向另一个主题发送一条消息,表明从鲍勃的账户中发生了扣款。
  • 之后,另一条信息表明 爱丽丝现在已被记入贷方。

这一过程中的各种操作需要复杂的协调,以确保一切按预期进行。

任何时候,如果我们有类似的多个相关事件,事务都可以帮助确保数据完整性并提供 ACID 语义。在这个示例中,单个事件独立存在的意义并不大,但它们结合在一起形成了整个流程,因此需要事务性来确保数据完整性。

如果我们想推广这种模式,我们可以说,任何时候我们都可以使用消耗-处理-发布模式来完成关键任务,在这种模式下,如果一个组件发生故障,整个处理器需要像没有发生故障一样运行,那么使用事务就是一个潜在的解决方案。

其他领域的更多高级示例
想象一下,一个机票预订系统需要发布一个有多个行程的预订信息。如果由于某种原因,系统无法发布整个预订信息,则需要中止流程并重新开始。

  • 一家经纪公司需要向清算所发送包含多个买入订单的请求。假设该进程无法将单个订单作为单一原子单元发布到清算所使用的消息系统。在这种情况下,经纪商必须重新发送订单。
  • 向保险公司发送病人测试数据的医疗计费系统必须向消息系统发布病人的各种相关测试数据。
  • 在线游戏系统需要跟踪玩家在游戏中的位置,并通过事务处理将其发送到中央服务器,以确保所有玩家都能看到正确的坐标,而不是部分更新的位置。
  • 零售商的库存补货系统需要将各种相关产品的状态信息作为单一原子单元发送。
  • 在线电子商务订购系统需要在单个原子聚合操作中发布订单详细信息(如订单条目、账户持有人信息、发货信息等)。

与外部数据库同步
当你需要与其他事务系统同步时,事务就成了另一种得心应手的用例。除了向 Kafka 发布数据外,假设您还必须在关系数据库中持久化记录或某些派生信息,所有这些都需要在单个原子操作中完成。如果一个系统发送数据失败,我们就必须回滚。如果每次发布到 Kafka 的只有一条记录,没有其他任何相关操作,那么就不需要使用事务,我们将在本系列博客的下一部分看到这一点。不过,即使您只向 Kafka 主题发布一次,但在同一流程中使用了关系数据库操作,也有必要使用事务来确保数据完整性。

发布到多个 Kafka 主题
纯生产者应用程序中事务的另一个用例是发布到多个 Kafka 主题。假设您有一些关键通知形式的关键业务数据(如订单详情),您希望发布到多个 Kafka 主题,订单详情的一部分发布到订单主题,另一部分发布到发货主题。在这种情况下,我们可以使用事务来确保端到端的数据完整性。

概括上述事务性用例
上述使用案例并不是必须进行事务处理的全部案例。当今企业中还存在许多其他使用案例,它们与我们所研究的使用案例的主旨并无太大区别,都需要在消息传递系统中进行事务处理。

下面的列表总结了 Apache Kafka 中的事务可以派上用场的一般用例:

  • 消费-处理-发布(Consume-process-publish)系统,该系统需要将记录作为单个原子单元发布,并提供精确的一次性语义交付保证。
  • 多个相关的发布事件,单独发布没有意义。
  • 以单个原子单位向多个主题发布数据。
  • 与外部事务管理器同步。

下面是所有这些不同情况的图示。它涵盖了我们上面考虑过的各种情况,如消费-处理-生产、多个生产者、与外部事务同步等。处理器消耗入站主题中的数据,执行业务逻辑,将一些信息持久化到数据库系统,并发布到多个 Kafka 主题。

Apache Kafka 中的事务
简单了解一下从高层次实现事务性的 Kafka 客户端 API 还是值得的。

需要注意的一点是,说到普通消费者,Kafka 中并没有所谓的事务消费者,但有事务感知消费者。

消费者通过设置隔离级别来实现事务感知。

默认情况下,Kafka 消费者可以看到上游生产者的所有记录,甚至是未提交的记录,因为 Kafka 消费者的默认隔离级别是 read_uncommitted。

Kafka 消费者必须使用 read_committed 的隔离级别来提供端到端的事务语义。

我们将在本系列博客的后续章节中了解如何在 Spring Cloud Stream 中实现这一点。

在生产者方面,应用程序需要依赖 Kafka 客户端提供的一些 API 方法。让我们来看看其中重要的几种。

要使应用程序具有事务性,Kafka 客户端需要一个事务 ID。应用程序通过名为 transactional.id 的 Kafka 生产者属性提供该 ID,事务协调器通过注册该 ID 来启动事务。事务协调器使用该 ID 跟踪事务的各个方面,如初始化、持续进展、提交等。

下面的列表总结了与事务相关的重要生产者 API 方法。

  • initTransactions() - 每个生产者调用一次以启动事务支持。初始化 Kafka 事务。
  • beginTransaction() - 在发送记录前开始事务。
  • sendOffsetsToTransaction() - 向事务发送消耗的记录偏移量。
  • commitTransaction() - 提交事务。
  • abortTransaction() - 终止事务。

在发送记录之前,我们需要初始化并开始事务。然后,继续进行数据处理。如果我们在发布记录时消耗了一条记录,就必须使用生产者向事务发送消耗记录的偏移量。之后,事务提交或中止操作就可以继续了(commitTransaction 或 abortTransaction)。当我们调用 commitTransaction 方法时,正是偏移量被 Kafka 客户端原子式发送到 consumer_offsets 主题的时候。

Spring for Apache Kafka 中的事务支持
在使用 Spring for Apache Kafka 或依赖于 Spring Cloud Stream Kafka binder的 Spring for Apache Kafka 等框架时,它们带来的好处是允许应用程序将主要精力放在业务逻辑上,因为这些框架会处理我们在上文看到的底层模板事务序列。

使用 Spring for Apache Kafka 或其他框架(如使用它的 Spring Cloud Stream)会带来很多好处,因为这样我们就不必担心编写底层模板序列(如上所述),从而确保所有事务步骤都能成功。

可以想象,这里有很多活动部件,如果省略了某个步骤或没有按照预期完成某个步骤,就会导致应用程序容易出错。就 Spring 而言,我们提到的框架会代表应用程序开发人员处理这些问题。让我们简单了解一下它是如何做到这一点的。

Spring for Apache Kafka 框架通过提供 Spring 开发人员所熟悉的一致的事务编程模型,隐藏了所有这些底层细节。因此,在使用 Spring for Apache Kafka 或其他框架(如 Spring Cloud Stream)时,应用程序只需专注于应用程序的业务逻辑,而无需处理复杂的底层事务相关事宜。

KafkaTransactionManager
Spring for Apache Kafka 如何提供这种一致的事务编程模型?简而言之,Spring 开发人员传统上使用事务注解或编程方法,例如直接在应用程序中使用事务模板(TransactionTemplate)来创建本地事务。这些机制需要事务管理器来实现事务处理。Spring for Apache Kafka 提供了事务管理器实现。KafkaTransactionManager 是 Spring Framework 中 PlatformTransactionManager 的一种实现。你可以将该事务管理器与事务注解(Transactional annotation)一起使用,也可以在本地事务中使用事务模板(TransactionTemplate)。KafkaTransactionManager 使用生产者工厂来创建 Kafka 生产者,并提供了开始、提交和回滚事务的 API。

KafkaResourceHolder
Spring for Apache Kafka 还提供了一个 KafkaResourceHolder,用于保存 Kafka 生产者资源。Spring for Apache Kafka 中的 KafkaTemplate 会触发给定生产者工厂的当前线程绑定 KafkaResourceHolder。在消费者发起的事务中,消息监听容器会进行绑定,而生产者工厂与 KafkaTransactionManager 使用的生产者工厂相同。这样,事务就可以使用同一个事务生产者来满足所有发布需求。

除了上述组件,Spring for Apache Kafka 还提供了其他实用工具来处理与事务相关的问题。


在 Spring Cloud Stream Kafka Binder中启用事务发送消息
在 Spring Cloud Stream 的 Kafka Binder中启用事务的主要驱动因素是一个单一属性:spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix。

当该属性具有有效的前缀字符串时,Spring Cloud Stream 中的 Kafka Binder就会确保底层 KafkaTemplate 通过使用事务来发布数据。

顺便提一下,该属性可指示 Spring Cloud Stream 在使用处理器模式(消耗-处理-生产或读-处理-写模式)时使消费者具有事务感知能力。

代码:

@SpringBootApplication
@RestController
public class SpringCloudStreamProducer {

   @Autowired
   StreamBridge streamBridge;

   @Autowired Sender sender;

   @Autowired
   DefaultBinderFactory binderFactory;

   public static void main(String args) {
       SpringApplication.run(SpringCloudStreamProducer.class, args);
   }

   @PostMapping("/send-data")
   public void publishData() throws InterruptedException {
       sender.send(streamBridge);
   }

   @Component
   static class Sender {

     @Transactional        
     public void send(StreamBridge streamBridge)      
     {
       for (int i = 0; i < 5; i++) {
            streamBridge.send(
"mySupplier-out-0", "data-" + i);           
       }
     }
   }

  @Bean
  KafkaTransactionManager customKafkaTransactionManager() {
     KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder)this.binderFactory.getBinder(
"kafka", MessageChannel.class);
     ProducerFactory<byte, byte> transactionalProducerFactory = kafka.getTransactionalProducerFactory();
     KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(transactionalProducerFactory);
     return kafkaTransactionManager;
  }
}
属性配置:
spring:
  cloud:
   stream:
     bindings:
       mySupplier-out-0:
         destination: my-topic
     kafka:
       binder:
         Transaction:
        transaction-id-prefix: mySupplier-
producer:
             configuration:
               retries: 1
               acks: all

在方法上添加 @Transactional 注解时,Spring 会在幕后为该方法创建事务代理。当 Spring Cloud Stream 调用事务方法时,代理会拦截该调用,然后通过代理对象进行实际调用。

我们提供的自定义 KafkaTransactionManager Bean 有两个作用。

  • 首先,它使 Spring Boot 应用 @EnableTransactionManagerment。
  • 它还提供了绑定器内部使用的相同生产者工厂,以便事务注解在应用事务时使用适当的资源。

当 Spring Boot 检测到可用的事务管理器 bean 时,它会自动为我们应用 @EnableTransactionManagement 注解,该注解负责检测 @Transactional 注解,然后通过 Spring AOP 代理和建议机制添加拦截器。

换句话说,Spring AOP 会为 @Transactional 方法创建一个代理,并包含 AOP 建议。

如果不应用 @EnableTransactionManagement 注解,Spring 不会触发任何代理和拦截机制。由于 EnableTransactionManagement 注解对于上述各种原因至关重要,因此我们必须提供一个事务管理器 Bean。否则,方法上的事务注解就没有任何作用。

请注意,我们是从绑定器中获取事务生产者工厂,并将其用于 KafkaTransactionManager 的构造函数中。当该 bean 出现在应用程序中时,现在所有记录的整个发布过程都是在单个事务的范围内进行的。我们在跟踪日志中只看到 beginTransaction...commitTransaction 的单个序列,这意味着只有一个合适的事务执行所有发布操作。

在幕后,这些都是事件发生的顺序:

  • 只要调用了注释为 Transactional 的方法,事务拦截器就会通过 AOP 代理机制启动,并使用自定义的 KafkaTransactionManager 启动一个新事务。
  • 当事务管理器开始事务时,事务管理器使用的资源--事务资源持有者(又名,从生产者工厂获得的生产者)--会绑定到事务。
  • 当该方法调用 StreamBridgesend 方法时,底层 KafkaTemplate 将使用自定义 KafkaTransactionManager 创建的相同事务资源。由于一个事务已经在进行中,因此它不会启动另一个事务,而是在同一个事务生产者上进行发布。
  • 在调用更多发送方法时,它不会启动新事务。相反,它会通过原始事务中使用的同一生产者资源进行发布。
  • 方法退出时,如果没有错误,拦截器会要求事务管理器提交事务。如果方法中的任何发送操作或其他操作出现异常,拦截器会要求事务管理器回滚事务。这些调用最终会触发 KafkaResourceHolder 的提交或回滚方法,从而调用 Kafka 生产者提交或回滚事务。

由于我们的示例中只有一个自定义 KafkaTransactionManager Bean,因此我们可以直接使用 Transactional 注解。另一方面,如果我们有多个自定义 KafkaTransactionManager Bean,我们就必须用正确的 Bean 名称来限定 @Transactional 注解。

如果我们在不使用自定义 KafkaTransactionManager 的情况下运行应用程序呢?
如果我们移除自定义的 KafkaTransactionManager 并运行此应用程序,可以看到它创建了五个单独的事务,而不是一个单一的事务。如果启用 TRACE 日志,就能在日志中看到 beginTransaction...commitTransaction 的五个序列。

您可以编写一个事务消费者 Spring Cloud Stream 应用程序,并将其隔离级别设置为 read_committed,从而验证这种行为。具体方法是使用 spring.cloud.stream.kafka.binder.configuration.isolation.level 属性并将其值设置为 read_committed。出于测试目的,在 for 循环中添加 Thread.sleep 或其他等待机制来模拟每次 StreamBridgesend 之后的行为。我们可以看到,无论等待与否,只要每次发送方法调用返回,消费者就会收到数据,从而证明整个操作并非由一个事务执行,而是每次发送都发生在自己的事务中。

我们之所以看到每次发送都有独立的事务,是因为事务注解没有完成我们期望它完成的工作。事务注解只有在事务管理器 bean 可用且其生产者工厂与绑定器使用的工厂相同的情况下才会起作用。

如果 Spring Boot 通过 spring.kafka.producer.transaction-id-prefix 检测到配置中的 transaction-id-prefix 属性,它就会自动配置 KafkaTransactionManager。不过,由于我们是在 Spring Cloud Stream 上下文中,所以必须使用 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix,因为这是我们向框架发出信号,以便为binder和相关事务生产者工厂创建内部事务管理器的方式。如果我们提供适当的 spring.kafka 前缀,让 Spring Boot 自动为我们配置一个 KakaTransactionManager 会怎样?虽然这很诱人,但却行不通,因为自动配置的事务管理器使用的生产者工厂与绑定器使用的生产者工厂不同。因此,我们必须提供一个自定义的 KafkaTransactionManager,使用与binder相同的生产者工厂。这正是我们上面所做的。


Spring Cloud Stream Kafka 和 Exactly-Once 语义
Apache Kafka 中的 "一次性 "语义针对的是读-处理-写(或消耗-转换-产生)应用。有时,我们会对 "一次 "到底在做什么感到困惑。是最初的消费、数据处理,还是最后的生产?

Apache Kafka 保证整个 "读取->处理-写入"(read->process-write)序列的 "一次"(actly-once)语义。

在这个序列中,读取和处理部分总是至少一次--例如,如果部分处理或写入因任何原因失败。如果依赖精确一次交付,事务就非常关键,这样才能成功完成或回滚数据的最终发布。

一个潜在的副作用是,初始消费和处理可能会发生多次。例如,如果事务回滚,消费者偏移量不会更新,下一次轮询(如果是在 Spring Cloud Stream 中重试或应用程序重启)将重新交付相同的记录并再次进行处理。

因此,在消费和处理(转换)部分至少保证一次,这是需要理解的关键点。

任何以 read_committed 隔离级别运行的下游消费者都只能从上游处理器获得一次信息。

因此,我们必须明白,在一次准确交付的世界中,处理器和下游消费者必须协调,才能从一次准确交付的语义中获益。

任何以 read_uncommitted 隔离级别运行的已生成主题的消费者都可能看到重复的数据。

另一个需要注意的问题是,由于记录的消费和处理可能会发生多次,应用程序代码需要遵循惰性模式,这主要是在代码与外部系统(如数据库)交互时需要考虑的问题。在这种情况下,应用程序应确保用户代码不会产生副作用。

让我们重温一下之前看到的简单消耗-处理-产生循环的代码。

@Bean
public Consumer<PersonEvent> process(TxCode txCode) {
   return txCode::run;
}

@Component
class TxCode {

   @Transactional
   void run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send(
"process-out-0", event);
   }
}

正如我们之前看到的,要使该应用程序具有事务性,我们必须为 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix 配置属性提供一个合适的值。

只需在 Spring Cloud Stream 中提供该属性,上述代码段就能完全实现一次交付。

整个端到端流程在事务边界内运行(尽管在上述示例中我们有两个事务)。

我们有一个在容器中调用监听器时启动的外部 Kafka 事务,以及由事务拦截器启动的另一个 JPA 事务。

  • 当 StreamBridge 发送发生时,会使用与初始 Kafka 事务相同的事务资源,但直到控制权返回容器后才会提交。
  • 当方法退出时,JPA 事务就会提交。
  • 假设这里出了问题,数据库操作抛出了异常。在这种情况下,JPA 不会提交,它会回滚,异常会传播回监听容器,此时 Kafka 事务也会回滚。
  • 另一方面,如果 JPA 操作成功,但 Kafka 发布失败并抛出异常,JPA 不会提交,而是回滚,异常会传播到监听器。

在上面的代码中,如果我们不是与外部事务管理器同步,而只是发布到 Kafka,那么我们就不需要使用 @Transactional 注解,我们甚至可以在 txCode 方法中内联代码,作为消费者 lambda 的一部分:

@Bean
public Consumer<PersonEvent> process() {
   return pe -> {
      Person person = new Person();
       person.setName(pe.getName());
       PersonEvent event = new PersonEvent();
       event.setName(person.getName());
       event.setType("PersonSaved");
       streamBridge.send(
"process-out-0", event);

   }
}

在这种情况下,我们只有容器在调用监听器时发起的 Kafka 事务。当代码通过 StreamBridge 发送方法发布记录时,KafkaTemplate 会使用与初始事务相同的事务生产者工厂。

这两种情况下的故事都是,我们是完全事务性的,最终的发布只在事务中完成一次。隔离级别为 read_committed 的下游消费者应该只消费一次。

Kafka Streams和精确一次性语义
具有讽刺意味的是,Kafka Streams 应用程序原本是 Apache Kafka 添加事务支持和精确一次性语义的原因,但我们还没有谈论它。

原因在于,在 Kafka Streams 应用程序中实现精确一次性语义非常简单,而且几乎不难。正如他们所说,这只是一个配置旋钮。要了解有关 Kafka Streams 中精确一次性语义的更多信息,请参阅 Confluent 的这篇博客 blog from Confluent


要使用 Kafka Streams 绑定器,您只需使用以下 Maven 坐标将其添加到 Spring Cloud Stream 应用程序中:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>

与普通的基于 Kafka 客户端的应用程序一样,在 Kafka Streams 中,当你以消耗-处理-产生模式产生最终输出时,actly-once 保证就会发挥作用,这意味着所产生数据的任何下游消费者只要使用 read_committed 隔离级别,就会消耗正好一个数据。

Kafka Streams 配置属性 processing.guarantee 属性可在 Kafka Streams 应用程序中启用精确一次性语义。

你可以在 Spring Cloud Stream 中通过设置 spring.cloud.stream.kafka.streams.binder.configuration.processing.guarantee 属性来设置它。

  • 你需要将值设为 exactly_once。
  • 默认情况下,Kafka Streams 使用 at_least_once 的值。

有状态 Kafka Streams 应用程序中通常会发生三种主要活动:

  • 记录的初始消费
  • 通过更新日志主题更新状态存储。
  • 生成数据

模式是接收记录并进行处理。
  • 在此过程中,任何状态信息都会具体化到状态存储中,从而更新特定的更新日志主题。
  • 最后,出站记录会发布到另一个 Kafka 主题。

如果你注意到这种模式,它看起来与我们已经见过的许多场景相似,除了状态存储部分。

将 processing.guarantee 设置为 exactly_once,Kafka Streams 就能保证,如果在这些活动中出现异常或应用程序崩溃,整个单元会原子回滚,就像什么都没发生过一样。

应用程序重启后,处理器会再次消耗记录、处理记录并最终发布数据。

由于数据发布是在幕后以事务方式进行的,因此在数据永远发布之前,任何隔离级别为 read_committed 的下游消费者都不会消费该记录,而处理器会处理所有实现事务性所需的工作(如提交已消费记录的偏移量等),从而保证一次交付。

Kafka Streams 的准确一次性交付保证是针对从 Kafka 相关活动的角度来看记录的端到端消费、处理和发布。当外部系统存在时,它不提供这种保证。

例如,假设您的代码与外部系统有交互,如数据库插入或更新操作。在这种情况下,应用程序将自行决定如何参与事务。

在这种情况下,Spring 的事务支持又派上了用场。

正如我们在本系列中多次看到的那样,可以将与数据库交互的代码封装在一个单独的方法中,使用 @Transactional 注解对其进行注解,并提供一个适当的事务管理器,例如我们看到的 JPA 管理器。

当这种方法抛出异常时,JPA 事务会回滚,异常会传播到 Kafka Streams 处理器代码,最终传播回 Kafka Streams 框架本身,然后回滚原始 Kafka 事务。

值得在此再次重申的是,我们必须明白,从流拓扑中的处理器调用这些操作时,必须编写代码来处理惰性,因为 "精确一次 "只适用于整个流程,而不适用于序列中的单个读取和处理。

消费-加工-生产
假设您的代码与外部系统有交互,如数据库插入或更新操作。在这种情况下,应用程序将自行决定如何参与事务。
在这种情况下,Spring 的事务支持又派上了用场。

让我们看看事件驱动和流应用程序中的一个关键模式,称为“消费-处理-生产”模式。在 Spring Cloud Stream 中,此类模式的实现如下所示:

@Bean
public Function<PersonEvent, PersonEvent> process(TxCode txCode) {
  return pe -> txCode.run(pe);
}

@Component
class TxCode {

   @Transactional
   PersonEvent run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       return event;
   }
}

我们有一个 Spring Cloud Stream 函数,它消耗输入主题中的 PersonEvent,然后调用函数的 lambda 表达式主体中的函数进行处理。该函数返回另一个 PersonEvent,我们将其发布到出站的 Kafka 主题中。

如果不是在事务性上下文中,我们可以将上述 run 方法内联为函数 lambda 表达式的一部分。不过,要实现事务语义,@Transactional 注解必须用在不同类中的方法上。

要使Binder具有事务性,请确保为 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix 提供一个有效值。

上面的代码是完全事务性的吗?但实际上,它只是端到端的部分事务性代码。让我们看看事件发生的顺序。

Binder是事务性的,因为我们提供了事务 ID 前缀。

当消费者轮询消息监听器容器中的记录时,它会调用其 TrasactionTemplateexecute 方法中的内部监听器方法。
因此,执行监听器方法(调用用户方法)的整个端到端过程都是在 KafkaTransactionManager 启动的事务中运行的。

事务启动时,事务同步管理器(TransactionSynchronizationManager)会将资源(生产者)绑定到事务上。当用户方法(注释为 @Transactional 的方法)被调用时,事务拦截器会拦截该调用,让封装的 AOP 建议来处理实际调用。

因为我们有一个 JpaTransactionManager,所以事务拦截器会使用该管理器并启动一个新事务。是否要与现有事务同步取决于每个事务管理器的实现。

就 JpaTransactionManager(以及许多其他类似的数据库事务管理器实现)而言,它不允许与现有事务同步。

因此,JPA 事务是独立运行的,当run方法退出时,事务拦截器会使用 JPA 事务管理器执行提交或回滚操作。

这样,JPA 事务管理器就完成了它的工作。

此时,方法调用的响应将返回调用者,即 Spring Cloud Stream 基础架构。

  • Spring Cloud Stream 中的机制会接收该响应并将其发送到 Kafka 中的出站主题。
  • 它使用初始事务开始时绑定的相同事务生产者。
  • 发送记录后,控制返回消息监听器容器,然后提交或回滚事务。

以下是该序列中的步骤:

  1. Kafka 消费者接收记录。
  2. Spring Kafka 中的容器通过使用 TransactionTemplate 的 execute 方法调用监听器。
  3. KafkaTransactionManager 启动一个新事务:绑定 Kafka 资源(生产者)。
  4. 当它到达用户代码时,事务拦截器最终会拦截该调用并启动一个新的 JPA 事务。
  5. 然后,AOP 代理调用实际方法。
  6. 方法退出时,JpaTransactionManager 会提交或回滚。
  7. 该方法的输出会返回到 Spring Cloud Stream 中的调用者。
  8. 然后使用步骤 4 中的相同事务资源将响应发送到 Kafka outbound。
  9. 控制返回消息监听容器,KafkaTransactionManager 提交或回滚。

那么,问题出在哪里呢?

它看起来是事务性的,但实际上只是部分事务性。

一开始的主要问题是,整个端到端流程超出了单个原子事务的范围,这是一个重大问题。

这里有两个事务--Kafka 和 JPA,而且 JPA 和 Kafka 事务之间没有同步。

如果数据库事务提交了,而 Kafka 发送失败了,那么就没有办法回滚 JPA 事务。

我们可能会认为 ChainedTransactionManager 可以帮上忙。虽然这种直觉有一定的道理,但在上述代码中却行不通。

因为在调用监听器方法时容器中创建了 Kafka 事务,所以 ChainedTransactionManager 不会从提供给它的任何 Kafka 事务管理器中创建任何新的 Kafka 事务。在退出用户方法时,我们仍有一个 JPA 事务需要提交或回滚。Kafka 事务必须等到调用返回容器后才能提交或回滚。

问题在于我们使用了 Spring Cloud Stream 中的一个函数,该函数使框架能够发布到 Kafka。

在我们的案例中,任何用户指定的事务(如 JPA 事务)都会在 Spring Cloud Stream 执行 Kafka 发布之前发生。我们需要确保用户代码是向 Kafka 发布的代码,这样才能将整个事务代码视为一个单元。

为此,我们应改用 Consumer 而不是 Function,然后使用 StreamBridge API 发布到 Kafka。看看修改后的代码:

@Bean
public Consumer<PersonEvent> process(TxCode txCode) {
   return txCode::run;
}


然后,我们使用与上述相同的 TxCode:

@Component
class TxCode {

   @Transactional
   void run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send(
"process-out-0", event);
   }
}

请注意,run方法不会返回任何内容,但我们会通过 StreamBridge API 明确向外发送 Kafka 主题。

让我们来看看发生这些变化的事件顺序

  1. Kafka 消费者接收记录。
  2. Spring Kafka 中的容器通过使用 TransactionTemplate 的 execute 方法调用监听器。
  3. KafkaTransactionManager 启动新事务。
  4. 绑定 Kafka 资源(生产者)。
  5. 当到达用户代码时,拦截器会拦截该调用,并使用 JpaTransactionManager 启动一个新事务。
  6. 实际的用户方法被调用。
  7. 作为方法执行的一部分,Kafka 发送操作是通过 StreamBridge 进行的。底层 KafkaTemplate 使用步骤 4 中绑定的相同事务生产者工厂。
  8. 方法退出时,JpaTransactionManager 会提交或回滚。
  9. 最后,当 Kafka 事务提交(或回滚)时,控制权返回 TransactionTemplateexecute 方法。

请特别注意上面的第 7 步。当 KafkaTemplate 检测到已经有一个正在进行的 Kafka 事务(从步骤 3 开始)时,它不会与 JPA 事务同步,尽管 KafkaTemplate 可以这样做。现有的 Kafka 事务具有优先权,它会加入该事务。

尽管我们仍然有两个独立的事务,但从端到端事务的角度来看,事情是原子的。如果通过 StreamBridge 进行的 Kafka 发布操作失败,JPA 和 Kafka 事务都不会执行提交操作。两者都会回滚。同样,如果数据库操作失败,两个事务仍会回滚。

但是,总是有可能出现一个事务提交而另一个事务回滚的情况,因此应用代码必须处理记录的去重以实现容错

通过以上讨论,我们深入探讨了编写事务型 Spring Cloud Stream 应用程序的各种选项,这些应用程序在消费和生产到 Apache Kafka 的同时,必须与外部事务系统(如数据库)进行交互。

(说白了,似乎Spring + Kafka无法实现真正统一的事务,因为JPA事务管理器和Kafka都是“自以为是的强者”,两者无法链接到一起。)