本文是SpringBoot + DDD + Apache Kafka实现最终一致性的教程与源码第四部分:
这篇文章中说明了三种状态更改事件消息传递流程。- 对客户的更改动作会触发由Accounts服务生成的事件消息,该事件消息在accounts.customer.changeKafka主题上发布,并由Orders服务使用;
- 批准订单会触发动作会由订单服务产生的事件消息,该事件消息会在orders.order.fulfillKafka主题上发布,并由配送服务使用;
- 订单状态的更改动作会触发由配送服务产生的事件消息,该消息在fulfillment.order.changeKafka主题上发布,并由订单服务使用;
这些状态更改事件消息流中的每一个在Kafka主题的生产者和消费者方面都遵循相同的架构模式。
让我们检查每个状态更改事件消息传递流及其背后的代码。
客户状态变更
当Customer服务创建或更新新的客户实体时,将生成一条CustomerChangeEvent消息并将其发送到accounts.customer.changeKafka主题。该消息由Orders服务检索和使用。这就是订单服务最终记录所有可能下订单的客户的方式。通过Kafka,可以说订单的客户联系信息最终与帐户的客户联系信息一致。

有多种方法可以触发将消息发送到Kafka。对于此特定状态更改,Customer服务使用侦听器。扩展AbstractMongoEventListener的侦听器类,监听Customer实体的onAfterSave事件。
@Slf4j @Controller public class AfterSaveListener extends AbstractMongoEventListener<Customer> {
@Value("${spring.kafka.topic.accounts-customer}") private String topic;
private Sender sender;
@Autowired public AfterSaveListener(Sender sender) {
this.sender = sender; }
@Override public void onAfterSave(AfterSaveEvent<Customer> event) {
log.info("onAfterSave event='{}'", event); Customer customer = event.getSource();
CustomerChangeEvent customerChangeEvent = new CustomerChangeEvent(); customerChangeEvent.setId(customer.getId()); customerChangeEvent.setName(customer.getName()); customerChangeEvent.setContact(customer.getContact()); customerChangeEvent.setAddresses(customer.getAddresses());
sender.send(topic, customerChangeEvent); } }
|
侦听器通过实例化带有客户信息的CustomerChangeEvent新事件来处理事件,并将其传递给Sender类:
@Slf4j public class Sender {
@Autowired private KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate;
public void send(String topic, CustomerChangeEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic); kafkaTemplate.send(topic, payload); } }
|
Sender使用KafkaTemplate将消息发送到accounts.customer.changeKafka主题,如下所示。由于消息顺序对于确保按顺序处理对客户信息的更改至关重要,因此所有消息都将发送到具有单个分区的单个主题。
Orders服务的Receiver类消费使用Accounts服务生成的CustomerChangeEvent消息:
@Slf4j @Component public class Receiver {
@Autowired private CustomerOrdersRepository customerOrdersRepository;
@Autowired private MongoTemplate mongoTemplate;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch; }
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}") public void receiveCustomerOrder(CustomerOrders customerOrders) {
log.info("received payload='{}'", customerOrders); latch.countDown(); customerOrdersRepository.save(customerOrders); }
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}") public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) {
log.info("received payload='{}'", orderStatusChangeEvent); latch.countDown();
Criteria criteria = Criteria.where("orders.guid") .is(orderStatusChangeEvent.getGuid()); Query query = Query.query(criteria);
Update update = new Update(); update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent()); mongoTemplate.updateFirst(query, update, "customer.orders"); } }
|
Receiver与配送服务、订购服务的类的配置有所不同。
订单服务从多个主题接收消息,每个主题都包含具有不同有效负载结构的消息。每种类型的消息都必须反序列化为不同的对象类型。为此,ReceiverConfig类使用Apache Kafka的StringDeserializer。Orders服务ReceiverConfig引用了Spring Kafka的AbstractKafkaListenerContainerFactory类setMessageConverter方法,该方法允许动态对象类型匹配:
@Configuration @EnableKafka public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}") private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset;
@Override @Bean public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props; }
@Override @Bean public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new StringDeserializer() ); }
@Bean ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setMessageConverter(new StringJsonMessageConverter());
return factory; }
@Override @Bean public Receiver receiver() {
return new Receiver(); } }
|
配送Fulfillment订单状态状态更改
当“配送”实体中的“订单”状态更改为“已批准”以外的其他任何内容时,“配送Fulfillment”服务会生成一条OrderStatusChangeEvent消息,并将其发送到fulfillment.order.changeKafka主题。该消息由Orders订单服务检索和使用。这是订单服务跟踪所有CustomerOrder生命周期事件的方式,从最初的“已创建”状态到最终的“已接收”状态。

