使用 Go、SSE 和 htmx 实时更新网站


在这个例子中,我们将在服务器端呈现 HTML,除了 htmx 库之外不使用任何 JavaScript 代码进行交互。

完整示例在GitHub上。您可以使用 docker-compose 在本地运行它。

使用的工具

  • Echo — 我喜欢的轻量级 HTTP 路由器,它的错误处理比 更简单net/http。
  • templ — 一个基于代码生成的 HTML 模板库。有时它可能会很奇怪,但总的来说,我对它很满意,比 更喜欢它html/template。最好与 IDE 插件一起使用。
  • htmx — 一个无需编写 JavaScript 即可使用 AJAX 和 SSE 的库。
  • Watermill——我们维护的用于处理消息的事件驱动库。
  • PostgreSQL和Google Cloud Pub/Sub用于存储和消息传递基础设施。(您可以选择不同的 Pub/Sub 进行消息传递,甚至是 Postgres。)

在设计 SSE 端点时,您必须决定有效负载应该是什么、何时发送更新以及向谁发送。

有效负载
有效负载只是文本,如何编码由您决定。它可以是常规的 JSON API 响应,也可以是直接嵌入网站的 HTML。请记住,每行都应有前缀data:,并且有效负载需要以两个新行 ( \n\n) 结尾。

您需要一种方法来了解应用程序中何时发生更改,以便推送更新。例如,如果用户收到一条消息,您会在 UI 中显示一个红色气泡。

SSE 端点是长期运行的,因此您可能有数百或数千个 goroutine 在后台运行,您必须通知它们发生更改。作为响应,每个 goroutine 都应向客户端发送一个事件。由于您可能正在运行多个服务实例,因此这无法在内存中工作。

您通常只想通知某些用户发生的事情。如果我向您发送消息,我希望通知出现在您的屏幕上,但不会出现在其他任何人的屏幕上。因此,您需要一种方法来过滤发生的事情并选择谁应该获得更新(以及触发哪些 SSE 端点)。

在此示例中,其过程如下:

  • 内容:帖子“统计”模型,包括 HTML 格式的浏览量和反应数量。
  • 时间:当有人看到帖子或对其做出反应时。
  • 收件人:所有看到更新帖子的人。其他帖子未更新。

实现 SSE 端点
虽然您完全可以从头开始(或使用库)创建 SSE 端点,而且它并不复杂,但困难的部分是触发更新以响应发生的事情。(并且通过网络执行此操作,因为在生产中很少运行单个服务实例。)与运行 HTTP 服务器一样,您不想在这里重新发明轮子。

我们通常使用Watermill来处理与事件相关的任何事情。这是我们维护的一个 Go 库,它抽象了 Pub/Subs 的低级细节。(GitHub 上的星星数量接近 7k⭐️)。您可以将它与任何现有代码库一起使用,因为它不是一个框架,而是一个轻量级库(就像 htmx 一样)。它支持许多 Pub/Subs,因此可以轻松地从您已有的基础设施(甚至是 SQL 数据库)开始。

Watermill
首先,你需要一个Pub/Sub — 一个让你处理网络上消息的系统(也称为“消息代理”或“队列”)。常见的选择是 Kafka 或 RabbitMQ,但它也可以是一个 SQL 数据库。

Watermill 将所有 Pub/Sub 抽象为两个接口:

type Publisher interface {
    Publish(topic string, messages ...*Message) error
    Close() error
}

type Subscriber interface {
    Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
    Close() error
}

您可以发布信息,也可以订阅信息。其中总会涉及到一个主题--一个决定谁会收到信息的字符串。

Watermill 的核心部分是 "消息"。它就像 net/http 软件包中的请求(Request)。最简单的消息只有一个可选 ID 和一个有效载荷。有效载荷是一片字节,因此可以使用任何你想要的编排方式(JSON、协议缓冲区、纯字符串等)。

msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))

虽然 Watermill 的所有组件都基于发布者和订阅者接口,但直接使用它们是相对低级的 API。在本例中,我们将使用 Watermill 的 CQRS 组件,它是更高级别的 API。它基于相同的理念,但去除了一些模板,如序列化和反序列化。我们将使用 EventBus 发布事件,并使用 EventProcessor 订阅事件。

我们希望发布两项事件活动:

  • PostViewed 在有人看到帖子时发布。
  • PostReactionAdded 在有人对帖子做出反应时发布。
