使用Kafka Streams和Spring Boot实现微服务Saga分布式事务 - Piotr


在本文中,您将学习如何在 Spring Boot 中使用 Kafka Streams。我们将依赖 Spring Kafka 项目。为了很好地解释它是如何工作的,我们将实现一个 saga 模式。saga 模式是一种跨微服务管理分布式事务的方法。该过程的关键阶段是发布触发本地事务的事件。微服务通过消息代理交换此类事件。事实证明,Kafka Streams 可以帮助我们。让我们看看如何!
源代码:
如果您想自己尝试一下,可以随时查看我的源代码。为此,您需要克隆我的 GitHub 存储库。之后,您应该按照我的指示进行操作。
除了 Spring Kafka,您还可以使用 Spring Cloud Stream for Kafka。您可以在本文中阅读更多相关信息。Spring Cloud Stream 提供了一些有用的功能,例如 DLQ 支持、默认的 JSON 序列化或交互式查询。
 
架构
我们将创建一个简单的系统,由三个微服务组成。订单服务将订单发送到名为订单的Kafka主题。其他两个微服务stock-service和payment-service都会监听传入的事件。在接收到这些事件后,它们会验证是否有可能执行该订单。例如,如果客户账户上没有足够的资金,订单将被拒绝。否则,支付服务接受订单,并向支付-订单主题发送一个响应。库存服务也是如此,除了验证库存产品的数量并向库存-订单主题发送一个响应。

然后,订单服务通过订单的ID将来自库存-订单和付款-订单主题的两个流连接起来。如果两个订单都被接受,它就确认了一个分布式交易。另一方面,如果一个订单被接受,而第二个订单被拒绝,它就会执行回滚。在这种情况下,它只是生成一个新的订单事件并将其发送到订单主题。我们可以把订单主题看作是订单状态变化的一个流,或者就像一个有最后状态的表。下面的图片直观地展示了我们的场景。

 
Kafka Streams with Spring Boot
让我们从订单服务开始实施。令人惊讶的是,没有针对Kafka的Spring Boot启动器(除非我们使用Spring Cloud Stream)。因此,我们需要包含spring-kafka依赖项。为了处理流,我们还需要直接包含kafka-streams模块。由于订单服务暴露了一些REST端点,所以需要添加Spring Boot Web启动器。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
</dependency>

订单服务是我们方案中最重要的微服务。它充当订单网关和传奇模式的协调者。它需要我们架构中使用的所有三个主题。为了在应用程序启动时自动创建主题,我们需要定义以下bean类。

@Bean
public NewTopic orders() {
   return TopicBuilder.name("orders")
         .partitions(3)
         .compact()
         .build();
}

@Bean
public NewTopic paymentTopic() {
   return TopicBuilder.name("payment-orders")
         .partitions(3)
         .compact()
         .build();
}

@Bean
public NewTopic stockTopic() {
   return TopicBuilder.name("stock-orders")
         .partitions(3)
         .compact()
         .build();
}

然后,让我们来定义我们的第一个Kafka流。
要做到这一点,我们需要使用StreamsBuilder Bean。
订单服务接收来自支付服务(在payment-events主题)和股票服务(在stock-events主题)的事件。每一个事件都包含先前由订单服务设置的ID。如果我们通过订单的ID将这两个流连接成一个流,我们将能够确定我们的交易状态。这个算法非常简单。如果支付服务和股票服务都接受了订单,交易的最终状态就是确认。如果两个服务都拒绝了订单,最终的状态是REJECTED。最后一个选项是ROLLBACK--当一个服务接受了订单,而一个服务拒绝了它。
下面是OrderManageService Bean中的描述方法:

@Service
public class OrderManageService {

