在分布式系统中,我们经常面临保持数据库和外部系统同步的挑战。想象一下将订单保存到数据库,然后将消息发布到消息代理。如果任何一个操作失败,您的系统最终都会处于不一致的状态。
发件箱模式通过将消息发布视为数据库事务的一部分来解决此问题。我们不会直接发布消息,而是将它们保存到数据库中的发件箱表中,以确保原子操作。然后,一个单独的进程会可靠地发布这些消息。
在本期通讯中,我们将深入研究在 .NET 中实现这种模式,涵盖从设置到扩展的所有内容。
为什么我们需要发件箱模式?
事务性发件箱模式解决了分布式系统中的一个常见问题。当您需要同时做两件事时就会出现此问题:保存数据并与外部组件通信。
考虑发送订单确认电子邮件、通知其他系统有关新客户注册的信息或在下订单后更新库存水平等场景。这些场景都涉及本地数据更改以及外部通信或更新。
例如,假设有一个微服务需要:
如果其中一个步骤失败,您的系统可能会处于不一致的状态。也许订单已保存,但其他人都不知道。或者每个人都认为有新订单,但实际上它不在数据库中。
以下是CreateOrderCommandHandler没有发件箱的模式:
public class CreateOrderCommandHandler( IOrderRepository orderRepository, IProductInventoryChecker inventoryChecker, IUnitOfWork unitOfWork, IEventBus eventBus) : IRequestHandler<CreateOrderCommand, OrderDto> { public async Task<OrderDto> Handle(CreateOrderCommand request, CancellationToken cancellationToken) { var order = new Order(request.CustomerId, request.ProductId, request.Quantity, inventoryChecker);
await orderRepository.AddAsync(order);
await unitOfWork.CommitAsync(cancellationToken);
// The database transaction is completed at this point.
await eventBus.Send(new OrderCreatedIntegrationEvent(order.Id));
return new OrderDto { Id = order.Id, Total = order.Total }; } }
|
此代码存在潜在的一致性问题。提交数据库事务后,可能会出现以下两种情况:
应用程序可能会在事务提交后、事件发送前崩溃。订单将在数据库中创建,但其他系统对此一无所知。
当我们尝试发送事件时,事件总线可能已关闭或无法访问。这将导致订单在未通知其他系统的情况下创建。
事务性发件箱模式通过确保数据库更新和事件发布被视为单个原子操作来帮助解决此问题。
序列图可说明了发件箱模式如何解决我们的一致性挑战。我们不再尝试将保存数据和发送消息作为单独的步骤,而是将订单和发件箱消息保存在单个数据库事务中。这是一个全有或全无的操作 - 我们不能处于不一致的状态。
单独的发件箱处理器负责实际的消息发送。它会不断检查发件箱表中的未发送消息并将其发布到消息队列。处理器在成功发布后将消息标记为已发送,以防止重复。
这里需要注意的一点是,发件箱模式为我们提供了至少一次的传递。发件箱消息将至少发送一次,但如果需要重试,也可能发送多次。这意味着我们必须让我们的消息消费者具有幂等性。
实现发件箱模式
首先,让我们创建用于存储消息的发件箱表:
CREATE TABLE outbox_messages ( id UUID PRIMARY KEY, type VARCHAR(255) NOT NULL, content JSONB NOT NULL, occurred_on_utc TIMESTAMP NOT NULL, processed_on_utc TIMESTAMP NULL, error TEXT NULL );
// We can consider adding this index since we will be querying for unprocessed messages often CREATE INDEX idx_outbox_messages_processed_on_utc ON outbox_messages (processed_on_utc);
|
我将使用 PostgreSQL 作为本例的数据库。请注意列jsonb的类型content。如果将来需要,它允许索引和查询 JSON 数据。
现在,让我们创建一个类来代表我们的发件箱条目:
public sealed class OutboxMessage { public Guid Id { get; init; } public string Type { get; init; } public string Content { get; init; } public DateTime OccurredOnUtc { get; init; } public DateTime? ProcessedOnUtc { get; init; } public string? Error { get; init; } }
|
以下是向发件箱添加消息的方法:
public async Task AddToOutbox<T>(T message, NpgsqlDataSource dataSource) { var outboxMessage = new OutboxMessage { Id = Guid.NewGuid(), OccurredOnUtc = DateTime.UtcNow, Type = typeof(T).FullName, // We'll need this for deserialization Data = JsonConvert.SerializeObject(message) };
await using var connection = await dataSource.OpenConnectionAsync(); await connection.ExecuteAsync( @""" INSERT INTO outbox_messages (id, occurred_on_utc, type, content) VALUES (@Id, @OccurredOnUtc, @Type, @Content::jsonb) """, outboxMessage); }
|
实现此目的的一种优雅方法是使用域事件来表示通知。当域中发生重大事件时,我们将引发域事件。在完成交易之前,我们可以拾取所有事件并将其存储为发件箱消息。您可以从工作单元或使用EF Core 拦截器执行此操作。
处理发件箱
我们需要的下一个组件是 Outbox 处理器。这可能是一个物理上独立的进程,也可能是同一进程中的后台工作程序。
我将使用 Quartz 来安排发件箱处理的后台作业。这是一个强大的库,对安排重复作业提供了出色的支持。
现在,让我们实现OutboxProcessorJob:
[DisallowConcurrentExecution] public class OutboxProcessorJob( NpgsqlDataSource dataSource, IPublishEndpoint publishEndpoint, Assembly integrationEventsAssembly) : IJob { public async Task Execute(IJobExecutionContext context) { await using var connection = await dataSource.OpenConnectionAsync(); await using var transaction = await connection.BeginTransactionAsync();
// You can make the limit a parameter, to control the batch size. // We can also select just the id, type, and content columns. var messages = await connection.QueryAsync<OutboxMessage>( @""" SELECT * FROM outbox_messages WHERE processed_on_utc IS NULL ORDER BY occurred_on_utc LIMIT 100 """, transaction: transaction);
foreach (var message in messages) { try { var messageType = integrationEventsAssembly.GetType(message.Type); var deserializedMessage = JsonConvert.DeserializeObject(message.Content, messageType);
// We should introduce retries here to improve reliablity. await publishEndpoint.Publish(deserializedMessage);
await connection.ExecuteAsync( @""" UPDATE outbox_messages SET processed_on_utc = @ProcessedOnUtc WHERE id = @Id """, new { ProcessedOnUtc = DateTime.UtcNow, message.Id }, transaction: transaction); } catch (Exception ex) { // We can also introduce error logging here.
await connection.ExecuteAsync( @""" UPDATE outbox_messages SET processed_on_utc = @ProcessedOnUtc, error = @Error WHERE id = @Id """, new { ProcessedOnUtc = DateTime.UtcNow, Error = ex.ToString(), message.Id }, transaction: transaction); } }
await transaction.CommitAsync(); } }
|
此方法使用轮询定期从数据库获取未处理的消息。轮询会增加数据库的负载,因为我们需要频繁查询未处理的消息。
处理发件箱消息的另一种方法是使用事务日志跟踪。我们可以使用Postgres 逻辑复制来实现这一点。数据库将更改从预写日志 (WAL) 传输到我们的应用程序,我们将处理这些消息并将其发布到消息代理。您可以使用它来实现基于推送的发件箱处理器。
注意事项和权衡
发件箱模式虽然有效,但会引入额外的复杂性和数据库写入。在高吞吐量系统中,监控其性能以确保其不会成为瓶颈至关重要。
我建议在 Outbox 处理器中实现重试机制以提高可靠性。考虑对瞬时故障使用指数退避,对持续性问题使用断路器,以防止系统在中断期间过载。
实施幂等消息消费者至关重要。网络问题或处理器重启可能会导致同一条消息被多次传送,因此您的消费者必须安全地处理重复处理。
随着时间的推移,发件箱表可能会大幅增长,从而可能影响数据库性能。尽早实施归档策略非常重要。考虑将已处理的消息移至冷存储或在一定时间后删除它们。
扩展发件箱处理
随着系统的发展,您可能会发现单个发件箱处理器无法处理大量消息。这会导致事件发生和消费者处理之间的延迟增加。
一种直接的方法是增加发件箱处理器作业的频率。您应该考虑每隔几秒运行一次。这可以显著减少消息处理的延迟。
另一个有效的策略是在提取未处理的消息时增加批处理大小。通过每次运行处理更多消息,您可以提高吞吐量。但是,请注意不要使批处理过大,以免导致事务长时间运行。
对于高容量系统,并行处理发件箱可能非常有效。实施锁定机制来认领一批消息,允许多个处理器同时工作而不会发生冲突。您可以使用SELECT ... FOR UPDATE SKIP LOCKED来认领一批消息。这种方法可以大大提高您的处理能力。
总结
发件箱模式是维护分布式系统中数据一致性的强大工具。通过将数据库操作与消息发布分离,发件箱模式可确保您的系统即使在出现故障时也能保持可靠性。
记住要保持你的消费者幂等,实施适当的扩展策略,并管理你的发件箱表的增长。
虽然这增加了一些复杂性,但保证消息传递的好处使其成为许多场景中的有价值的模式。
如果您希望以健壮、可用于生产的方式实现 Outbox 模式,则可以查看实用清洁架构。 它包括有关实现 Outbox 模式的完整部分,以及用于构建可维护和可扩展的 .NET 应用程序的其他基本模式。