每个事件都有一个事件处理程序,用于更新数据库中的帖子统计信息(与 HTTP 处理程序的概念类似)。(处理程序还应发布 PostStatsUpdated 事件。我们将用它来触发 SSE 更新。

发布事件
首先,让我们创建一个发布者。我使用的是 Google Cloud Pub/Sub Publisher,但它也可以与 Watermill 支持的任何其他 Publisher 互换使用。配置只需要一个项目 ID。

(请注意,Google Cloud Pub/Sub 只是 Watermill 支持的 Pub/Sub 之一。您可以轻松将其更改为其他支持的 Pub/Sub。就像 ORM 可以与 MySQL、PostgreSQL 和 SQLite 一起使用一样)。

logger := watermill.NewStdLogger(false, false)

publisher, err := googlecloud.NewPublisher(
    googlecloud.PublisherConfig{
        ProjectID: cfg.PubSubProjectID,
    },
    logger,
)


发布器处理消息,这意味着您必须将事件(结构)编组为字节并选择将它们发布到哪些主题。对所有遵循某些约定的事件和主题使用相同的编组是很常见的,例如事件名称是其中的一部分。

我们将使用EventBus组件来简化发布 API。您可以将其视为发布者的高级包装器(如您所见,它是第一个参数)。您只需传递一次配置选项,然后就可以使用单个方法调用发布事件。

eventBus, err := cqrs.NewEventBusWithConfig(
    publisher,
    cqrs.EventBusConfig{
        GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
            return params.EventName, nil
        },
        Marshaler: cqrs.JSONMarshaler{},
        Logger:    logger,
    },
)

配置需要一个 Marshaler,因此我们使用 cqrs.JSONMarshaler{}(所有消息都将被 marshal 为 JSON)。

GeneratePublishTopic 函数会根据可用参数返回主题名称。我们没有直接向 Publish 传递主题,而是定义了这个函数来根据消息确定主题。每次发布消息时,EventBus 都会调用该函数。在本例中,我们将使用 params.EventName。因此,如果你考虑这样一个结构

type PostViewed struct {
    PostID int <code>json:"post_id"</code>
}

它将发布在 PostViewed 主题上。(所选 marshaler 提供了提取事件名称的方法)。

使用事件总线发布事件非常简单。有了 marshaler 和 GeneratePublishTopic 的设置,我们就可以将事件结构传递给 Publish,剩下的就在幕后完成了。在 HTTP 处理程序中,我们可以使用类似下面这样的功能:

event := PostViewed{
    PostID: post.ID,
}

err = h.eventBus.Publish(ctx, event)

订阅事件
我决定让 HTTP 端点只发布事件。事件处理程序会异步更新数据库中的帖子统计信息。这样,客户端就无需等待更改的应用,而视图最终将通过 SSE 更新。

我们需要两个事件处理程序来更新数据库中的统计信息。第一个处理程序更新视图计数,第二个处理程序更新反应计数。

用于订阅事件的 CQRS 组件是 EventProcessor。与 EventBus 一样,最初也需要进行一些设置。但有了它,以后编写处理程序就会非常轻松。它背后的理念与 EventBus 类似,但却是相反的。

首先,创建一个路由器。它的概念与大家熟悉的 HTTP 路由器类似。该组件在后台运行,并将消息路由到处理程序。

router, err := message.NewRouter(message.RouterConfig{}, logger)

与 HTTP 路由器类似,Watermill 路由器也支持中间件。例如,你可以添加 Recoverer 中间件,这样处理程序中的恐慌就不会炸毁你的服务器。

router.AddMiddleware(middleware.Recoverer)

现在我们可以创建 EventProcessor。

eventProcessor, err := cqrs.NewEventProcessorWithConfig(
    router, 
    cqrs.EventProcessorConfig{
        GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) {
            return params.EventName, nil
        },
        SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) {
            return googlecloud.NewSubscriber(
                googlecloud.SubscriberConfig{
                    ProjectID: cfg.PubSubProjectID,
                    GenerateSubscriptionName: func(topic string) string {
                        return fmt.Sprintf("%v_%v", topic, params.HandlerName)
                    },
                },
                logger,
            )
        },
        Marshaler: cqrs.JSONMarshaler{},
        Logger:    logger,
    },
)

第一个参数是路由器。这类似于 EventBus 对发布者的 "包装"。

然后是配置。Marshaler 和 GenerateSubscribeTopic 与 EventBus 中的概念相同。唯一不同的是,它们位于 Pub/Sub 的另一端。在事件总线中,Marshaler 会对消息进行 Marshal,然后由函数决定将其发布到哪个主题。而在这里,Marshaler 会将消息卸载回结构体,然后由 GenerateSubscribeTopic 决定向哪个主题订阅。

SubscriberConstructor 正如其名:它返回一个新的订阅者。你可能会问,为什么不使用单个订阅者,就像我们在 EventBus 中使用的发布者那样呢?

发布消息非常简单:你只需 marshal 一个结构体,将字节发送到一个主题,然后就大功告成了。订阅则是更有趣的地方。例如,同一服务有两个副本。如何确保只有一个副本收到来自 Pub/Sub 的消息?

策略取决于 Pub/Sub。在谷歌云 Pub/Sub 中,您可以使用绑定到主题的单个 "订阅",并在副本之间共享。这就是为什么订阅者构造函数在这种情况下非常有用。它允许我们为每种事件类型指定使用哪种订阅。在本例中,订阅将主题名称与处理程序名称连接起来。例如,PostViewed_UpdateViews。

