Python中的发件箱模式源码


微服务/模块之间最常用的通信方式之一是通过事件进行异步通信。
实施可靠的消息传递有时可能具有挑战性。在今天的文章中,我将向您介绍如何实现发件箱模式以保证事件的传递和可靠的消息传递。

发件箱模式(事务性发件箱或存储转发事件发布者)是解决方案。
我们希望确保我们的业务实体和我们的业务事件存储在同一个事务中。

完整的代码示例,您可以在 GitHub 点击标题。

应用服务:

# blog/examples/src/outbox_pattern/library/application/service.py

class LibraryCardService:
    def __init__(
        self,
        library_card_repository: ILibraryCardRepository,
        event_bus: IEventBus,
        session: Session
    ) -> None:
        self._library_card_repository = library_card_repository
        self._event_bus = event_bus
        self._session = session

    def create(self, owner_id: str) -> LibraryCard:
        library_card = LibraryCard.create(EntityId.of(owner_id))

        with self._session.begin():
            events = library_card.events
            self._library_card_repository.save(library_card)

            for event in events:
                self._event_bus.publish(event)

        return library_card

我们将实体和事件保存在同一个事务中。
我们的业务对象的events属性返回需要发布的事件列表:

# blog/examples/src/outbox_pattern/library/domain/library_card.py

class LibraryCard:
    ...

    @property
    def events(self) -> list[Event]:
        return self._domain_events[:]

    @classmethod
    def create(cls, owner_id: EntityId) -> "LibraryCard":
        instance = cls(
            id=EntityId.new_one(),
            owner_id=owner_id,
            rentals=[],
            status=Status.ACTIVE
        )
        instance._add_domain_event(
            LibraryCardCreated(
                id=uuid.uuid4().hex,
                occurred_on=datetime.utcnow(),
                card_id=str(instance.id),
                owner_id=str(instance._owner_id),
            )
        )
        return instance

    def _add_domain_event(self, event: Event) -> None:
        self._domain_events.append(event)

    ...

这些事件是不可变的简单 python 数据类:

# blog/examples/src/outbox_pattern/library/domain/events.py

@dataclass(frozen=True)
class Event:
    id: str
    occurred_on: datetime


@dataclass(frozen=True)
class LibraryCardCreated(Event):
    card_id: str
    owner_id: str


OutboxMessage定义:

# blog/examples/src/outbox_pattern/outbox/message.py

@dataclass(frozen=True)
class MessageType:
    qualified_name: str

    def module_name(self) -> str:
        without_class_name = self.qualified_name.split(".")[:-1]
        return
".".join(without_class_name)

    def class_name(self) -> str:
        return self.qualified_name.split(
".")[-1]

    def __str__(self) -> str:
        return self.qualified_name


@dataclass
class OutboxMessage:
    id: EntityId
    occurred_on: datetime
    type: MessageType
    data: dict[str, Any]
    processed_on: Optional[datetime]

好的,现在我们可以转到消息发件箱了。我们的消息发件箱界面如下所示:

# blog/examples/src/outbox_pattern/outbox/message_outbox.py

class IMessageOutbox(ABC):
    @abstractmethod
    def save(self, event: Event) -> None:
        pass

    @abstractmethod
    def mark_as_published(self, message: OutboxMessage) -> None:
        pass

    @abstractmethod
    def to_publish(self) -> list[OutboxMessage]:
        pass


我决定使用 SQLAlchemy 实现:

# blog/examples/src/outbox_pattern/outbox/sql_alchemy_message_outbox.py

class OutboxMessageModel(Base):
    __tablename__ = "outbox_messages"

    id = Column(CHAR(32), primary_key=True)
    occurred_on = Column(DateTime, nullable=False)
    type = Column(String, nullable=False)
    data = Column(JSON, nullable=False)
    processed_on = Column(DateTime)

    def __str__(self) -> str:
        return (
            f"OutboxMessage(id={self.id}, occurred_on={self.occurred_on}, "
            f"type={self.type}, processed_on={self.processed_on})"
        )


