在本文中,您将学习如何在 Spring Boot 应用程序中将 Kafka 事务与 Spring Kafka 项目一起使用。为了运行 Kafka 集群,我们将使用Upstash。
如果您想自己尝试一下,可以随时查看我的源代码。为此,您需要克隆我的 GitHub 存储库。之后,您应该按照我的指示进行操作。让我们开始。
Spring Boot 中的 Kafka 入门
我已经使用 Web Dashboard 在 Upstash 上创建了一个 Kafka集群。所有连接凭据都是自动生成的。您可以在集群的主页上找到并复制它们。
假设我们有一个用户名作为 KAFKA_USER 变量和一个密码作为 KAFKA_PASS 变量,我们需要在application.yml文件中提供以下 Spring 配置:
spring: application.name: transactions-service kafka: bootstrap-servers: inviting-camel-5620-eu1-kafka.upstash.io:9092 properties: security.protocol: SASL_SSL sasl.mechanism: SCRAM-SHA-256 sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${KAFKA_USER}" password="${KAFKA_PASS}";
|
这是所需依赖项的列表。由于我们交换 JSON 消息,我们需要 Jackson 库进行序列化或反序列化。当然,我们还需要包含 Spring Boot starter 和 Spring Kafka。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
|
transactions-service正在生成和发送订单。我们将transactions在应用启动时创建测试主题。
@SpringBootApplication public class TransactionsService {
public static void main(String args) { SpringApplication.run(TransactionsService.class, args); }
@Bean public NewTopic transactionsTopic() { return TopicBuilder.name("transactions") .partitions(3) .replicas(1) .build(); }
}
|
在 Spring Boot 中启用 Kafka 事务
在 Kafka 中,生产者通过向事务协调器发出请求来启动事务。您可以在 Confluent 博客上的以下文章中找到该过程的详细说明。
使用 Spring Boot,我们只需要设置 spring.kafka.producer.transaction-id-prefix 属性来启用事务。KafkaTransactionManager Spring Boot 将通过自动配置一个bean 并将其连接到侦听器容器中来完成其余的 工作。这是负责消息生产者的配置的一部分。我们使用 JsonSerializer 将数据从对象序列化为 JSON。交易前缀是tx-.
spring: kafka: producer: key-serializer: org.apache.kafka.common.serialization.LongSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer transaction-id-prefix: tx-
|
在我们的场景中,我们将在单个事务中发送 10 条消息。为了观察消费者端的日志,我们将后续尝试之间的延迟设置为 1 秒。
@Transactional public void generateAndSendPackage() throws InterruptedException, TransactionException { for (long i = 0; i < 10; i++) { Order t = new Order(id++, i+1, i+2, 1000, "NEW"); ListenableFuture<SendResult<Long, Order>> result = kafkaTemplate.send("transactions", t.getId(), t); result.addCallback(callback); Thread.sleep(1000); } }
|
在 Kafka 消费者端启用事务
在第一步中,我们将只打印传入的消息。我们需要用@KafkaListener. 目标主题是transactions,消费群体是a。此外,我们必须添加@Transactional注释以启用对侦听方法的事务支持。
@Service public class TransactionsListener {
private static final Logger LOG = LoggerFactory .getLogger(TransactionsListener.class);
@KafkaListener( id = "transactions", topics = "transactions", containerGroup = "a", concurrency = "3") @Transactional public void listen(Order order) { LOG.info("{}", order); } }
|
让我们先运行生产者应用程序。要做的就是转到transactions-service目录并执行命令mvn spring-boot:run。
为 Spring Kafka 事务启用更详细的日志是一个好主意。为此,将以下行添加到application.yml文件中:
logging: level: org.springframework.transaction: trace org.springframework.kafka.transaction: debug
|
之后,让我们运行消费者应用程序。为此,请转到accounts-service目录并运行与以前相同的命令。您应该会看到在 Upstash 控制台中创建的以下主题:
该transactions-service应用程序公开了用于发送消息的 REST 端点。它只是开始在我在上一节中提到的单个事务中生成和发送 10 条消息的过程。让我们调用端点:
$ curl -X POST http://localhost:8080/transactions
让我们看看生产者端的日志。发送完所有消息后,它提交了事务。
现在,让我们看看它在消费者方面的表现。所有消息都是在生产者应用程序发送后立即接收的。这不是我们所期望的……
为了验证发生了什么,我们需要查看消费者应用程序日志。这是一个带有 Kafka 消费者设置的片段。如您所见,默认情况下,Spring Kafka 将为 Spring Boot事务隔离级别设置read_uncommitted。
使用 Spring Kafka 深入研究事务
为了解决上一节中事务的问题,我们需要更改application.yml文件中的默认隔离级别。由于spring.kafka.consumer.properties我们必须将isolation.level属性设置read_commited为如下所示。
spring: application.name: accounts-service kafka: consumer: key-deserializer: org.apache.kafka.common.serialization.LongDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: "*" isolation.level: read_committed
|
之后,让我们accounts-service再次运行该应用程序。
现在,生产者提交事务后,所有消息都已收到。
当我们将@KafkaListener concurrency参数设置为3 时,共有三个消费者线程。
下一步,我们将在生产者端测试事务的回滚。
为此,我们将修改生成和发送订单的方法。现在,generateAndSendPackage得到一个布尔参数,它指示事务是否应该回滚。
@Transactional public void generateAndSendPackage(boolean error) throws InterruptedException { for (long i = 0; i < 10; i++) { Order t = new Order(id++, i+1, i+2, 1000, "NEW"); ListenableFuture<SendResult<Long, Order>> result = kafkaTemplate.send("transactions", t.getId(), t); result.addCallback(callback); if (error && i > 5) throw new RuntimeException(); Thread.sleep(1000); } }
|
这是我们测试的日志。在发送六个订单后,该方法会抛出一个RuntimeException并且 Spring 回滚一个事务。正如预期的那样,消费者应用程序没有收到任何消息。
重要的是要知道 Spring 回滚默认情况下仅适用于 未经检查 的异常。
要回滚 检查 的异常,我们需要 rollbackFor在@Transactional注解上指定。
事务生产者甚至在提交事务之前就向 Kafka 集群发送消息。您可以在上一节中看到它,如果隔离级别为read_uncommited ,侦听器会持续接收消息。
因此,如果我们在生产者端回滚事务,则在回滚之前发送的消息会到达 Kafka 代理。例如,我们可以在主题transactions的 Upstash 实时消息视图中看到它。
下图这是transactions主题中a消费者组所有分区的当前偏移量值。
在发送第一个包含 10 条消息的包后,我们成功提交,并使用第二个包回滚了事务。在这种情况下,偏移量的总和为 10。但实际上,这些分区上的当前最新偏移量是不同的。
为了验证它,例如,我们可以将侦听器的消费者组名称更改为b并启动accounts-service.
@KafkaListener( id = "transactions", topics = "transactions", containerGroup = "b", concurrency = "3") @Transactional public void listen(Order order) { LOG.info("{}", order); }
|
这是b消费者组的当前偏移量值。
当然,消息已回滚。
但这里要理解的重要一点是,这些操作发生在 Kafka 代理端。事务协调器更改 Kafka 偏移量的值。我们可以很容易地验证消费者在回滚后不会收到消息,即使我们使用该spring.kafka.consumer.auto-offset-reset属性将初始偏移量设为earliest最早的 。
添加数据库
在本节中,我们将使用新功能扩展我们的场景。我们的应用程序会将订单状态存储在数据库中。仅出于演示目的,我们将使用内存数据库 H2。此场景需要两个依赖项:H2 和 Spring Data JPA。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency>
|
那里OrderGroup存储了包裹的当前状态(SENT、CONFIRMED、ROLLBACK)、单个包裹中的订单总数以及由accounts-service.
@Entity public class OrderGroup {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String status; private int totalNoOfOrders; private int processedNoOfOrders;
// GETTERS/SETTERS ... }
|
为了管理实体,我们使用 Spring Data repository 模式:
public interface OrderGroupRepository extends CrudRepository<OrderGroup, Long> { }
|
我们还将在accounts-service应用程序中包含一个数据库。当它处理传入的订单时,它会在源账户和目标账户之间进行转账。它将帐户余额存储在数据库中。
@Entity public class Account {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private int balance;
// GETTERS/SETTERS ... }
|
和以前一样,有一个用于管理Account实体的存储库 bean。
public interface AccountRepository extends CrudRepository<Account, Long> { }
|
我们还需要修改Order应用程序之间交换的消息。它必须包含groupId用于处理确认的字段。
public class Order {
private Long id; private Long sourceAccountId; private Long targetAccountId; private int amount; private String status; private Long groupId;
// GETTERS/SETTERS ... }
|
下面的图表说明了我们所描述场景的架构。
处理跨多个资源的事务
包含 Spring Data JPA 后,有两个注册的TransactionManagerbean 名称transactionManager和kafkaTransactionManager. 因此我们需要在@Transactional注解中选择事务管理器的名称。
- 第一步,我们向数据库添加一个新实体。主键id在数据库中自动生成,然后返回给对象。
- 之后,我们获取 groupId并生成该组内的订单序列。
当然,这两个操作(保存到数据库,发送到 Kafka)都是同一个事务的一部分。
@Transactional("kafkaTransactionManager") public void sendOrderGroup(boolean error) throws InterruptedException { OrderGroup og = repository.save(new OrderGroup("SENT", 10, 0)); generateAndSendPackage(error, og.getId()); }
private void generateAndSendPackage(boolean error, Long groupId) throws InterruptedException { for (long i = 0; i < 10; i++) { Order o = new Order(id++, i+1, i+2, 1000, "NEW", groupId); ListenableFuture<SendResult<Long, Order>> result = kafkaTemplate.send("transactions", o.getId(), o); result.addCallback(callback); if (error && i > 5) throw new RuntimeException(); Thread.sleep(1000); } }
|
该accounts-service应用程序侦听传入的订单。它在单独的事务中处理每个订单。它检查客户账户中是否有足够的资金进行转账。如果有足够的钱,它会执行转账。最后,它将响应发送到 transactions-service事务状态。消息被发送到orders主题。
@KafkaListener( id = "transactions", topics = "transactions", groupId = "a", concurrency = "3") @Transactional("kafkaTransactionManager") public void listen(Order order) { LOG.info("Received: {}", order); process(order); }
private void process(Order order) { Account accountSource = repository .findById(order.getSourceAccountId()) .orElseThrow(); Account accountTarget = repository .findById(order.getTargetAccountId()) .orElseThrow(); if (accountSource.getBalance() >= order.getAmount()) { accountSource.setBalance(accountSource.getBalance() - order.getAmount()); repository.save(accountSource); accountTarget.setBalance(accountTarget.getBalance() + order.getAmount()); repository.save(accountTarget); order.setStatus("PROCESSED"); } else { order.setStatus("FAILED"); } LOG.info("After processing: {}", order); kafkaTemplate.send("orders", order.getId(), order); }
|
transactions-service监听关于该orders主题的订单确认。一旦收到消息,它就会增加订单组中已处理订单的数量,并将当前结果存储在数据库中。我们应该使用默认的 Spring 事务管理器,因为我们不向 Kafka 发送任何消息。
@KafkaListener( id = "orders", topics = "orders", groupId = "a", concurrency = "3") @Transactional("transactionManager") public void listen(Order order) { LOG.info("{}", order); OrderGroup og = repository .findById(order.getGroupId()) .orElseThrow(); if (order.getStatus().equals("PROCESSED")) { og.setProcessedNoOfOrders(og.getProcessedNoOfOrders() + 1); og = repository.save(og); LOG.info("Current: {}", og); } }
|
不要忘记OrderGroup在事务期间锁定记录。由于我们同时处理消息(使用 3 个线程),我们需要锁定OrderGroup记录,直到我们更新processedNoOfOrders列的值:
public interface OrderGroupRepository extends CrudRepository<OrderGroup, Long> {
@Lock(LockModeType.PESSIMISTIC_WRITE) Optional<OrderGroup> findById(Long groupId); }
|
让我们测试一个积极的场景。我们将生成一组应确认的订单。为此,让我们调用我们的端点POST /transactions:
$ curl -X 'POST' 'http://localhost:8080/transactions' \ -H 'Content-Type: application/json' \ -d 'false'
|
以下是accounts-service应用程序的日志:
我们还可以查看transactions-service应用程序生成的日志:
最后,我们可以通过调用以下端点来验证订单组的当前状态:
$ curl -X GET 'http://localhost:8080/transactions'
|
如果我们回滚事务会发生什么?使用以下命令自行尝试:
$ curl -X 'POST' 'http://localhost:8080/transactions' \ -H 'Content-Type: application/json' \ -d 'true'
|
最后的想法
您可以使用 Spring Kafka 项目通过 Spring Boot 轻松处理 Kafka 事务。您可以将您的应用程序与数据库集成并处理跨多个资源的事务。但是,需要澄清一件事——Kafka 不支持 XA 事务。可能会导致数据不一致。Spring 没有解决这种情况,它只是在后台执行两个事务。当@Transactional方法退出时,Spring Boot 会先提交数据库事务,然后提交 Kafka 事务。您可以更改该顺序以首先启用 Kafka 事务提交,方法是将外部方法配置为使用 DataSourceTransactionManager,将内部方法 配置为使用KafkaTransactionManager.
我们能以某种方式解决那个案子吗?当然。例如,Debezium 项目允许您将数据库更改流式传输到 Kafka 主题中。使用这种方法,您只需提交数据库中的更改,然后配置 Debezium 以将更改的事件发送到 Kafka。
GitHub 存储库