使用 POSTGRES 作为消息队列

本文研究了 Postgres 的轻量级通知机制,并讨论如何利用它来实现简单但有效的基于推送的消息队列。它还研究了使用此队列在 Kubernetes 部署上的副本之间进行通信,以及实现通用任务处理框架。

作为消息队列的 Postgres
当然,Postgres 是一种关系数据库,实现了 SQL 标准的大部分功能。
除此之外,Postgres 还实现了许多其他非标准化的功能,这些功能也可以通过对 SQL 的扩展来执行。
其中一个功能就是 LISTEN 和 NOTIFY 机制,它允许跨数据库连接发送异步消息。
当然,这些命令也可以通过 JDBC 发出。

举一个简单的 hello-world 例子,考虑让 JVM 监听一个给定的 hello_world_channel:

try (Connection conn = getConnection()) {
 
  try (Statement stmt = conn.createStatement()) {
    stmt.execute(“LISTEN hello_world_channel");
  }
 
  PGNotification notifications =  conn
    .unwrap(PgConnection.class)
    .getNotifications(0);
 
  System.out.println(
   
"Hello " + notifications[0].getParameter() + "!");
}

要接收通知,需要指定要 LISTEN 到的频道名称。通道名称可以任意选择。要接收通知,需要解除与 Postgres JDBC 驱动程序 PgConnection 的连接。接收到的通知可以通过超时读取,如果想无限期等待,则超时时间为 0。

现在,第二个 JVM 也可以通过类似的简单设置发送通知:

try (
    Connection conn = getConnection(); 
    Statement stmt = conn.createStatement()) {
  stmt.execute("NOTIFY hello_world_channel, ‘World’");
}

这将导致第一个 JVM 打印 Hello World!。

定义触发器以创建简单的消息队列
通常,通知不是直接发送的,而是通过表上的触发器发送的。例如,要实现上述消息队列,可以从一个简单的表开始,如下所示:

CREATE TABLE MY_MESSAGES (
  RECEIVER VARCHAR(200),
  ID SERIAL,
  PAYLOAD JSON,
  PROCESSED BOOLEAN);

为了在消息插入表时触发通知,如下所示的函数在 Postgres 的过程语言 pgSQL 中实现了这一点,而不改变插入的行:

CREATE FUNCTION MY_MESSAGES_FCT()
RETURNS TRIGGER AS
$BODY$
BEGIN
  PERFORM pg_notify(‘my_message_queue’, NEW.RECEIVER);
  RETURN NEW;
END;
$BODY$
LANGUAGE PLPGSQL;

在上面的函数中,调用了 pg_notify 函数,该函数只是触发一个 NOTIFY,其中第二个参数作为负载,但避免了可能的 SQL字符串连接可能会发生注入。现在可以将此函数安装为 MY_MESSAGES 中任何插入的触发器:

CREATE TRIGGER MY_MESSAGES_TRG
AFTER INSERT ON MY_MESSAGES
FOR EACH ROW
EXECUTE PROCEDURE MY_MESSAGES_FCT();

通过这种方式,一个或多个侦听器可以在新消息到达时收到通知,例如作为 Kubernetes 部署中的副本。

Postgres 通知和连接池
Postgres 通知机制的一个注意事项是,它通常需要创建一个专用连接来接收通知。这是由于该连接用于通过 JDBC 客户端在打开连接和执行 LISTEN 语句时建立的通道发送通知。这就要求连接必须是长寿命的,而这通常与池数据源不兼容。相反,我们应该通过 DriverManager API 创建一个专用连接。

请注意,这也会占用 Postgres 服务器上的一个完整连接,而连接通常也是池化的。因此,如果有太多的 JVM 占用专用连接来监听通知,Postgres 服务器可能会开始拒绝新的连接尝试。因此,有必要增加 Postgres 服务器实例中允许的最大并发连接数。由于用于接收通知的连接经常空闲运行,只需要很少的机器资源,因此这通常不是一个重大变化。恰恰相反,如果监听通知可以替代对数据库的频繁轮询,那么这种方法甚至可以释放资源。

除了这个缺点,Postgres 的方法还带来了一个不那么明显的优点。以 Oracle 为例,数据库不需要专用连接。不过,这要求数据库能够主动调用指定主机和端口上的通知应用程序。这可能并不总是可行,例如在 Kubernetes 上,多个副本共享一个主机。

在 Postgres 上使用 Spring Integration 的 jdbc 消息队列
随着 Spring Integration 第六版的即将推出,这一功能将在 Spring 整合中可用。Spring 整合已经提供了基于 JDBC 的队列实现。但到目前为止,它只提供轮询消息,或在单个 JVM 中操作同一队列对象时接收推送消息。通过定义与上述触发器类似的触发器(如 Spring integration 的 schema-postgres.sql 文件所建议),Spring integration 可以接收通过常规 JdbcChannelMessageStore 发送的消息。

该消息允许向给定通道发送带有任何可序列化有效载荷的消息,如下所示:

JdbcChannelMessageStore messageStore = 
  new JdbcChannelMessageStore(getDataSource());
messageStore.setChannelMessageStoreQueryProvider(
  new PostgresChannelMessageStoreQueryProvider());
 
messageStore.addMessageToGroup(
  “some-channel”, 
  new GenericMessage<>(“World”);


现在,Spring Integration 6 允许通过推送通知从任何其他已连接的 JVM 接收:

PostgresChannelMessageTableSubscriber subscriber = 
  new PostgresChannelMessageTableSubscriber(() -> 
    DriverManager.getConnection(
      getJdbcUrl(), 
      getUsername(), 
      getPassword()).unwrap(PgConnection.class);
subscriber.start()
 
PostgresSubscribableChannel channel = 
  new PostgresSubscribableChannel(
    messageStore, 
    "some-channel",
     subscriber);
channel.subscribe(message -> System.out.println(
  “Hello “ + message.getPayload() + “!”);

以前,像这样的虚拟机间通信只能通过轮询通道获取新消息,而上述机制允许不同虚拟机之间进行准即时通信。在创建已使用 Postgres 的多节点应用程序时,这可以作为在虚拟机之间进行通信的一种简便方法。

例如,可以使用 Spring 集成的 LockRegistryLeaderInitiator 来确定执行非共享工作的节点。如果多个节点都能接收到供该领导者节点处理的 HTTP 消息,那么这些节点现在就可以通过 JDBC 消息存储转发该调用,从而即时通知领导者。

这只需几行代码就能实现,而且无需扩展技术栈,采用 Zookeeper 等其他技术。

实现通用任务处理器并向工作人员推送通知
对于使用 Postgres 进行 JVM 间通信的真实示例,挪威税务机关提供了一个用于通用任务处理的精简库数据库的通知 API。如果创建了一批新任务,多个工作节点会收到额外工作的通知,并被唤醒以轮询新消息。这项工作将继续进行,直到没有其他任务可用为止,此时工作人员将重新入睡。

这显示了通知机制的另一个优势,它允许同时通知给定通道的任意数量的侦听器,而无需将表行预先分配给给定节点。得益于 Postgres 的多版本并发控制,这种分配可以根据从数据库中进行选择来决定,其中每个节点都可以获取行锁来确定其表中的任务,而不需要在可能的替代通知框架内进行单独的分配实现。所有这些使得 Postgres 成为使用数据库作为队列的不错选择,特别是如果 Postgres 已经是技术堆栈的一部分。