使用Redis实现简单的事件驱动架构 [DDD、事件溯源和一致性哈希] - core27


用 Redis 模拟 Kafka实现事件驱动架构,

Apache Kafka 已成为大多数技术栈中的主流组件。使用 Kafka 的好处包括确保事件中的因果顺序,同时保持并行性,通过在服务器之间快速复制分区来恢复故障,等等。
然而,运行 Kafka 也面临着一系列挑战。虽然许多工程团队都希望将 Kafka 添加到他们的堆栈中并与“真正的”工程师一起赢得一席之地,但运营开销构成了强大的进入障碍。

在这篇文章中,我们将重点介绍如何构建一个看起来像传统单体应用程序但又是松散耦合的事件驱动系统的系统。为此,我们依赖于从领域驱动设计、事件溯源和一致性哈希等概念中学习。

有序事件
大多数系统关心事件的顺序。大多数系统中的排序仅限于所考虑的域。例如,当我们查看帖子到一个线程时,我们关心的是相对于帖子的排序。当我们查看金融系统时,排序主要限于账户。大型系统中事件的全局排序很少有用,但可能是相关的。

场景:帖子被添加到一个线程Thread中
假设我们对每个添加的帖子都有相当多的后期处理,这反过来会更新线程的某些属性。
这创建了一个相当好的场景来说明分区的使用。

在这种情况下,默认方法是将所有帖子发送到队列中,并让一群工作人员(或消费者)完成工作。这为我们提供了系统所需的并行性,但在我们与多个消费者打交道的那一刻,顺序就会丢失。
我们保留顺序的唯一方法是确保我们一次处理一个任务,从而才能反映该线程上发生的事情的真实顺序。
下一个明显的想法是使用每个线程的专用队列来处理相同的问题,但如果我们知道我们将生成大量线程,那立即感觉像是矫枉过正。

分区
分区只是将我们的排队系统分解为专门的分区。因此,如果我们从一个天真的估计开始,即8个工人每分钟能够处理1600个事件,那么我们的设计就从16个分区开始。
你可能需要做更多的工作来确保你的估计是好的,但在这个例子中,我们将以假设它是好的来工作。我们还为一个分区分配一个worker,因为我们希望每个分区都能始终保持因果排序。

现在我们需要确保一个特定线程的帖子都被路由到同一个分区。每个分区都由一个消费者管理,所以我们的排序不会被打乱。

重要的是要记住,"队列分区 "或 "专用分区 "是一个抽象的结构。它实际上只是一个队列。我们使用分区这个术语,因为它使我们很容易与该领域广泛使用的术语保持一致。

一致性哈希
我们将使用一致哈希散列作为一种手段,将属于特定线程的所有帖子路由到同一队列分区(或队列)。
在我们的例子中,我们将使用Murmurhash和一个由名为uHashRing的库管理这个持续体。

将我们的队列视为一个连续体
现在,如果我们简单地将所有的8个队列放在一个圆圈中,我们会得到这样的结果。让我们把这个称为连续体,因为第7个队列后面是第一个队列,即第0个队列。

现在,一致性散列允许我们使用threadId将一个给定的任务/工作映射到一个特定的队列。因此,在这种情况下,我们使用threadId作为分区的关键。
这里需要注意的一个重要方面是,我们没有把我们的队列称为后处理队列。它们不是专用队列。你可以把一个Transaction事务事件扔到这里,并期望相应的消费者(和事件处理程序)来处理它。

事件
在前面的几段话中,我们已经说了很多关于事件的内容,但我们还没有真正定义事件的含义。
我们的系统会把事件看成是发生在我们系统中的事实。
事实通常是指以某种方式改变了系统状态的事情(或者是失败的事情)。
例如,PostCreatedEvent发生在一个新帖子被创建时。同样地,当帖子被更新时,PostUpdatedEvent也会发生。
你可以将一个事件映射到你系统中的大多数CRUD操作。
如果将你的系统设计成领域,你会惊讶地发现一个应用服务所触发的事件的数量。

