网上商店DDD系统的领域事件的消息流程与状态设计 - itnext

21-05-24 banq

本文是SpringBoot + DDD + Apache Kafka实现最终一致性的教程与源码第四部分:

这篇文章中说明了三种状态更改事件消息传递流程。

  1. 对客户的更改动作会触发由Accounts服务生成的事件消息,该事件消息在accounts.customer.changeKafka主题上发布,并由Orders服务使用;
  2. 批准订单会触发动作会由订单服务产生的事件消息,该事件消息会在orders.order.fulfillKafka主题上发布,并由配送服务使用;
  3. 订单状态的更改动作会触发由配送服务产生的事件消息,该消息在fulfillment.order.changeKafka主题上发布,并由订单服务使用;

这些状态更改事件消息流中的每一个在Kafka主题的生产者和消费者方面都遵循相同的架构模式。

让我们检查每个状态更改事件消息传递流及其背后的代码。

 

客户状态变更

当Customer服务创建或更新新的客户实体时,将生成一条CustomerChangeEvent消息并将其发送到accounts.customer.changeKafka主题。该消息由Orders服务检索和使用。这就是订单服务最终记录所有可能下订单的客户的方式。通过Kafka,可以说订单的客户联系信息最终与帐户的客户联系信息一致。

有多种方法可以触发将消息发送到Kafka。对于此特定状态更改,Customer服务使用侦听器。扩展AbstractMongoEventListener的侦听器类,监听Customer实体的onAfterSave事件。

@Slf4j
@Controller
public class AfterSaveListener extends AbstractMongoEventListener<Customer> {

    @Value("${spring.kafka.topic.accounts-customer}")
    private String topic;

    private Sender sender;

    @Autowired
    public AfterSaveListener(Sender sender) {

        this.sender = sender;
    }

    @Override
    public void onAfterSave(AfterSaveEvent<Customer> event) {

        log.info("onAfterSave event='{}'", event);
        Customer customer = event.getSource();

        CustomerChangeEvent customerChangeEvent = new CustomerChangeEvent();
        customerChangeEvent.setId(customer.getId());
        customerChangeEvent.setName(customer.getName());
        customerChangeEvent.setContact(customer.getContact());
        customerChangeEvent.setAddresses(customer.getAddresses());

        sender.send(topic, customerChangeEvent);
    }
}

侦听器通过实例化带有客户信息的CustomerChangeEvent新事件来处理事件,并将其传递给Sender类:

@Slf4j
public class Sender {

    @Autowired
    private KafkaTemplate<String, CustomerChangeEvent> kafkaTemplate;

    public void send(String topic, CustomerChangeEvent payload) {

        log.info("sending payload='{}' to topic='{}'", payload, topic);
        kafkaTemplate.send(topic, payload);
    }
}

Sender使用KafkaTemplate将消息发送到accounts.customer.changeKafka主题,如下所示。由于消息顺序对于确保按顺序处理对客户信息的更改至关重要,因此所有消息都将发送到具有单个分区的单个主题。

 

Orders服务的Receiver类消费使用Accounts服务生成的CustomerChangeEvent消息:

@Slf4j
@Component
public class Receiver {

    @Autowired
    private CustomerOrdersRepository customerOrdersRepository;

    @Autowired
    private MongoTemplate mongoTemplate;

    private CountDownLatch latch = new CountDownLatch(1);

    public CountDownLatch getLatch() {

        return latch;
    }

    @KafkaListener(topics = "${spring.kafka.topic.accounts-customer}")
    public void receiveCustomerOrder(CustomerOrders customerOrders) {

        log.info("received payload='{}'", customerOrders);
        latch.countDown();
        customerOrdersRepository.save(customerOrders);
    }

    @KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}")
    public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) {

        log.info("received payload='{}'", orderStatusChangeEvent);
        latch.countDown();

        Criteria criteria = Criteria.where("orders.guid")
                .is(orderStatusChangeEvent.getGuid());
        Query query = Query.query(criteria);

        Update update = new Update();
        update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent());
        mongoTemplate.updateFirst(query, update, "customer.orders");
    }
}

Receiver与配送服务、订购服务的类的配置有所不同。

订单服务从多个主题接收消息,每个主题都包含具有不同有效负载结构的消息。每种类型的消息都必须反序列化为不同的对象类型。为此,ReceiverConfig类使用Apache Kafka的StringDeserializer。Orders服务ReceiverConfig引用了Spring Kafka的AbstractKafkaListenerContainerFactory类setMessageConverter方法,该方法允许动态对象类型匹配:

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Override
    @Bean
    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

        return props;
    }

    @Override
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {

        return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
                new StringDeserializer(),
                new StringDeserializer()
        );
    }

    @Bean
    ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setMessageConverter(new StringJsonMessageConverter());

        return factory;
    }

    @Override
    @Bean
    public Receiver receiver() {

        return new Receiver();
    }
}

 