Fulfillment服务通过FulfillmentController类公开了多个端点,该类模拟了订单状态的变化。它们允许将订单的状态从“已批准”更改为“正在处理”,“已发货”,“在途”,最后更改为“已接收”。此更改适用于所有符合条件的订单。
每个状态更改都会触发对MongoDB中的Fulfillment文档的更改。每次更改还会生成一个Kafka消息,该消息包含OrderStatusChangeEvent在消息有效负载中。Fulfillment服务的Sender类负责处理此问题。
请注意,在此示例中,这两个事件未在原子事务中处理。更新数据库或发送消息都可能会独立失败,这将导致数据一致性丢失。在现实世界中,我们必须使用少数几种常见的架构模式,确保这两个独立的动作作为一个事务成功或失败,以确保数据的一致性。
@Slf4j public class Sender {
@Autowired private KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate;
public void send(String topic, OrderStatusChangeEvent payload) {
log.info("sending payload='{}' to topic='{}'", payload, topic); kafkaTemplate.send(topic, payload); } }
|
SenderConfi类处理Sender类的配置。这个Spring Kafka生产者配置类使用Spring Kafka的JsonSerializer类将OrderStatusChangeEvent对象序列化为JSON消息有效负载。该类几乎与SenderConfig“订单和帐户”服务中的类相同。
@Configuration @EnableKafka public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers;
@Bean public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props; }
@Bean public ProducerFactory<String, OrderStatusChangeEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs()); }
@Bean public KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory()); }
@Bean public Sender sender() {
return new Sender(); } }
|
Sender使用KafkaTemplate将消息发送到orders.order.fulfill卡夫卡主题,由于消息顺序并不重要,因此如果消息量很大,则可以将消息发送到具有多个分区的主题。
订单服务中Receiver类负责消费由配送服务生产的OrderStatusChangeEvent消息:
@Slf4j @Component public class Receiver {
@Autowired private CustomerOrdersRepository customerOrdersRepository;
@Autowired private MongoTemplate mongoTemplate;
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch; }
@KafkaListener(topics = "${spring.kafka.topic.accounts-customer}") public void receiveCustomerOrder(CustomerOrders customerOrders) {
log.info("received payload='{}'", customerOrders); latch.countDown(); customerOrdersRepository.save(customerOrders); }
@KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}") public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) {
log.info("received payload='{}'", orderStatusChangeEvent); latch.countDown();
Criteria criteria = Criteria.where("orders.guid") .is(orderStatusChangeEvent.getGuid()); Query query = Query.query(criteria);
Update update = new Update(); update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent()); mongoTemplate.updateFirst(query, update, "customer.orders"); } }
|
如上所述,Orders服务的配置与从Kafka接收消息的Fulfillment服务不同。Orders服务接收来自多个主题的消息。这里的ReceiverConfig类使用StringDeserializer反序列化所有消息;Orders服务的ReceiverConfig类引用Spring Kafka AbstractKafkaListenerContainerFactory类的setMessageConverter方法,该方法允许动态对象类型匹配。
@Configuration @EnableKafka public class ReceiverConfigNotConfluent implements ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}") private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset;
@Override @Bean public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props; }
@Override @Bean public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new StringDeserializer() ); }
@Bean ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setMessageConverter(new StringJsonMessageConverter());
return factory; }
@Override @Bean public Receiver receiver() {
return new Receiver(); } }
|
Orders服务从每个Kafka主题中使用消息都与Receiver类中的一个方法相关联(如上所示)。此方法接受特定的对象类型作为输入参数,表示消息内容需要反序列化到的对象类型。对于OrderStatusChangeEvent消息,将调用receiveOrderStatusChangeEvents方法来使用fulfillment.order.change Kafka主题中的消息。