   public Order confirm(Order orderPayment, Order orderStock) {
      Order o = new Order(orderPayment.getId(),
             orderPayment.getCustomerId(),
             orderPayment.getProductId(),
             orderPayment.getProductCount(),
             orderPayment.getPrice());
      if (orderPayment.getStatus().equals("ACCEPT") &&
                orderStock.getStatus().equals("ACCEPT")) {
         o.setStatus("CONFIRMED");
      } else if (orderPayment.getStatus().equals("REJECT") &&
                orderStock.getStatus().equals("REJECT")) {
         o.setStatus("REJECTED");
      } else if (orderPayment.getStatus().equals("REJECT") ||
                orderStock.getStatus().equals("REJECT")) {
         String source = orderPayment.getStatus().equals("REJECT")
                    ? "PAYMENT" : "STOCK";
         o.setStatus("ROLLBACK");
         o.setSource(source);
      }
      return o;
   }

}

最后,我们的流的实现。我们需要定义KStream bean。我们正在使用KStream的join方法连接两个流。连接的窗口是10秒。结果是,我们正在设置订单的状态,并向订单主题发送一个新的订单。我们使用与发送新订单相同的主题。

@Autowired
OrderManageService orderManageService;

@Bean
public KStream<Long, Order> stream(StreamsBuilder builder) {
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   KStream<Long, Order> stream = builder
         .stream("payment-orders", Consumed.with(Serdes.Long(), orderSerde));

   stream.join(
         builder.stream("stock-orders"),
         orderManageService::confirm,
         JoinWindows.of(Duration.ofSeconds(10)),
         StreamJoined.with(Serdes.Long(), orderSerde, orderSerde))
      .peek((k, o) -> LOG.info("Output: {}", o))
      .to("orders");

   return stream;
}


 
SpringBoot配置
在Spring Boot中,应用程序的名称默认为Kafka Streams的消费者组的名称。因此,我们应该在application.yml中设置。当然,我们还需要设置Kafka bootstrap服务器的地址。最后,我们要配置事件序列化的默认键和值。它同时适用于标准和流处理。

spring.application.name: orders
spring.kafka:
  bootstrap-servers: 127.0.0.1:56820
  producer:
    key-serializer: org.apache.kafka.common.serialization.LongSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  streams:
    properties:
      default.key.serde: org.apache.kafka.common.serialization.Serdes$LongSerde
      default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
      spring.json.trusted.packages: "*"


从Kafka主题发送和接收事件
在上一节中,我们讨论了如何创建一个新的Kafka流,作为连接其他两个流的结果。
现在,让我们看看如何处理传入的消息。我们可以以支付服务为例来考虑这个问题。
它监听传入的订单。如果它收到一个新的订单,它将在客户的账户上执行预订,并向payment-orders主题发送一个带有预订状态的响应。如果它收到来自订单服务的交易确认,它将提交交易或回滚。为了启用Kafka监听器,我们应该用@EnableKafka来注解主类。此外,监听方法必须用@KafkaListener来注解。下面的方法监听订单主题上的事件,并在支付消费者组中运行。

@SpringBootApplication
@EnableKafka
public class PaymentApp {

    private static final Logger LOG = LoggerFactory.getLogger(PaymentApp.class);

    public static void main(String args) {
        SpringApplication.run(PaymentApp.class, args);
    }

    @Autowired
    OrderManageService orderManageService;

    @KafkaListener(id = "orders", topics = "orders", groupId = "payment")
    public void onEvent(Order o) {
        LOG.info("Received: {}" , o);
        if (o.getStatus().equals("NEW"))
            orderManageService.reserve(o);
        else
            orderManageService.confirm(o);
    }
}

这里是前面代码片段中使用的OrderManageService的实现。如果它收到新状态的订单,它将执行预订。在预订过程中,它将订单的价格从 amountAvailable 字段中减去,并在 amountReserved 字段中添加相同的值。然后,它设置订单的状态,并使用KafkaTemplate向payment-orders主题发送响应。在确认阶段,它不发送任何响应事件。它可以执行回滚,这意味着--将订单的价格从reserved字段中减去,并将其添加到amountAvailable字段中。否则,它只是 "提交 "交易事务,从保留金额字段中减去价格。

@Service
public class OrderManageService {

   private static final String SOURCE = "payment";
   private static final Logger LOG = LoggerFactory.getLogger(OrderManageService.class);
   private CustomerRepository repository;
   private KafkaTemplate<Long, Order> template;

