微服务事务发件箱模式电子邮件案例代码


事务发件箱模式适用于在微服务的事务边界内保持一致性至关重要的场景。它确保本地事务和事件发布以原子方式发生,从而防止数据不一致。另一方面,Saga 模式旨在协调跨多个微服务的长期运行的业务流程,处理分布式事务,并在发生故障时采取补偿措施。
通过在我们的用户注册系统中应用事务发件箱模式,我们实现了服务之间的可靠通信,维护了数据一致性,并增强了系统的整体稳健性。

在微服务架构中,各个服务被设计为独立运行,通常处理业务功能的不同方面。但是,当操作需要跨多个服务进行协调时,确保数据一致性就成为一项关键挑战。这是因为每个服务都有自己的数据库和事务边界,这使得这些服务在交互时很难保持一致状态。

例如,在用户注册系统中,注册新用户可能涉及跨多个服务的几个步骤:

  1. 在用户服务的数据库中保存用户详细信息。
  2. 通过电子邮件服务发送确认电子邮件。
  3. 在分析服务中更新用户分析。

在理想情况下,所有这些步骤都应成功完成,以确保系统保持一致。但是,在此过程中的各个阶段都可能发生故障。例如,用户详细信息可能已正确保存在数据库中,但电子邮件服务可能无法发送确认电子邮件,或者分析服务可能已关闭,从而导致服务之间的注册数据不同步。这会导致数据不一致,并可能导致以下问题:
  • 尽管存储了用户的详细信息,但他们仍收不到确认电子邮件。
  • 分析服务不能准确反映注册用户的数量。
  • 难以跨服务追踪用户注册情况。

在高可用性系统中,此类挑战变得更加严重,因为微服务是分布式的,并且实时运行,而且通常规模庞大。如果数据一致性管理不善,可能会影响整个系统的可靠性和可信度。

为了应对这些挑战,事务发件箱模式提供了一种确保跨服务一致性的解决方案。通过将事件生成与事件处理分离并利用可靠的存储机制,该模式有助于确保事件(如“用户注册”事件)不会丢失或重复,即使在服务发生故障的情况下也是如此。此模式在原子性(即确保所有步骤都成功或都不成功)和微服务之间的可靠通信至关重要的场景中特别有用。

事务性发件箱模式通过将事件(例如“用户已注册”事件)持久化到与业务数据位于同一数据库中的发件箱表中来运行。这意味着业务操作(例如,保存用户详细信息)和事件记录发生在同一事务中。一旦提交事务,就保证事件被持久化。然后,一个单独的进程(发件箱处理器)从此发件箱读取事件并将它们异步发布到下游服务,例如电子邮件和分析服务。这种方法确保最终的一致性,并通过在通信链中的任何步骤失败时启用重试来提供对服务故障的恢复能力。

关键组件
发件箱实体
表示存储事件的发件箱表的结构。

  • 事件 ID:事件的唯一标识符。
  • 事件类型:描述事件的类型(例如UserRegistered)。
  • 事件有效负载:包含与事件相关的数据。
  • 元数据:时间戳或路由详细信息等附加信息。

交易逻辑
在事务边界内触发领域事件生成的业务逻辑。

  • 根据业务操作生成领域事件。
  • 在同一事务内更新应用程序的状态。

发件箱持久化机制
在发件箱表中持久保存事件的机制。

  • 利用关系数据库存储同一事务内的事件。
  • 发件箱表中的常见字段:事件ID、事件类型、事件负载、元数据。

发件箱处理器
负责处理来自发件箱的事件的后台进程。

  • 定期扫描发件箱表以查找新事件。
  • 将事件发布到消息代理或事件存储。
  • 处理失败事件发布的重试。

消息代理或事件存储
向下游服务广播事件的系统。
  • 示例包括 Apache Kafka、RabbitMQ 或 AWS SNS。
  • 确保可靠的事件传递,并将事件生产者与消费者分离。

活动消费者
订阅并处理来自消息代理的事件的服务。
  • 订阅特定主题或队列。
  • 处理传入事件并相应地更新其状态。

重试机制
即使出现瞬时故障也能确保事件的可靠传递。

  • 在发件箱处理器中实现针对失败事件发布的重试机制。
  • 应用指数退避、断路器模式或自定义重试逻辑。

错误处理
管理事件生成、持久化或发布期间可能发生的错误。

  • 实现错误记录和监控。
  • 定义事件处理期间处理错误的策略。


编码实现:发件箱模式
步骤 1:定义发件箱实体

