Java实现PostgreSQL事件驱动LISTEN/NOTIFY支持

本文简要介绍了PostgreSQL 中的LISTEN和NOTIFY命令,以及如何在 JDBC 连接中使用它们。

什么是LISTEN和NOTIFY?
PostgreSQL 支持使用LISTEN和NOTIFY命令在服务器和连接的客户端之间进行异步通信。这些 PostgreSQL 特有的扩展使我们能够将数据库用作一个简单的消息系统,从而允许我们从数据库中生成客户端可以响应的事件。这可用于许多用途,例如实时仪表板、缓存失效、数据审计等等。

1. 监听通知
我们使用LISTEN命令来注册接收事件的兴趣。该命令会获取我们要监听的频道名称:

postgres=# LISTEN my_channel;
LISTEN

一旦我们完成此操作,我们的连接就可以接收此通道上发生的事件的异步通知。

每个对这些通知感兴趣的连接都会收到它们,因此系统实际上是广播消息,而不是将它们发送给单个接收者。这意味着我们可以使用此机制轻松地将数据库中正在发生的事件告知每个客户端。

请注意,如果我们使用psql,则不会自动接收通知。相反,我们需要再次执行LISTEN命令,这样我们就会显示自上次执行以来发出的所有通知:

postgres=# LISTEN my_channel;
LISTEN
.....
postgres=# LISTEN my_channel;
LISTEN
Asynchronous notification "my_channel" with payload "Hello, World!" received from server process with PID 66.

在这里,我们可以看到某个连接引发了带有有效载荷“Hello, world!”的事件,并且我们的监听连接已经收到了通知。

虽然我们可以注册的监听器数量没有上限,但每个监听器必须保持其数据库连接打开才能接收通知,因此最大连接数限制实际上起到了限制的作用。此外,每个监听器都会使用一定量的资源,因此过多的监听器可能会导致性能问题。

2. 发出通知
现在我们知道了如何监听事件,接下来我们需要能够触发这些事件。我们可以使用NOTIFY命令来触发事件。该命令需要传入频道名称和要发送的消息:

postgres=# NOTIFY my_channel, 'Hello, World!';
NOTIFY

当执行此命令时,所有之前执行过相应LISTEN 命令的连接都可以接收此事件,正如我们之前看到的。

我们的有效载荷是可选的,但如果我们不提供或提供NULL,系统将视为提供了空字符串。它的最大大小为 8,000 字节。如果我们尝试发送超过该值的内容,则会收到错误,并且不会通知任何监听器。

通知会参与事务。这意味着,如果我们在活动事务期间发出通知,系统将不会发送通知,直到事务提交为止。这也意味着,如果事务回滚,系统将根本不会发送通知。

3. 动态消息
NOTIFY命令要求发送的消息必须精确指定。我们无法动态生成该消息,包括简单的字符串连接:

postgres=# NOTIFY my_channel, 'Hello, ' || 'World';
ERROR:  syntax error at or near "||"
LINE 1: NOTIFY my_channel, 'Hello, ' || 'World!';

但是,我们可以使用pg_notify函数来生成通知。它可以接收任何形式的消息:

postgres=# SELECT pg_notify('my_channel', 'Hello, ' || 'World!');
 pg_notify
<hr>
(1 row)

在这种情况下,频道名称必须以字符串形式提供,有效负载也必须以单独的字符串形式提供。我们可以根据需要以任何方式构造这些字符串,包括使用 SQL 语句的结果。

4. 从触发器引发事件
我们可以通过执行适当的语句自行触发事件,也可以让数据库自动触发事件。例如,我们可以注册在适当时间执行的触发器函数,这些函数也可以生成以下通知:

CREATE OR REPLACE FUNCTION notify_table_change() RETURNS TRIGGER AS $$
    BEGIN
        PERFORM pg_notify('table_change', TG_TABLE_NAME);
        RETURN NEW;
    END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER table_change 
    AFTER INSERT OR UPDATE OR DELETE ON table_name
    FOR EACH ROW EXECUTE PROCEDURE notify_table_change();

完成此操作后,如果在table_name表中创建、更新或删除行,此触发器将自动在table_change通道上发送带有更改的表名称的通知。

使用 JDBC 发出通知
我们可以从 JDBC 发出通知,就像我们已经看到的一样。

首先,我们需要连接数据库。目前我们可以使用官方驱动程序。让我们将它们添加到我们的构建中:

<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.7.6</version>
</dependency>

然后我们可以正常创建连接:

Connection connection = DriverManager.getConnection("jdbc:postgresql://localhost:5432/postgres", "postgres", "mysecretpassword");

一旦连接成功,我们就可以使用NOTIFY命令或pg_notify()函数发出通知:

try (Statement statement = connection.createStatement()) {
    statement.execute("NOTIFY my_channel, 'Hello, NOTIFY!'");
}