配送Fulfillment订单状态状态更改

当“配送”实体中的“订单”状态更改为“已批准”以外的其他任何内容时,“配送Fulfillment”服务会生成一条OrderStatusChangeEvent消息,并将其发送到fulfillment.order.changeKafka主题。该消息由Orders订单服务检索和使用。这是订单服务跟踪所有CustomerOrder生命周期事件的方式,从最初的“已创建”状态到最终的“已接收”状态。

Fulfillment服务通过FulfillmentController类公开了多个端点,该类模拟了订单状态的变化。它们允许将订单的状态从“已批准”更改为“正在处理”,“已发货”,“在途”,最后更改为“已接收”。此更改适用于所有符合条件的订单。

每个状态更改都会触发对MongoDB中的Fulfillment文档的更改。每次更改还会生成一个Kafka消息,该消息包含OrderStatusChangeEvent在消息有效负载中。Fulfillment服务的Sender类负责处理此问题。

请注意,在此示例中,这两个事件未在原子事务中处理。更新数据库或发送消息都可能会独立失败,这将导致数据一致性丢失。在现实世界中,我们必须使用少数几种常见的架构模式,确保这两个独立的动作作为一个事务成功或失败,以确保数据的一致性。

@Slf4j
public class Sender {

    @Autowired
    private KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate;

    public void send(String topic, OrderStatusChangeEvent payload) {

        log.info("sending payload='{}' to topic='{}'", payload, topic);
        kafkaTemplate.send(topic, payload);
    }
}

SenderConfi类处理Sender类的配置。这个Spring Kafka生产者配置类使用Spring Kafka的JsonSerializer类将OrderStatusChangeEvent对象序列化为JSON消息有效负载。该类几乎与SenderConfig“订单和帐户”服务中的类相同。

@Configuration
@EnableKafka
public class SenderConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, OrderStatusChangeEvent> producerFactory() {

        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, OrderStatusChangeEvent> kafkaTemplate() {

        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public Sender sender() {

        return new Sender();
    }
}

Sender使用KafkaTemplate将消息发送到orders.order.fulfill卡夫卡主题,由于消息顺序并不重要,因此如果消息量很大,则可以将消息发送到具有多个分区的主题。

订单服务中Receiver类负责消费由配送服务生产的OrderStatusChangeEvent消息:

@Slf4j
@Component
public class Receiver {

    @Autowired
    private CustomerOrdersRepository customerOrdersRepository;

    @Autowired
    private MongoTemplate mongoTemplate;

    private CountDownLatch latch = new CountDownLatch(1);

    public CountDownLatch getLatch() {

        return latch;
    }

    @KafkaListener(topics = "${spring.kafka.topic.accounts-customer}")
    public void receiveCustomerOrder(CustomerOrders customerOrders) {

        log.info("received payload='{}'", customerOrders);
        latch.countDown();
        customerOrdersRepository.save(customerOrders);
    }

    @KafkaListener(topics = "${spring.kafka.topic.fulfillment-order}")
    public void receiveOrderStatusChangeEvents(OrderStatusChangeEvent orderStatusChangeEvent) {

        log.info("received payload='{}'", orderStatusChangeEvent);
        latch.countDown();

        Criteria criteria = Criteria.where("orders.guid")
                .is(orderStatusChangeEvent.getGuid());
        Query query = Query.query(criteria);

        Update update = new Update();
        update.addToSet("orders.$.orderStatusEvents", orderStatusChangeEvent.getOrderStatusEvent());
        mongoTemplate.updateFirst(query, update, "customer.orders");
    }
}

如上所述,Orders服务的配置与从Kafka接收消息的Fulfillment服务不同。Orders服务接收来自多个主题的消息。这里的ReceiverConfig类使用StringDeserializer反序列化所有消息;Orders服务的ReceiverConfig类引用Spring Kafka AbstractKafkaListenerContainerFactory类的setMessageConverter方法,该方法允许动态对象类型匹配。

@Configuration
@EnableKafka
public class ReceiverConfigNotConfluent implements ReceiverConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Override
    @Bean
    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

        return props;
    }

    @Override
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {

        return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
                new StringDeserializer(),
                new StringDeserializer()
        );
    }

    @Bean
    ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setMessageConverter(new StringJsonMessageConverter());

        return factory;
    }

    @Override
    @Bean
    public Receiver receiver() {

        return new Receiver();
    }
}

Orders服务从每个Kafka主题中使用消息都与Receiver类中的一个方法相关联(如上所示)。此方法接受特定的对象类型作为输入参数,表示消息内容需要反序列化到的对象类型。对于OrderStatusChangeEvent消息,将调用receiveOrderStatusChangeEvents方法来使用fulfillment.order.change Kafka主题中的消息。

猜你喜欢