@Entity 
@Table(name = "outbox"
public  class  OutboxEvent { 

    @Id 
    @GeneratedValue(strategy = GenerationType.IDENTITY) 
    private Long id; 

    @Column(name =
"event_type"
    private String eventType; 

    @Column(name =
"event_payload", columnDefinition = "TEXT"
    private String eventPayload; 

    public  OutboxEvent () {} 

    public  OutboxEvent (String eventType, String eventPayload) { 
        this .eventType = eventType; 
        this .eventPayload = eventPayload; 
    } 
}

第二步:用户服务

@Service 
public  class  UserService { 

    @Autowired 
    private UserRepository userRepository; 

    @Autowired 
    private OutboxEventRepository outboxEventRepository; 

    @Transactional 
    public  void  registerUser (User user) { 
        // 保存用户详细信息
        userRepository.save(user); 

       
// 生成 UserRegistered 事件
        String  eventPayload  = buildUserRegisteredEventPayload(user); 

       
// 在同一事务内将事件保存到发件箱
        OutboxEvent  outboxEvent  =  new  OutboxEvent (
"UserRegistered" , eventPayload); 
        outboxEventRepository.save(outboxEvent); 
    } 

    private String buildUserRegisteredEventPayload (User user) { 
       
// 使用用户详细信息构造事件有效负载
        ObjectMapper  objectMapper  =  new  ObjectMapper (); 
        try
            return objectMapper.writeValueAsString(user); 
        } catch (JsonProcessingException e) { 
            throw  new  RuntimeException (
"Error serializing user data" , e); 
        } 
    } 
}

步骤 3:发件箱处理器

@Component 
public  class  OutboxProcessor { 

    @Autowired 
    private OutboxEventRepository outboxEventRepository; 

    @Autowired 
    private KafkaTemplate<String, String> kafkaTemplate; 

    @Scheduled(fixedRate = 5000)  // 每 5 秒运行一次
    public  void  processOutboxEvents () { 
        List<OutboxEvent> outboxEvents = outboxEventRepository.findAll(); 

        for (OutboxEvent outboxEvent : outboxEvents) { 
            try
               
// 将事件发布到 Kafka
                 kafkaTemplate.send(
"user-events" , outboxEvent.getEventType(), outboxEvent.getEventPayload()).get(); 

               
// 发送成功后从发件箱中删除事件
                outboxEventRepository.delete(outboxEvent); 
            } catch (Exception e) { 
               
// 记录错误并在下一个循环中重试
                System.err.println(
"发送事件失败:" + e.getMessage()); 
            } 
        } 
    } 
}

步骤 4:电子邮件服务和分析服务
两种服务都会有 Kafka 消费者来订阅user-events主题并UserRegistered相应地处理事件。

电子邮件服务:

@Service 
public  class  EmailService { 

    @KafkaListener (topics = "user-events" , groupId = "email-group"
    public  void  handleUserRegisteredEvent ( @Payload  String eventPayload ) { 
       
// 反序列化事件负载
        ObjectMapper objectMapper = new  ObjectMapper (); 
        try
            User user = objectMapper. readValue (eventPayload, User . class ); 
           
// 向用户发送确认电子邮件的逻辑
            sendConfirmationEmail (user); 
        } catch ( JsonProcessingException e) { 
            System . err . println (
"反序列化事件负载时出错: " + e. getMessage ()); 
        } 
    } 

   

 private  void  sendConfirmationEmail ( User user ) { 
        // 实现电子邮件发送逻辑
        System . out . println (
"正在发送确认电子邮件至 " + user. getEmail ()); 
    } 
}

分析服务:

@Service 
public  class  AnalyticsService { 

    @KafkaListener (topics = "user-events" , groupId = "analytics-group"
    public  void  handleUserRegisteredEvent ( @Payload  String eventPayload ) { 
       
// 反序列化事件负载
        ObjectMapper objectMapper = new  ObjectMapper (); 
        try
            User user = objectMapper. readValue (eventPayload, User . class ); 
           
// 使用新用户数据更新分析的逻辑
            updateAnalytics (user); 
        } catch ( JsonProcessingException e) { 
            System . err . println (
"反序列化事件负载时出错: " + e. getMessage ()); 
        } 
    } 

    private  void  updateAnalytics ( User user ) { 
       
// 实现分析更新逻辑
        System . out . println (
"正在更新用户的分析 " + user. getId ()); 
    } 
}


Saga 模式与 Outbox 模式用例
1、Saga模式
Saga 模式用于管理跨多个服务的分布式事务。每个服务执行其本地事务并发布事件以触发下一步。如果某个步骤失败,则使用补偿事务来撤消之前的操作。
示例场景:

  • 步骤1:用户服务注册用户并发布UserRegistered事件。
  • 第 2 步:电子邮件服务发送确认电子邮件。
  • 步骤3:如果发送电子邮件失败,则需要补偿事务来回滚用户注册或妥善处理失败。

2、发件箱模式
Outbox 模式可确保事件发布和本地事务在同一微服务中以原子方式发生。当您需要将事件作为本地事务的一部分可靠地发布时,该模式特别有用。
在我们的示例中的用法:

  • 用户服务UserRegistered在同一事务中将用户详细信息和事件保存在发件箱表中。
  • 发件箱处理器异步读取发件箱表并将事件发布到消息代理。
  • 这可以保证事件不会丢失并最终传递到电子邮件服务和分析服务。