一个事件也映射了系统的周围状态。
让我们设计一个创建帖子的应用服务:

from typing import List
from .services.base import ServiceBase
from sqlalchemy.session import Session

class PostService(ServiceBase)
    def __init__(self, thread_id: UUID, params:   PostCreateAPIParams, db_session: Session):
        self.thread_id = thread_id
        self.params = params
        self.user: Union[User, None] = None
        self.post: Union[Post, None] =  None
        self.db_session = db_session
        self.errors: List[str] = []
        self.error_code: Union[str, None] = None
 
     async def __call__(self)
        return await self.invoke()

     async def invoke(self):
         await self.find_thread()
         await self.verify_author()
         await self.create_post()
         await self.build_response_dao()
         await self.trigger_events()
         return self   

     async def find_thread(self):
    # truncated for brevity
    pass
    
    
     async def trigger_events(self):
    user_dao: UserDAO = UserDAO.from_orm(self.user) if self.user else None
        post_dao: PostDAO = PostDAO.from_orm(self.post) if self.post else None
        thread_dao: ThreadDAO = ThreadDAO.from_orm(self.thread) if self.thread else None

        if await self.has_errors:
            event_dao = PostCreatedEventDAO(
                user=user_dao,
                thread=thread_dao,
                post=post_dao,
                params=self.params,
                errors=self.errors,
                error_code= self.error_code
            )
        else:
            event_dao = PostCreationFailedEventDAO(
        user=user_dao,
                thread=thread_dao,
                params=self.params,
                post=post_dao
                errors=self.errors,
                error_code= self.error_code
            )     
                 
       partition_key = (
       str(self.thread.id) if self.thread else "PostCreationFailedEvent"
        )
       await SystemEventService.trigger(partition_key=partition_key, event_dao=event_dao, db_session=self.db_session)
       return self


在这个例子中, trigger_events方法决定了要发布的事实。在这种情况下,它收集了周围的上下文。这也可能包括请求参数(如传递到事件中的params属性)。然而,什么是正确捕获的上下文也取决于上下文:)。

因此,我们的最终事件可能看起来像这样。请注意,该事件没有一个 updated_at 属性,因为我们认为事件是不可改变的事实。我们不能撤消已经发生的事情。

