微服务/模块之间最常用的通信方式之一是通过事件进行异步通信。
实施可靠的消息传递有时可能具有挑战性。在今天的文章中,我将向您介绍如何实现发件箱模式以保证事件的传递和可靠的消息传递。
发件箱模式(事务性发件箱或存储转发事件发布者)是解决方案。
我们希望确保我们的业务实体和我们的业务事件存储在同一个事务中。
完整的代码示例,您可以在 GitHub 点击标题。
应用服务:
# blog/examples/src/outbox_pattern/library/application/service.py |
我们将实体和事件保存在同一个事务中。
我们的业务对象的events属性返回需要发布的事件列表:
# blog/examples/src/outbox_pattern/library/domain/library_card.py |
这些事件是不可变的简单 python 数据类:
# blog/examples/src/outbox_pattern/library/domain/events.py |
OutboxMessage定义:
# blog/examples/src/outbox_pattern/outbox/message.py |
好的,现在我们可以转到消息发件箱了。我们的消息发件箱界面如下所示:
# blog/examples/src/outbox_pattern/outbox/message_outbox.py |
我决定使用 SQLAlchemy 实现:
# blog/examples/src/outbox_pattern/outbox/sql_alchemy_message_outbox.py
class OutboxMessageModel(Base):
__tablename__ = "outbox_messages"
id = Column(CHAR(32), primary_key=True)
occurred_on = Column(DateTime, nullable=False)
type = Column(String, nullable=False)
data = Column(JSON, nullable=False)
processed_on = Column(DateTime)
def __str__(self) -> str:
return (
f"OutboxMessage(id={self.id}, occurred_on={self.occurred_on}, "
f"type={self.type}, processed_on={self.processed_on})"
)
class SqlAlchemyMessageOutbox(IMessageOutbox):
def __init__(self, session: Session) -> None:
self._session = session
def _to_outbox_message(self, model: OutboxMessageModel) -> OutboxMessage:
return OutboxMessage(
id=EntityId.of(model.id),
occurred_on=model.occurred_on,
type=MessageType(model.type),
data=model.data,
processed_on=model.processed_on,
)
def save(self, event: Event) -> None:
data = asdict(event)
outbox_message = OutboxMessageModel(
id=str(EntityId.new_one()),
occurred_on=datetime.utcnow(),
type=f"{type(event).__module__}.{type(event).__name__}",
data=data,
)
self._session.add(outbox_message)
def to_publish(self) -> list[OutboxMessage]:
stmt = (
select(OutboxMessageModel)
.where(OutboxMessageModel.processed_on == null())
.order_by(OutboxMessageModel.occurred_on)
.limit(100)
)
models: list[OutboxMessageModel] = self._session.execute(stmt).scalars().all()
result = []
for model in models:
result.append(self._to_outbox_message(model))
return result
def mark_as_published(self, message: OutboxMessage) -> None:
self._session.merge(
OutboxMessageModel(
id=str(message.id),
occurred_on=message.occurred_on,
type=str(message.type),
data=message.data,
processed_on=datetime.utcnow(),
)
)
正如你在上面看到的那样:
- save方法将事件作为一个参数,将其转换为OutboxMessageModel,并将其保存在数据库中。type属性是完全合格的类名。data属性是一个序列化的事件的数据。
- to_publish方法返回所有属性为processed_on等于null的消息。
- mark_as_published接收消息并将 processed_on属性设置为datetime.utcnow()值。
然后我们想将我们的消息发件箱作为依赖项传递给我们的EventBus:
# blog/examples/src/outbox_pattern/shared/event_bus.py |
最后一个组件是发件箱处理器。负责从数据库中获取未处理的消息,处理它们,并使用processed_on设置为正确值的属性再次保存。
# blog/examples/src/outbox_pattern/outbox/outbox_processor.py |
程序如下:
- 获取所有需要发布的消息
- 根据保存在数据库中的完全限定类名构建具体事件实例
- 使用一些消息传递库发布事件。对于本文,我使用了 apos
- 将此消息保存为已发布,以便在下一次执行期间不会对其进行处理。
如果在第 3 点出现任何问题,消息将不会被标记为已处理。 OutboxProcessor将在下次执行时尝试再次处理它。
正如我之前提到OutboxProcessor的需要定期运行(我使用rocketry来完成它。如果您还没有听说过这个库,我鼓励您检查一下。它功能强大,轻巧且非常易于使用)。
from rocketry import Rocketry |
这种方法给了我们“至少一次交付”