   public OrderManageService(CustomerRepository repository, KafkaTemplate<Long, Order> template) {
      this.repository = repository;
      this.template = template;
   }

   public void reserve(Order order) {
      Customer customer = repository.findById(order.getCustomerId()).orElseThrow();
      LOG.info("Found: {}", customer);
      if (order.getPrice() < customer.getAmountAvailable()) {
         order.setStatus("ACCEPT");
         customer.setAmountReserved(customer.getAmountReserved() + order.getPrice());
         customer.setAmountAvailable(customer.getAmountAvailable() - order.getPrice());
      } else {
         order.setStatus("REJECT");
      }
      order.setSource(SOURCE);
      repository.save(customer);
      template.send("payment-orders", order.getId(), order);
      LOG.info("Sent: {}", order);
   }

   public void confirm(Order order) {
      Customer customer = repository.findById(order.getCustomerId()).orElseThrow();
      LOG.info("Found: {}", customer);
      if (order.getStatus().equals("CONFIRMED")) {
         customer.setAmountReserved(customer.getAmountReserved() - order.getPrice());
         repository.save(customer);
      } else if (order.getStatus().equals("ROLLBACK") && !order.getSource().equals(SOURCE)) {
         customer.setAmountReserved(customer.getAmountReserved() - order.getPrice());
         customer.setAmountAvailable(customer.getAmountAvailable() + order.getPrice());
         repository.save(customer);
      }

   }
}

 
库存服务stock-service
一个类似的逻辑在库存服务方面被实现。然而,它使用字段productCount而不是订单的价格,并对所需数量的订单产品进行预订。下面是库存服务中OrderManageService类的实现。

@Service
public class OrderManageService {

   private static final String SOURCE = "stock";
   private static final Logger LOG = LoggerFactory.getLogger(OrderManageService.class);
   private ProductRepository repository;
   private KafkaTemplate<Long, Order> template;

   public OrderManageService(ProductRepository repository, KafkaTemplate<Long, Order> template) {
      this.repository = repository;
      this.template = template;
   }

   public void reserve(Order order) {
      Product product = repository.findById(order.getProductId()).orElseThrow();
      LOG.info("Found: {}", product);
      if (order.getStatus().equals("NEW")) {
         if (order.getProductCount() < product.getAvailableItems()) {
            product.setReservedItems(product.getReservedItems() + order.getProductCount());
            product.setAvailableItems(product.getAvailableItems() - order.getProductCount());
            order.setStatus("ACCEPT");
            repository.save(product);
         } else {
            order.setStatus("REJECT");
         }
         template.send("stock-orders", order.getId(), order);
         LOG.info("Sent: {}", order);
      }
   }

   public void confirm(Order order) {
      Product product = repository.findById(order.getProductId()).orElseThrow();
      LOG.info("Found: {}", product);
      if (order.getStatus().equals("CONFIRMED")) {
         product.setReservedItems(product.getReservedItems() - order.getProductCount());
         repository.save(product);
      } else if (order.getStatus().equals("ROLLBACK") && !order.getSource().equals(SOURCE)) {
         product.setReservedItems(product.getReservedItems() - order.getProductCount());
         product.setAvailableItems(product.getAvailableItems() + order.getProductCount());
         repository.save(product);
      }
   }

}

 
用Spring Boot查询Kafka流
现在,让我们考虑以下场景:
首先,订单服务收到一个新的订单(通过REST API)并将其发送到Kafka主题。然后,这个订单被其他两个微服务接收。一旦他们发回一个积极的响应(或消极的),订单服务就会把它们作为流来处理,并改变订单的状态。具有新状态的订单将被排放到与之前相同的主题。那么,我们在哪里存储订单的当前状态的数据呢?在Kafka主题中。我们将再次在我们的Spring Boot应用程序中使用Kafka Streams。但是这一次,我们利用了KTable的优势。让我们来看看我们的场景的可视化情况。



好了,让我们在订单服务中定义另一个Kafka Streams bean。我们将获得相同的订单主题作为一个流。我们将把它转换为Kafka表,并在一个持久的存储中实现它。由于这一点,我们将能够轻松地从我们的REST控制器中查询存储。

