使用Spring Boot + Kafka实现分布式事务 - Piotr


在本文中,您将学习如何在 Spring Boot 应用程序中将 Kafka 事务与 Spring Kafka 项目一起使用。为了运行 Kafka 集群,我们将使用Upstash
如果您想自己尝试一下,可以随时查看我的源代码。为此,您需要克隆我的 GitHub 存储库。之后,您应该按照我的指示进行操作。让我们开始。

Spring Boot 中的 Kafka 入门
我已经使用 Web Dashboard 在 Upstash 上创建了一个 Kafka集群。所有连接凭据都是自动生成的。您可以在集群的主页上找到并复制它们。


假设我们有一个用户名作为 KAFKA_USER 变量和一个密码作为 KAFKA_PASS 变量,我们需要在application.yml文件中提供以下 Spring 配置:

spring:
  application.name: transactions-service
  kafka:
    bootstrap-servers: inviting-camel-5620-eu1-kafka.upstash.io:9092
    properties:
      security.protocol: SASL_SSL
      sasl.mechanism: SCRAM-SHA-256
      sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${KAFKA_USER}" password="${KAFKA_PASS}";

这是所需依赖项的列表。由于我们交换 JSON 消息,我们需要 Jackson 库进行序列化或反序列化。当然,我们还需要包含 Spring Boot starter 和 Spring Kafka。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
</dependency>

transactions-service正在生成和发送订单。我们将transactions在应用启动时创建测试主题。

@SpringBootApplication
public class TransactionsService {

   public static void main(String args) {
      SpringApplication.run(TransactionsService.class, args);
   }

   @Bean
   public NewTopic transactionsTopic() {
      return TopicBuilder.name("transactions")
          .partitions(3)
          .replicas(1)
          .build();
   }

}

在 Spring Boot 中启用 Kafka 事务
在 Kafka 中,生产者通过向事务协调器发出请求来启动事务。您可以在 Confluent 博客上的以下文章中找到该过程的详细说明。

使用 Spring Boot,我们只需要设置 spring.kafka.producer.transaction-id-prefix 属性来启用事务。KafkaTransactionManager Spring Boot 将通过自动配置一个bean 并将其连接到侦听器容器中来完成其余的 工作。这是负责消息生产者的配置的一部分。我们使用 JsonSerializer 将数据从对象序列化为 JSON。交易前缀是tx-.

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      transaction-id-prefix: tx-

在我们的场景中,我们将在单个事务中发送 10 条消息。为了观察消费者端的日志,我们将后续尝试之间的延迟设置为 1 秒。

@Transactional
public void generateAndSendPackage() 
      throws InterruptedException, TransactionException {
   for (long i = 0; i < 10; i++) {
      Order t = new Order(id++, i+1, i+2, 1000, "NEW");
      ListenableFuture<SendResult<Long, Order>> result =
         kafkaTemplate.send("transactions", t.getId(), t);
      result.addCallback(callback);
      Thread.sleep(1000);
   }
}

在 Kafka 消费者端启用事务
在第一步中,我们将只打印传入的消息。我们需要用@KafkaListener. 目标主题是transactions,消费群体是a。此外,我们必须添加@Transactional注释以启用对侦听方法的事务支持。

@Service
public class TransactionsListener {

   private static final Logger LOG = LoggerFactory
          .getLogger(TransactionsListener.class);

   @KafkaListener(
          id = "transactions",
          topics = "transactions",
          containerGroup = "a",
          concurrency = "3")
   @Transactional
   public void listen(Order order) {
      LOG.info("{}", order);
   }
}

让我们先运行生产者应用程序。要做的就是转到transactions-service目录并执行命令mvn spring-boot:run。
为 Spring Kafka 事务启用更详细的日志是一个好主意。为此,将以下行添加到application.yml文件中:

logging:
  level:
    org.springframework.transaction: trace
    org.springframework.kafka.transaction: debug

之后,让我们运行消费者应用程序。为此,请转到accounts-service目录并运行与以前相同的命令。您应该会看到在 Upstash 控制台中创建的以下主题:


该transactions-service应用程序公开了用于发送消息的 REST 端点。它只是开始在我在上一节中提到的单个事务中生成和发送 10 条消息的过程。让我们调用端点:
$ curl -X POST http://localhost:8080/transactions

让我们看看生产者端的日志。发送完所有消息后,它提交了事务。

现在,让我们看看它在消费者方面的表现。所有消息都是在生产者应用程序发送后立即接收的。这不是我们所期望的……

为了验证发生了什么,我们需要查看消费者应用程序日志。这是一个带有 Kafka 消费者设置的片段。如您所见,默认情况下,Spring Kafka 将为 Spring Boot事务隔离级别设置read_uncommitted。