{
   "event_name": "PostCreatedEvent"
   
"event_id": "0fb6a4d4-ae65-4f18-be44-edb9ace6b5bb",
   
"event_version": "v1.0",
   
"time": "2022-09-03T04:16:59.294509+00:00",
   
"payload": {
          
"user": { "user_id":"1a1269ee-6b6f-4325-8562-cb169a68e7b3", "is_blocked": false, "first_name": "Siddharth", "last_name": "R", "email": "sid@........},
          
"post" :{ "post_id": "fa3e7b12-4908-4d53-be11-629e6f47ae90", "thread_id": "666d0404-0756-4b93-892f-19d9b4b25a99", ...... },
          
"thread": { "thread_id": "666d0404-0756-4b93-892f-19d9b4b25a99", .....},
          
"params": { .... },
          
"errors": [ ... ],
          
"error_code": ""
    },
   
"created_at": "2022-09-03T04:16:59.294"
   
"logged_at": "2022-09-03T04:16:59.294"
}

在我们的例子中,应用服务通过调用触发方法将事件转给一个叫做SystemEventsService的服务。  该方法在为我们实际发布该事件之前做了一系列的工作。它通过我们先前看到的连续体运行,根据我们传递给它的分区键识别队列(和相应的工作者worker)。  这几乎就是我们需要一致的散列的原因。这可以确保我们的事件总是由同一个分区(和工作者)处理。

因此,一旦我们为我们的任务确定了工作者,我们就要求工作者

  • 保留该事件,以备我们以后需要再来处理它
  • 将其发布给所有相关的工作者
  • 让我们订阅该任务的事件驱动型工作负载触发其工作流程。

将事件分配到正确的分区

@staticmethod
async def trigger(
    partition_key: str, event_dao: SystemEventDAO
):    

    try:
        worker: SystemEventPartitionConfig = await SystemEventsService._get_worker(
            partition_key=partition_key
        )
        worker_func = getattr(system_events_workers, worker.worker_name)
        log_info(msg=f"Trigger called with worker: {worker.worker_name}")
        worker_func.delay(event_dao=event_dao.json())
    except (OperationalError, ConnectionError) as e:
        log_error(msg=f
"[RedisError] {e}", e=e, method="trigger", loc=f"{__name__}")
    except Exception as e:
        log_error(
            msg=f
"SystemEventError: {e}", e=e, method="trigger", loc=f"{__name__}"
        )
    return

@staticmethod
async def _get_worker(partition_key: str) -> SystemEventPartitionConfig:
   
"""For a given string it returns the worker that should process the event by running it through a murmurr hashing
    function and uses that to fetch the nodes from the continuum
"""

    node = ring.get(key=partition_key)
    nodename = node.get(
"nodename", None)

    if not nodename:
        raise ValueError(
"Could not find a node in the continuum for key {node}")

    node_config = continuum.get(nodename, None)
    if not node_config:
        raise ValueError(
           
"Could not find a node in the continuum for key {nodename}"
        )

    config_attrs = {
"partition_key": partition_key, "partition_id": nodename}
    config_attrs = {**node_config, **config_attrs}
    return SystemEventPartitionConfig(**config_attrs)

事件驱动的系统
下面是最好的部分:
现在整个系统可以让你把你的应用程序作为一系列的异步事件处理程序来运行,这些处理程序可以在特定的事件上被调用。当一个事件到达正确的分区时,工作者会将该事件分配给一系列的事件处理程序。

async def create_system_event(
    task_type, event_dao: SystemEventDAO, db_session: Session = None
):
    if not db_session:
        db_session = get_session()

    system_event: Union[SystemEvent, None] = None

    try:
        if event_dao.event_name not in SYSTEM_GENERATED_REQUEST_EVENTS:
            system_event = await SystemEventsService.create(
                event_dao=event_dao, db_session=db_session
            )

            if system_event:
                log_info(
                    msg=f"system_event with id: {system_event.id} created for event_name: {system_event.event_name}"
                )
                event_dao.id = system_event.id
        else:
            log_info(
                msg=f
"system_generated_request_event with name: {event_dao.event_name} ready for processing."
            )
        await EventHandler.process(event_dao=event_dao, db_session=db_session)
    except Exception as e:
        log_error(
            msg=f
"Error handling events: {event_dao.event_name}: {e} \\n {traceback.print_exc()}"
        )
        capture_exc(error=e)
    finally:
        db_session.close()
    return system_event


分区工作者持久化该事件,并将其分派给EventHandler。 事件处理程序是一系列可独立部署的函数,可以做任何你想做的事情。
如:

handlers = [
    PostInsightsGeneratorEventHandler,
    ThreadActivityManagerEventHandler,
    SpamDetectionEventHandler,
    #...,
    #....,
    ImageResizerEventHandler,
]

而我们的处理器可以以任何你喜欢的方式处理它们。在这里,我们按顺序处理它们,但它也可以并行派发:

class EventHandler:
    @staticmethod
    async def process(event_dao: SystemEventDAO, db_session: Session = None):
        log_info(msg=f'Event: {event_dao.event_name} arrived')
        
        for event_handler in handlers:
            await event_handler.process(event_dao=event_dao, db_session=db_session)
        return


处理程序本身是一个相当简单的类,它检查相关的事件。

class PostInsightsGeneratorEventHandler(event_dao: SystemEventDAO, db_session: Session):
    if not event_dao.event_name == "PostCreatedEvent" :
         return 

    log_info(msg=f
"PostCreatedEventHandler called with {event_dao.event_id}")
    # Do whatever you need to here