正如承诺的那样,设置完成后,添加消息处理程序就非常简单了。请注意,函数是通用的(具有推断类型),因此您可以使用强类型事件!处理程序名称用于生成订阅名称,因此在处理程序中必须是唯一的。

err = eventProcessor.AddHandlers(
    cqrs.NewEventHandler(
        "UpdateViews"
        func(ctx context.Context, event *PostViewed) error {
            return repo.UpdatePost(ctx, event.PostID, func(post *Post) {
                post.Views++
            })
        },
    ),
    cqrs.NewEventHandler(
       
"UpdateReactions",
        func(ctx context.Context, event *PostReactionAdded) error {
            return repo.UpdatePost(ctx, event.PostID, func(post *Post) {
                post.Reactions[event.ReactionID]++
            })
        },
    ),
)

最后一部分是运行路由器,就像运行 HTTP 服务器一样。

go func() {
    err := router.Run(context.Background())
    if err != nil {
        panic(err)
    }
}()

发布帖子统计已更新
我们将使用另一个事件来触发 SSE 更新:PostStatsUpdated。它包括帖子的 ID 和已更新内容的记录(查看次数或反应 ID)。

type PostStatsUpdated struct {
    PostID          int     <code>json:"post_id"</code>
    ViewsUpdated    bool    <code>json:
"views_updated"</code>
    ReactionUpdated *string <code>json:
"reaction_updated"</code>
}

更新帖子后,双方处理人员都应发布该PostStatsUpdated事件。

err = eventProcessor.AddHandlers(
    cqrs.NewEventHandler(
        "UpdateViews",
        func(ctx context.Context, event *PostViewed) error {
            err = repo.UpdatePost(ctx, event.PostID, func(post *Post) {
                post.Views++
            })
            if err != nil {
                return err
            }

            statsUpdated := PostStatsUpdated{
                PostID:       event.PostID,
                ViewsUpdated: true,
            }

            return eventBus.Publish(ctx, statsUpdated)
        },
    ),
    cqrs.NewEventHandler(
       
"UpdateReactions",
        func(ctx context.Context, event *PostReactionAdded) error {
            err := repo.UpdatePost(ctx, event.PostID, func(post *Post) {
                post.Reactions[event.ReactionID]++
            })
            if err != nil {
                return err
            }

            statsUpdated := PostStatsUpdated{
                PostID:          event.PostID,
                ReactionUpdated: &event.ReactionID,
            }

            return eventBus.Publish(ctx, statsUpdated)
        },
    ),
)

SSE 路由器
现在是时候实现 SSE 端点了。Watermill 还提供了一个可以与其他内部组件很好地配合使用的 SSE 组件。

主要组件称为 SSE Router,其背后的想法非常简单。当您调用其AddHandler方法时,它会订阅配置的订阅者中的给定主题。该方法返回一个常规 HTTP 处理程序,您可以将其与您想要的任何 HTTP 路由器一起使用。每当所选主题中出现消息时,它将以扇出方式传播到其中所有正在运行的 SSE 端点。

....更多点击标题

配置订阅者
还记得我们希望每个事件一次仅由一个服务副本处理的部分吗?对于用于 SSE 的事件,您需要一种违反直觉的方法:所有订阅者都需要处理每个事件,因为 SSE 端点将在所有服务实例上运行。

...更多点击标题

页面
难题的最后一部分是客户端代码。


其他需要考虑的事项
两种 SSE 端点
如何从 SSE 端点返回事件完全由您决定。以下是两种适用于不同场景的方法。

  1. 一个端点,在最初和每次更新时都返回相同的数据模型。每次触发更新时,您都会“刷新”模型,也许嵌入在网站上。这就是我们在上例中使用的。
  2. 一个端点,最初不返回任何内容,然后在发生更新时不断发送唯一更新。例如,您可以将每个新事件附加到某个列表中。这就是您实现通知或网络聊天的方式。

至少一次投递
在使用几乎任何 Pub/Sub 时,您必须注意“至少一次”的交付保证。您可能会因为网络问题或服务器在错误的时间停机而两次收到同一条消息。
不要试图绕过这个问题。相反,接受这种情况的发生,并将你的处理程序设计为幂等的。这意味着处理消息两次(或更多)与处理一次具有相同的效果。

在上面的例子中,我们没有对此进行防范。如果同一条消息被处理两次,它会在数据库中添加额外的视图或反应。在这种情况下,这不是什么大问题,我们可以忍受。一种防止这种情况发生的方法是将已处理的消息 ID 存储在数据库中,并在每次更新时检查它。

注意HTTP/1.1
在现代浏览器中,HTTP/1.1 上每个服务器最多可打开 6 个连接,这在使用 SSE 时可能会成为大问题。如果有人在多个标签页中打开您的网站,您的网站将无法正常运行。

为获得最佳效果,请在不受此限制的情况下使用 SSE 和 HTTP/2。大多数现代网络服务器都支持 HTTP/2,因此请确保启用它。

更多点击标题