使用 Spring Kafka 深入研究事务
为了解决上一节中事务的问题,我们需要更改application.yml文件中的默认隔离级别。由于spring.kafka.consumer.properties我们必须将isolation.level属性设置read_commited为如下所示。

spring:
  application.name: accounts-service
  kafka:
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"
        isolation.level: read_committed

之后,让我们accounts-service再次运行该应用程序。

现在,生产者提交事务后,所有消息都已收到。
当我们将@KafkaListener concurrency参数设置为3 时,共有三个消费者线程。

下一步,我们将在生产者端测试事务的回滚。
为此,我们将修改生成和发送订单的方法。现在,generateAndSendPackage得到一个布尔参数,它指示事务是否应该回滚。

@Transactional
public void generateAndSendPackage(boolean error)
       throws InterruptedException {
   for (long i = 0; i < 10; i++) {
      Order t = new Order(id++, i+1, i+2, 1000, "NEW");
      ListenableFuture<SendResult<Long, Order>> result =
            kafkaTemplate.send("transactions", t.getId(), t);
      result.addCallback(callback);
      if (error && i > 5)
         throw new RuntimeException();
      Thread.sleep(1000);
   }
}

这是我们测试的日志。在发送六个订单后,该方法会抛出一个RuntimeException并且 Spring 回滚一个事务。正如预期的那样,消费者应用程序没有收到任何消息。

重要的是要知道 Spring 回滚默认情况下仅适用于 未经检查 的异常。
要回滚 检查 的异常,我们需要 rollbackFor在@Transactional注解上指定。

事务生产者甚至在提交事务之前就向 Kafka 集群发送消息。您可以在上一节中看到它,如果隔离级别为read_uncommited ,侦听器会持续接收消息。
因此,如果我们在生产者端回滚事务,则在回滚之前发送的消息会到达 Kafka 代理。例如,我们可以在主题transactions的 Upstash 实时消息视图中看到它。


下图这是transactions主题中a消费者组所有分区的当前偏移量值。
在发送第一个包含 10 条消息的包后,我们成功提交,并使用第二个包回滚了事务。在这种情况下,偏移量的总和为 10。但实际上,这些分区上的当前最新偏移量是不同的。

为了验证它,例如,我们可以将侦听器的消费者组名称更改为b并启动accounts-service.

@KafkaListener(
     id = "transactions",
     topics = "transactions",
     containerGroup = "b",
     concurrency = "3")
@Transactional
public void listen(Order order) {
   LOG.info("{}", order);
}

这是b消费者组的当前偏移量值。

当然,消息已回滚。
但这里要理解的重要一点是,这些操作发生在 Kafka 代理端。事务协调器更改 Kafka 偏移量的值。我们可以很容易地验证消费者在回滚后不会收到消息,即使我们使用该spring.kafka.consumer.auto-offset-reset属性将初始偏移量设为earliest最早的 。


添加数据库
在本节中,我们将使用新功能扩展我们的场景。我们的应用程序会将订单状态存储在数据库中。仅出于演示目的,我们将使用内存数据库 H2。此场景需要两个依赖项:H2 和 Spring Data JPA。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>

那里OrderGroup存储了包裹的当前状态(SENT、CONFIRMED、ROLLBACK)、单个包裹中的订单总数以及由accounts-service.

@Entity
public class OrderGroup {

   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long id;
   private String status;
   private int totalNoOfOrders;
   private int processedNoOfOrders;

   // GETTERS/SETTERS ...
}

为了管理实体,我们使用 Spring Data repository 模式:

public interface OrderGroupRepository extends 
   CrudRepository<OrderGroup, Long> {
}

我们还将在accounts-service应用程序中包含一个数据库。当它处理传入的订单时,它会在源账户和目标账户之间进行转账。它将帐户余额存储在数据库中。

@Entity
public class Account {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private int balance;

   // GETTERS/SETTERS ...
}

和以前一样,有一个用于管理Account实体的存储库 bean。

public interface AccountRepository extends
   CrudRepository<Account, Long> {
}

我们还需要修改Order应用程序之间交换的消息。它必须包含groupId用于处理确认的字段。

public class Order {

    private Long id;
    private Long sourceAccountId;
    private Long targetAccountId;
    private int amount;
    private String status;
    private Long groupId;

   // GETTERS/SETTERS ...
}

下面的图表说明了我们所描述场景的架构。


处理跨多个资源的事务
包含 Spring Data JPA 后,有两个注册的TransactionManagerbean 名称transactionManager和kafkaTransactionManager. 因此我们需要在@Transactional注解中选择事务管理器的名称。

  1. 第一步,我们向数据库添加一个新实体。主键id在数据库中自动生成,然后返回给对象。
  2. 之后,我们获取 groupId并生成该组内的订单序列。