class SqlAlchemyMessageOutbox(IMessageOutbox):
    def __init__(self, session: Session) -> None:
        self._session = session

    def _to_outbox_message(self, model: OutboxMessageModel) -> OutboxMessage:
        return OutboxMessage(
            id=EntityId.of(model.id),
            occurred_on=model.occurred_on,
            type=MessageType(model.type),
            data=model.data,
            processed_on=model.processed_on,
        )

    def save(self, event: Event) -> None:
        data = asdict(event)
        outbox_message = OutboxMessageModel(
            id=str(EntityId.new_one()),
            occurred_on=datetime.utcnow(),
            type=f"{type(event).__module__}.{type(event).__name__}",
            data=data,
        )
        self._session.add(outbox_message)

    def to_publish(self) -> list[OutboxMessage]:
        stmt = (
            select(OutboxMessageModel)
            .where(OutboxMessageModel.processed_on == null())
            .order_by(OutboxMessageModel.occurred_on)
            .limit(100)
        )

        models: list[OutboxMessageModel] = self._session.execute(stmt).scalars().all()

        result = []
        for model in models:
            result.append(self._to_outbox_message(model))

        return result

    def mark_as_published(self, message: OutboxMessage) -> None:
        self._session.merge(
            OutboxMessageModel(
                id=str(message.id),
                occurred_on=message.occurred_on,
                type=str(message.type),
                data=message.data,
                processed_on=datetime.utcnow(),
            )
        )

正如你在上面看到的那样:

  • save方法将事件作为一个参数,将其转换为OutboxMessageModel,并将其保存在数据库中。type属性是完全合格的类名。data属性是一个序列化的事件的数据。
  • to_publish方法返回所有属性为processed_on等于null的消息。
  • mark_as_published接收消息并将 processed_on属性设置为datetime.utcnow()值。

然后我们想将我们的消息发件箱作为依赖项传递给我们的EventBus:

# blog/examples/src/outbox_pattern/shared/event_bus.py

class IEventBus(ABC):
    @abstractmethod
    def publish(self, event: Event) -> None:
        pass


class StoreAndForwardEventBus(IEventBus):
    def __init__(self, message_outbox: IMessageOutbox) -> None:
        self._message_outbox = message_outbox

    def publish(self, event: Event) -> None:
        self._message_outbox.save(event)

最后一个组件是发件箱处理器。负责从数据库中获取未处理的消息,处理它们,并使用processed_on设置为正确值的属性再次保存。

# blog/examples/src/outbox_pattern/outbox/outbox_processor.py

class OutboxProcessor:
    def __init__(
        self,
        message_outbox: IMessageOutbox,
        session: Session,
        messenger: Apos
    ) -> None:
        self._message_outbox = message_outbox
        self._session = session
        self._messenger = messenger
        self._logger: FilteringBoundLogger = structlog.get_logger()

    def _get_cls_for(self, message_type: MessageType) -> Type:
        module = importlib.import_module(message_type.module_name())
        return getattr(module, message_type.class_name())  # type: ignore

    def process_outbox_message(self) -> None:
        with self._session.begin():
            messages = self._message_outbox.to_publish()

            for message in messages:
                event_cls = self._get_cls_for(message.type)
                event = event_cls(**message.data)
                self._messenger.publish_event(event)
                self._logger.info(f"Publishing event {event}")
                self._message_outbox.mark_as_published(message)


程序如下:

  1. 获取所有需要发布的消息
  2. 根据保存在数据库中的完全限定类名构建具体事件实例
  3. 使用一些消息传递库发布事件。对于本文,我使用了 apos
  4. 将此消息保存为已发布,以便在下一次执行期间不会对其进行处理。

如果在第 3 点出现任何问题,消息将不会被标记为已处理。 OutboxProcessor将在下次执行时尝试再次处理它。
正如我之前提到OutboxProcessor的需要定期运行(我使用rocketry来完成它。如果您还没有听说过这个库,我鼓励您检查一下。它功能强大,轻巧且非常易于使用)。

from rocketry import Rocketry
from rocketry.conditions.api import every

from src.outbox_pattern import messenger
from src.outbox_pattern.outbox.outbox_processor import OutboxProcessor
from src.outbox_pattern.outbox.sql_alchemy_message_outbox import SqlAlchemyMessageOutbox
from src.outbox_pattern.shared.db import Db

app = Rocketry()


@app.task(every("10 seconds"))
def process_messages() -> None:
    session = Db(
"sqlite:///db.sqlite").session
    message_outbox = SqlAlchemyMessageOutbox(session)
    processor = OutboxProcessor(message_outbox, session, messenger)

    processor.process_outbox_message()


if __name__ ==
"__main__":
    app.run()

这种方法给了我们“至少一次交付”