@Bean
public KTable<Long, Order> table(StreamsBuilder builder) {
   KeyValueBytesStoreSupplier store =
         Stores.persistentKeyValueStore("orders");
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   KStream<Long, Order> stream = builder
         .stream("orders", Consumed.with(Serdes.Long(), orderSerde));
   return stream.toTable(Materialized.<Long, Order>as(store)
         .withKeySerde(Serdes.Long())
         .withValueSerde(orderSerde));
}


如果我们在同一台机器上运行多个订单服务实例,覆盖状态存储的默认位置也很重要。要做到这一点,我们应该为每一个实例定义以下独特的属性。
spring.kafka.streams.state-dir: /tmp/kafka-streams/1

不幸的是,Spring Boot中没有内置支持Kafka流的交互式查询。然而,我们可以使用自动配置的StreamsBuilderFactoryBean将KafkaStreams实例注入控制器。然后我们可以在 "物化 materialized"名称下查询状态存储。这当然是非常微不足道的例子。我们只是从KTable获取所有的订单。

@GetMapping
public List<Order> all() {
   List<Order> orders = new ArrayList<>();
      ReadOnlyKeyValueStore<Long, Order> store = kafkaStreamsFactory
         .getKafkaStreams()
         .store(StoreQueryParameters.fromNameAndType(
                "orders",
                QueryableStoreTypes.keyValueStore()));
   KeyValueIterator<Long, Order> it = store.all();
   it.forEachRemaining(kv -> orders.add(kv.value));
   return orders;
}

在同一个OrderController中,也有一个方法用于发送一个新订单到Kafka主题。

@PostMapping
public Order create(@RequestBody Order order) {
   order.setId(id.incrementAndGet());
   template.send("orders", order.getId(), order);
   LOG.info("Sent: {}", order);
   return order;
}
 

测试场景
在我们运行我们的示例微服务之前,我们需要启动 Kafka 的本地实例。通常,我为此使用 Redpanda。它是一个兼容 Kafka API 的流媒体平台。与 Kafka 相比,在本地运行它相对容易。您需要做的就是在rpk本地安装CLI(这里是macOS的说明)。之后,您可以使用以下命令创建单节点实例:
$ rpk container start
运行后,它将打印您的节点的地址。对我来说,是 127.0.0.1:56820。您应该将该地址作为spring.kafka.bootstrap-servers属性的值。您还可以使用以下命令显示已创建主题的列表:
$ rpk topic list --brokers 127.0.0.1:56820
然后,让我们运行我们的微服务。从 开始,order-service因为它正在创建所有必需的主题并构建 Kafka Streams 实例。您可以使用 REST 端点发送单个订单:

$ curl -X 'POST' \
  'http://localhost:8080/orders' \
  -H 'Content-Type: application/json' \
  -d '{
  "customerId": 10,
  "productId": 10,
  "productCount": 5,
  "price": 100,
  "status": "NEW"
}'


以下 bean 负责生成 10000 个随机订单:

@Service
public class OrderGeneratorService {

   private static Random RAND = new Random();
   private AtomicLong id = new AtomicLong();
   private Executor executor;
   private KafkaTemplate<Long, Order> template;

   public OrderGeneratorService(Executor executor, KafkaTemplate<Long, Order> template) {
      this.executor = executor;
      this.template = template;
   }

   @Async
   public void generate() {
      for (int i = 0; i < 10000; i++) {
         int x = RAND.nextInt(5) + 1;
         Order o = new Order(id.incrementAndGet(), RAND.nextLong(100) + 1, RAND.nextLong(100) + 1, "NEW");
         o.setPrice(100 * x);
         o.setProductCount(x);
         template.send("orders", o.getId(), o);
      }
   }
}

可以通过调用端点来启动该过程POST /orders/generate。
无论是决定发送单个订单还是生成多个随机订单,您都可以使用以下端点轻松查询订单状态:
$ curl http://localhost:8080/orders
这是由应用程序和 Kafka Streams 生成的主题结构,用于执行连接操作并将订单保存KTable为状态存储。