当然,这两个操作(保存到数据库,发送到 Kafka)都是同一个事务的一部分。
@Transactional("kafkaTransactionManager")
public void sendOrderGroup(boolean error) throws InterruptedException {
   OrderGroup og = repository.save(new OrderGroup("SENT", 10, 0));
   generateAndSendPackage(error, og.getId());
}

private void generateAndSendPackage(boolean error, Long groupId)
      throws InterruptedException {
   for (long i = 0; i < 10; i++) {
      Order o = new Order(id++, i+1, i+2, 1000, "NEW", groupId);
      ListenableFuture<SendResult<Long, Order>> result =
         kafkaTemplate.send("transactions", o.getId(), o);
      result.addCallback(callback);
      if (error && i > 5)
         throw new RuntimeException();
      Thread.sleep(1000);
   }
}

该accounts-service应用程序侦听传入的订单。它在单独的事务中处理每个订单。它检查客户账户中是否有足够的资金进行转账。如果有足够的钱,它会执行转账。最后,它将响应发送到 transactions-service事务状态。消息被发送到orders主题。

@KafkaListener(
   id = "transactions",
   topics = "transactions",
   groupId = "a",
   concurrency = "3")
@Transactional("kafkaTransactionManager")
public void listen(Order order) {
   LOG.info("Received: {}", order);
   process(order);
}

private void process(Order order) {
   Account accountSource = repository
      .findById(order.getSourceAccountId())
      .orElseThrow();
   Account accountTarget = repository
      .findById(order.getTargetAccountId())
      .orElseThrow();
   if (accountSource.getBalance() >= order.getAmount()) {
      accountSource.setBalance(accountSource.getBalance() - order.getAmount());
      repository.save(accountSource);
      accountTarget.setBalance(accountTarget.getBalance() + order.getAmount());
      repository.save(accountTarget);
      order.setStatus("PROCESSED");
   } else {
      order.setStatus("FAILED");
   }
   LOG.info("After processing: {}", order);
   kafkaTemplate.send("orders", order.getId(), order);
}

transactions-service监听关于该orders主题的订单确认。一旦收到消息,它就会增加订单组中已处理订单的数量,并将当前结果存储在数据库中。我们应该使用默认的 Spring 事务管理器,因为我们不向 Kafka 发送任何消息。

@KafkaListener(
      id = "orders",
      topics = "orders",
      groupId = "a",
      concurrency = "3")
@Transactional("transactionManager")
public void listen(Order order) {
   LOG.info("{}", order);
   OrderGroup og = repository
      .findById(order.getGroupId())
      .orElseThrow();
   if (order.getStatus().equals("PROCESSED")) {      
      og.setProcessedNoOfOrders(og.getProcessedNoOfOrders() + 1);
      og = repository.save(og);
      LOG.info("Current: {}", og);
   }
}

不要忘记OrderGroup在事务期间锁定记录。由于我们同时处理消息(使用 3 个线程),我们需要锁定OrderGroup记录,直到我们更新processedNoOfOrders列的值:

public interface OrderGroupRepository extends
        CrudRepository<OrderGroup, Long> {

    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Optional<OrderGroup> findById(Long groupId);
}

让我们测试一个积极的场景。我们将生成一组应确认的订单。为此,让我们调用我们的端点POST /transactions:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d 'false'

以下是accounts-service应用程序的日志:

我们还可以查看transactions-service应用程序生成的日志:

最后,我们可以通过调用以下端点来验证订单组的当前状态:

$ curl -X GET 'http://localhost:8080/transactions'

如果我们回滚事务会发生什么?使用以下命令自行尝试:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d 'true'

最后的想法
您可以使用 Spring Kafka 项目通过 Spring Boot 轻松处理 Kafka 事务。您可以将您的应用程序与数据库集成并处理跨多个资源的事务。但是,需要澄清一件事——Kafka 不支持 XA 事务。可能会导致数据不一致。Spring 没有解决这种情况,它只是在后台执行两个事务。当@Transactional方法退出时,Spring Boot 会先提交数据库事务,然后提交 Kafka 事务。您可以更改该顺序以首先启用 Kafka 事务提交,方法是将外部方法配置为使用 DataSourceTransactionManager,将内部方法 配置为使用KafkaTransactionManager.

我们能以某种方式解决那个案子吗?当然。例如,Debezium 项目允许您将数据库更改流式传输到 Kafka 主题中。使用这种方法,您只需提交数据库中的更改,然后配置 Debezium 以将更改的事件发送到 Kafka。

GitHub 存储库