正如我们之前看到的,如果我们想要做的不仅仅是一个空字符串,比如使用绑定参数,那么我们需要使用pg_notify:

try (PreparedStatement statement = connection.prepareStatement("SELECT pg_notify(?, ?)")) {
    statement.setString(1,
"my_channel");
    statement.setString(2,
"Hello, pg_notify!");
    statement.execute();
}

两种方法的工作方式相同,并且系统会按照预期传递通知:

postgres=# postgres=# LISTEN my_channel;
LISTEN
Asynchronous notification "my_channel" with payload "Hello, NOTIFY!" received from server process with PID 390.
Asynchronous notification
"my_channel" with payload "Hello, pg_notify!" received from server process with PID 390.

在这里,我们可以看到我们的监听会话成功接收了 Java 代码发出的两个通知。

使用官方 JDBC 驱动程序
虽然使用 JDBC 发出通知很简单,但监听通知却比较复杂。从数据库接收异步消息并不是 JDBC 规范的正式组成部分,因此我们需要借助驱动程序特定的功能。

我们需要做的第一件事是执行LISTEN 语句:

try (Statement statement = connection.createStatement()) {
    statement.execute("LISTEN my_channel");
}

但是,为了接收通知本身,我们需要在原始PGConnection对象上使用getNotifications() 方法。这意味着我们首先需要确保获取到正确的连接类型:

PGConnection pgConnection = connection.unwrap(org.postgresql.PGConnection.class);

然后,我们调用getNotifications()来获取已收到的所有通知。我们需要循环执行此操作,并以合适的频率轮询数据库:

while (!Thread.currentThread().isInterrupted()) {
    PGNotification[] notifications = pgConnection.getNotifications(1000);
    if (notifications != null) {
        // React to notifications
    }
}

收到通知后,我们可以按照自己喜欢的方式做出反应。但是,直到下次调用getNotifications()时,我们才会收到更多通知,因此,如果我们需要有效地对更多通知做出反应,请记住不要停止此循环。

我们有三种不同的getNotifications()调用方式。最简单的方法是不带参数,这样它会立即返回所有未完成的通知。然而,这不是推荐的解决方案。还有一个版本,它会设置一个超时时间(以毫秒为单位),线程会阻塞该时间:

PGNotification[] notifications = pgConnection.getNotifications(100);

在这种情况下,呼叫将在超时或任何通知可用后返回(以先发生者为准)。

如果我们调用此版本时将超时值设为0,那么它将永远阻塞。这实际上意味着我们只会在收到任何通知时才返回。如果我们在专用线程上运行此方法,那么这将使管理更加轻松,因为我们不再需要进行任何空闲等待。

使用 PGJDBC-NG 进行监听
如果我们想在不轮询数据库的情况下接收通知,可以使用一些替代驱动程序来实现。PGJDBC -NG 驱动程序与PostgreSQL 兼容,同时提供一些更高级的功能,包括注册通知回调的功能。

在使用它们之前,我们需要将它们添加到我们的构建中:

<dependency>
    <groupId>com.impossibl.pgjdbc-ng</groupId>
    <artifactId>pgjdbc-ng</artifactId>
    <version>0.8.9</version>
</dependency>

然后,我们可以像平常一样创建连接,只是这次我们使用jdbc:pgsql类型的 URL ,而不是 jdbc:postgresql。例如:

Connection connection = DriverManager.getConnection("jdbc:pgsql://localhost:5432/postgres", "postgres", "mysecretpassword");

我们仍然需要在连接上执行LISTEN 命令,与之前完全相同。不过,这次我们可以注册一个监听器,以便在通知发生时接收回调。为此,我们需要实现PGNotificationListener接口:

class Listener implements PGNotificationListener {
    @Override
    public void notification(int processId, String channelName, String payload) {
        LOG.info("Received notification: Channel='{}', Payload='{}', PID={}",
                channelName, payload, processId);
    }
}

然后我们可以使用我们的连接注册一个实例:

PGConnection pgConnection = connection.unwrap(com.impossibl.postgres.api.jdbc.PGConnection.class);
pgConnection.addNotificationListener(new Listener());

此时,只要连接处于活动状态,我们就会在发出通知时自动收到通知,而无需轮询数据库:

10:34:03.104 [PG-JDBC I/O (1)] INFO com.baeldung.listennotify.JdbcLiveTest -- Received notification: Channel='my_channel', Payload='Hello, NOTIFY!', PID=844
10:34:03.106 [PG-JDBC I/O (1)] INFO com.baeldung.listennotify.JdbcLiveTest -- Received notification: Channel='my_channel', Payload='Hello, pg_notify!', PID=844

这不仅让我们更容易管理,而且对我们的应用程序来说也更高效,因为我们不再需要轮询数据库等待某些事情发生。